def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
python类byte2int()的实例源码
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def _bytes_text(code_point_iter, quote, prefix=b'', suffix=b''):
quote_code_point = six.byte2int(quote)
buf = BytesIO()
buf.write(prefix)
buf.write(quote)
for code_point in code_point_iter:
if code_point == quote_code_point:
buf.write(b'\\' + quote)
elif code_point == six.byte2int(b'\\'):
buf.write(b'\\\\')
elif _is_printable_ascii(code_point):
buf.write(six.int2byte(code_point))
else:
buf.write(_escape(code_point))
buf.write(quote)
buf.write(suffix)
return buf.getvalue()
def generate_scalars_binary(scalars_map, preceding_symbols=0):
for ion_type, values in six.iteritems(scalars_map):
for native, expected in values:
native_expected = expected
has_symbols = False
if native is None:
# An un-adorned 'None' doesn't contain enough information to determine its Ion type
native_expected = b'\x0f'
elif ion_type is IonType.CLOB:
# All six.binary_type are treated as BLOBs unless wrapped by an _IonNature
tid = six.byte2int(expected) + 0x10 # increment upper nibble for clob -> blob; keep lower nibble
native_expected = bytearray([tid]) + expected[1:]
elif ion_type is IonType.SYMBOL and native is not None:
has_symbols = True
elif ion_type is IonType.STRING:
# Encode all strings as symbols too.
symbol_expected = _serialize_symbol(
IonEvent(IonEventType.SCALAR, IonType.SYMBOL, SymbolToken(None, 10 + preceding_symbols)))
yield _Parameter(IonType.SYMBOL.name + ' ' + native,
IonPyText.from_value(IonType.SYMBOL, native), symbol_expected, True)
yield _Parameter('%s %s' % (ion_type.name, native), native, native_expected, has_symbols)
wrapper = _FROM_ION_TYPE[ion_type].from_value(ion_type, native)
yield _Parameter(repr(wrapper), wrapper, expected, has_symbols)
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def get_time(self):
"""Read out stations internal time.
@return: datetime
"""
bintime = self._send_command(SIReader.C_GET_TIME, b'')[1]
year = byte2int(bintime[0])
month = byte2int(bintime[1])
day = byte2int(bintime[2])
am_pm = byte2int(bintime[3]) & 0b1
second = SIReader._to_int(bintime[4:6])
hour = am_pm * 12 + second // 3600
second %= 3600
minute = second // 60
second %= 60
ms = int(round(byte2int(bintime[6]) / 256.0 * 1000000))
self.beep()
try:
return datetime(year, month, day, hour, minute, second, ms)
except ValueError:
# return None if the time reported by the station is impossible
return None
def _update_proto_config(self):
# Read protocol configuration
ret = self._send_command(SIReader.C_GET_SYS_VAL, SIReader.O_PROTO + b'\x01')
config_byte = byte2int(ret[1][1])
self.proto_config = {}
self.proto_config['ext_proto'] = config_byte & (1 << 0) != 0
self.proto_config['auto_send'] = config_byte & (1 << 1) != 0
self.proto_config['handshake'] = config_byte & (1 << 2) != 0
self.proto_config['pw_access'] = config_byte & (1 << 4) != 0
self.proto_config['punch_read'] = config_byte & (1 << 7) != 0
# Read operating mode
ret = self._send_command(SIReader.C_GET_SYS_VAL, SIReader.O_MODE + b'\x01')
self.proto_config['mode'] = byte2int(ret[1][1])
return self.proto_config
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def decode(self, encoded_payload):
"""Decode a transmitted payload."""
self.packets = []
while encoded_payload:
if six.byte2int(encoded_payload[0:1]) <= 1:
packet_len = 0
i = 1
while six.byte2int(encoded_payload[i:i + 1]) != 255:
packet_len = packet_len * 10 + six.byte2int(
encoded_payload[i:i + 1])
i += 1
self.packets.append(packet.Packet(
encoded_packet=encoded_payload[i + 1:i + 1 + packet_len]))
else:
i = encoded_payload.find(b':')
if i == -1:
raise ValueError('invalid payload')
packet_len = int(encoded_payload[0:i])
pkt = encoded_payload[i + 1: i + 1 + packet_len]
self.packets.append(packet.Packet(encoded_packet=pkt))
encoded_payload = encoded_payload[i + 1 + packet_len:]
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def decode(self, encoded_payload):
"""Decode a transmitted payload."""
self.packets = []
while encoded_payload:
if six.byte2int(encoded_payload[0:1]) <= 1:
packet_len = 0
i = 1
while six.byte2int(encoded_payload[i:i + 1]) != 255:
packet_len = packet_len * 10 + six.byte2int(
encoded_payload[i:i + 1])
i += 1
self.packets.append(packet.Packet(
encoded_packet=encoded_payload[i + 1:i + 1 + packet_len]))
else:
i = encoded_payload.find(b':')
if i == -1:
raise ValueError('invalid payload')
packet_len = int(encoded_payload[0:i])
pkt = encoded_payload[i + 1: i + 1 + packet_len]
self.packets.append(packet.Packet(encoded_packet=pkt))
encoded_payload = encoded_payload[i + 1 + packet_len:]
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def decode(self, encoded_payload):
"""Decode a transmitted payload."""
self.packets = []
while encoded_payload:
if six.byte2int(encoded_payload[0:1]) <= 1:
packet_len = 0
i = 1
while six.byte2int(encoded_payload[i:i + 1]) != 255:
packet_len = packet_len * 10 + six.byte2int(
encoded_payload[i:i + 1])
i += 1
self.packets.append(packet.Packet(
encoded_packet=encoded_payload[i + 1:i + 1 + packet_len]))
else:
i = encoded_payload.find(b':')
if i == -1:
raise ValueError('invalid payload')
packet_len = int(encoded_payload[0:i])
pkt = encoded_payload[i + 1: i + 1 + packet_len]
self.packets.append(packet.Packet(encoded_packet=pkt))
encoded_payload = encoded_payload[i + 1 + packet_len:]
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if sys.version_info < (3, 0):
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
else:
if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def _scalar_event_pairs(data, events, info):
"""Generates event pairs for all scalars.
Each scalar is represented by a sequence whose first element is the raw data and whose following elements are the
expected output events.
"""
first = True
delimiter, in_container = info
space_delimited = not (b',' in delimiter)
for event in events:
input_event = NEXT
if first:
input_event = e_read(data + delimiter)
if space_delimited and event.value is not None \
and ((event.ion_type is IonType.SYMBOL) or
(event.ion_type is IonType.STRING and
six.byte2int(b'"') != six.indexbytes(data, 0))): # triple-quoted strings
# Because annotations and field names are symbols, a space delimiter after a symbol isn't enough to
# generate a symbol event immediately. Similarly, triple-quoted strings may be followed by another
# triple-quoted string if only delimited by whitespace or comments.
yield input_event, INC
if in_container:
# Within s-expressions, these types are delimited in these tests by another value - in this case,
# int 0 (but it could be anything).
yield e_read(b'0' + delimiter), event
input_event, event = (NEXT, e_int(0))
else:
# This is a top-level value, so it may be flushed with NEXT after INCOMPLETE.
input_event, event = (NEXT, event)
first = False
yield input_event, event
def _decode_received(self, msg):
"""Returns either bytes or str, depending on message type."""
if not isinstance(msg, six.binary_type):
# already decoded - do nothing
return msg
# only decode from utf-8 if message is not binary data
type = six.byte2int(msg[0:1])
if type >= 48: # no binary
return msg.decode('utf-8')
# binary message, don't try to decode
return msg
def decode(self, encoded_packet):
"""Decode a transmitted package."""
b64 = False
if not isinstance(encoded_packet, binary_types):
encoded_packet = encoded_packet.encode('utf-8')
elif not isinstance(encoded_packet, bytes):
encoded_packet = bytes(encoded_packet)
self.packet_type = six.byte2int(encoded_packet[0:1])
if self.packet_type == 98: # 'b' --> binary base64 encoded packet
self.binary = True
encoded_packet = encoded_packet[1:]
self.packet_type = six.byte2int(encoded_packet[0:1])
self.packet_type -= 48
b64 = True
elif self.packet_type >= 48:
self.packet_type -= 48
self.binary = False
else:
self.binary = True
self.data = None
if len(encoded_packet) > 1:
if self.binary:
if b64:
self.data = base64.b64decode(encoded_packet[1:])
else:
self.data = encoded_packet[1:]
else:
try:
self.data = self.json.loads(
encoded_packet[1:].decode('utf-8'))
except ValueError:
self.data = encoded_packet[1:].decode('utf-8')
def extract_message(cls, raw_bytes):
if len(raw_bytes) < 2:
return None, raw_bytes
if six.byte2int(raw_bytes) != 0x1e:
raise FramingError(
'Start marker is missing: %s' % raw_bytes)
if b'\x0a' in raw_bytes:
b_msg, rest = raw_bytes.split(b'\x0a', 1)
return b_msg[1:], rest
else:
if b'\x1e' in raw_bytes[1:]:
raise FramingError(
'End marker is missing: %s' % raw_bytes)
return None, raw_bytes
def extract_message(cls, raw_bytes):
if len(raw_bytes) < 2:
return None, raw_bytes
if six.byte2int(raw_bytes) != 123:
raise FramingError(
'Broken state. Expected JSON Object, got: %s' % raw_bytes)
stack = [123]
uniesc = 0
poppers = {91: [93], 123: [125], 34: [34]}
adders = {91: [34, 91, 123], 123: [34, 91, 123], 34: [92], 92: [117]}
for idx in range(1, len(raw_bytes)):
cbyte = six.indexbytes(raw_bytes, idx)
if cbyte in poppers.get(stack[-1], []):
stack.pop()
elif cbyte in adders.get(stack[-1], []):
stack.append(cbyte)
elif stack[-1] == 92:
stack.pop()
elif stack[-1] == 117:
uniesc += 1
if uniesc >= 4:
stack = stack[:-2]
uniesc = 0
if not stack:
return raw_bytes[:idx + 1], raw_bytes[idx + 1:]
return None, raw_bytes
def extract_message(cls, raw_bytes):
if len(raw_bytes) < 2:
return None, raw_bytes
if six.byte2int(raw_bytes) != 0x1e:
raise FramingError(
'Start marker is missing: %s' % raw_bytes)
if b'\x0a' in raw_bytes:
b_msg, rest = raw_bytes.split(b'\x0a', 1)
return b_msg[1:], rest
else:
if b'\x1e' in raw_bytes[1:]:
raise FramingError(
'End marker is missing: %s' % raw_bytes)
return None, raw_bytes
def extract_message(cls, raw_bytes):
if len(raw_bytes) < 2:
return None, raw_bytes
if six.byte2int(raw_bytes) != 123:
raise FramingError(
'Broken state. Expected JSON Object, got: %s' % raw_bytes)
stack = [123]
uniesc = 0
poppers = {91: [93], 123: [125], 34: [34]}
adders = {91: [34, 91, 123], 123: [34, 91, 123], 34: [92], 92: [117]}
for idx in range(1, len(raw_bytes)):
cbyte = six.indexbytes(raw_bytes, idx)
if cbyte in poppers.get(stack[-1], []):
stack.pop()
elif cbyte in adders.get(stack[-1], []):
stack.append(cbyte)
elif stack[-1] == 92:
stack.pop()
elif stack[-1] == 117:
uniesc += 1
if uniesc >= 4:
stack = stack[:-2]
uniesc = 0
if not stack:
return raw_bytes[:idx + 1], raw_bytes[idx + 1:]
return None, raw_bytes
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def _get_close_args(self, data):
if data and len(data) >= 2:
code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def byte2int(x):
try:
return x[0]
except TypeError:
return x
def _decode_cardnr(number):
"""Decodes a 4 byte cardnr to an int. SI-Card numbering is a bit odd:
SI-Card 5:
- byte 0: always 0 (not stored on the card)
- byte 1: card series (stored on the card as CNS)
- byte 2,3: card number
- printed: 100'000*CNS + card number
- nr range: 1-65'000 + 200'001-265'000 + 300'001-365'000 + 400'001-465'000
SI-Card 6/6*/8/9/10/11/pCard/tCard/fCard/SIAC1:
- byte 0: card series (SI6:00, SI9:01, SI8:02, pCard:04, tCard:06, fCard:0E, SI10+SI11+SIAC1:0F)
- byte 1-3: card number
- printed: only card number
- nr range:
- SI6: 500'000-999'999 + 2'003'000-2'003'400 (WM2003) + 16'711'680-16'777'215 (SI6*)
- SI9: 1'000'000-1'999'999, SI8: 2'000'000-2'999'999
- pCard: 4'000'000-4'999'999, tCard: 6'000'000-6'999'999
- SI10: 7'000'000-7'999'999, SIAC1: 8'000'000-8'999'999
- SI11: 9'000'000-9'999'999, fCard: 14'000'000-14'999'999
The card nr ranges guarantee that no ambigous values are possible
(500'000 = 0x07A120 > 0x04FFFF = 465'535 = highest technically possible value on a SI5)
"""
if number[0:1] != b'\x00':
raise SIReaderException('Unknown card series')
nr = SIReader._to_int(number[1:4])
if nr < 500000:
# SI5 card
ret = SIReader._to_int(number[2:4])
if byte2int(number[1]) < 2:
# Card series 0 and 1 do not have the 0/1 printed on the card
return ret
else:
return byte2int(number[1]) * 100000 + ret
else:
# SI6/8/9
return nr