def _read_eof(self):
# We've read to the end of the file, so we have to rewind in order
# to reread the 8 bytes containing the CRC and the file size.
# We check the that the computed CRC and size of the
# uncompressed data matches the stored values. Note that the size
# stored is the true file size mod 2**32.
self.fileobj.seek(-8, 1)
crc32 = read32(self.fileobj)
isize = read32(self.fileobj) # may exceed 2GB
if crc32 != self.crc:
raise IOError("CRC check failed %s != %s" % (hex(crc32),
hex(self.crc)))
elif isize != (self.size & 0xffffffffL):
raise IOError, "Incorrect length of data produced"
# Gzip files can be padded with zeroes and still have archives.
# Consume all zero bytes and set the file position to the first
# non-zero byte. See http://www.gzip.org/#faq8
c = "\x00"
while c == "\x00":
c = self.fileobj.read(1)
if c:
self.fileobj.seek(-1, 1)
python类crc32()的实例源码
def crc32(self):
"""
A little bit old-fashioned, but some encodings like yEnc require that
a crc32 value be used. This calculates it based on the file
"""
# block size defined as 2**16
block_size = 65536
# The mask to apply to all CRC checking
BIN_MASK = 0xffffffffL
# Initialize
_crc = 0
if self.open(mode=NNTPFileMode.BINARY_RO):
for chunk in iter(lambda: self.stream.read(block_size), b''):
_crc = crc32(chunk, _crc)
return format(_crc & BIN_MASK, '08x')
return None
def infer_id(self):
representation = self.__repr__(ignore_id=True)
# Clean the representation
representation = representation\
.replace(':bytes ', ':string ')\
.replace('?bytes ', '?string ')\
.replace('<', ' ').replace('>', '')\
.replace('{', '').replace('}', '')
representation = re.sub(
r' \w+:flags\.\d+\?true',
r'',
representation
)
return crc32(representation.encode('ascii'))
def write(self,data):
self._check_closed()
if self.mode != WRITE:
import errno
raise IOError(errno.EBADF, "write() on read-only GzipFile object")
if self.fileobj is None:
raise ValueError, "write() on closed GzipFile object"
# Convert data type if called by io.BufferedWriter.
if isinstance(data, memoryview):
data = data.tobytes()
if len(data) > 0:
self.size = self.size + len(data)
self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
self.fileobj.write( self.compress.compress(data) )
self.offset += len(data)
return len(data)
def asciiDecompress(code):
""" decompress result of asciiCompress """
code = base64.decodestring(code)
csum = zlib.crc32(code)
data = zlib.decompress(code)
return data, csum
def test_img_cdn_hard_rewrite(self):
"""????html?CDN??? https://httpbin.org/"""
# ?????, ????CDN
self.rv = self.client.get(
self.url("/image/jpeg"),
environ_base=env(),
headers=headers()
) # type: Response
# ??flaks???, ???????????, ???????????
self.assertEqual("image/jpeg", self.rv.content_type, msg=self.dump())
self.assertEqual(200, self.rv.status_code, msg=self.dump())
self.assertEqual(0x97ca823f, crc32(self.rv.data), msg=self.dump())
with self.app.test_client() as c:
# ???? https://httpbin.org/image/jpeg ???, ??????????????CDN
self.rv2 = c.get(
self.url("/"),
environ_base=env(),
headers=headers()
) # type: Response
self.assertIn(b"cdn2.zmirror-unittest.com/image/jpeg", self.rv2.data, msg=self.dump())
def write(self,data):
self._check_closed()
if self.mode != WRITE:
import errno
raise IOError(errno.EBADF, "write() on read-only GzipFile object")
if self.fileobj is None:
raise ValueError, "write() on closed GzipFile object"
# Convert data type if called by io.BufferedWriter.
if isinstance(data, memoryview):
data = data.tobytes()
if len(data) > 0:
self.size = self.size + len(data)
self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
self.fileobj.write( self.compress.compress(data) )
self.offset += len(data)
return len(data)
def _read_eof(self):
# We've read to the end of the file, so we have to rewind in order
# to reread the 8 bytes containing the CRC and the file size.
# We check the that the computed CRC and size of the
# uncompressed data matches the stored values. Note that the size
# stored is the true file size mod 2**32.
self.fileobj.seek(-8, 1)
crc32 = read32(self.fileobj)
isize = read32(self.fileobj) # may exceed 2GB
if crc32 != self.crc:
raise IOError("CRC check failed %s != %s" % (hex(crc32),
hex(self.crc)))
elif isize != (self.size & 0xffffffffL):
raise IOError, "Incorrect length of data produced"
# Gzip files can be padded with zeroes and still have archives.
# Consume all zero bytes and set the file position to the first
# non-zero byte. See http://www.gzip.org/#faq8
c = "\x00"
while c == "\x00":
c = self.fileobj.read(1)
if c:
self.fileobj.seek(-1, 1)
def load_data(self, record):
if isinstance(record,str):
# parse the text record
record = self.parse_station_record(record)
# loads the object from a database object
self.ReceiverCode = record['ReceiverCode']
self.ReceiverSerial = record['ReceiverSerial']
self.ReceiverFirmware = record['ReceiverFirmware']
self.AntennaCode = record['AntennaCode']
self.AntennaSerial = record['AntennaSerial']
self.AntennaHeight = float(record['AntennaHeight'])
self.AntennaNorth = float(record['AntennaNorth'])
self.AntennaEast = float(record['AntennaEast'])
self.HeightCode = record['HeightCode']
self.RadomeCode = record['RadomeCode']
self.ReceiverVers = record['ReceiverVers']
# create a hash record using the station information
# use only the information that can actually generate a change in the antenna position
self.hash = zlib.crc32('%.3f %.3f %.3f %s %s' % (self.AntennaNorth, self.AntennaEast, self.AntennaHeight, self.AntennaCode, self.RadomeCode))
return
def crc32_value(file_path, block_size=1048576):
"""
Calculate the CRC32 value of the data of the specified file.
NOTE: this function's result equals to other software generated.
but not equals to the value got by onedrive.
So we should use this function in our system.
:param str file_path:
:param int block_size:
:return int:
"""
crc = 0
with open(file_path, 'rb') as f:
while True:
data = f.read(block_size)
if not data:
break
crc = zlib.crc32(data, crc)
return format(crc & 0xFFFFFFFF, '08x').upper()
def infer_id(self):
representation = self.__repr__(ignore_id=True)
# Clean the representation
representation = representation\
.replace(':bytes ', ':string ')\
.replace('?bytes ', '?string ')\
.replace('<', ' ').replace('>', '')\
.replace('{', '').replace('}', '')
representation = re.sub(
r' \w+:flags\.\d+\?true',
r'',
representation
)
return crc32(representation.encode('ascii'))
def _recv_tcp_full(self):
"""
Receives a message from the network,
internally encoded using the TCP full protocol.
May raise InvalidChecksumError if the received data doesn't
match its valid checksum.
:return: the read message payload.
"""
packet_len_seq = self.read(8) # 4 and 4
packet_len, seq = struct.unpack('<ii', packet_len_seq)
body = self.read(packet_len - 12)
checksum = struct.unpack('<I', self.read(4))[0]
valid_checksum = crc32(packet_len_seq + body)
if checksum != valid_checksum:
raise InvalidChecksumError(checksum, valid_checksum)
return body
def write(self,data):
self._check_closed()
if self.mode != WRITE:
import errno
raise IOError(errno.EBADF, "write() on read-only GzipFile object")
if self.fileobj is None:
raise ValueError, "write() on closed GzipFile object"
# Convert data type if called by io.BufferedWriter.
if isinstance(data, memoryview):
data = data.tobytes()
if len(data) > 0:
self.fileobj.write(self.compress.compress(data))
self.size += len(data)
self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
self.offset += len(data)
return len(data)
def _read_eof(self):
# We've read to the end of the file, so we have to rewind in order
# to reread the 8 bytes containing the CRC and the file size.
# We check the that the computed CRC and size of the
# uncompressed data matches the stored values. Note that the size
# stored is the true file size mod 2**32.
self.fileobj.seek(-8, 1)
crc32 = read32(self.fileobj)
isize = read32(self.fileobj) # may exceed 2GB
if crc32 != self.crc:
raise IOError("CRC check failed %s != %s" % (hex(crc32),
hex(self.crc)))
elif isize != (self.size & 0xffffffffL):
raise IOError, "Incorrect length of data produced"
# Gzip files can be padded with zeroes and still have archives.
# Consume all zero bytes and set the file position to the first
# non-zero byte. See http://www.gzip.org/#faq8
c = "\x00"
while c == "\x00":
c = self.fileobj.read(1)
if c:
self.fileobj.seek(-1, 1)
def write(self,data):
self._check_closed()
if self.mode != WRITE:
import errno
raise IOError(errno.EBADF, "write() on read-only GzipFile object")
if self.fileobj is None:
raise ValueError, "write() on closed GzipFile object"
# Convert data type if called by io.BufferedWriter.
if isinstance(data, memoryview):
data = data.tobytes()
if len(data) > 0:
self.fileobj.write(self.compress.compress(data))
self.size += len(data)
self.crc = zlib.crc32(data, self.crc) & 0xffffffffL
self.offset += len(data)
return len(data)
def _read_eof(self):
# We've read to the end of the file, so we have to rewind in order
# to reread the 8 bytes containing the CRC and the file size.
# We check the that the computed CRC and size of the
# uncompressed data matches the stored values. Note that the size
# stored is the true file size mod 2**32.
self.fileobj.seek(-8, 1)
crc32 = read32(self.fileobj)
isize = read32(self.fileobj) # may exceed 2GB
if crc32 != self.crc:
raise IOError("CRC check failed %s != %s" % (hex(crc32),
hex(self.crc)))
elif isize != (self.size & 0xffffffffL):
raise IOError, "Incorrect length of data produced"
# Gzip files can be padded with zeroes and still have archives.
# Consume all zero bytes and set the file position to the first
# non-zero byte. See http://www.gzip.org/#faq8
c = "\x00"
while c == "\x00":
c = self.fileobj.read(1)
if c:
self.fileobj.seek(-1, 1)
def compute_campaign_digests(test_campaign):
dc = b""
for ts in test_campaign:
dts = b""
for t in ts:
dt = t.test.strip().encode('ascii')
t.crc = crc32(dt)
dts += b"\0"+dt
ts.crc = crc32(dts)
dc += b"\0\x01"+dts
test_campaign.crc = crc32(dc)
if type(test_campaign.filename) is str and test_campaign.filename != '<stdin>':
test = open(test_campaign.filename, 'rb').read()
elif test_campaign.filename == '<stdin>':
test = sys.stdin.read().encode('ascii')
else:
raise Exception("Unknown test source %s" % test_campaign.filename)
test_campaign.sha = sha1(test)
#### FILTER CAMPAIGN #####
def pack_into(self, buff, offset):
"""Serialize and write to ``buff`` starting at offset ``offset``.
Intentionally follows the pattern of ``struct.pack_into``
:param buff: The buffer to write into
:param offset: The offset to start the write at
"""
# NB a length of 0 means an empty string, whereas -1 means null
len_key = -1 if self.partition_key is None else len(self.partition_key)
len_value = -1 if self.value is None else len(self.value)
fmt = '!BBi%dsi%ds' % (max(len_key, 0), max(len_value, 0))
args = (self.MAGIC,
self.compression_type,
len_key,
self.partition_key or b"",
len_value,
self.value or b"")
struct.pack_into(fmt, buff, offset + 4, *args)
fmt_size = struct.calcsize(fmt)
data = buffer(buff[(offset + 4):(offset + 4 + fmt_size)])
crc = crc32(data) & 0xffffffff
struct.pack_into('!I', buff, offset, crc)
def write(self,data):
self._check_closed()
if self.mode != WRITE:
import errno
raise IOError(errno.EBADF, "write() on read-only GzipFile object")
if self.fileobj is None:
raise ValueError("write() on closed GzipFile object")
# Convert data type if called by io.BufferedWriter.
if isinstance(data, memoryview):
data = data.tobytes()
if len(data) > 0:
self.size = self.size + len(data)
self.crc = zlib.crc32(data, self.crc) & 0xffffffff
self.fileobj.write( self.compress.compress(data) )
self.offset += len(data)
return len(data)
def __init__(self, *args):
super().__init__(*args)
self.packsize = None # uint32|64 packed size
self.origsize = None # uint32|64 original size
self.datetime = None # uint32 ctime
self.attribs = None # uint32 file attributes
self.crc32 = None # uint32 checksum over compressed file
self.comptype = None # uint8 compression type
self.compqual = None # uint8 compression quality
self.params = None # uint16 decompression parameters
self.reserved1 = None # uint16
self.filename = None # [uint16]
self.comment = b'' # [uint16] optional, compressed
self.ntsecurity = b'' # [uint16] optional
self.reserved2 = None # ?
self.dataoffset = None # position of data after hdr
def __init__(self, idx, filehdrs, f):
"""
Initialize an :class:`AceMember` object with index within archive *idx*,
initial file header *filehdr* and underlying file-like object *f*.
"""
self._idx = idx
self._file = f
self._headers = filehdrs
self.__attribs = filehdrs[0].attribs
self.__comment = filehdrs[0].comment.decode('utf-8',
errors='replace')
self.__crc32 = filehdrs[-1].crc32
self.__comptype = filehdrs[0].comptype
self.__compqual = filehdrs[0].compqual
self.__datetime = _dt_fromdos(filehdrs[0].datetime)
self.__dicsizebits = (filehdrs[0].params & 15) + 10
self.__dicsize = 1 << self.__dicsizebits
self.__raw_filename = filehdrs[0].filename
self.__filename = self._sanitize_filename(filehdrs[0].filename)
self.__ntsecurity = filehdrs[0].ntsecurity
self.__size = filehdrs[0].origsize
self.__packsize = 0
for hdr in filehdrs:
self.__packsize += hdr.packsize
def compute_campaign_digests(test_campaign):
dc = b""
for ts in test_campaign:
dts = b""
for t in ts:
dt = t.test.strip().encode('ascii')
t.crc = crc32(dt)
dts += b"\0"+dt
ts.crc = crc32(dts)
dc += b"\0\x01"+dts
test_campaign.crc = crc32(dc)
if type(test_campaign.filename) is str and test_campaign.filename != '<stdin>':
test = open(test_campaign.filename, 'rb').read()
elif test_campaign.filename == '<stdin>':
test = sys.stdin.read().encode('ascii')
else:
raise Exception("Unknown test source %s" % test_campaign.filename)
test_campaign.sha = sha1(test)
#### FILTER CAMPAIGN #####
def to_png(self, data, output):
''' Dump data to the image file. Data is bytes(RGBRGB...RGB).
Pure python PNG implementation.
http://inaps.org/journal/comment-fonctionne-le-png
'''
p__ = pack
line = self.width * 3
png_filter = p__('>B', 0)
scanlines = b''.join(
[png_filter + data[y * line:y * line + line]
for y in range(self.height)])
magic = p__('>8B', 137, 80, 78, 71, 13, 10, 26, 10)
# Header: size, marker, data, CRC32
ihdr = [b'', b'IHDR', b'', b'']
ihdr[2] = p__('>2I5B', self.width, self.height, 8, 2, 0, 0, 0)
ihdr[3] = p__('>I', crc32(b''.join(ihdr[1:3])) & 0xffffffff)
ihdr[0] = p__('>I', len(ihdr[2]))
# Data: size, marker, data, CRC32
idat = [b'', b'IDAT', compress(scanlines), b'']
idat[3] = p__('>I', crc32(b''.join(idat[1:3])) & 0xffffffff)
idat[0] = p__('>I', len(idat[2]))
# Footer: size, marker, None, CRC32
iend = [b'', b'IEND', b'', b'']
iend[3] = p__('>I', crc32(iend[1]) & 0xffffffff)
iend[0] = p__('>I', len(iend[2]))
with open(output, 'wb') as fileh:
fileh.write(magic)
fileh.write(b''.join(ihdr))
fileh.write(b''.join(idat))
fileh.write(b''.join(iend))
return
err = 'Error writing data to "{0}".'.format(output)
raise ScreenshotError(err)
def write(self, s):
"""Write string s to the stream.
"""
if self.comptype == "gz":
self.crc = self.zlib.crc32(s, self.crc)
self.pos += len(s)
if self.comptype != "tar":
s = self.cmp.compress(s)
self.__write(s)
def write(self, s):
"""Write string s to the stream.
"""
if self.comptype == "gz":
self.crc = self.zlib.crc32(s, self.crc)
self.pos += len(s)
if self.comptype != "tar":
s = self.cmp.compress(s)
self.__write(s)
def write_chunk(outfile, tag, data=b''):
"""
Write a PNG chunk to the output file, including length and
checksum.
"""
# http://www.w3.org/TR/PNG/#5Chunk-layout
outfile.write(struct.pack("!I", len(data)))
outfile.write(tag)
outfile.write(data)
checksum = zlib.crc32(tag)
checksum = zlib.crc32(data, checksum)
checksum &= 2**32-1
outfile.write(struct.pack("!I", checksum))
def write(self, s):
"""Write string s to the stream.
"""
if self.comptype == "gz":
self.crc = self.zlib.crc32(s, self.crc)
self.pos += len(s)
if self.comptype != "tar":
s = self.cmp.compress(s)
self.__write(s)
def send(self, packet):
"""Sends the given packet (bytes array) to the connected peer"""
if not self.tcp_client.connected:
raise ConnectionError('Client not connected to server.')
with BinaryWriter() as writer:
writer.write_int(len(packet) + 12) # 12 = size_of (integer) * 3
writer.write_int(self.send_counter)
writer.write(packet)
crc = crc32(writer.get_bytes())
writer.write_int(crc, signed=False)
self.send_counter += 1
self.tcp_client.write(writer.get_bytes())
def receive(self, **kwargs):
"""Receives a TCP message (tuple(sequence number, body)) from the
connected peer.
If a named 'timeout' parameter is present, it will override
'self.timeout', and this can be a 'timedelta' or 'None'.
"""
timeout = kwargs.get('timeout', self.timeout)
# First read everything we need
packet_length_bytes = self.tcp_client.read(4, timeout)
packet_length = int.from_bytes(packet_length_bytes, byteorder='little')
seq_bytes = self.tcp_client.read(4, timeout)
seq = int.from_bytes(seq_bytes, byteorder='little')
body = self.tcp_client.read(packet_length - 12, timeout)
checksum = int.from_bytes(
self.tcp_client.read(4, timeout), byteorder='little', signed=False)
# Then perform the checks
rv = packet_length_bytes + seq_bytes + body
valid_checksum = crc32(rv)
if checksum != valid_checksum:
raise InvalidChecksumError(checksum, valid_checksum)
# If we passed the tests, we can then return a valid TCP message
return seq, body
def _init_write(self, filename):
self.name = filename
self.crc = zlib.crc32("") & 0xffffffffL
self.size = 0
self.writebuf = []
self.bufsize = 0