def __default_error_handler_(self, in_error_code, in_mid, \
in_vuid = None, in_addon = None):
# Warning: This version of error handler is made for
# testing and does not comply with the specification
my_descr = ""
if in_error_code in self.errors.keys():
my_descr = self.errors[in_error_code]
if isinstance(in_addon, str):
my_descr += " ({})".format(in_addon)
if my_descr != "":
print('PyIRCIoT error:', my_descr)
def copy_string_(self, from_string):
if not isinstance(from_string, str):
return None
return '{}_'.format(from_string)[:-1]
def is_json_(self, in_message):
if not isinstance(in_message, str):
return False
try:
my_json = json.loads(in_message)
except ValueError:
return False
return True
def get_enc_by_enc_(self, in_encoding):
if not isinstance(in_encoding, str):
return None
my_encoding = in_encoding.lower()
for my_key in self.CONST.enc_aliases.keys():
if my_encoding in self.CONST.enc_aliases[ my_key ]:
my_encoding = my_key
break
if my_encoding not in self.get_encs_list_():
return None
return my_encoding
def get_langs_by_enc_(self, in_encoding):
my_encoding = self.get_enc_by_enc_(in_encoding)
if my_encoding == None:
return []
my_langs = []
for my_key in self.CONST.hl_old_enc.keys():
if my_encoding in self.CONST.hl_old_enc[ my_key ]:
my_langs += [ my_key ]
return my_langs
def get_encs_by_lang_(self, in_human_language):
if not isinstance(in_human_language, str):
return []
my_encs = [ self.CONST.enc_UTF8 ]
if in_human_language in self.CONST.hl_old_enc.keys():
my_encs += self.CONST.hl_old_enc[ in_human_language ]
return my_encs
def get_langs_list_(self):
my_langs = []
for my_key in self.CONST.hl_old_enc.keys():
my_langs += [ my_key ]
return my_langs
def get_encs_list_(self):
my_encs = [ self.CONST.enc_UTF8 ]
for my_lang in self.CONST.hl_old_enc.keys():
for my_enc in self.CONST.hl_old_enc[ my_lang ]:
if my_enc not in my_encs:
my_encs += [ my_enc ]
return my_encs
def is_ipv4_address_(self, in_ipv4_address):
if not isinstance(in_ipv4_address, str):
return False
try:
socket.inet_pton(socket.AF_INET, in_ipv4_address)
except socket.error:
return False
return True
def is_ipv6_address_(self, in_ipv6_address):
if not isinstance(in_ipv6_address, str):
return False
try:
socket.inet_pton(socket.AF_INET6, in_ipv6_address)
except socket.error:
return False
return True
def is_ip_address_(self, in_ip_address):
if self.is_ipv4_address_(in_ip_address):
return True
if self.is_ipv6_address_(in_ip_address):
return True
return False
def is_ip_port_(self, in_ip_port):
if not isinstance(in_ip_port, int):
return False
if in_ip_port < 1 or in_ip_port > 65535:
return False
return True
def dns_ipv4_resolver_(self, in_name):
if not isinstance(in_name, str):
return None
my_ip_list = []
try:
from dns import resolver
my_result = resolver.query(in_name, 'A')
for my_answer in my_result.response.answer:
for my_item in my_answer.items:
my_ip = my_item.address
if not self.is_ipv4_address_(my_ip):
continue
if my_ip not in my_ip_list:
my_ip_list += [ my_ip ]
return my_ip_list
except:
return None
def dns_reverse_resolver_(self, in_server_ip):
if self.is_ip_address_(in_server_ip):
try:
from dns import resolver, reversename
my_reverse = reversename.from_address(in_server_ip)
my_answer = resolver.query(my_reverse, 'PTR')
return str(my_answer[0])[:-1]
except:
pass
return in_server_ip
def get_os_name_(self):
if self.os_override != None:
return self.os_override
try:
return os.uname()[0]
except:
if os.name == 'nt':
return self.CONST.os_windows
return None
def get_ipv6_route_linux_(self, in_server_ip):
def unpack_ipv6_(in_string):
my_unpack = ''
for my_idx in range(8):
if my_idx != 0:
my_unpack += ':'
my_unpack += in_string[my_idx*4:my_idx*4+4]
return my_unpack
my_proc = self.os_linux_proc_ipv6_route
if self.get_os_name_() != self.CONST.os_linux:
return None
if not os.path.exists(my_proc):
return None
if not os.access(my_proc, os.R_OK):
return None
if not self.is_ipv6_address_(in_server_ip):
return None
my_route = None
my_check = ipaddress.IPv6Address(in_server_ip)
with open(my_proc) as my_handler:
for my_line in my_handler:
my_fields = my_line.strip().split()
my_network = my_fields[0]
my_netmask = my_fields[1]
my_gateway = my_fields[4]
if len(my_network) != 32 \
or len(my_gateway) != 32 \
or len(my_netmask) != 2:
continue
my_iunpack = unpack_ipv6_(my_network)
my_gunpack = unpack_ipv6_(my_gateway)
if my_gateway == '::':
my_gateway = None
my_netmask = int(my_netmask)
my_metric = int(my_fields[5], 16)
my_netbase = ipaddress.ip_network(my_iunpack \
+ "/{:d}".format(my_netmask), False)
my_gateway = ipaddress.ip_address(my_gunpack)
my_network = my_iunpack
my_if_name = my_fields[9]
if my_check in my_netbase:
my_get = False
if my_route == None:
my_get = True
else:
( chk_if_name, chk_network, chk_netmask, \
chk_metric, chk_gateway ) = my_route
if my_netmask > chk_netmask:
my_get = True
elif my_netmask == chk_netmask:
if my_metric < chk_metric:
my_get = True
if my_get:
my_route = ( my_if_name, my_network, \
my_netmask, my_metric, my_gateway )
return my_route
#
# End of get_ipv6_route_linux_()
def get_ipv4_route_linux_(self, in_server_ip):
def unpack_ipv4_(in_string):
return socket.inet_ntoa(struct.pack("<L", int(in_string, 16)))
my_proc = self.os_linux_proc_ipv4_route
if self.get_os_name_() != self.CONST.os_linux:
return None
if not os.path.exists(my_proc):
return None
if not os.access(my_proc, os.R_OK):
return None
if not self.is_ipv4_address_(in_server_ip):
return None
my_route = None
my_check = ipaddress.ip_address(in_server_ip)
# Warning: this method only checks the default routing table,
# but it is more correct to take the desired routing table
# by the number based on the current table selection rules,
# which is often used in the routers and in the Android OS
with open(my_proc) as my_handler:
for my_line in my_handler:
my_fields = my_line.strip().split()
my_network = my_fields[1]
my_netmask = my_fields[7]
if len(my_network) != 8 or len(my_netmask) != 8:
continue
my_if_name = my_fields[0]
my_gateway = my_fields[2]
my_metric = int(my_fields[6])
my_network = unpack_ipv4_(my_network)
my_gateway = unpack_ipv4_(my_gateway)
my_netmask = unpack_ipv4_(my_netmask)
if my_gateway == '0.0.0.0':
my_gateway = None
my_ip_mask = ipaddress.ip_address(my_netmask)
my_netbase = ipaddress.ip_network(my_network \
+ '/' + my_netmask)
if my_check in my_netbase:
my_get = False
if my_route == None:
my_get = True
else:
( chk_if_name, chk_network, chk_netmask, \
chk_metric, chk_gateway ) = my_route
chk_ip_mask = ipaddress.ip_address(chk_netmask)
if my_ip_mask > chk_ip_mask:
my_get = True
elif chk_ip_mask == my_ip_mask:
if my_metric < chk_metric:
my_get = True
if my_get:
my_route = ( my_if_name, my_network, \
my_netmask, my_metric, my_gateway )
return my_route
#
# End of get_ipv4_route_linux_()
def get_ipv4_route_(self, in_server_ip):
if not self.is_ipv4_address_(in_server_ip):
return None
my_os = self.get_os_name_()
if my_os == self.CONST.os_linux:
return self.get_ipv4_route_linux_(in_server_ip)
# Other OS's methods will be here
return None
def get_ipv6_route_(self,in_server_ip):
if not self.is_ipv6_address_(in_server_ip):
return None
my_os = self.get_os_name_()
if my_os == self.CONST.os_linux:
return self.get_ipv6_route_linux_(in_server_ip)
# Other OS's methods will be here
return None
def get_src_ip_by_dst_ip_(self,in_server_ip):
my_ipv4_route = None
my_ipv6_route = None
if self.is_ipv4_address_(in_server_ip):
my_ipv4_route = self.get_ipv4_route_(in_server_ip)
if my_ipv4_route == None:
return None
( my_if_name, my_network, my_netmask, \
my_metric, my_gateway ) = my_ipv4_route
elif self.is_ipv6_address_(in_server_ip):
my_ipv6_route = self.get_ipv6_route_(in_server_ip)
if my_ipv6_route == None:
return None
( my_if_name, my_network, my_netmask, \
my_metric, my_gateway ) = my_ipv6_route
else:
return None
my_ip_out = None
my_ip_mask = ipaddress.ip_address(in_server_ip)
my_adapters = ifaddr.get_adapters()
for my_adapter in my_adapters:
for my_ip in my_adapter.ips:
if self.is_ipv4_address_(my_ip.ip) \
and my_ipv4_route != None:
my_network_str \
= "{}/{:d}".format(my_ip.ip, my_ip.network_prefix)
my_netbase = ipaddress.ip_network(my_network_str, False)
if my_ip_mask in my_netbase:
return my_ip.ip
if my_adapter.name == my_if_name:
if my_gateway == None:
my_ip_out = my_ip.ip
else:
my_netbase = ipaddress.ip_network(my_network \
+ '/' + my_netmask)
my_ip_check = ipaddress.ip_address(my_ip.ip)
if my_ip_check in my_netbase:
return my_ip.ip
elif isinstance(my_ip.ip, tuple) \
and my_ipv6_route != None:
( my_ipv6, my_ipv6_flowinfo, my_ipv6_scope_id ) = my_ip.ip
if self.is_ipv6_address_(my_ipv6):
my_network_str \
= "{}/{:d}".format(my_ipv6, my_ip.network_prefix)
my_netbase = ipaddress.ip_network(my_network_str, False)
if my_ip_mask in my_netbase:
return my_ipv6
if my_adapter.name == my_if_name:
if my_gateway == None:
my_ip_out = my_ipv6
else:
my_netbase = ipaddress.ip_network(my_network \
+ "/{:d}".format(my_netmask), False)
my_ip_check = ipaddress.ip_address(my_ipv6)
if my_ip_check in my_netbase:
return my_ipv6
return my_ip_out
#
# End of get_src_ip_by_dst_ip_()
def validate_descriptions_(self, in_dict):
if not isinstance(in_dict, dict):
return {}
my_dict = in_dict
for my_key in in_dict.keys():
if not isinstance(my_key, int):
del my_dict[ my_key ]
continue
my_item = in_dict[ my_key ]
if not isinstance(my_item, str):
del my_dict[ my_key ]
continue
my_a = my_item.count('{')
my_b = my_item.count('}')
my_c = my_item.count('{}')
if my_a != my_c or my_b != my_c:
del my_dict[ my_key ]
continue
return my_dict
def irciot_set_locale_(self, in_lang):
if not isinstance(in_lang, str): return
self.lang = in_lang
my_desc = {}
try:
from PyIRCIoT.irciot_errors \
import irciot_get_all_error_descriptions_
my_desc = irciot_get_all_error_descriptions_(in_lang)
my_desc = self.validate_descriptions_(my_desc)
if my_desc != {}:
self.errors.update(my_desc)
except:
pass
def get_config_value_(self, in_item, in_section = None):
if not isinstance(self.__config, dict): return None
if not isinstance(in_item, str): return None
if not in_item in self.__config.keys(): return None
return self.__config[in_item]
def set_config_value_(self, in_item, in_value, in_section = None):
if not isinstance(in_item, str): return False
if not isinstance(self.__config, dict): self.__config = {}
self.__config[in_item] = in_value
return True
def load_config_defaults_(self, in_defaults):
if not isinstance(in_defaults, dict): return False
self.__config = in_defaults
return True
# incomplete
def load_config_file_(self, in_filename, in_defaults = None):
if self.__config is None: self.__config = {}
self.load_config_defaults_(in_defaults)
if not isinstance(in_filename, str): return False
if not os.path.isfile(in_filename): return False
if not os.access(in_filename, os.R_OK):
self._error_handler_(self.CONST.err_LOADCFG, 0)
return False
try:
import configparser
import random
except Exception as my_ex:
self._error_handler_(self.CONST.err_IMPORT, 0, in_addon = str(my_ex))
return False
random.seed()
my_dummy = 'dummy{:d}'.format(random.randint(10000, 99999))
my_parser = configparser.ConfigParser()
try:
file_fd = open(in_filename, 'r')
my_cfg_str = '[{}]\n'.format(my_dummy)
my_cfg_str += file_fd.read(self.max_config_size)
file_fd.close()
my_parser.read_string(my_cfg_str)
my_config = my_parser._sections[my_dummy]
if isinstance(my_config, dict):
for config_key in my_config.keys():
self.__config[config_key] = my_config[config_key]
del my_config
except Exception as my_ex:
self.__error_handler_(self.CONST.err_LOADCFG, 0, in_addon = str(my_ex))
return False
return True
def bot_process_kill_(self):
if self.get_os_name_() in self.CONST.os_all_UNIX:
try:
import signal
os.kill(os.getpid(), signal.SIGKILL)
except:
pass
def bot_process_kill_timeout_(self, in_timeout):
if type(in_timeout) not in [ int, float ]: return
if self.get_os_name_() in self.CONST.os_all_UNIX \
and in_timeout > 0:
try:
import signal
signal.signal(signal.SIGALRM, self.bot_process_kill_)
signal.alarm(in_timeout)
except:
pass