irciot_shared.py
import socket
import struct
import ifaddr
import ipaddress
import datetime
import sys
import os
try:
import json
except:
import simplejson as json
class irciot_shared_(object):
class CONST(object):
#
### IRC-IoT API:
#
api_GET_LMID = 101 # Get last Message ID
api_SET_LMID = 102 # Set last Message ID
api_GET_OMID = 111 # Get Own last Message ID
api_SET_OMID = 112 # Set Own last Message ID
api_GET_EKEY = 301 # Get Encryption Key
api_SET_EKEY = 302 # Set Encryption Key
api_GET_EKTO = 351 # Get Encryption Key Timeout
api_SET_EKTO = 352 # Set Encyrption Key Timeout
api_GET_BKEY = 501 # Get Blockchain key
api_SET_BKEY = 502 # Set Blockchain Key
api_GET_BKTO = 551 # Get Blockchain Key Timeout
api_SET_BKTO = 552 # Set Blockchain Key Timeout
api_GET_iMTU = 600 # Get initial Maximum Transmission Unit
api_GET_iENC = 601 # Get initial Encoding
api_GET_VUID = 700 # Get list of Virutal User IDs
#
api_vuid_cfg = 'c' # VUID prefix for users from config
api_vuid_tmp = 't' # VUID prefix for temporal users
api_vuid_srv = 's' # VUID prefix for IRC-IoT Services
api_vuid_all = '*' # Means All users VUIDs when sending messages
#
api_vuid_any = [ api_vuid_cfg, api_vuid_tmp, api_vuid_srv ]
api_vuid_not_srv = [ api_vuid_cfg, api_vuid_tmp, api_vuid_all ]
#
api_vuid_self = 'c0' # Default preconfigured VUID
#
### Basic IRC-IoT Services
#
api_vuid_CRS = 'sC' # Cryptographic Repository Service
api_vuid_GDS = 'sD' # Global Dictionary Service
api_vuid_GRS = 'sR' # Global Resolving Service
api_vuid_GTS = 'sT' # Global Time Service
#
api_vuid_PRS = 'sr' # Primary Routing Service
#
api_first_temporal_vuid = 1000
#
# for Python 3.x: 19 Jan 3001 08:00 UTC
api_epoch_maximal = 32536799999
#
### For IoT Bot creation:
#
default_bot_name = 'iotBot'
default_bot_python = 'python3'
default_bot_background_parameter = 'background'
#
default_max_config_size = 1024 * 1024 # bytes
#
### OS depended:
#
os_aix = 'AIX'
os_freebsd = 'FreeBSD'
os_hpux = 'HP-UX'
os_hurd = 'GNU'
os_irix = 'IRIX'
os_linux = 'Linux'
os_macosx = 'Darwin'
os_minix3 = 'Minix3'
os_netbsd = 'NetBSD'
os_openbsd = 'OpenBSD'
os_os400 = 'OS400'
os_solaris = 'SunOS'
os_windows = 'WindowsNT'
os_qnx = 'QNX'
#
os_all_UNIX = [
os_aix, os_hpux, os_freebsd, os_linux,
os_macosx, os_minix3, os_netbsd, os_openbsd,
os_solaris, os_qnx ]
#
os_linux_proc_ipv4_route = '/proc/net/route'
os_linux_proc_ipv6_route = '/proc/net/ipv6_route'
#
### Human Languages (ISO 639-1):
#
hl_Arabic = 'ar'
hl_Armenian = 'hy'
hl_Basque = 'eu'
hl_Bashkir = 'ba'
hl_Bengali = 'bn'
hl_Chinese = 'cn' # Simplified
hl_Church = 'cu' # Slavonic
hl_Croatian = 'hr'
hl_Czech = 'cz'
hl_Danish = 'dk'
hl_Deutsch = 'de'
hl_English = 'en'
hl_Estonian = 'ee'
hl_Finnish = 'fi'
hl_French = 'fr'
hl_Georgian = 'ka'
hl_Greek = 'gr'
hl_Hrvatski = 'hr'
hl_Hebrew = 'il'
hl_Ido = 'io'
hl_Irish = 'ga'
hl_Italian = 'it'
hl_Kazakh = 'kk'
hl_Korean = 'ko'
hl_Kurdish = 'ku'
hl_Kyrgyz = 'ky'
hl_Latvian = 'lv'
hl_Maltese = 'mt'
hl_Nauru = 'na'
hl_Persian = 'fa'
hl_Polish = 'pl'
hl_Pushto = 'ps'
hl_Romanian = 'ro'
hl_Russian = 'ru'
hl_Sanskrit = 'sa'
hl_Serbian = 'rs'
hl_Slovak = 'sk'
hl_Somali = 'so'
hl_Swahili = 'sw'
hl_Swedish = 'se'
hl_Spanish = 'es'
hl_Tamil = 'ta'
hl_Tajik = 'tg'
hl_Thai = 'th'
hl_Turkish = 'tr'
hl_Turkmen = 'tk'
hl_Urdu = 'ur'
hl_Uzbek = 'uz'
hl_Japanese = 'jp'
#
hl_default = hl_English
#
enc_ASCII = "ascii"
for enc in [ "7", "8", "8a", "16u" ]:
locals()["enc_ArmSCII{}".format(enc.upper())] \
= "armscii-{}".format(enc)
for enc in [ "CN", "TW", "JP", "KR" ]:
locals()["enc_EUC{}".format(enc)] = "EUC-{}".format(enc)
for enc in [ "c", "r", "ru", "t", "u" ]:
locals()["enc_KOI8{}".format(enc.upper())] \
= "koi8-{}".format(enc)
for enc in [ 7, 8, 16, 32 ]:
locals()["enc_UTF{}".format(enc)] = "utf-{}".format(enc)
for enc in [ 273, 278, 280, 297, 423, 437, 737, 775,
850, 851, 852, 855, 856, 857, 858, 860, 861, 862,
863, 865, 866, 869, 874, 875, 880, 905, 922, 935,
1025, 1097, 1250, 1251, 1252, 1253, 1254, 1255, 1256, 1257,
1258, 1259 ]:
locals()["enc_{}".format(enc)] = "cp{}".format(enc)
for enc in [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 13, 14, 15 ]:
locals()["enc_ISO{}".format(enc)] = "iso-8859-{}".format(enc)
enc_ISO22 = "iso-2022"
#
enc_aliases = {
enc_ASCII : [ "cp-367", "cp367", "us-ascii" ],
enc_273 : [ "cp-273", "ibm273", "ibm-273" ],
enc_278 : [ "cp-278", "ibm278", "ibm-278" ],
enc_280 : [ "cp-280", "ibm280", "ibm-280" ],
enc_297 : [ "cp-297", "ibm297", "ibm-297" ],
enc_423 : [ "cp-423", "ibm423", "ibm-423" ],
enc_437 : [ "cp-437", "ibm437", "ibm-437" ],
enc_737 : [ "cp-737", "windows-737", "ibm737", "ibm-737" ],
enc_775 : [ "cp-775", "windows-755", "ibm775", "ibm-775" ],
enc_850 : [ "cp-850", "windows-850", "ibm850", "ibm-850" ],
enc_851 : [ "cp-851", "ibm851", "ibm-851" ],
enc_852 : [ "cp-852", "windows-852", "ibm852", "ibm-852" ],
enc_855 : [ "cp-855", "windows-855", "ibm855", "ibm-855" ],
enc_856 : [ "cp-856", "ibm856", "ibm-856" ],
enc_857 : [ "cp-857", "windows-857", "ibm857", "ibm-857" ],
enc_858 : [ "cp-858", "windows-858", "ibm-858" ],
enc_860 : [ "cp-860", "ibm860", "ibm-860" ],
enc_861 : [ "cp-861", "windows-861", "ibm861", "ibm-861" ],
enc_862 : [ "cp-862", "windows-862", "dos-862", "ibm862" ],
enc_863 : [ "cp-863", "ibm863", "ibm-863" ],
enc_865 : [ "cp-865", "ibm865", "ibm-865" ],
enc_866 : [ "cp-866", "windows-866", "ibm866", "ibm-866" ],
enc_869 : [ "cp-869", "windows-869", "ibm869", "ibm-869" ],
enc_874 : [ "cp-874", "windows-874" ],
enc_875 : [ "cp-875", "ibm875", "ibm-875" ],
enc_880 : [ "cp-880", "ibm880", "ibm-880" ],
enc_905 : [ "cp-905", "ibm905", "ibm-905" ],
enc_922 : [ "cp-922", "ibm922", "ibm-922" ],
enc_935 : [ "cp-935", "ibm935", "ibm-935" ],
enc_1025 : [ "cp-1025", "ibm1025", "ibm-1025" ],
enc_1097 : [ "cp-1097", "ibm1097", "ibm-1097" ],
enc_1250 : [ "cp-1250", "windows-1250", "cp5346" ],
enc_1251 : [ "cp-1251", "windows-1251", "cp5347" ],
enc_1252 : [ "cp-1252", "windows-1252", "cp5348" ],
enc_1253 : [ "cp-1253", "windows-1253", "cp5349" ],
enc_1254 : [ "cp-1254", "windows-1254", "cp5350" ],
enc_1255 : [ "cp-1255", "windows-1255" ],
enc_1256 : [ "cp-1256", "windows-1256" ],
enc_1257 : [ "cp-1257", "windows-1257" ],
enc_1258 : [ "cp-1258", "windows-1258" ],
enc_1259 : [ "cp-1259", "windows-1259" ],
enc_EUCCN : [ "euc_cn" ],
enc_EUCTW : [ "euc_tw" ],
enc_EUCJP : [ "euc_jp" ],
enc_EUCKR : [ "euc_kr", "cp970" ],
enc_ISO1 : [ "iso8859-1", "latin1", "ibm819", "ibm-819" ],
enc_ISO2 : [ "iso8859-2", "latin2", "ibm912", "ibm-912" ],
enc_ISO3 : [ "iso8859-3", "latin3", "ibm913", "ibm-913" ],
enc_ISO4 : [ "iso8859-4", "latin4", "ibm914", "ibm-914" ],
enc_ISO5 : [ "iso8859-5", "cp915", "ibm915", "ibm-915" ],
enc_ISO6 : [ "iso8859-6", "cp1089", "cp-1089" ],
enc_ISO7 : [ "iso8859-7", "greek8", "ibm813", "ibm-813" ],
enc_ISO8 : [ "iso8859-8", "hebrew8" ],
enc_ISO9 : [ "iso8859-9", "latin5", "ibm920", "ibm-920" ],
enc_ISO10 : [ "iso8859-10", "latin6" ],
enc_ISO13 : [ "iso8859-13", "cp921", "cp-921" ],
enc_ISO15 : [ "iso8859-15", "latin9", "ibm923", "ibm-923" ],
enc_ISO22 : [ "iso2022" ],
enc_KOI8R : [ "koi8r", "cp878", "ibm-878" ],
enc_KOI8U : [ "koi8u", "cp1168", "ibm-1168" ],
enc_UTF7 : [ "utf7" ],
enc_UTF8 : [ "utf8" ],
enc_UTF16 : [ "utf16" ],
enc_UTF32 : [ "utf32" ],
}
#
default_encoding = enc_UTF8
#
hl_old_enc = {
hl_Arabic : [ enc_ISO6 ],
hl_Armenian : [ enc_ArmSCII7, enc_ArmSCII8, enc_ArmSCII8A ],
hl_Basque : [ enc_ISO1 ],
hl_Bashkir : [],
hl_Bengali : [],
hl_Chinese : [ enc_935, enc_EUCTW ],
hl_Church : [],
hl_Croatian : [ enc_ISO2 ],
hl_Czech : [ enc_ISO2 ],
hl_Danish : [ enc_ISO1 ],
hl_Deutsch : [ enc_ISO1, enc_273 ],
hl_English : [ enc_ISO1, enc_437, enc_UTF7, enc_ASCII ],
hl_Estonian : [ enc_ISO15, enc_ISO13, enc_922, enc_775 ],
hl_Finnish : [ enc_ISO1, enc_278 ],
hl_French : [ enc_ISO1, enc_863, enc_297 ],
hl_Georgian : [],
hl_Greek : [ enc_ISO7, enc_423, enc_737, enc_869,
enc_875, enc_1253 ],
hl_Hrvatski : [ enc_ISO1 ],
hl_Hebrew : [ enc_ISO8, enc_862, enc_856, enc_1255 ],
hl_Ido : [],
hl_Irish : [ enc_ISO14 ],
hl_Italian : [ enc_ISO1, enc_280 ],
hl_Kazakh : [],
hl_Korean : [ enc_EUCKR ],
hl_Kurdish : [],
hl_Kyrgyz : [],
hl_Latvian : [ enc_1257, enc_ISO13, enc_775 ],
hl_Maltese : [ enc_ISO3 ],
hl_Nauru : [],
hl_Persian : [ enc_1097 ],
hl_Polish : [ enc_ISO2 ],
hl_Pushto : [],
hl_Romanian : [ enc_ISO2 ],
hl_Russian : [ enc_ISO5, enc_855, enc_866, enc_1251,
enc_880, enc_KOI8C, enc_KOI8R, enc_KOI8RU ],
hl_Sanskrit : [],
hl_Serbian : [ enc_1025 ],
hl_Slovak : [ enc_ISO2 ],
hl_Somali : [],
hl_Swahili : [],
hl_Swedish : [ enc_ISO1, enc_278 ],
hl_Spanish : [ enc_ISO1 ],
hl_Tamil : [],
hl_Tajik : [ enc_KOI8T ],
hl_Thai : [ enc_874 ],
hl_Turkish : [ enc_ISO9, enc_857, enc_905, enc_1254 ],
hl_Turkmen : [],
hl_Urdu : [],
hl_Uzbek : [],
hl_Japanese : [ enc_EUCJP ]
}
#
err_SEC = 5
err_MIN = 6
err_HOURS = 7
err_BYTES = 8
err_TRY = 9
err_CLOSED = 10
err_RECONN = 11
err_CONNTO = 12
err_DEVEL = 13
err_SENDTO = 15
err_USAGE = 16
err_OPTIONS = 17
err_LOADCFG = 18
err_IMPORT = 19
err_BASEDON = 64
err_UNKNOWN = 100
#
err_DESCRIPTIONS = {
err_SEC: " sec.",
err_MIN: " min.",
err_HOURS: " hr.",
err_BYTES: " byte(s)",
err_TRY: " (try: {})",
err_CLOSED: "Connection closed",
err_RECONN: "reconnecting to ",
err_CONNTO: "Connecting to ",
err_SENDTO: "Sending to ",
err_USAGE: "Usage: ",
err_OPTIONS: "[<options>]",
err_LOADCFG: "Problem reading the configuration file",
err_IMPORT: "Library import error",
err_BASEDON: "based on IRC-IoT demo library",
err_UNKNOWN: "Unknown error",
err_DEVEL: "You are using the test part of library code" \
+ ", it may be unstable or insecure, if you are not sure" \
+ " - disable it"
}
#
def __setattr__(self, *_):
pass
def __init__(self):
#
self.bot_name = self.CONST.default_bot_name
self.bot_python = self.CONST.default_bot_python
self.bot_background_parameter \
= self.CONST.default_bot_background_parameter
self.errors = self.CONST.err_DESCRIPTIONS
self.max_config_size = self.CONST.default_max_config_size
self.lang = self.CONST.hl_default
# Only for testing:
self.os_override = None
self.os_linux_proc_ipv4_route \
= self.CONST.os_linux_proc_ipv4_route
self.os_linux_proc_ipv6_route \
= self.CONST.os_linux_proc_ipv6_route
self.__config = None
self.__error_handler_ = self.__default_error_handler_
def __default_error_handler_(self, in_error_code, in_mid, \
in_vuid = None, in_addon = None):
# Warning: This version of error handler is made for
# testing and does not comply with the specification
my_descr = ""
if in_error_code in self.errors.keys():
my_descr = self.errors[in_error_code]
if isinstance(in_addon, str):
my_descr += " ({})".format(in_addon)
if my_descr != "":
print('PyIRCIoT error:', my_descr)
def copy_string_(self, from_string):
if not isinstance(from_string, str):
return None
return '{}_'.format(from_string)[:-1]
def wipe_string_(self, in_hash):
try:
import SecureString
SecureString.clearmem(in_hash)
except:
pass
return "U" * 128
def td2ms_(self, in_td):
return in_td.days * 86400 \
+ in_td.seconds \
+ in_td.microseconds / 1000000
def is_json_(self, in_message):
if not isinstance(in_message, str):
return False
try:
my_json = json.loads(in_message)
except ValueError:
return False
return True
def get_enc_by_enc_(self, in_encoding):
if not isinstance(in_encoding, str):
return None
my_encoding = in_encoding.lower()
for my_key in self.CONST.enc_aliases.keys():
if my_encoding in self.CONST.enc_aliases[ my_key ]:
my_encoding = my_key
break
if my_encoding not in self.get_encs_list_():
return None
return my_encoding
def get_langs_by_enc_(self, in_encoding):
my_encoding = self.get_enc_by_enc_(in_encoding)
if my_encoding == None:
return []
my_langs = []
for my_key in self.CONST.hl_old_enc.keys():
if my_encoding in self.CONST.hl_old_enc[ my_key ]:
my_langs += [ my_key ]
return my_langs
def get_lang_by_enc_(self, in_encoding):
my_langs = self.get_langs_by_enc_(in_encoding)
if len(my_langs) != 1:
return None
return my_langs.pop()
def get_encs_by_lang_(self, in_human_language):
if not isinstance(in_human_language, str):
return []
my_encs = [ self.CONST.enc_UTF8 ]
if in_human_language in self.CONST.hl_old_enc.keys():
my_encs += self.CONST.hl_old_enc[ in_human_language ]
return my_encs
def get_langs_list_(self):
my_langs = []
for my_key in self.CONST.hl_old_enc.keys():
my_langs += [ my_key ]
return my_langs
def get_encs_list_(self):
my_encs = [ self.CONST.enc_UTF8 ]
for my_lang in self.CONST.hl_old_enc.keys():
for my_enc in self.CONST.hl_old_enc[ my_lang ]:
if my_enc not in my_encs:
my_encs += [ my_enc ]
return my_encs
def is_ipv4_address_(self, in_ipv4_address):
if not isinstance(in_ipv4_address, str):
return False
try:
socket.inet_pton(socket.AF_INET, in_ipv4_address)
except socket.error:
return False
return True
def is_ipv6_address_(self, in_ipv6_address):
if not isinstance(in_ipv6_address, str):
return False
try:
socket.inet_pton(socket.AF_INET6, in_ipv6_address)
except socket.error:
return False
return True
def is_ip_address_(self, in_ip_address):
if self.is_ipv4_address_(in_ip_address):
return True
if self.is_ipv6_address_(in_ip_address):
return True
return False
def is_ip_port_(self, in_ip_port):
if not isinstance(in_ip_port, int):
return False
if in_ip_port < 1 or in_ip_port > 65535:
return False
return True
def is_hostname_(self, in_name):
try:
socket.gethostbyname(in_name)
return True
except socket.error:
return False
# incomplete
def is_local_ip_address_(self, in_ip):
return False
def dns_ipv4_resolver_(self, in_name):
if not isinstance(in_name, str):
return None
my_ip_list = []
try:
from dns import resolver
my_result = resolver.query(in_name, 'A')
for my_answer in my_result.response.answer:
for my_item in my_answer.items:
my_ip = my_item.address
if not self.is_ipv4_address_(my_ip):
continue
if my_ip not in my_ip_list:
my_ip_list += [ my_ip ]
return my_ip_list
except:
return None
def dns_reverse_resolver_(self, in_server_ip):
if self.is_ip_address_(in_server_ip):
try:
from dns import resolver, reversename
my_reverse = reversename.from_address(in_server_ip)
my_answer = resolver.query(my_reverse, 'PTR')
return str(my_answer[0])[:-1]
except:
pass
return in_server_ip
def get_os_name_(self):
if self.os_override != None:
return self.os_override
try:
return os.uname()[0]
except:
if os.name == 'nt':
return self.CONST.os_windows
return None
def get_ipv6_route_linux_(self, in_server_ip):
def unpack_ipv6_(in_string):
my_unpack = ''
for my_idx in range(8):
if my_idx != 0:
my_unpack += ':'
my_unpack += in_string[my_idx*4:my_idx*4+4]
return my_unpack
my_proc = self.os_linux_proc_ipv6_route
if self.get_os_name_() != self.CONST.os_linux:
return None
if not os.path.exists(my_proc):
return None
if not os.access(my_proc, os.R_OK):
return None
if not self.is_ipv6_address_(in_server_ip):
return None
my_route = None
my_check = ipaddress.IPv6Address(in_server_ip)
with open(my_proc) as my_handler:
for my_line in my_handler:
my_fields = my_line.strip().split()
my_network = my_fields[0]
my_netmask = my_fields[1]
my_gateway = my_fields[4]
if len(my_network) != 32 \
or len(my_gateway) != 32 \
or len(my_netmask) != 2:
continue
my_iunpack = unpack_ipv6_(my_network)
my_gunpack = unpack_ipv6_(my_gateway)
if my_gateway == '::':
my_gateway = None
my_netmask = int(my_netmask)
my_metric = int(my_fields[5], 16)
my_netbase = ipaddress.ip_network(my_iunpack \
+ "/{:d}".format(my_netmask), False)
my_gateway = ipaddress.ip_address(my_gunpack)
my_network = my_iunpack
my_if_name = my_fields[9]
if my_check in my_netbase:
my_get = False
if my_route == None:
my_get = True
else:
( chk_if_name, chk_network, chk_netmask, \
chk_metric, chk_gateway ) = my_route
if my_netmask > chk_netmask:
my_get = True
elif my_netmask == chk_netmask:
if my_metric < chk_metric:
my_get = True
if my_get:
my_route = ( my_if_name, my_network, \
my_netmask, my_metric, my_gateway )
return my_route
#
# End of get_ipv6_route_linux_()
def get_ipv4_route_linux_(self, in_server_ip):
def unpack_ipv4_(in_string):
return socket.inet_ntoa(struct.pack("<L", int(in_string, 16)))
my_proc = self.os_linux_proc_ipv4_route
if self.get_os_name_() != self.CONST.os_linux:
return None
if not os.path.exists(my_proc):
return None
if not os.access(my_proc, os.R_OK):
return None
if not self.is_ipv4_address_(in_server_ip):
return None
my_route = None
my_check = ipaddress.ip_address(in_server_ip)
# Warning: this method only checks the default routing table,
# but it is more correct to take the desired routing table
# by the number based on the current table selection rules,
# which is often used in the routers and in the Android OS
with open(my_proc) as my_handler:
for my_line in my_handler:
my_fields = my_line.strip().split()
my_network = my_fields[1]
my_netmask = my_fields[7]
if len(my_network) != 8 or len(my_netmask) != 8:
continue
my_if_name = my_fields[0]
my_gateway = my_fields[2]
my_metric = int(my_fields[6])
my_network = unpack_ipv4_(my_network)
my_gateway = unpack_ipv4_(my_gateway)
my_netmask = unpack_ipv4_(my_netmask)
if my_gateway == '0.0.0.0':
my_gateway = None
my_ip_mask = ipaddress.ip_address(my_netmask)
my_netbase = ipaddress.ip_network(my_network \
+ '/' + my_netmask)
if my_check in my_netbase:
my_get = False
if my_route == None:
my_get = True
else:
( chk_if_name, chk_network, chk_netmask, \
chk_metric, chk_gateway ) = my_route
chk_ip_mask = ipaddress.ip_address(chk_netmask)
if my_ip_mask > chk_ip_mask:
my_get = True
elif chk_ip_mask == my_ip_mask:
if my_metric < chk_metric:
my_get = True
if my_get:
my_route = ( my_if_name, my_network, \
my_netmask, my_metric, my_gateway )
return my_route
#
# End of get_ipv4_route_linux_()
def get_ipv4_route_(self, in_server_ip):
if not self.is_ipv4_address_(in_server_ip):
return None
my_os = self.get_os_name_()
if my_os == self.CONST.os_linux:
return self.get_ipv4_route_linux_(in_server_ip)
# Other OS's methods will be here
return None
def get_ipv6_route_(self,in_server_ip):
if not self.is_ipv6_address_(in_server_ip):
return None
my_os = self.get_os_name_()
if my_os == self.CONST.os_linux:
return self.get_ipv6_route_linux_(in_server_ip)
# Other OS's methods will be here
return None
def get_src_ip_by_dst_ip_(self,in_server_ip):
my_ipv4_route = None
my_ipv6_route = None
if self.is_ipv4_address_(in_server_ip):
my_ipv4_route = self.get_ipv4_route_(in_server_ip)
if my_ipv4_route == None:
return None
( my_if_name, my_network, my_netmask, \
my_metric, my_gateway ) = my_ipv4_route
elif self.is_ipv6_address_(in_server_ip):
my_ipv6_route = self.get_ipv6_route_(in_server_ip)
if my_ipv6_route == None:
return None
( my_if_name, my_network, my_netmask, \
my_metric, my_gateway ) = my_ipv6_route
else:
return None
my_ip_out = None
my_ip_mask = ipaddress.ip_address(in_server_ip)
my_adapters = ifaddr.get_adapters()
for my_adapter in my_adapters:
for my_ip in my_adapter.ips:
if self.is_ipv4_address_(my_ip.ip) \
and my_ipv4_route != None:
my_network_str \
= "{}/{:d}".format(my_ip.ip, my_ip.network_prefix)
my_netbase = ipaddress.ip_network(my_network_str, False)
if my_ip_mask in my_netbase:
return my_ip.ip
if my_adapter.name == my_if_name:
if my_gateway == None:
my_ip_out = my_ip.ip
else:
my_netbase = ipaddress.ip_network(my_network \
+ '/' + my_netmask)
my_ip_check = ipaddress.ip_address(my_ip.ip)
if my_ip_check in my_netbase:
return my_ip.ip
elif isinstance(my_ip.ip, tuple) \
and my_ipv6_route != None:
( my_ipv6, my_ipv6_flowinfo, my_ipv6_scope_id ) = my_ip.ip
if self.is_ipv6_address_(my_ipv6):
my_network_str \
= "{}/{:d}".format(my_ipv6, my_ip.network_prefix)
my_netbase = ipaddress.ip_network(my_network_str, False)
if my_ip_mask in my_netbase:
return my_ipv6
if my_adapter.name == my_if_name:
if my_gateway == None:
my_ip_out = my_ipv6
else:
my_netbase = ipaddress.ip_network(my_network \
+ "/{:d}".format(my_netmask), False)
my_ip_check = ipaddress.ip_address(my_ipv6)
if my_ip_check in my_netbase:
return my_ipv6
return my_ip_out
#
# End of get_src_ip_by_dst_ip_()
def validate_descriptions_(self, in_dict):
if not isinstance(in_dict, dict):
return {}
my_dict = in_dict
for my_key in in_dict.keys():
if not isinstance(my_key, int):
del my_dict[ my_key ]
continue
my_item = in_dict[ my_key ]
if not isinstance(my_item, str):
del my_dict[ my_key ]
continue
my_a = my_item.count('{')
my_b = my_item.count('}')
my_c = my_item.count('{}')
if my_a != my_c or my_b != my_c:
del my_dict[ my_key ]
continue
return my_dict
def irciot_set_locale_(self, in_lang):
if not isinstance(in_lang, str): return
self.lang = in_lang
my_desc = {}
try:
from PyIRCIoT.irciot_errors \
import irciot_get_all_error_descriptions_
my_desc = irciot_get_all_error_descriptions_(in_lang)
my_desc = self.validate_descriptions_(my_desc)
if my_desc != {}:
self.errors.update(my_desc)
except:
pass
def get_config_value_(self, in_item, in_section = None):
if not isinstance(self.__config, dict): return None
if not isinstance(in_item, str): return None
if not in_item in self.__config.keys(): return None
return self.__config[in_item]
def set_config_value_(self, in_item, in_value, in_section = None):
if not isinstance(in_item, str): return False
if not isinstance(self.__config, dict): self.__config = {}
self.__config[in_item] = in_value
return True
def load_config_defaults_(self, in_defaults):
if not isinstance(in_defaults, dict): return False
self.__config = in_defaults
return True
# incomplete
def load_config_file_(self, in_filename, in_defaults = None):
if self.__config is None: self.__config = {}
self.load_config_defaults_(in_defaults)
if not isinstance(in_filename, str): return False
if not os.path.isfile(in_filename): return False
if not os.access(in_filename, os.R_OK):
self._error_handler_(self.CONST.err_LOADCFG, 0)
return False
try:
import configparser
import random
except Exception as my_ex:
self._error_handler_(self.CONST.err_IMPORT, 0, in_addon = str(my_ex))
return False
random.seed()
my_dummy = 'dummy{:d}'.format(random.randint(10000, 99999))
my_parser = configparser.ConfigParser()
try:
file_fd = open(in_filename, 'r')
my_cfg_str = '[{}]\n'.format(my_dummy)
my_cfg_str += file_fd.read(self.max_config_size)
file_fd.close()
my_parser.read_string(my_cfg_str)
my_config = my_parser._sections[my_dummy]
if isinstance(my_config, dict):
for config_key in my_config.keys():
self.__config[config_key] = my_config[config_key]
del my_config
except Exception as my_ex:
self.__error_handler_(self.CONST.err_LOADCFG, 0, in_addon = str(my_ex))
return False
return True
def bot_usage_handler (self):
print('{} ({})'.format(self.bot_name, \
self.errors[self.CONST.err_BASEDON]))
print('\n{}{} {}\n'.format( \
self.errors[self.CONST.err_USAGE], \
sys.argv[0], self.errors[self.CONST.err_OPTIONS]))
def bot_set_options_(self, in_options):
if not isinstance(in_options, str): return False
self.errors[self.CONST.err_OPTIONS] = in_options
return True
def bot_redirect_output_(self, in_filename):
if not isinstance(in_filename, str):
return
try:
io_redir = open(in_filename, 'w+')
except:
return
sys.stdout = io_redir
sys.stderr = io_redir
io_handler1 = io_redir.fileno()
os.dup2(io_handler1, 1)
os.close(1)
io_handler2 = io_redir.fileno()
os.dup2(io_handler2, 2)
os.close(2)
def bot_process_kill_(self):
if self.get_os_name_() in self.CONST.os_all_UNIX:
try:
import signal
os.kill(os.getpid(), signal.SIGKILL)
except:
pass
def bot_process_kill_timeout_(self, in_timeout):
if type(in_timeout) not in [ int, float ]: return
if self.get_os_name_() in self.CONST.os_all_UNIX \
and in_timeout > 0:
try:
import signal
signal.signal(signal.SIGALRM, self.bot_process_kill_)
signal.alarm(in_timeout)
except:
pass
def bot_background_start_(self):
import subprocess
my_count = len(sys.argv)
if my_count == 1:
self.bot_usage_handler ()
elif my_count > 1:
my_list = [ self.bot_python ]
for my_idx in range(1, my_count):
my_list += [ sys.argv[my_idx] ]
if my_list[ my_count - 1 ] != self.bot_background_parameter:
my_list = [ os.path.expanduser(sys.argv[0]) ] + my_list
my_list += [ self.bot_background_parameter ]
print("Starting {} ...".format(self.bot_name))
my_process = subprocess.Popen( my_list )
sys.exit(0)
[ Get back ]
|
IRC-IoT
|