def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
python类crc32()的实例源码
def checksum(self, file):
import binascii
preprocess = self.compiler.preprocess_source(file)
return binascii.crc32(preprocess.encode('utf-8'))
def _calc_crc(self, decoded):
"""
Calculate the CRC based on the decoded content passed in
"""
self._escape = crc32(decoded, self._escape)
self._crc = (self._escape ^ -1)
def crc32(self):
""" Returns the calculated crc32 string for the decoded data.
"""
return "%08x" % (self._crc ^ BIN_MASK)
def read_body(self, f, body_checksum):
checksum = 0
data = f.read(32)
while len(data) == 32:
meta = dict()
segment = ""
segment,
meta['incremental'],
meta['base'],
meta['encryption'],
meta['compression'],
sha1 = unpack('<2IH2B20s', data)
meta['sha1_hash'] = b2a_hex(sha1)
self.segments[segment] = meta
checksum = crc32(data, checksum)
data = f.read(32)
if checksum != body_checksum:
raise Exception('Body checksum does not match')
def update_fuzzer(self):
resp = self.transport.perform_request(self.pb_request, self.get_params)
data, text, url, mime = resp.content, resp.text, resp.url, resp.headers['Content-Type'].split(';')[0]
meta = '%s %d %08x\n%s' % (mime, len(data), crc32(data) & 0xffffffff, resp.url)
self.fuzzer.urlField.setText(meta)
self.fuzzer.frame.update_frame(data, text, url, mime, getattr(self, 'pb_resp', None))
def __init__(self, fileobj, mode, zipinfo, decrypter=None):
self._fileobj = fileobj
self._decrypter = decrypter
self._compress_type = zipinfo.compress_type
self._compress_size = zipinfo.compress_size
self._compress_left = zipinfo.compress_size
if self._compress_type == ZIP_DEFLATED:
self._decompressor = zlib.decompressobj(-15)
self._unconsumed = ''
self._readbuffer = ''
self._offset = 0
self._universal = 'U' in mode
self.newlines = None
# Adjust read size for encrypted files since the first 12 bytes
# are for the encryption/password information.
if self._decrypter is not None:
self._compress_left -= 12
self.mode = mode
self.name = zipinfo.filename
if hasattr(zipinfo, 'CRC'):
self._expected_crc = zipinfo.CRC
self._running_crc = crc32(b'') & 0xffffffff
else:
self._expected_crc = None
def _update_crc(self, newdata, eof):
# Update the CRC using the given data.
if self._expected_crc is None:
# No need to compute the CRC if we don't have a reference value
return
self._running_crc = crc32(newdata, self._running_crc) & 0xffffffff
# Check the CRC if we're at the end of the file
if eof and self._running_crc != self._expected_crc:
raise BadZipfile("Bad CRC-32 for file %r" % self.name)
def _get_record_metadata(self):
return ConsumerRecord(
topic=TOPIC_STATES,
partition=0,
offset=42,
timestamp=1467649216540,
timestamp_type=0,
key=b'NY',
value=b'foo',
checksum=binascii.crc32(b'foo'),
serialized_key_size=b'NY',
serialized_value_size=b'foo')
def mock_consumer(self, KafkaConsumer, value, max_calls=1):
# Mock a consumer object
fake_kafka_consumer = MagicMock()
# Should return a record when used as an iterator. Set up the mock to
# return the record up to the limit of max_calls. Then raises StopIteration
record = ConsumerRecord(
topic=TOPIC_STATES,
partition=0,
offset=42,
timestamp=1467649216540,
timestamp_type=0,
key=b'NY',
value=value,
checksum=binascii.crc32(value),
serialized_key_size=b'NY',
serialized_value_size=value)
meta = { 'i': 0 }
def _iter(*args, **kwargs):
if meta['i'] >= max_calls:
raise StopIteration()
meta['i'] += 1
return record
fake_kafka_consumer.__next__.side_effect = _iter
# Return some partitions
fake_kafka_consumer.partitions_for_topic.return_value = set([0, 1])
# Make class instantiation return our mock
KafkaConsumer.return_value = fake_kafka_consumer
return fake_kafka_consumer
def pack_data(self, buf):
data = self.rnd_data(len(buf)) + buf
data_len = len(data) + 8
crc = binascii.crc32(struct.pack('>H', data_len)) & 0xFFFF
data = struct.pack('<H', crc) + data
data = struct.pack('>H', data_len) + data
adler32 = zlib.adler32(data) & 0xFFFFFFFF
data += struct.pack('<I', adler32)
return data
def pack_auth_data(self, buf):
if len(buf) == 0:
return b''
data = self.rnd_data(len(buf)) + buf
data_len = len(data) + 16
crc = binascii.crc32(struct.pack('>H', data_len) + self.salt + self.server_info.key) & 0xFFFFFFFF
data = struct.pack('<I', crc) + data
data = struct.pack('>H', data_len) + data
data += hmac.new(self.server_info.iv + self.server_info.key, data, hashlib.sha1).digest()[:10]
return data
def client_post_decrypt(self, buf):
if self.raw_trans:
return buf
self.recv_buf += buf
out_buf = b''
while len(self.recv_buf) > 4:
crc = struct.pack('<H', binascii.crc32(self.recv_buf[:2]) & 0xFFFF)
if crc != self.recv_buf[2:4]:
raise Exception('client_post_decrypt data uncorrect crc')
length = struct.unpack('>H', self.recv_buf[:2])[0]
if length >= 8192 or length < 7:
self.raw_trans = True
self.recv_buf = b''
raise Exception('client_post_decrypt data error')
if length > len(self.recv_buf):
break
if struct.pack('<I', zlib.adler32(self.recv_buf[:length - 4]) & 0xFFFFFFFF) != self.recv_buf[length - 4:length]:
self.raw_trans = True
self.recv_buf = b''
raise Exception('client_post_decrypt data uncorrect checksum')
pos = common.ord(self.recv_buf[4])
if pos < 255:
pos += 4
else:
pos = struct.unpack('>H', self.recv_buf[5:7])[0] + 4
out_buf += self.recv_buf[pos:length - 4]
self.recv_buf = self.recv_buf[length:]
if out_buf:
self.decrypt_packet_num += 1
return out_buf
def client_encode(self, buf):
if self.raw_trans_sent:
return buf
self.send_buffer += buf
if not self.has_sent_header:
self.has_sent_header = True
data = os.urandom(common.ord(os.urandom(1)[0]) % 96 + 4)
crc = (0xffffffff - binascii.crc32(data)) & 0xffffffff
return data + struct.pack('<I', crc)
if self.raw_trans_recv:
ret = self.send_buffer
self.send_buffer = b''
self.raw_trans_sent = True
return ret
return b''
def server_decode(self, buf):
if self.has_recv_header:
return (buf, True, False)
self.has_recv_header = True
crc = binascii.crc32(buf) & 0xffffffff
if crc != 0xffffffff:
self.has_sent_header = True
if self.method == 'random_head':
return (b'E'*2048, False, False)
return (buf, True, False)
# (buffer_to_recv, is_need_decrypt, is_need_to_encode_and_send_back)
return (b'', False, True)
def __init__(self, ffi, preamble, tmpdir=None, modulename=None,
ext_package=None, tag='', force_generic_engine=False,
source_extension='.c', flags=None, relative_to=None, **kwds):
if ffi._parser._uses_new_feature:
raise ffiplatform.VerificationError(
"feature not supported with ffi.verify(), but only "
"with ffi.set_source(): %s" % (ffi._parser._uses_new_feature,))
self.ffi = ffi
self.preamble = preamble
if not modulename:
flattened_kwds = ffiplatform.flatten(kwds)
vengine_class = _locate_engine_class(ffi, force_generic_engine)
self._vengine = vengine_class(self)
self._vengine.patch_extension_kwds(kwds)
self.flags = flags
self.kwds = self.make_relative_to(kwds, relative_to)
#
if modulename:
if tag:
raise TypeError("can't specify both 'modulename' and 'tag'")
else:
key = '\x00'.join([sys.version[:3], __version_verifier_modules__,
preamble, flattened_kwds] +
ffi._cdefsources)
if sys.version_info >= (3,):
key = key.encode('utf-8')
k1 = hex(binascii.crc32(key[0::2]) & 0xffffffff)
k1 = k1.lstrip('0x').rstrip('L')
k2 = hex(binascii.crc32(key[1::2]) & 0xffffffff)
k2 = k2.lstrip('0').rstrip('L')
modulename = '_cffi_%s_%s%s%s' % (tag, self._vengine._class_key,
k1, k2)
suffix = _get_so_suffixes()[0]
self.tmpdir = tmpdir or _caller_dir_pycache()
self.sourcefilename = os.path.join(self.tmpdir, modulename + source_extension)
self.modulefilename = os.path.join(self.tmpdir, modulename + suffix)
self.ext_package = ext_package
self._has_source = False
self._has_module = False
def _encode_service_payload(self, service_check):
service_payload_list = [
self.protocol_version,
0, # Padding
0, # Placeholder for CRC
service_check.timestamp,
service_check.status,
service_check.hostname.encode('utf8'),
service_check.name.encode('utf8'),
service_check.output.encode('utf8'),
0 # Padding
]
crc = binascii.crc32(struct.pack(self.service_payload_fmt,
*service_payload_list))
service_payload_list[2] = crc
return struct.pack(self.service_payload_fmt, *service_payload_list)
def parse(self, data, offset=0):
""" Parse the u-boot environment variables from bytearray.
:param data: The data in bytes array
:param offset: The offset of input data
"""
self._env = collections.OrderedDict()
fmt = ">IB" if self._bigendian else "<IB"
(read_crc, tmp) = struct.unpack_from(fmt, data, offset)
if tmp == 0x01:
self._redundant = True
read_data = data[offset + 5:]
else:
self._redundant = False
read_data = data[offset + 4:]
calc_crc = binascii.crc32(read_data) & 0xffffffff
if read_crc != calc_crc:
raise ValueError("Wrong CRC")
read_data = read_data.decode('utf-8')
for s in read_data.split('\0'):
if not s or s.startswith('\xFF') or s.startswith('\x00'):
break
key, value = s.split('=', 1)
self._env[key] = value
def export(self):
""" Export the u-boot environment variables into bytearray.
:return The environment variables in bytearray
"""
env_size = self.size
if self._redundant:
env_size -= 5
else:
env_size -= 4
data = str()
for k in self._env:
data += "{0:s}={1:s}".format(k, self._env[k])
data += "\0" # Termination of line "\0"
data += "\0" # End of file "\0\0"
if len(data) > env_size:
raise Exception("ERROR: ENV size out of range, extend required size !")
env_blob = data + chr(self._empty_value) * (env_size - len(data))
env_blob = env_blob.encode('utf-8')
crc = binascii.crc32(env_blob) & 0xffffffff
fmt = ">I" if self._bigendian else "<I"
ret = struct.pack(fmt + "B", crc, 0x01) if self._redundant else struct.pack(fmt, crc)
ret += env_blob
return ret
def read(self, name):
"""Return file bytes (as a string) for name."""
f = self.readfile(name)
zinfo = self.getinfo(name)
bytes = f.read()
crc = binascii.crc32(bytes)
if crc != zinfo.CRC:
raise zipfile.BadZipfile, "Bad CRC-32 for file %s" % name
return bytes