def read(self):
self.fileptr.seek(self.offset, 0)
rec_fmt = '=LBBHLLHHlBBH'
rec_len = struct.calcsize(rec_fmt)
rec_unpack = struct.Struct(rec_fmt).unpack_from
s = rec_unpack(self.fileptr.read(rec_len))
self.STX = s[1]
self.typeOfDatagram = chr(s[2])
self.EMModel = s[3]
self.RecordDate = s[4]
self.Time = float(s[5]/1000.0)
self.Counter = s[6]
self.SerialNumber = s[7]
self.Height = float (s[8] / float (100))
self.HeightType = s[9]
# now read the footer
self.ETX, self.checksum = readFooter(self.numberOfBytes, self.fileptr)
###############################################################################
python类Struct()的实例源码
def readFooter(numberOfBytes, fileptr):
rec_fmt = '=BH'
rec_len = struct.calcsize(rec_fmt)
rec_unpack = struct.Struct(rec_fmt).unpack_from
s = rec_unpack(fileptr.read(rec_len))
ETX = s[0]
checksum = s[1]
# self.DatagramAsReceived = s[0].decode('utf-8').rstrip('\x00')
# if numberOfBytes % 2 == 0:
# # skip the spare byte
# ETX = s[2]
# checksum = s[3]
# else:
# ETX = s[1]
# checksum = s[2]
# #read any trailing bytes. We have seen the need for this with some .all files.
# if bytesRead < self.numberOfBytes:
# self.fileptr.read(int(self.numberOfBytes - bytesRead))
return ETX, checksum
###############################################################################
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
# Helper function for a85encode and b85encode
if not isinstance(b, bytes_types):
b = memoryview(b).tobytes()
padding = (-len(b)) % 4
if padding:
b = b + b'\0' * padding
words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)
chunks = [b'z' if foldnuls and not word else
b'y' if foldspaces and word == 0x20202020 else
(chars2[word // 614125] +
chars2[word // 85 % 7225] +
chars[word % 85])
for word in words]
if padding and not pad:
if chunks[-1] == b'z':
chunks[-1] = chars[0] * 5
chunks[-1] = chunks[-1][:-padding]
return b''.join(chunks)
def get_packer(self, obj):
has_bitmap, none_bitmap, present_bitmap = self._get_bitmaps(obj)
rv = self.packer_cache.get(present_bitmap)
if rv is None:
packer = struct.Struct("".join([
self.bitmap_packer.format,
] + [
self.slot_struct_types[slot]
for i,slot in enumerate(self.slot_keys)
if present_bitmap & (cython.cast(cython.ulonglong, 1) << i)
]))
alignment = self.alignment
size = packer.size
padding = (size + alignment - 1) / alignment * alignment - size
self.packer_cache[present_bitmap] = rv = (packer, padding)
return rv
def get_unpacker(self, has_bitmap, none_bitmap):
present_bitmap = has_bitmap & ~none_bitmap
if self._last_unpacker is not None and present_bitmap == self._last_unpacker_bitmap:
return self._last_unpacker
rv = self.unpacker_cache.get(present_bitmap)
if rv is None:
pformat = "".join([
self.slot_struct_types[slot]
for i,slot in enumerate(self.slot_keys)
if present_bitmap & (cython.cast(cython.ulonglong, 1) << i)
])
unpacker = struct.Struct(pformat)
alignment = self.alignment
size = unpacker.size
padding = (size + self.bitmap_size + alignment - 1) / alignment * alignment - size
gfactory = GenericProxyClass(self.slot_keys, self.slot_types, present_bitmap, self.bitmap_size)
rv = (unpacker, padding, pformat, gfactory)
self.unpacker_cache[present_bitmap] = rv
self._last_unpacker_bitmap = present_bitmap
self._last_unpacker = rv
return rv
def parse_station_record(self, line):
fieldnames = ['StationCode', 'StationName', 'DateStart', 'DateEnd', 'AntennaHeight', 'HeightCode', 'AntennaNorth', 'AntennaEast',
'ReceiverCode', 'ReceiverVers', 'ReceiverFirmware', 'ReceiverSerial', 'AntennaCode', 'RadomeCode', 'AntennaSerial']
fieldwidths = (
1, 6, 18, 19, 19, 9, 7, 9, 9, 22, 22, 7, 22, 17, 7, 20) # negative widths represent ignored padding fields
fmtstring = ' '.join('{}{}'.format(abs(fw), 'x' if fw < 0 else 's') for fw in fieldwidths)
fieldstruct = struct.Struct(fmtstring)
parse = fieldstruct.unpack_from
if line[0] == ' ' and len(line) >= 77:
record = dict(zip(fieldnames, map(str.strip, parse(line.ljust(fieldstruct.size))[1:])))
else:
return None
# convert to datetime object
DateStart, DateEnd = self.stninfodate2datetime(record['DateStart'], record['DateEnd'])
record['DateStart'] = DateStart
record['DateEnd'] = DateEnd
record['StationCode'] = record['StationCode'].lower()
return record
def __init__(self, cloexec=True, nonblock=True):
self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
flags = 0
if cloexec:
flags |= self.CLOEXEC
if nonblock:
flags |= self.NONBLOCK
self._inotify_fd = self._init1(flags)
if self._inotify_fd == -1:
raise INotifyError(os.strerror(ctypes.get_errno()))
self._buf = ctypes.create_string_buffer(5000)
self.fenc = get_preferred_file_name_encoding()
self.hdr = struct.Struct(b'iIII')
# We keep a reference to os to prevent it from being deleted
# during interpreter shutdown, which would lead to errors in the
# __del__ method
self.os = os
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
# Helper function for a85encode and b85encode
if not isinstance(b, bytes_types):
b = memoryview(b).tobytes()
padding = (-len(b)) % 4
if padding:
b = b + b'\0' * padding
words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)
chunks = [b'z' if foldnuls and not word else
b'y' if foldspaces and word == 0x20202020 else
(chars2[word // 614125] +
chars2[word // 85 % 7225] +
chars[word % 85])
for word in words]
if padding and not pad:
if chunks[-1] == b'z':
chunks[-1] = chars[0] * 5
chunks[-1] = chunks[-1][:-padding]
return b''.join(chunks)
def _binary_write(filepath, faces):
with open(filepath, 'wb') as data:
fw = data.write
# header
# we write padding at header beginning to avoid to
# call len(list(faces)) which may be expensive
fw(struct.calcsize('<80sI') * b'\0')
# 3 vertex == 9f
pack = struct.Struct('<9f').pack
# number of vertices written
nb = 0
for face in faces:
# calculate face normal
# write normal + vertexes + pad as attributes
fw(struct.pack('<3f', *normal(*face)) + pack(*itertools.chain.from_iterable(face)))
# attribute byte count (unused)
fw(b'\0\0')
nb += 1
# header, with correct value now
data.seek(0)
fw(struct.pack('<80sI', _header_version().encode('ascii'), nb))
def read(self):
self.fileptr.seek(self.offset, 0)
rec_fmt = '=LBBHLLHHLLBBH'
rec_len = struct.calcsize(rec_fmt)
rec_unpack = struct.Struct(rec_fmt).unpack
# bytesRead = rec_len
s = rec_unpack(self.fileptr.read(rec_len))
# self.numberOfBytes = s[0]
self.STX = s[1]
self.typeOfDatagram = chr(s[2])
self.EMModel = s[3]
self.RecordDate = s[4]
self.Time = float(s[5]/1000.0)
self.ClockCounter = s[6]
self.SerialNumber = s[7]
self.ExternalDate = s[8]
self.ExternalTime = s[9]
self.PPS = s[10]
self.ETX = s[11]
self.checksum = s[12]
###############################################################################
def test_unpack_from(self):
test_string = b'abcd01234'
fmt = '4s'
s = struct.Struct(fmt)
for cls in (bytes, bytearray):
data = cls(test_string)
self.assertEqual(s.unpack_from(data), (b'abcd',))
self.assertEqual(s.unpack_from(data, 2), (b'cd01',))
self.assertEqual(s.unpack_from(data, 4), (b'0123',))
for i in range(6):
self.assertEqual(s.unpack_from(data, i), (data[i:i+4],))
for i in range(6, len(test_string) + 1):
self.assertRaises(struct.error, s.unpack_from, data, i)
for cls in (bytes, bytearray):
data = cls(test_string)
self.assertEqual(struct.unpack_from(fmt, data), (b'abcd',))
self.assertEqual(struct.unpack_from(fmt, data, 2), (b'cd01',))
self.assertEqual(struct.unpack_from(fmt, data, 4), (b'0123',))
for i in range(6):
self.assertEqual(struct.unpack_from(fmt, data, i), (data[i:i+4],))
for i in range(6, len(test_string) + 1):
self.assertRaises(struct.error, struct.unpack_from, fmt, data, i)
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
# Helper function for a85encode and b85encode
if not isinstance(b, bytes_types):
b = memoryview(b).tobytes()
padding = (-len(b)) % 4
if padding:
b = b + b'\0' * padding
words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)
chunks = [b'z' if foldnuls and not word else
b'y' if foldspaces and word == 0x20202020 else
(chars2[word // 614125] +
chars2[word // 85 % 7225] +
chars[word % 85])
for word in words]
if padding and not pad:
if chunks[-1] == b'z':
chunks[-1] = chars[0] * 5
chunks[-1] = chunks[-1][:-padding]
return b''.join(chunks)
def parse(cls, buff, offset):
"""
Given a buffer and offset, returns the parsed value and new offset.
Parses the ``size_primitive`` first to determine how many more bytes to
consume to extract the value.
"""
size, offset = cls.size_primitive.parse(buff, offset)
if size == -1:
return None, offset
var_struct = struct.Struct("!%ds" % size)
value = var_struct.unpack_from(buff, offset)[0]
value = cls.parse_value(value)
offset += var_struct.size
return value, offset
def send_message(self, message):
""" Send the message to client
:param message: the message to send
"""
output = self.request.wfile
output.write(struct.Struct(">B").pack(129))
length = len(message)
if length <= 125:
output.write(struct.Struct(">B").pack(length))
elif length >= 126 and length <= 65535:
output.write(struct.Struct(">B").pack(126))
output.write(struct.pack(">H", length))
else:
output.write(struct.Struct(">B").pack(127))
output.write(struct.pack(">Q", length))
output.write(message)
logging.debug(message)
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
# Helper function for a85encode and b85encode
if not isinstance(b, bytes_types):
b = memoryview(b).tobytes()
padding = (-len(b)) % 4
if padding:
b = b + b'\0' * padding
words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)
chunks = [b'z' if foldnuls and not word else
b'y' if foldspaces and word == 0x20202020 else
(chars2[word // 614125] +
chars2[word // 85 % 7225] +
chars[word % 85])
for word in words]
if padding and not pad:
if chunks[-1] == b'z':
chunks[-1] = chars[0] * 5
chunks[-1] = chunks[-1][:-padding]
return b''.join(chunks)
def _85encode(b, chars, chars2, pad=False, foldnuls=False, foldspaces=False):
# Helper function for a85encode and b85encode
if not isinstance(b, bytes_types):
b = memoryview(b).tobytes()
padding = (-len(b)) % 4
if padding:
b = b + b'\0' * padding
words = struct.Struct('!%dI' % (len(b) // 4)).unpack(b)
chunks = [b'z' if foldnuls and not word else
b'y' if foldspaces and word == 0x20202020 else
(chars2[word // 614125] +
chars2[word // 85 % 7225] +
chars[word % 85])
for word in words]
if padding and not pad:
if chunks[-1] == b'z':
chunks[-1] = chars[0] * 5
chunks[-1] = chunks[-1][:-padding]
return b''.join(chunks)
def test_ceph_key(self, mock_urandom):
result = utils.JinjaUtils.ceph_key()
# First, decode the base64
raw_result = base64.b64decode(result.encode('ascii'))
# Decompose into a header and a key
hdr_struct = struct.Struct('<hiih')
header = raw_result[:hdr_struct.size]
key = raw_result[hdr_struct.size:]
# Interpret the header
_type, _secs, _nanosecs, key_len = hdr_struct.unpack(header)
assert key_len == len(key)
# Verify that the key is what it should be
assert key == b'0123456789012345'
def __new__(mcs, clsname, clsbases, clsdict):
headers = clsdict.get('__header__', [])
if headers:
header_attrs, header_fmt = zip(*headers)
header_format_order = clsdict.get('__byte_order__', '>')
header_format = [header_format_order] + list(header_fmt)
header_struct = struct.Struct(''.join(header_format))
clsdict['__slots__'] = ('_fields', '_view', '_payload')
clsdict['_header_fields'] = tuple(header_attrs)
clsdict['_header_bytes_order'] = header_format_order
clsdict['_header_struct'] = header_struct
clsdict['_header_size'] = header_struct.size
return type.__new__(mcs, clsname, clsbases, clsdict)
def b85decode(b):
_b85dec = [None] * 256
for i, c in enumerate(iterbytes(_b85alphabet)):
_b85dec[c] = i
padding = (-len(b)) % 5
b = b + b'~' * padding
out = []
packI = struct.Struct('!I').pack
for i in range(0, len(b), 5):
chunk = b[i:i + 5]
acc = 0
try:
for c in iterbytes(chunk):
acc = acc * 85 + _b85dec[c]
except TypeError:
for j, c in enumerate(iterbytes(chunk)):
if _b85dec[c] is None:
raise ValueError(
'bad base85 character at position %d' % (i + j)
)
raise
try:
out.append(packI(acc))
except struct.error:
raise ValueError('base85 overflow in hunk starting at byte %d'
% i)
result = b''.join(out)
if padding:
result = result[:-padding]
return result
def __init__(self,format_string):
self._struct = _Struct(format_string)
def _unpack_bytes_from_pybuffer(buf, offs, idmap):
if idmap is not None and offs in idmap:
return idmap[offs]
if cython.compiled:
try:
buf = _likebuffer(buf)
PyObject_GetBuffer(buf, cython.address(pybuf), PyBUF_SIMPLE) # lint:ok
rv = _unpack_bytes_from_cbuffer(cython.cast(cython.p_char, pybuf.buf), offs, pybuf.len, None) # lint:ok
finally:
PyBuffer_Release(cython.address(pybuf)) # lint:ok
else:
hpacker = struct.Struct('=H')
objlen = hpacker.unpack_from(buf, offs)[0]
offs = int(offs)
dataoffs = offs + hpacker.size
compressed = (objlen & 0x8000) != 0
if (objlen & 0x7FFF) == 0x7FFF:
qpacker = struct.Struct('=HQ')
objlen = qpacker.unpack_from(buf, offs)[1]
dataoffs = offs + qpacker.size
else:
objlen = objlen & 0x7FFF
rv = buffer(buf, dataoffs, objlen)
if compressed:
rv = lz4_decompress(rv)
else:
rv = bytes(rv)
if idmap is not None:
idmap[offs] = rv
return rv
def set_nomenclature(self):
"""
As in get_nomenclature, but set the title of the file header
in the file, encoded as a pascal string containing
15 characters and stored as 16 bytes of binary data.
"""
self.file.seek(0)
title = 'DAC2 objects'
st = struct.Struct( '<B15sH' )
header_rec = [len(title), title, 18] # constant header
header_chr = st.pack( *header_rec )
self.header_size = len( header_chr )
self.file.write( header_chr )
def set_nomenclature(self):
"""
As in get_nomenclature, but set the title of the file header
in the file, encoded as a pascal string containing
15 characters and stored as 16 bytes of binary data.
"""
self.file.seek(0)
title = 'DAC2 objects'
st = struct.Struct( '<B15sH' )
header_rec = [len(title), title, 18] # constant header
header_chr = st.pack( *header_rec )
self.header_size = len( header_chr )
self.file.write( header_chr )
def _make_packer(format_string):
packer = struct.Struct(format_string)
pack = packer.pack
unpack = lambda s: packer.unpack(s)[0]
return pack, unpack
def b85decode(b):
_b85dec = [None] * 256
for i, c in enumerate(iterbytes(_b85alphabet)):
_b85dec[c] = i
padding = (-len(b)) % 5
b = b + b'~' * padding
out = []
packI = struct.Struct('!I').pack
for i in range(0, len(b), 5):
chunk = b[i:i + 5]
acc = 0
try:
for c in iterbytes(chunk):
acc = acc * 85 + _b85dec[c]
except TypeError:
for j, c in enumerate(iterbytes(chunk)):
if _b85dec[c] is None:
raise ValueError(
'bad base85 character at position %d' % (i + j)
)
raise
try:
out.append(packI(acc))
except struct.error:
raise ValueError('base85 overflow in hunk starting at byte %d'
% i)
result = b''.join(out)
if padding:
result = result[:-padding]
return result
def b85decode(b):
_b85dec = [None] * 256
for i, c in enumerate(iterbytes(_b85alphabet)):
_b85dec[c] = i
padding = (-len(b)) % 5
b = b + b'~' * padding
out = []
packI = struct.Struct('!I').pack
for i in range(0, len(b), 5):
chunk = b[i:i + 5]
acc = 0
try:
for c in iterbytes(chunk):
acc = acc * 85 + _b85dec[c]
except TypeError:
for j, c in enumerate(iterbytes(chunk)):
if _b85dec[c] is None:
raise ValueError(
'bad base85 character at position %d' % (i + j)
)
raise
try:
out.append(packI(acc))
except struct.error:
raise ValueError('base85 overflow in hunk starting at byte %d'
% i)
result = b''.join(out)
if padding:
result = result[:-padding]
return result
def __init__(self, rich_format):
self.rich_fmt = rich_format
self.fmt = format_from_rich(rich_format)
self.compiled = struct.Struct(self.fmt)
self.size = self.compiled.size
def __init__(self, rich_format):
self.rich_fmt = rich_format
self.fmt = format_from_rich(rich_format)
self.compiled = struct.Struct(self.fmt)
self.size = self.compiled.size
def b85decode(b):
_b85dec = [None] * 256
for i, c in enumerate(iterbytes(_b85alphabet)):
_b85dec[c] = i
padding = (-len(b)) % 5
b = b + b'~' * padding
out = []
packI = struct.Struct('!I').pack
for i in range(0, len(b), 5):
chunk = b[i:i + 5]
acc = 0
try:
for c in iterbytes(chunk):
acc = acc * 85 + _b85dec[c]
except TypeError:
for j, c in enumerate(iterbytes(chunk)):
if _b85dec[c] is None:
raise ValueError(
'bad base85 character at position %d' % (i + j)
)
raise
try:
out.append(packI(acc))
except struct.error:
raise ValueError('base85 overflow in hunk starting at byte %d'
% i)
result = b''.join(out)
if padding:
result = result[:-padding]
return result
def b85decode(b):
_b85dec = [None] * 256
for i, c in enumerate(iterbytes(_b85alphabet)):
_b85dec[c] = i
padding = (-len(b)) % 5
b = b + b'~' * padding
out = []
packI = struct.Struct('!I').pack
for i in range(0, len(b), 5):
chunk = b[i:i + 5]
acc = 0
try:
for c in iterbytes(chunk):
acc = acc * 85 + _b85dec[c]
except TypeError:
for j, c in enumerate(iterbytes(chunk)):
if _b85dec[c] is None:
raise ValueError(
'bad base85 character at position %d' % (i + j)
)
raise
try:
out.append(packI(acc))
except struct.error:
raise ValueError('base85 overflow in hunk starting at byte %d'
% i)
result = b''.join(out)
if padding:
result = result[:-padding]
return result