def line_number(self):
"""Get the current line number the frame is executing."""
# We don't keep f_lineno up to date, so calculate it based on the
# instruction address and the line number table.
lnotab = self.f_code.co_lnotab
byte_increments = six.iterbytes(lnotab[0::2])
line_increments = six.iterbytes(lnotab[1::2])
byte_num = 0
line_num = self.f_code.co_firstlineno
for byte_incr, line_incr in zip(byte_increments, line_increments):
byte_num += byte_incr
if byte_num > self.f_lasti:
break
line_num += line_incr
return line_num
python类iterbytes()的实例源码
def _check_basic_auth(req, hdr, credentials):
if isinstance(credentials, six.text_type): # ``token68`` form
try:
credentials = base64.b64decode(credentials)
except Exception as e:
req.complain(1210, header=hdr, error=e)
else:
# RFC 7617 section 2 requires that,
# whatever the encoding of the credentials,
# it must be ASCII-compatible, so we don't need to know it.
if b':' not in credentials:
req.complain(1211, header=hdr)
for c in six.iterbytes(credentials):
if CTL.match(six.int2byte(c)):
req.complain(1212, header=hdr, char=hex(c))
else:
req.complain(1209, header=hdr)
def hex(self, s):
digits = []
for ch in six.iterbytes(s):
if ch == 0:
digits.append(Color.set(Color.lightgray, '00'))
else:
digits.append('%02X' % ch)
return ' '.join(digits)
def string(self, s):
def printable(ch):
if 32 <= ch <= 127:
return six.unichr(ch)
else:
return Color.set(Color.lightgray, '.')
return ''.join(map(printable, six.iterbytes(s)))
def _serialize_clob(ion_event):
value = ion_event.value
return _bytes_text(six.iterbytes(value), _DOUBLE_QUOTE, prefix=_LOB_START, suffix=_LOB_END)
def _seq(s):
"""Converts bytes to a sequence of integer code points."""
return tuple(six.iterbytes(s))
def extend(self, values):
if isinstance(values, six.text_type):
self.__text += values
else:
assert isinstance(values, six.binary_type)
for b in six.iterbytes(values):
self.append(b)
def feed(self, data):
"""
Feed data to the parser.
"""
assert isinstance(data, binary_type)
for b in iterbytes(data):
self._parser.send(int2byte(b))
def feed(self, data):
"""
Feed data to the parser.
"""
assert isinstance(data, binary_type)
for b in iterbytes(data):
self._parser.send(int2byte(b))
def main():
database_name = 'NIXNET_example'
cluster_name = 'CAN_Cluster'
signal_names = ['CANEventSignal1', 'CANEventSignal2']
with convert.SignalConversionSinglePointSession(
database_name,
cluster_name,
signal_names) as session:
user_value = six.moves.input('Enter {} signal values [float, float]: '.format(len(signal_names)))
try:
expected_signals = [float(x.strip()) for x in user_value.split(",")]
except ValueError:
expected_signals = [24.5343, 77.0129]
print('Unrecognized input ({}). Setting data buffer to {}'.format(user_value, expected_signals))
if len(expected_signals) != len(signal_names):
expected_signals = [24.5343, 77.0129]
print('Invalid number of signal values entered. Setting data buffer to {}'.format(expected_signals))
frames = session.convert_signals_to_frames(expected_signals)
print('Frames:')
for frame in frames:
print(' {}'.format(frame))
print(' payload={}'.format(list(six.iterbytes(frame.payload))))
converted_signals = session.convert_frames_to_signals(frames)
print('Signals: {}'.format([v for (_, v) in converted_signals]))
def bwt_transform(L):
# Semi-inefficient way to get the character counts
if six.PY3:
F = bytes(sorted(L))
else:
F = b''.join(sorted(L))
base = []
for i in range(256):
base.append(F.find(six.int2byte(i)))
pointers = [-1] * len(L)
for i, symbol in enumerate(six.iterbytes(L)):
pointers[base[symbol]] = i
base[symbol] += 1
return pointers
def feed(self, data):
"""
Feed data to the parser.
"""
assert isinstance(data, binary_type)
for b in iterbytes(data):
self._parser.send(int2byte(b))
def _to_int(s):
"""Computes the integer value of a raw byte string."""
value = 0
for offset, c in enumerate(iterbytes(s[::-1])):
value += c << offset * 8
return value
def decrypt(self, str):
# block - UChar[]
block = []
for i in six.iterbytes(str):
block.append(i)
# print block
block = des_ecb_encrypt(block, self.KeySched, 0)
res = b''
for i in block:
res = res + six.int2byte(i)
return res
def str_to_key56(key_str):
if not type(key_str) == six.binary_type:
# TODO rsanders high - figure out how to make this not necessary
key_str = key_str.encode('ascii')
if len(key_str) < 7:
key_str = key_str + b'\000\000\000\000\000\000\000'[:(7 - len(key_str))]
key_56 = []
for i in six.iterbytes(key_str[:7]):
key_56.append(i)
return key_56
def test_bytesiter():
it = six.iterbytes(six.b("hi"))
assert six.next(it) == ord("h")
assert six.next(it) == ord("i")
py.test.raises(StopIteration, six.next, it)
def feed(self, data):
"""
Feed data to the parser.
"""
assert isinstance(data, binary_type)
for b in iterbytes(data):
self._parser.send(int2byte(b))
def decrypt(self, str):
# block - UChar[]
block = []
for i in six.iterbytes(str):
block.append(i)
# print block
block = des_ecb_encrypt(block, self.KeySched, 0)
res = b''
for i in block:
res = res + six.int2byte(i)
return res
def str_to_key56(key_str):
if not type(key_str) == six.binary_type:
# TODO rsanders high - figure out how to make this not necessary
key_str = key_str.encode('ascii')
if len(key_str) < 7:
key_str = key_str + b'\000\000\000\000\000\000\000'[:(7 - len(key_str))]
key_56 = []
for i in six.iterbytes(key_str[:7]):
key_56.append(i)
return key_56
def decrypt(self, str):
# block - UChar[]
block = []
for i in six.iterbytes(str):
block.append(i)
# print block
block = des_ecb_encrypt(block, self.KeySched, 0)
res = b''
for i in block:
res = res + six.int2byte(i)
return res
def str_to_key56(key_str):
if not type(key_str) == six.binary_type:
# TODO rsanders high - figure out how to make this not necessary
key_str = key_str.encode('ascii')
if len(key_str) < 7:
key_str = key_str + b'\000\000\000\000\000\000\000'[:(7 - len(key_str))]
key_56 = []
for i in six.iterbytes(key_str[:7]):
key_56.append(i)
return key_56
def test_bytesiter():
it = six.iterbytes(six.b("hi"))
assert six.next(it) == ord("h")
assert six.next(it) == ord("i")
py.test.raises(StopIteration, six.next, it)
def _generate_kernel_bin(self, k, ctx):
gk = gpuarray.GpuKernel(k.code, k.name, k.params, context=ctx,
**k._get_py_flags())
bin = gk._binary
bcode = ','.join(hex(c) for c in iterbytes(bin))
return ("""static const char %(bname)s[] = { %(bcode)s };""" %
dict(bname=k.binvar, bcode=bcode))
def _check_protocol_id(complain, encoded_id):
# Since there is only one correct way to encode
# an ALPN protocol ID into an RFC 7838 ``protocol-id``,
# we just compute it and compare to what's in the message.
decoded_id = pct_decode(force_bytes(encoded_id))
correct_encoded_id = u''
for b in six.iterbytes(decoded_id):
c = six.int2byte(b)
if (tchar - '%').match(c):
correct_encoded_id += force_unicode(c)
else:
correct_encoded_id += pct_encode(c, safe='').upper()
if encoded_id != correct_encoded_id:
complain(1256, actual=encoded_id, correct=correct_encoded_id)
return decoded_id
def url_encoded_data(self):
if self.headers.content_type == \
media.application_x_www_form_urlencoded and \
okay(self.decoded_body) and self.content_is_full:
for byte in six.iterbytes(self.decoded_body):
if not URL_ENCODED_GOOD_BYTES[byte]:
char = six.int2byte(byte)
self.complain(1040, char=format_chars([char]))
return Unavailable(self.decoded_body)
# pylint: disable=no-member
return parse_qs(self.decoded_body.decode('ascii'))
else:
return None
def test_bytesiter():
it = six.iterbytes(six.b("hi"))
assert six.next(it) == ord("h")
assert six.next(it) == ord("i")
py.test.raises(StopIteration, six.next, it)
def feed(self, data):
"""
Feed data to the parser.
"""
assert isinstance(data, binary_type)
for b in iterbytes(data):
self._parser.send(int2byte(b))
def feed(self, data):
"""
Feed data to the parser.
"""
assert isinstance(data, binary_type)
for b in iterbytes(data):
self._parser.send(int2byte(b))