def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
python类crc32()的实例源码
def build_header(self):
timestamp = utctimestamp()
padding = str.encode('\0\0' * 14)
data = pack(
'<i2IQH14s',
timestamp,
self.metadata['incremental'],
self.metadata['segment_size'],
self.metadata['sectors'],
len(self.metadata['bases']),
padding
)
checksum = crc32(data)
for i in self.metadata['bases']:
data += i
checksum = crc32(i, checksum)
return data, checksum
def write_body(self, f):
checksum = 0
for segment, meta in self.segments.items():
data = pack(
'<2IH2B20s',
segment,
meta['incremental'],
meta['base'],
meta['encryption'],
meta['compression'],
meta['sha1_hash']
)
f.write(data)
checksum = crc32(data, checksum)
""" Backfill the body_checksum """
f.seek(24, 0)
f.write(pack('<I', checksum))
def __init__(self, dump):
assert len(dump) == NOR_SIZE
(img2_magic, self.block_size, unused, firmware_block, firmware_block_count) = struct.unpack('<4s4I', dump[:20])
(img2_crc,) = struct.unpack('<I', dump[48:52])
assert img2_crc == binascii.crc32(dump[:48]) & 0xffffffff
self.firmware_offset = self.block_size * firmware_block
self.firmware_length = self.block_size * firmware_block_count
self.parts = [
dump[0:52],
dump[52:512],
dump[512:self.firmware_offset],
dump[self.firmware_offset:self.firmware_offset + self.firmware_length],
dump[self.firmware_offset + self.firmware_length:]
]
self.images = []
offset = 0
while 1:
(magic, size) = struct.unpack('<4sI', self.parts[3][offset:offset+8])
if magic != 'Img3'[::-1] or size == 0:
break
self.images.append(self.parts[3][offset:offset + size])
offset += size
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def find_xorpad(titleid, crc32):
expectedname = "%s.%08lx.Main.exheader.xorpad" % (titleid, crc32)
legacyname = titleid + ".Main.exheader.xorpad"
xorpads = glob.glob(os.path.join("xorpads", "*.[xX][oO][rR][pP][aA][dD]"))
xorpads += glob.glob(os.path.join("xorpads", "*.[zZ][iI][pP]"))
for xorpad in xorpads:
if zipfile.is_zipfile(xorpad):
with zipfile.ZipFile(xorpad, "r") as e:
for entry in e.infolist():
filename = os.path.join(tmpdir, expectedname)
basename = os.path.basename(entry.filename)
if basename.lower() == expectedname.lower():
source = e.open(entry, "r")
target = file(filename, "wb")
with source, target:
shutil.copyfileobj(source, target)
return filename
else:
basename = os.path.basename(xorpad)
if basename.lower() == expectedname.lower() or \
basename.lower() == legacyname.lower():
return xorpad
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def _update_new_data(self, off, data_to_insert):
"""
Update the _new_data string in the specified offset.
The data to be inserted overwrites previous data and should be given as a list of values.
:param off: start offset in _new_data to insert data into
:param data_to_insert: data to insert to _new_data
:return: void
"""
BITSStateFile._log_instance_message('updating new_data in offset %s' % hex(off))
self._new_data = override_data(self._new_data, off, data_to_insert)
if _os_ver == 10:
decoded_queue_footer = BITSStateFile.QUEUE_FOOTER_HEX[_os_ver].decode('hex')
crc32_off = self._new_data.find(decoded_queue_footer) + len(decoded_queue_footer)
crc32_value = struct.pack("i", crc32(self._new_data[:crc32_off]))
self._new_data = override_data(self._new_data, crc32_off, crc32_value)
def crc32(self):
if not self.isfile():
raise TypeError('cannot compute crc32, not a file: {}'.format(self.abspath()))
else:
try:
with open(self.abspath(), 'rb') as buf:
buf = "%08X" % (binascii.crc32(buf.read()) & 0xFFFFFFFF)
return buf
except FileNotFoundError:
raise FileNotFoundError('failed to compute crc32 for: {}'.format(self.abspath()))
except PermissionError:
raise PermissionError('failed to compute crc32 for: {}'.format(self.abspath()))
except:
raise RuntimeError('failed to compute crc32 for: {}'.format(self.abspath()))
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def compensate(buf, wanted):
wanted ^= FINALXOR
newBits = 0
for i in range(32):
if newBits & 1:
newBits >>= 1
newBits ^= CRCPOLY
else:
newBits >>= 1
if wanted & 1:
newBits ^= CRCINV
wanted >>= 1
newBits ^= crc32(buf) ^ FINALXOR
return pack('<L', newBits)
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def fix_header(self):
"""
Repairs the header values.
"""
# Update magic
self.header.magic = self.header._MAGIC
# Update system_data_size
system_raw = self.system.dumps()
self.header.system_data_size = len(system_raw)
# Update user_data_size
user_raw = self._user_raw
self.header.user_data_size = len(user_raw)
# Update crc32
header_raw = self.header.dumps(False)
checksum = 0
checksum = crc32(header_raw, checksum)
checksum = crc32(system_raw, checksum)
checksum = crc32(user_raw, checksum)
# Convert the checksum into 32-bit unsigned integer (for Python 2/3 compatibility)
self.header.crc32 = (checksum & 0xffffffff)
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def _read_comment_v3(self, inf, psw=None):
# read data
rf = XFile(inf.volume_file)
rf.seek(inf.file_offset)
data = rf.read(inf.compress_size)
rf.close()
# decompress
cmt = rar_decompress(inf.extract_version, inf.compress_type, data,
inf.file_size, inf.flags, inf.CRC, psw, inf.salt)
# check crc
if self._crc_check:
crc = crc32(cmt)
if crc < 0:
crc += (1 << 32)
if crc != inf.CRC:
return None
return self._decode_comment(cmt)
# write in-memory archive to temp file - needed for solid archives
def read(self, cnt = None):
"""Read all or specified amount of data from archive entry."""
# sanitize cnt
if cnt is None or cnt < 0:
cnt = self.remain
elif cnt > self.remain:
cnt = self.remain
if cnt == 0:
return EMPTY
# actual read
data = self._read(cnt)
if data:
self.CRC = crc32(data, self.CRC)
self.remain -= len(data)
if len(data) != cnt:
raise BadRarFile("Failed the read enough data")
# done?
if not data or self.remain == 0:
#self.close()
self._check()
return data
def readinto(self, buf):
"""Zero-copy read directly into buffer."""
cnt = len(buf)
if cnt > self.remain:
cnt = self.remain
vbuf = memoryview(buf)
res = got = 0
while got < cnt:
res = self.fd.readinto(vbuf[got : cnt])
if not res:
break
if self.crc_check:
self.CRC = crc32(vbuf[got : got + res], self.CRC)
self.remain -= res
got += res
return got
def readinto(self, buf):
"""Zero-copy read directly into buffer."""
got = 0
vbuf = memoryview(buf)
while got < len(buf):
# next vol needed?
if self.cur_avail == 0:
if not self._open_next():
break
# length for next read
cnt = len(buf) - got
if cnt > self.cur_avail:
cnt = self.cur_avail
# read into temp view
res = self.fd.readinto(vbuf[got : got + cnt])
if not res:
break
if self.crc_check:
self.CRC = crc32(vbuf[got : got + res], self.CRC)
self.cur_avail -= res
self.remain -= res
got += res
return got
def dl_link(context, template='footer/dl_link.html'):
request = context.get('request')
if not request:
return ''
site = get_current_site(request)
website = site.domain
page_info = request.path_info
set_crc = binascii.crc32(('set:%s:%s' % (website, page_info)).encode())
set_names = tuple(sorted(conf.DL_LINKS.keys()))
url_set = conf.DL_LINKS[set_names[set_crc % len(set_names)]]
url_crc = binascii.crc32(('url:%s:%s' % (website, page_info)).encode())
records = tuple(sorted(url_set, key=lambda x: x['url']))
record = records[url_crc % len(url_set)]
return loader.render_to_string(template, record, request=context.get('request'))
def zip_to_file(file_path, destination):
fd, zip_filename = tempfile.mkstemp(suffix=".zip", dir=destination)
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) as myzip:
if os.path.isdir(file_path):
abs_src = os.path.abspath(file_path)
for root, dirs, files in os.walk(file_path):
for current_file in files:
absname = os.path.abspath(os.path.join(root, current_file))
arcname = absname[len(abs_src) + 1:]
myzip.write(absname, arcname)
else:
myzip.write(file_path, file_path)
zip_info = ''.join(str(zipinfoi.CRC) for zipinfoi in myzip.infolist())
checksum = hex(binascii.crc32(zip_info) & 0xffffffff)
return zip_filename, checksum
def test_returned_value(self):
# Limit to the minimum of all limits (b2a_uu)
MAX_ALL = 45
raw = self.rawdata[:MAX_ALL]
for fa, fb in zip(a2b_functions, b2a_functions):
a2b = getattr(binascii, fa)
b2a = getattr(binascii, fb)
try:
a = b2a(self.type2test(raw))
res = a2b(self.type2test(a))
except Exception as err:
self.fail("{}/{} conversion raises {!r}".format(fb, fa, err))
if fb == 'b2a_hqx':
# b2a_hqx returns a tuple
res, _ = res
self.assertEqual(res, raw, "{}/{} conversion: "
"{!r} != {!r}".format(fb, fa, res, raw))
self.assertIsInstance(res, bytes)
self.assertIsInstance(a, bytes)
self.assertLess(max(a), 128)
self.assertIsInstance(binascii.crc_hqx(raw, 0), int)
self.assertIsInstance(binascii.crc32(raw), int)
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def test_extract(self):
for v in (True, False):
with TempDir() as tdir:
extract(simple_rar, tdir, verify_data=v)
h = {
normalize(os.path.abspath(os.path.join(tdir, h['filename']))): h
for h in headers(simple_rar)}
data = {}
for dirpath, dirnames, filenames in os.walk(tdir):
for f in filenames:
path = normalize(os.path.join(dirpath, f))
data[os.path.relpath(path, tdir).replace(os.sep, '/')] = d = open(path, 'rb').read()
if f == 'one.txt':
self.ae(os.path.getmtime(path), 1098472879)
self.ae(h[path]['unpack_size'], len(d))
self.ae(h[path]['file_crc'] & 0xffffffff, crc32(d) & 0xffffffff)
q = {k: v for k, v in sr_data.items() if v}
del q['symlink']
self.ae(data, q)
def test_returned_value(self):
# Limit to the minimum of all limits (b2a_uu)
MAX_ALL = 45
raw = self.rawdata[:MAX_ALL]
for fa, fb in zip(a2b_functions, b2a_functions):
a2b = getattr(binascii, fa)
b2a = getattr(binascii, fb)
try:
a = b2a(self.type2test(raw))
res = a2b(self.type2test(a))
except Exception, err:
self.fail("{}/{} conversion raises {!r}".format(fb, fa, err))
if fb == 'b2a_hqx':
# b2a_hqx returns a tuple
res, _ = res
self.assertEqual(res, raw, "{}/{} conversion: "
"{!r} != {!r}".format(fb, fa, res, raw))
self.assertIsInstance(res, str)
self.assertIsInstance(a, str)
self.assertLess(max(ord(c) for c in a), 128)
self.assertIsInstance(binascii.crc_hqx(raw, 0), int)
self.assertIsInstance(binascii.crc32(raw), int)
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def test_returned_value(self):
# Limit to the minimum of all limits (b2a_uu)
MAX_ALL = 45
raw = self.rawdata[:MAX_ALL]
for fa, fb in zip(a2b_functions, b2a_functions):
a2b = getattr(binascii, fa)
b2a = getattr(binascii, fb)
try:
a = b2a(self.type2test(raw))
res = a2b(self.type2test(a))
except Exception, err:
self.fail("{}/{} conversion raises {!r}".format(fb, fa, err))
if fb == 'b2a_hqx':
# b2a_hqx returns a tuple
res, _ = res
self.assertEqual(res, raw, "{}/{} conversion: "
"{!r} != {!r}".format(fb, fa, res, raw))
self.assertIsInstance(res, str)
self.assertIsInstance(a, str)
self.assertLess(max(ord(c) for c in a), 128)
self.assertIsInstance(binascii.crc_hqx(raw, 0), int)
self.assertIsInstance(binascii.crc32(raw), int)
def _GenerateCRCTable():
"""Generate a CRC-32 table.
ZIP encryption uses the CRC32 one-byte primitive for scrambling some
internal keys. We noticed that a direct implementation is faster than
relying on binascii.crc32().
"""
poly = 0xedb88320
table = [0] * 256
for i in range(256):
crc = i
for j in range(8):
if crc & 1:
crc = ((crc >> 1) & 0x7FFFFFFF) ^ poly
else:
crc = ((crc >> 1) & 0x7FFFFFFF)
table[i] = crc
return table
def ChkMainCrc(filename):
'''
check main CRC
'''
data = open(filename,'rb').read()
for i in range(0, len(data)-4):
# these 4 bytes are the CRC embedded in the FirmwareUpdate.bin
block = data[i:i+4]
# calculate CRC for the rest of the data (replace the 4 bytes with 0's)
c = (crc32(data[:i] + "\0"*4 + data[i+4:],0xFFFFFFFF)^0xFFFFFFFF) & 0xffffffff
if pack("<I", c) in block:
print "Found at offset dec=%d hex=%08X" % (i,i)
print "CRC=%08X" % c
break
def CalSectCrc(data):
sections = unpack("<I", data[15:15+4])[0]
ptr = 27
print "[+]Patching section CRC..."
for i in range(sections):
section = data[ptr:ptr+10]
type, offset, size = unpack("<HII", section)
# print "0x%04X: at offset 0x%08X, size 0x%08X [ends at 0x%08X]" %\
# (type, offset, size, offset+size)
if type == 0xc002:
print "\tUnknown header type C002 ignored..."
ptr+=10
continue
section_data = data[offset:offset+size]
c = (crc32(section_data[:24] + "\0"*4 + section_data[28:], \
0xFFFFFFFF)^0xFFFFFFFF) & 0xffffffff
print "\tCalculated crc: %s" % ("0x%08X" % c)
section_data = section_data[:24] + pack("<I", c) + section_data[28:]
data = data[:offset] + section_data + data[offset+size:]
ptr+=10
return data
def buildResponse(query, packetNumber):
message = ('II II' '81 80' # TID, Flags
'00 00' # question count
'00 01' # answer count
'00 00' # authority count
'00 00' # additional count
)
message = message.replace('II II', "%04x" % packetNumber)
for chunk in query.split("."):
message += ' %02x' % len(chunk)
for c in chunk:
message += ' %02x' % ord(c)
message += ' 00'
message += '00 01' # type A query
message += '00 01' # Class IN
#message += 'c0 0c 00 01 00 01' # answer header: type a, class IN
message += "00 00 00 01"
#message += '00 01' # ttl: 1 second?
message += '00 04' # response length
hostchunk = query.split('.')[0]
respIP = binascii.crc32(hostchunk) & 0xffffffff
message += "%08x" % respIP
return message