'''
'' PyIRCIoT-router (PyIRCIoT_router class)
''
'' Copyright (c) 2019-2021 Alexey Y. Woronov
''
'' By using this file, you agree to the terms and conditions set
'' forth in the LICENSE file which can be found at the top level
'' of this package
''
'' Authors:
'' Alexey Y. Woronov <alexey@woronov.ru>
'''
# Those Global options override default behavior and memory usage
#
CAN_debug_library = False
try: # insecure, but for development
from irciot import PyLayerIRCIoT
except:
from PyIRCIoT.irciot import PyLayerIRCIoT
from copy import deepcopy
from time import sleep
from time import time
import threading
import random
try:
import json
except:
import simplejson as json
def __init__(self):
#
self.input_plugin = None
# This is an object that defines routing and prefiltering
# of input messages at an external level outside of this
# IRC-IoT router class
#
self.input_routes = [ ( "*", "*" ) ]
# This is list of tuples each of which contains an source and
# destination IRC-IoT address or mask in the input messages
#
self.router_graphs = [ ( self.do_router_forwarding_, {} ) ]
# This is list of tuples each of which contains an function
# and it's required parameters. Each function sequentially
# performed on the message flow and must have a same inbound
# and outbound interface. If another list is used as a list
# item, then the first item of a nested list may contain a
# special tuple with a function that defining the conditions
# to perform the list, the remaining items in such a list are
# used to process IRC-IoT messages
#
self.output_routes = [ ( "*", "*" ) ]
# This is list of tuples each of which contains an source and
# destination IRC-IoT address or mask in the output messages
#
self.output_plugin = None
# This is an object that defines routing and postfiltering
# of output messages at an external level outside of this
# IRC-IoT router class
#
self.__connections_tracking = {}
#
self.__dup_detection_pipeline = []
#
self.__LMR_table = {}
#
self.__GMR_table = {}
#
super(PyIRCIoT_router, self).__init__()
#
self.maximal_detect_dup_messages \
= self.CONST.default_maximal_detect_dup_messages
#
self.maximal_connection_tracking \
= self.CONST.default_maximal_connection_tracking
self.timeout_connection_tracking \
= self.CONST.default_timeout_connection_tracking
#
self.irciot_crc16_init_()
#
if self.CONST.irciot_router_protocol_version \
!= self.CONST.irciot_protocol_version:
self.irciot_error_(self.CONST.err_PROTO_VER_MISMATCH, 0)
if self.CONST.irciot_router_library_version \
!= self.CONST.irciot_library_version:
self.irciot_error_(self.CONST.err_LIB_VER_MISMATCH, 0)
raise ValueError(self.CONST.irciot_library_version, \
self.CONST.irciot_router_library_version)
#
self.__encoding = self.CONST.irciot_default_encoding
#
self.errors = self.CONST.err_DESCRIPTIONS
#
self.irciot_set_locale_(self.lang)
#
self.__LMR_pool = {}
self.__GMR_pool = {}
#
self.__LMR_task = None
self.__GMR_task = None
#
self.__LMR_run = False
self.__GMR_run = False
#
self.__LMR_latency = self.CONST.default_LMR_latency
self.__GMR_latency = self.CONST.default_GMR_latency
#
self.__LMR_announce_interval \
= self.CONST.default_LMR_announce_interval
#
self.__GMR_connect_try_interval \
= self.CONST.default_GMR_connect_try_interval
#
self.__primary_irciot_address = ""
#
# End of PyIRCIoT_router.__init__()
def __del__(self):
if self.__LMR_run: self.__stop_LMR_()
if self.__GMR_run: self.__stop_GMR_()
def router_pointer (self, in_compatibility, in_messages_pack):
# Warning: interface may be changed while developing
return False
def __start_LMR_(self):
if self.__LMR_run: return
if self.__LMR_task != None: return
self.__LMR_task = threading.Thread(target = self.__local_message_router_)
self.__LMR_run = True
self.__LMR_task.start()
def __start_GMR_(self):
if self.__GMR_run: return
if self.__GMR_task != None: return
self.__GMR_task = threading.Thread(target = self.__global_message_router_)
self.__GMR_run = True
self.__GMR_task.start()
def irciot_set_GMR_connect_try_interval_(self, in_delay):
if type(in_delay) not in [ int, float ]: return False
if in_delay < self.CONST.min_GMR_connect_try_interval \
or in_delay > self.CONST.max_GMR_connect_try_interval:
return False
self.__GMR_connect_try_interval = in_delay
return True
def router_error_(self, in_error_code, in_addon = None):
# Warning: interface may be changed while developing
return
def is_router_route_(self, in_route):
try:
( left_addr, right_addr ) = in_route
except:
return False
if not isinstance(left_addr, str):
return False
if not isinstance(right_addr, str):
return False
return True
def is_router_routes_(self, in_routes):
if not isinstance(in_routes, list):
return False
for my_route in in_routes:
if not self.is_router_route_(my_route):
return False
return True
def is_router_graph_(self, in_graph):
try:
( my_func, my_params ) = in_graph
except:
return False
if not isinstance(my_func, object):
return False
if not isinstance(my_params, dict):
return False
return True
def is_router_graphs_(self, in_graphs):
if not isinstance(in_graphs, list):
return False
for my_item in in_graphs:
if isinstance(my_item, list):
if not self.is_router_graphs_(my_item):
return False
else:
if not self.is_router_graph_(my_item):
return False
return True
def do_router_graph_(self, in_datum, in_graph, \
in_direction, in_vuid = None):
if not isinstance(in_datum, dict):
return None
if not self.is_router_graph_(in_graph):
return None
if not self.is_irciot_datum_(in_datum, None, None, None):
return None
( my_function_, my_params ) = in_graph
my_keys = my_function_( None, self.CONST.rgn_REQUEST_PARAMS, None )
for my_key in my_keys:
if not my_key in my_params.keys():
self.irciot_error_(self.CONST.err_MISSING_PARAMETER, 0, \
in_addon = my_key)
return None
return my_function_( in_datum, my_params, in_direction, in_vuid )
def do_router_route_(self, in_datum, in_route, in_vuid = None):
if not isinstance(in_datum, dict):
return None
if not isinstance(in_route, tuple):
return None
if not self.is_irciot_datum_(in_datum, None, None, None):
return None
return in_datum
def do_router_graphs_(self, in_datum, in_graphs, \
in_direction, in_vuid = None):
if not isinstance(in_datum, dict):
return None
if not isinstance(in_graphs, list):
return None
my_datum = in_datum
for my_index, my_graph in enumerate(in_graphs):
if isinstance(my_graph, list):
if my_graph == []:
return None
my_datum = self.do_router_graphs_(my_datum, my_graph, \
in_derection, in_vuid)
else:
tmp_datum = self.do_router_graph_(my_datum, my_graph, \
in_direction, in_vuid)
if my_index == 0 and isinstance(tmp_datum, bool):
if not tmp_datum:
break
elif isinstance(tmp_datum, dict):
my_datum = tmp_datum
else:
return None
del tmp_datum
return my_datum
#
# End of PyIRCIoT_router.do_router_graphs_()
def do_router_routes_(self, in_datum, in_routes, in_vuid = None):
if not isinstance(in_datum, dict):
return None
if not isinstance(in_routes, list):
return None
for my_route in in_routes:
pass
return in_datum
def do_router_(self, in_message, in_direction, in_vuid = None):
if in_direction in [ self.CONST.dir_in, self.CONST.dir_both ] \
and not self.is_router_routes_(self.input_routes):
return ""
if in_direction in [ self.CONST.dir_out, self.CONST.dir_both ] \
and not self.is_router_routes_(self.output_routes):
return ""
if not self.is_router_graphs_(self.router_graphs):
return ""
self.irciot_check_encoding_()
my_json = self.irciot_deinencap_(in_message, in_vuid)
if my_json == "":
return ""
try:
my_datums = json.loads(my_json)
except:
return ""
if isinstance(my_datums, dict):
my_datums = [ my_datums ]
if not isinstance(my_datums, list):
return ""
my_outdat = []
for my_datum in my_datums:
if in_direction in [ self.CONST.dir_in, self.CONST.dir_both ]:
my_datum = self.do_router_routes_(my_datum, self.input_routes, in_vuid)
my_datum = self.do_router_graphs_(my_datum, self.router_graphs, \
in_direction, in_vuid)
if in_direction in [ self.CONST.dir_out, self.CONST.dir_both ]:
my_datum = self.do_router_routes_(my_datum, self.output_routes, in_vuid)
if my_datum != None:
my_outdat.append(my_datum)
if my_outdat == []:
return ""
out_message = ""
out_pack = self.irciot_encap_all_(my_outdat, in_vuid)
if isinstance(out_pack, list):
if len(out_pack) > 0:
( out_message, out_vuid ) = out_pack[0]
if not isinstance(out_message, str):
out_message = ""
return out_message
#
# End of PyIRCIoT_router.do_router_()
def dup_detection_(self, in_datum, in_direction, in_vuid = None):
if not isinstance(in_datum, dict):
return True # Drop invalid messages
if in_direction not in self.CONST.dir_any:
self.irciot_error_(self.CONST.err_INVALID_DIRECTION, 0)
return True
my_datum = deepcopy(in_datum)
if self.CONST.tag_DATE_TIME in in_datum:
my_dt = my_datum.pop(self.CONST.tag_DATE_TIME)
else:
my_dt = "{}".format(time())
if len(my_dt) > 11 and len(my_dt) < 21:
my_dt = my_dt[:11] # cut to POSIX timestamp
if len(my_dt) > 20:
my_dt = my_dt[:19] # cut to ISO 8601:2004
if in_vuid == None: my_string = self.CONST.api_vuid_all
else: my_string = "{}".format(in_vuid)
for my_key, my_value in sorted(my_datum.items()):
my_string += "{}{}".format(my_key, my_value)
del my_datum
my_bytes = bytes(my_string, self.__encoding)
my_string = "{}{}".format( \
self.irciot_crc16_(my_bytes), \
self.irciot_crc32_(my_bytes))
del my_bytes
my_string += "{}".format(my_dt)
if my_string in self.__dup_detection_pipeline:
return True # Drop duplicate messages
self.__dup_detection_pipeline.append(my_string)
if len(self.__dup_detection_pipeline) \
> self.maximal_detect_dup_messages:
self.__dup_detection_pipeline.pop(0)
return False
#
# End of PyIRCIoT_router.dup_detection_()
# incomplete
def __direct_router_message_(self, in_dict, in_vuid):
if not isinstance(in_dict, dict) \
or not isinstance(in_vuid, str):
return
my_message = json.dumps(in_dict, separators=(',',':'))
my_compat = self.irciot_compatibility_()
my_pack = (my_message, in_vuid)
if not self.irc_pointer (my_compat, my_pack):
# Warning; if transport handler was not attached,
# the message will be added to the output_pool,
# but delivering the message at a specified time
# is not guaranteed and the routing protocol
# may not work correctly !!!
self.output_pool.append(my_pack)
def get_LMR_list_(self):
my_LMR_list = []
for my_key in self.__LMR_pool.keys():
my_LMR_list.append(my_key)
return my_LMR_list
def get_GMR_list_(self):
my_GMR_list = []
for my_key in self.__GMR_pool.keys():
my_GMR_list.append(my_key)
return my_GMR_list
def __check_LMR_id_(self, in_LMR_id = None):
if in_LMR_id == None:
return False
if not isinstance(in_LMR_id, int):
self.__invalid_LMR_id_(in_LMR_id)
return False
if in_LMR_id not in self.__LMR_pool.keys():
self.__invalid_LMR_id_(in_LMR_id)
return False
return True
def __check_GMR_id_(self, in_GMR_id = None):
if in_GMR_id == None:
return False
if not isinstance(in_GMR_id, int):
self.__invalid_GMR_id_(in_GMR_id)
return False
if in_GMR_id not in self.__GMR_pool.keys():
self.__invalid_GMR_id_(in_GMR_id)
return False
return True
def __get_LMR_id_(self, in_LMR_id):
if in_LMR_id != None:
return in_LMR_id
if len(self.__LMR_pool) == 1:
return list(self.__LMR_pool)[0]
return None
def __get_GMR_id_(self, in_GMR_id):
if in_GMR_id != None:
return in_GMR_id
if len(self.__GMR_pool) == 1:
return list(self.__GMR_pool)[0]
return None
def get_LMR_status_(self, in_LMR_id = None):
my_LMR_id = self.__get_LMR_id_(in_LMR_id)
if not self.__check_LMR_id_(my_LMR_id): return None
return self.__LMR_pool[my_LMR_id][self.CONST.intag_LMR_status]
def get_GMR_status_(self, in_GMR_id = None):
my_GMR_id = self.__get_GMR_id_(in_GMR_id)
if not self.__check_GMR_id_(my_GMR_id): return None
return self.__GMR_pool[my_GMR_id][self.CONST.intag_GMR_status]
def get_GMR_vuid_(self, in_GMR_id = None):
my_GMR_id = self.__get_GMR_id_(in_GMR_id)
if not self.__check_GMR_id_(my_GMR_id): return None
return self.__GMR_pool[my_GMR_id][self.CONST.intag_GMR_vuid]