def encode_fallback(fallback, currency):
""" Encode all supported fallback addresses.
"""
if currency == 'bc' or currency == 'tb':
fbhrp, witness = bech32_decode(fallback)
if fbhrp:
if fbhrp != currency:
raise ValueError("Not a bech32 address for this currency")
wver = witness[0]
if wver > 16:
raise ValueError("Invalid witness version {}".format(witness[0]))
wprog = u5_to_bitarray(witness[1:])
else:
addr = base58.b58decode_check(fallback)
if is_p2pkh(currency, addr[0]):
wver = 17
elif is_p2sh(currency, addr[0]):
wver = 18
else:
raise ValueError("Unknown address type for {}".format(currency))
wprog = addr[1:]
return tagged('f', bitstring.pack("uint:5", wver) + wprog)
else:
raise NotImplementedError("Support for currency {} not implemented".format(currency))
python类pack()的实例源码
def encode_fallback(fallback, currency):
""" Encode all supported fallback addresses.
"""
if currency == 'bc' or currency == 'tb':
fbhrp, witness = bech32_decode(fallback)
if fbhrp:
if fbhrp != currency:
raise ValueError("Not a bech32 address for this currency")
wver = witness[0]
if wver > 16:
raise ValueError("Invalid witness version {}".format(witness[0]))
wprog = u5_to_bitarray(witness[1:])
else:
addr = base58.b58decode_check(fallback)
if is_p2pkh(currency, addr[0]):
wver = 17
elif is_p2sh(currency, addr[0]):
wver = 18
else:
raise ValueError("Unknown address type for {}".format(currency))
wprog = addr[1:]
return tagged('f', bitstring.pack("uint:5", wver) + wprog)
else:
raise NotImplementedError("Support for currency {} not implemented".format(currency))
def _updateBitString(self):
self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s' % (self.getField("CompanyPrefix").getBitLength(),self.getField("IndividualAssetReference").getBitLength())
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
self.getFieldValue("Header"),
self.getFieldValue("Filter"),
self.getFieldValue("Partition"),
self.getFieldValue("CompanyPrefix"),
self.getFieldValue("IndividualAssetReference"))
#Set the _bits for the SGTIN-96
self._bits = bsp.unpack("bin")[0][2:]
def _updateBitString(self):
self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:24' % (self.getField("CompanyPrefix").getBitLength(),self.getField("ServiceReference").getBitLength())
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
self.getFieldValue("Header"),
self.getFieldValue("Filter"),
self.getFieldValue("Partition"),
self.getFieldValue("CompanyPrefix"),
self.getFieldValue("ServiceReference"),
self.getFieldValue("Reserved"))
#Set the _bits for the GSRN-96
self._bits = bsp.unpack("bin")[0][2:]
def encode(self,generalManager,indicatorDigit,objectClass,filter_value,serialNumber=0):
"""
encodes an GID-96.
"""
#Reload fields
self._loadFields()
#General Manager
generalManagerField = self.getField("GeneralManager")
generalManagerField.setBitLength(28)
generalManagerField.setDigitLength(28/4)
generalManagerField.setOffset(8)
self.setFieldValue("GeneralManager",int(generalManager))
#ObjectClass
objectClassField = self.getField("ObjectClass")
objectClassField.setBitLength(24)
objectClassField.setDigitLength(24/8)
objectClassField.setOffset(36)
self.setFieldValue("ObjectClass",int(objectClass))
#SerialNumber
serialNumberField = self.getField("SerialNumber")
serialNumberField.setBitLength(36)
serialNumberField.setDigitLength(36/4)
self.setFieldValue("SerialNumber",int(serialNumber))
#Set the PackString Format
self._packStringFormat = 'uint:8, uint:28, uint:24, uint:36'
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
int(self.getFieldValue("Header")),
int(self.getFieldValue("GeneralManager")),
int(self.getFieldValue("ObjectClass")),
int(self.getFieldValue("SerialNumber")))
#Set the _bits for the GID-96
self._bits = bsp.unpack("bin")[0][2:]
return self
def _updateBitString(self):
#Set the PackString Format
self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:41' % (self.getField("CompanyPrefix").getBitLength(),self.getField("LocationReference").getBitLength())
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
self.getFieldValue("Header"),
self.getFieldValue("Filter"),
self.getFieldValue("Partition"),
self.getFieldValue("CompanyPrefix"),
self.getFieldValue("LocationReference"),
self.getFieldValue("Extension"))
#Set the _bits for the SGLN-96
self._bits = bsp.unpack("bin")[0][2:]
def _updateBitString(self):
self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:24' % (self.getField("CompanyPrefix").getBitLength(),self.getField("SerialReference").getBitLength())
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
self.getFieldValue("Header"),
self.getFieldValue("Filter"),
self.getFieldValue("Partition"),
self.getFieldValue("CompanyPrefix"),
self.getFieldValue("SerialReference"),
self.getFieldValue("Reserved"))
#Set the _bits for the SSCC-96
self._bits = bsp.unpack("bin")[0][2:]
def _updateBitString(self):
#Set the PackString Format
self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:38' % (self.getField("CompanyPrefix").getBitLength(),self.getField("AssetType").getBitLength())
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
self.getFieldValue("Header"),
self.getFieldValue("Filter"),
self.getFieldValue("Partition"),
self.getFieldValue("CompanyPrefix"),
self.getFieldValue("AssetType"),
self.getFieldValue("Serial"))
#Set the _bits for the GRAI-96
self._bits = bsp.unpack("bin")[0][2:]
def updateBitString(self):
self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:38' % (self.getField("CompanyPrefix").getBitLength(),self.getField("ItemReference").getBitLength())
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
self.getFieldValue("Header"),
self.getFieldValue("Filter"),
self.getFieldValue("Partition"),
self.getFieldValue("CompanyPrefix"),
self.getFieldValue("ItemReference"),
int(self.getFieldValue("SerialNumber")))
#Set the _bits for the SGTIN-96
self._bits = bsp.unpack("bin")[0]
def _updateBitString(self):
#Set the PackString Format
self._packStringFormat = 'uint:8, uint:3, uint:3, uint:%s, uint:%s, uint:58' % (self.getField("CompanyPrefix").getBitLength(),self.getField("DocumentType").getBitLength())
#Pack the bitstring
bsp= bitstring.pack(self._packStringFormat,
self.getFieldValue("Header"),
self.getFieldValue("Filter"),
self.getFieldValue("Partition"),
self.getFieldValue("CompanyPrefix"),
self.getFieldValue("DocumentType"),
self.getFieldValue("Serial"))
#Set the _bits for the GDTI-113
self._bits = bsp.unpack("bin")[0][2:]
def u5_to_bitarray(arr):
ret = bitstring.BitArray()
for a in arr:
ret += bitstring.pack("uint:5", a)
return ret
def tagged(char, l):
# Tagged fields need to be zero-padded to 5 bits.
while l.len % 5 != 0:
l.append('0b0')
return bitstring.pack("uint:5, uint:5, uint:5",
CHARSET.find(char),
(l.len / 5) / 32, (l.len / 5) % 32) + l
# Tagged field containing bytes
def packing(gateway_addr, node_addr , trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp):
return bitstring.pack('bin, bin, bin, bin, bin, bin, bin, bin, bin, bin', gateway_addr, node_addr , trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp)
def return_air_con_mqtt_str_bytes_to_send(str_bin):
units,crc = return_crc(str_bin)
str_bytes=struct.pack('7B', units[0], units[1], units[2], units[3], units[4], units[5], crc)
print(len(str_bytes))
print(repr(str_bytes))
return str_bytes
def mqtt_pub_air_con(data):
# {'node_select': 2, 'wind_speed': 2, 'temp_setting': 28, 'wind_directtion': 1, 'switch': 1, 'working_model': 1}
node_addr = bitstring.pack('uint:13',data['node_select']).bin
wind_direct = bitstring.pack('uint:2',data['wind_directtion']).bin
wind_speed = bitstring.pack('uint:2',data['wind_speed']).bin
on_off = bitstring.pack('uint:2',data['switch']).bin
work_mode = bitstring.pack('uint:3',data['working_model']).bin
temp = bitstring.pack('uint:5',data['temp_setting']).bin
str_bin = return_str_bin(node_addr, wind_direct, wind_speed, on_off, work_mode, temp)
str_bytes = return_air_con_mqtt_str_bytes_to_send(str_bin)
transmitMQTT(str_bytes)
def mqtt_pub_node_setting():
gateway_addr = '0b001' # 1
node_addr = '0b0000000010100' # 1
trans_direct = '0b1' # 1
func_code = '0b0010010' # 18
new_gateway_addr = '0b001'
# new_node_addr = '0b0000000001111'
new_node_addr = node_addr
reserve = '0b0000'
sleep_time = '0b0100101100' #5 minute
# sleep_time = '0b0000110111' #55 second
# sleep_time = '0b0001111000' #120 second
send_power = '0b11'
str_replaced = replace_0b(gateway_addr) + replace_0b(node_addr) + replace_0b(trans_direct) + replace_0b(func_code) + replace_0b(new_gateway_addr) + replace_0b(new_node_addr) + replace_0b(reserve) + replace_0b(sleep_time) + replace_0b(send_power)
print('str_repalced:', str_replaced)
str_bin = BitStream('0b' + str_replaced)
print('str_bin:', str_bin)
units, crc = return_crc(str_bin)
str_bytes = struct.pack('8B', units[0], units[1], units[2], units[3], units[4], units[5], units[6], crc)
print(str_bytes)
print(len(str_bytes))
print(repr(str_bytes))
transmitMQTT(str_bytes)
def replace_0b(input):
return input.replace('0b', '')
# str_bin = return_str_bin(gateway_addr, node_addr, trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp)
# str_bin = bitstring.pack(gateway_addr, node_addr, trans_direct, func_code, wind_direct, wind_speed, model, on_off, work_mode, temp)
def bitwise_reverse(int_src):
bin_str = bitstring.pack('uint:8',int_src).bin
ret = "".join(map(lambda x: "1" if x == "0" else "0", bin_str))
return int(ret, 2)
def padding(length): # pragma: no cover
def make_padding_field(parent, **field_options):
def encode_pad(value):
return pack('pad:n', n=value)
def decode_pad(encoded):
return None
return BreadField(
length, encode_pad, decode_pad,
str_format=field_options.get('str_format', None))
return make_padding_field
def u5_to_bitarray(arr):
ret = bitstring.BitArray()
for a in arr:
ret += bitstring.pack("uint:5", a)
return ret
def tagged(char, l):
# Tagged fields need to be zero-padded to 5 bits.
while l.len % 5 != 0:
l.append('0b0')
return bitstring.pack("uint:5, uint:5, uint:5",
CHARSET.find(char),
(l.len / 5) / 32, (l.len / 5) % 32) + l
# Tagged field containing bytes
def get_payload(self):
self.payload_fields.append(("Service", self.service))
self.payload_fields.append(("Port", self.port))
service = little_endian(bitstring.pack("8", self.service))
port = little_endian(bitstring.pack("32", self.port))
payload = service + port
return payload
def get_payload(self):
self.payload_fields.append(("Signal (mW)", self.signal))
self.payload_fields.append(("TX (bytes since on)", self.tx))
self.payload_fields.append(("RX (bytes since on)", self.rx))
self.payload_fields.append(("Reserved", self.reserved1))
signal = little_endian(bitstring.pack("32", self.signal))
tx = little_endian(bitstring.pack("32", self.tx))
rx = little_endian(bitstring.pack("32", self.rx))
reserved1 = little_endian(bitstring.pack("16", self.reserved1))
payload = signal + tx + rx + reserved1
return payload
def get_payload(self):
self.payload_fields.append(("Timestamp of Build", self.build))
self.payload_fields.append(("Reserved", self.reserved1))
self.payload_fields.append(("Version", self.version))
build = little_endian(bitstring.pack("64", self.build))
reserved1 = little_endian(bitstring.pack("64", self.reserved1))
version = little_endian(bitstring.pack("32", self.version))
payload = build + reserved1 + version
return payload
def get_payload(self):
self.payload_fields.append(("Signal (mW)", self.signal))
self.payload_fields.append(("TX (bytes since on)", self.tx))
self.payload_fields.append(("RX (bytes since on)", self.rx))
self.payload_fields.append(("Reserved", self.reserved1))
signal = little_endian(bitstring.pack("32", self.signal))
tx = little_endian(bitstring.pack("32", self.tx))
rx = little_endian(bitstring.pack("32", self.rx))
reserved1 = little_endian(bitstring.pack("16", self.reserved1))
payload = signal + tx + rx + reserved1
return payload
def get_payload(self):
self.payload_fields.append(("Timestamp of Build", self.build))
self.payload_fields.append(("Reserved", self.reserved1))
self.payload_fields.append(("Version", self.version))
build = little_endian(bitstring.pack("64", self.build))
reserved1 = little_endian(bitstring.pack("64", self.reserved1))
version = little_endian(bitstring.pack("32", self.version))
payload = build + reserved1 + version
return payload
def get_payload(self):
self.payload_fields.append(("Power", self.power_level))
power_level = little_endian(bitstring.pack("16", self.power_level))
payload = power_level
return payload
def get_payload(self):
self.payload_fields.append(("Label", self.label))
field_len_bytes = 32
label = b"".join(little_endian(bitstring.pack("8", ord(c))) for c in self.label)
padding = b"".join(little_endian(bitstring.pack("8", 0)) for i in range(field_len_bytes-len(self.label)))
payload = label + padding
return payload
def get_payload(self):
self.payload_fields.append(("Label", self.label))
field_len_bytes = 32
label = b"".join(little_endian(bitstring.pack("8", ord(c))) for c in self.label)
padding = b"".join(little_endian(bitstring.pack("8", 0)) for i in range(field_len_bytes-len(self.label)))
payload = label + padding
return payload
def get_payload(self):
self.payload_fields.append(("Vendor", self.vendor))
self.payload_fields.append(("Reserved", self.product))
self.payload_fields.append(("Version", self.version))
vendor = little_endian(bitstring.pack("32", self.vendor))
product = little_endian(bitstring.pack("32", self.product))
version = little_endian(bitstring.pack("32", self.version))
payload = vendor + product + version
return payload