Source code for msg_parser.data_models

# coding=utf-8
from datetime import datetime
from datetime import timedelta
from struct import unpack

from .properties import DATA_TYPE_MAP


[docs]class DataModel(object): def __init__(self): self.data_type_name = None
[docs] @staticmethod def lookup_data_type_name(data_type): return DATA_TYPE_MAP.get(data_type)
[docs] def get_value(self, data_value, data_type_name=None, data_type=None): if data_type_name: self.data_type_name = data_type_name elif data_type: self.data_type_name = self.lookup_data_type_name(data_type) else: raise Exception( "required arguments not provided to the constructor of the class." ) if not hasattr(self, self.data_type_name): return None value = getattr(self, self.data_type_name)(data_value) return value
[docs] @staticmethod def PtypUnspecified(data_value): return data_value
[docs] @staticmethod def PtypNull(_): return None
[docs] @staticmethod def PtypInteger16(data_value): return int(data_value.encode("hex"), 16)
[docs] @staticmethod def PtypInteger32(data_value): return int(data_value.encode("hex"), 32)
[docs] @staticmethod def PtypFloating32(data_value): return unpack("f", data_value)[0]
[docs] @staticmethod def PtypFloating64(data_value): return unpack("d", data_value)[0]
[docs] @staticmethod def PtypCurrency(data_value): return data_value
[docs] @staticmethod def PtypFloatingTime(data_value): return data_value
[docs] @staticmethod def PtypErrorCode(data_value): return unpack("I", data_value)[0]
[docs] @staticmethod def PtypBoolean(data_value): return unpack("B", data_value[0])[0] != 0
[docs] @staticmethod def PtypObject(data_value): if data_value and b"\x00" in data_value: data_value = data_value.replace(b"\x00", b"") return data_value
[docs] @staticmethod def PtypInteger64(data_value): return unpack("q", data_value)[0]
[docs] @staticmethod def PtypString8(data_value): if data_value and b"\x00" in data_value: data_value = data_value.replace(b"\x00", b"") return data_value
[docs] @staticmethod def PtypString(data_value): if data_value: data_value = data_value.decode("utf-16-le", errors="ignore").replace( "\x00", "" ) return data_value
[docs] @staticmethod def PtypTime(data_value): return get_time(data_value)
[docs] @staticmethod def PtypGuid(data_value): return data_value
[docs] @staticmethod def PtypServerId(data_value): return data_value
[docs] @staticmethod def PtypRestriction(data_value): return data_value
[docs] @staticmethod def PtypRuleAction(data_value): return data_value
[docs] @staticmethod def PtypBinary(data_value): if data_value and b"\x00" in data_value: data_value = data_value.replace(b"\x00", b"") return data_value
[docs] @staticmethod def PtypMultipleInteger16(data_value): entry_count = int(len(data_value) / 2) return [unpack("h", bytes[i * 2 : (i + 1) * 2])[0] for i in range(entry_count)]
[docs] @staticmethod def PtypMultipleInteger32(data_value): entry_count = int(len(data_value) / 4) return [unpack("i", bytes[i * 4 : (i + 1) * 4])[0] for i in range(entry_count)]
[docs] @staticmethod def PtypMultipleFloating32(data_value): entry_count = int(len(data_value) / 4) return [unpack("f", bytes[i * 4 : (i + 1) * 4])[0] for i in range(entry_count)]
[docs] @staticmethod def PtypMultipleFloating64(data_value): entry_count = int(len(data_value) / 8) return [unpack("d", bytes[i * 8 : (i + 1) * 8])[0] for i in range(entry_count)]
[docs] @staticmethod def PtypMultipleCurrency(data_value): return data_value
[docs] @staticmethod def PtypMultipleFloatingTime(data_value): entry_count = int(len(data_value) / 8) return [ get_floating_time(bytes[i * 8 : (i + 1) * 8]) for i in range(entry_count) ]
[docs] @staticmethod def PtypMultipleInteger64(data_value): entry_count = int(len(data_value) / 8) return [unpack("q", bytes[i * 8 : (i + 1) * 8])[0] for i in range(entry_count)]
[docs] @staticmethod def PtypMultipleString(data_value): return data_value
# string_list = [] # for item_bytes in data_value: # if item_bytes and '\x00' in item_bytes: # item_bytes = item_bytes.replace('\x00', '') # string_list.append(item_bytes.decode('utf-16-le')) # return string_list
[docs] @staticmethod def PtypMultipleString8(data_value): return data_value
[docs] @staticmethod def PtypMultipleTime(data_value): entry_count = int(len(data_value) / 8) return [get_time(bytes[i * 8 : (i + 1) * 8]) for i in range(entry_count)]
[docs] @staticmethod def PtypMultipleGuid(data_value): entry_count = int(len(data_value) / 16) return [bytes[i * 16 : (i + 1) * 16] for i in range(entry_count)]
[docs] @staticmethod def PtypMultipleBinary(data_value): return data_value
[docs]def get_floating_time(data_value): return datetime(year=1899, month=12, day=30) + timedelta( days=unpack("d", data_value)[0] )
[docs]def get_time(data_value): return datetime(year=1601, month=1, day=1) + timedelta( microseconds=unpack("q", data_value)[0] / 10.0 )
[docs]def get_multi_value_offsets(data_value): ul_count = unpack("I", data_value[:4])[0] if ul_count == 1: rgul_data_offsets = [8] else: rgul_data_offsets = [ unpack("Q", bytes[4 + i * 8 : 4 + (i + 1) * 8])[0] for i in range(ul_count) ] rgul_data_offsets.append(len(data_value)) return ul_count, rgul_data_offsets