def poll_punch(self):
"""Polls for new punches.
@return: list of (cardnr, punchtime) tuples, empty list if no new punches are available
"""
if not self.proto_config['ext_proto']:
raise SIReaderException('This command only supports stations in "Extended Protocol" '
'mode. Switch mode first')
if not self.proto_config['auto_send']:
raise SIReaderException('This command only supports stations in "Autosend" '
'mode. Switch mode first')
punches = []
while True:
try:
c = self._read_command(timeout=0)
except SIReaderTimeout:
break
if c[0] == SIReader.C_TRANS_REC:
cur_offset = SIReader._to_int(c[1][SIReader.T_OFFSET:SIReader.T_OFFSET + 3])
if self._next_offset is not None:
while self._next_offset < cur_offset:
# recover lost punches
punches.append(self._read_punch(self._next_offset))
self._next_offset += SIReader.REC_LEN
self._next_offset = cur_offset + SIReader.REC_LEN
punches.append((self._decode_cardnr(c[1][SIReader.T_CN:SIReader.T_CN + 4]),
self._decode_time(c[1][SIReader.T_TIME:SIReader.T_TIME + 2])))
else:
raise SIReaderException('Unexpected command %s received' % hex(byte2int(c[0])))
return punches
python类byte2int()的实例源码
def __init__(self, value=0):
if not isinstance(value, six.integer_types):
value = six.byte2int(value)
self.v = C + norm(abs(int(value)))
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256 * \
six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def decode(self, encoded_packet):
"""Decode a transmitted package."""
b64 = False
if not isinstance(encoded_packet, six.binary_type):
encoded_packet = encoded_packet.encode('utf-8')
self.packet_type = six.byte2int(encoded_packet[0:1])
if self.packet_type == 98: # 'b' --> binary base64 encoded packet
self.binary = True
encoded_packet = encoded_packet[1:]
self.packet_type = six.byte2int(encoded_packet[0:1])
self.packet_type -= 48
b64 = True
elif self.packet_type >= 48:
self.packet_type -= 48
self.binary = False
else:
self.binary = True
self.data = None
if len(encoded_packet) > 1:
if self.binary:
if b64:
self.data = base64.b64decode(encoded_packet[1:])
else:
self.data = encoded_packet[1:]
else:
try:
self.data = self.json.loads(
encoded_packet[1:].decode('utf-8'))
except ValueError:
self.data = encoded_packet[1:].decode('utf-8')
def byte2int(bytes):
return int(bytes[0])
def test_byte2int():
assert six.byte2int(six.b("\x03")) == 3
assert six.byte2int(six.b("\x03\x04")) == 3
py.test.raises(IndexError, six.byte2int, six.b(""))
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256 * \
six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def decode(self, encoded_packet):
"""Decode a transmitted package."""
b64 = False
if not isinstance(encoded_packet, six.binary_type):
encoded_packet = encoded_packet.encode('utf-8')
self.packet_type = six.byte2int(encoded_packet[0:1])
if self.packet_type == 98: # 'b' --> binary base64 encoded packet
self.binary = True
encoded_packet = encoded_packet[1:]
self.packet_type = six.byte2int(encoded_packet[0:1])
self.packet_type -= 48
b64 = True
elif self.packet_type >= 48:
self.packet_type -= 48
self.binary = False
else:
self.binary = True
self.data = None
if len(encoded_packet) > 1:
if self.binary:
if b64:
self.data = base64.b64decode(encoded_packet[1:])
else:
self.data = encoded_packet[1:]
else:
try:
self.data = self.json.loads(
encoded_packet[1:].decode('utf-8'))
except ValueError:
self.data = encoded_packet[1:].decode('utf-8')
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def __init__(self, value=0):
if not isinstance(value, six.integer_types):
value = six.byte2int(value)
self.v = C + norm(abs(int(value)))
def __init__(self, value=0):
if not isinstance(value, six.integer_types):
value = six.byte2int(value)
self.v = C + norm(abs(int(value)))
def test_byte2int():
assert six.byte2int(six.b("\x03")) == 3
assert six.byte2int(six.b("\x03\x04")) == 3
py.test.raises(IndexError, six.byte2int, six.b(""))
def decode(self, encoded_packet):
"""Decode a transmitted package."""
b64 = False
self.packet_type = six.byte2int(encoded_packet[0:1])
if self.packet_type == 98: # 'b' --> binary base64 encoded packet
self.binary = True
encoded_packet = encoded_packet[1:]
self.packet_type = six.byte2int(encoded_packet[0:1])
self.packet_type -= 48
b64 = True
elif self.packet_type >= 48:
self.packet_type -= 48
self.binary = False
else:
self.binary = True
self.data = None
if len(encoded_packet) > 1:
if self.binary:
if b64:
self.data = base64.b64decode(encoded_packet[1:])
else:
self.data = encoded_packet[1:]
else:
try:
self.data = self.json.loads(
encoded_packet[1:].decode('utf-8'))
except ValueError:
self.data = encoded_packet[1:].decode('utf-8')
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def test_byte2int():
assert six.byte2int(six.b("\x03")) == 3
assert six.byte2int(six.b("\x03\x04")) == 3
py.test.raises(IndexError, six.byte2int, six.b(""))
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]
def validate(self, skip_utf8_validation=False):
"""
validate the ABNF frame.
skip_utf8_validation: skip utf8 validation.
"""
if self.rsv1 or self.rsv2 or self.rsv3:
raise WebSocketProtocolException("rsv is not implemented, yet")
if self.opcode not in ABNF.OPCODES:
raise WebSocketProtocolException("Invalid opcode %r", self.opcode)
if self.opcode == ABNF.OPCODE_PING and not self.fin:
raise WebSocketProtocolException("Invalid ping frame.")
if self.opcode == ABNF.OPCODE_CLOSE:
l = len(self.data)
if not l:
return
if l == 1 or l >= 126:
raise WebSocketProtocolException("Invalid close frame.")
if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]):
raise WebSocketProtocolException("Invalid close frame.")
code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2])
if not self._is_valid_close_status(code):
raise WebSocketProtocolException("Invalid close opcode.")
def _get_close_args(self, data):
""" this functions extracts the code, reason from the close body
if they exists, and if the self.on_close except three arguments """
import inspect
# if the on_close callback is "old", just return empty list
if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3:
return []
if data and len(data) >= 2:
code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2])
reason = data[2:].decode('utf-8')
return [code, reason]
return [None, None]