def getBigEndian(hex_val):
step0 = "0x" + hex_val
step1 = BitArray(step0)
step2 = step1.hex
step3 = step2[::-1]
step4 = len(step2)
string=""
for b in range(0,step4,2):
revString = step3[b:(b+2)]
string += revString[::-1]
return string
python类BitArray()的实例源码
def getDifficult(bits):
step0 = getBigEndian(bits)
step1 = "0x"+step0
step2 = BitArray(step1)
step3 = step2.hex
first_pair ="0x" + step3[:2]
firstPair = BitArray(first_pair)
second_pair ="0x" + step3[2:8]
secondPair = BitArray(second_pair)
firstPairVal = firstPair.int
secondPairVal = secondPair.int
formula = 2**(8*(firstPairVal -3))
bitsDec = secondPairVal * formula
highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
highDiffDec = highestDifficulty.int
answer = round(highDiffDec/bitsDec,2)
return answer
def getBigEndian(hex_val):
step0 = "0x" + hex_val
step1 = BitArray(step0)
step2 = step1.hex
step3 = step2[::-1]
step4 = len(step2)
string=""
for b in range(0,step4,2):
revString = step3[b:(b+2)]
string += revString[::-1]
return string
def getDifficult(bits):
step0 = getBigEndian(bits)
step1 = "0x"+step0
step2 = BitArray(step1)
step3 = step2.hex
first_pair ="0x" + step3[:2]
firstPair = BitArray(first_pair)
second_pair ="0x" + step3[2:8]
secondPair = BitArray(second_pair)
firstPairVal = firstPair.int
secondPairVal = secondPair.int
formula = 2**(8*(firstPairVal -3))
bitsDec = secondPairVal * formula
highestDifficulty = BitArray("0x00000000FFFF0000000000000000000000000000000000000000000000000000")
highDiffDec = highestDifficulty.int
answer = float(highDiffDec/bitsDec)
return answer
def computeTransHash(rawHash):
initial_step = "0x" + rawHash
primary_step = BitArray(initial_step)
step0 = primary_step.bytes
step1 = hashlib.sha256(step0)
step2 = step1.digest()
step3 = hashlib.sha256(step2)
step4 = step3.hexdigest()
step5 = "0x" + step4
step6 = BitArray(step5)
step7 = hexHashReverser(step6)
return step7
#address must be in decimal form before entering it into base58encode
def recv_bitfield(self, length):
"""
Received only at the start of a connection when a peer wants to
tell you all the pieces it has in a very compact form.
:param length: The size of the payload, a number of bits
representing the number of pieces
"""
payload = self.recv(length)
bits = BitArray(bytes=payload)
for i in range(len(self.torrent.pieces)):
sha, piece = self.torrent.pieces.items()[i]
piece = self.PIECE(sha, self, have=bits[i])
self.pieces[sha] = piece
def send_bitfield(self):
"""
Tell the peer all the pieces you have in a really compact form.
"""
# the pieces are compacted in sequence in to bits
field = BitArray(
map(
lambda (sha, piece): sha == piece.digest,
self.torrent.pieces.items()
)
)
self.send_payload(5, field.tobytes())
def checklength(pdu, speclen):
"pdu is hex encoded, 4 bits per char."
assert type(pdu) == str
assert speclen is None or isinstance(speclen, int)
assert len(pdu) % 2 == 0
if speclen is not None:
if len(pdu)/2 != speclen:
raise DataError("mtype=0x%s: Incorrect length %s (got %s) body: %s"
% (pdu[:3*2], speclen, len(pdu)/2, pdu[3*2:]))
return BitArray(hex=pdu[3*2:-1*2])
def intdecoder(body, width=8, signed=False):
assert type(body) == BitArray
assert width % 8 == 0
assert width in [8, 16] # for now, due to fmt.
fmt = "%03i"
if width == 16:
fmt = "%05i"
s = []
for idx in range(0, body.len, width):
value = body[idx:idx+width]
s += [fmt % (value.intle if signed else value.uintle)]
return [("ints", " ".join(s)), ('strbody', body.hex)]
def pack(self):
values = _BitArray(self.count*self.size)
for i in range(self.count):
offset = i * self.size
try:
values[offset:offset + self.size] = int(self.physical_range.scale_to(self.logical_range, self._values[i]))
except ArithmeticError:
# If the value is outside of the physical range, and NULLs are allowed, then do not modify the value
if not self.flags & ReportFlags.NULL_STATE:
raise
return values
def serialize(self, report_type: ReportType) -> bytes:
data = _BitArray()
for item in self.items:
if isinstance(item, Report):
if item.report_type is not report_type:
continue
data.append(item.pack())
else:
data.append(item.serialize(report_type))
return data
def toHex(self):
'''Returns a hex representation of the EPCNumber'''
#h = BitArray("0b%s" % self._bits)
h = hex(int(self._bits,2))
return h[2:].upper()
def toHex(self):
'''Returns a hex representation of the GDTI-113. Need to make the bits divisble by 4'''
h = BitArray("0b%s%s" % ("0",self._bits))
return h.hex[2:].upper()
def getBits(self):
if(self._fieldValue!=None and self._bitLength!=None and self._bitLength>0):
ui = BitArray(uint = Conversion().uint32(self._fieldValue),length=self._bitLength)
return ui.bin[2:]
else:
return "0".zfill(self._bitLength)
def u5_to_bitarray(arr):
ret = bitstring.BitArray()
for a in arr:
ret += bitstring.pack("uint:5", a)
return ret
def tagged_bytes(char, l):
return tagged(char, bitstring.BitArray(l))
# Discard trailing bits, convert to bytes.
def __init__(self, src_ase, dst_ase, block_list, options, resume_mgr):
# type: (Descriptior, blobxfer.models.azure.StorageEntity,
# blobxfer.models.azure.StorageEntity, list,
# blobxfer.models.options.SyncCopy,
# blobxfer.operations.resume.SyncCopyResumeManager) -> None
"""Ctor for Descriptor
:param Descriptor self: this
:param blobxfer.models.azure.StorageEntity src_ase:
source Azure Storage Entity
:param blobxfer.models.azure.StorageEntity dst_ase:
destination Azure Storage Entity
:param list block_list: source blob block list
:param blobxfer.models.options.SyncCopy options: synccopy options
:param blobxfer.operations.resume.SyncCopyResumeManager resume_mgr:
synccopy resume manager
"""
self._offset = 0
self._chunk_num = 0
self._finalized = False
self._meta_lock = threading.Lock()
self._resume_mgr = resume_mgr
self._src_ase = src_ase
self._dst_ase = dst_ase
self._src_block_list = block_list
self._chunk_size = self._compute_chunk_size()
# calculate the total number of ops required for transfer
self._total_chunks = self._compute_total_chunks(self._chunk_size)
self._outstanding_ops = self._total_chunks
if blobxfer.util.is_not_empty(self._dst_ase.replica_targets):
self._outstanding_ops *= len(self._dst_ase.replica_targets) + 1
if self._resume_mgr:
self._completed_chunks = bitstring.BitArray(
length=self._total_chunks)
self._replica_counters = {}
def __init__(self, node_id):
self.node_id = BitArray(node_id)
self.root = Bucket(self.node_id)
self.__nodes = {}
def get_neighbor(self, info_hash):
info_hash = BitArray(info_hash)
return self.root.get_neighbor(info_hash)[0]
def __init__(self, node_id, addr):
self.nid = BitArray(bytes=node_id)
self.addr = addr
self.dubious = False
self.is_bad = False
self.last_fresh = time.time()