def encode(self, input, final=False):
state = self.state
encoded = []
for c in input.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: %r' % c)
self.state = state
return serial.to_bytes(encoded)
python类to_bytes()的实例源码
def encode(self, input, final=False):
state = self.state
encoded = []
for c in input.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: %r' % c)
self.state = state
return serial.to_bytes(encoded)
def encode(self, data, final=False):
"""\
Incremental encode, keep track of digits and emit a byte when a pair
of hex digits is found. The space is optional unless the error
handling is defined to be 'strict'.
"""
state = self.state
encoded = []
for c in data.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: {!r}'.format(c))
self.state = state
return serial.to_bytes(encoded)
def encode(self, data, final=False):
"""\
Incremental encode, keep track of digits and emit a byte when a pair
of hex digits is found. The space is optional unless the error
handling is defined to be 'strict'.
"""
state = self.state
encoded = []
for c in data.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: {!r}'.format(c))
self.state = state
return serial.to_bytes(encoded)
def encode(self, input, final=False):
state = self.state
encoded = []
for c in input.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: %r' % c)
self.state = state
return serial.to_bytes(encoded)
def encode(self, data, final=False):
"""\
Incremental encode, keep track of digits and emit a byte when a pair
of hex digits is found. The space is optional unless the error
handling is defined to be 'strict'.
"""
state = self.state
encoded = []
for c in data.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: {!r}'.format(c))
self.state = state
return serial.to_bytes(encoded)
def encode(self, data, final=False):
"""\
Incremental encode, keep track of digits and emit a byte when a pair
of hex digits is found. The space is optional unless the error
handling is defined to be 'strict'.
"""
state = self.state
encoded = []
for c in data.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: {!r}'.format(c))
self.state = state
return serial.to_bytes(encoded)
def handle_socket_read(self):
"""Read from socket"""
try:
# read a chunk from the serial port
data = self.socket.recv(1024)
if data:
# Process RFC 2217 stuff when enabled
if self.rfc2217:
data = serial.to_bytes(self.rfc2217.filter(data))
# add data to buffer
self.buffer_net2ser += data
else:
# empty read indicates disconnection
self.handle_disconnect()
except socket.error:
self.handle_socket_error()
def encode(self, data, final=False):
"""\
Incremental encode, keep track of digits and emit a byte when a pair
of hex digits is found. The space is optional unless the error
handling is defined to be 'strict'.
"""
state = self.state
encoded = []
for c in data.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: {!r}'.format(c))
self.state = state
return serial.to_bytes(encoded)
def encode(self, data, final=False):
"""\
Incremental encode, keep track of digits and emit a byte when a pair
of hex digits is found. The space is optional unless the error
handling is defined to be 'strict'.
"""
state = self.state
encoded = []
for c in data.upper():
if c in HEXDIGITS:
z = HEXDIGITS.index(c)
if state:
encoded.append(z + (state & 0xf0))
state = 0
else:
state = 0x100 + (z << 4)
elif c == ' ': # allow spaces to separate values
if state and self.errors == 'strict':
raise UnicodeError('odd number of hex digits')
state = 0
else:
if self.errors == 'strict':
raise UnicodeError('non-hex digit found: {!r}'.format(c))
self.state = state
return serial.to_bytes(encoded)
def reader(self):
"""loop forever and copy serial->socket"""
self.log.debug('reader thread started')
while self.alive:
try:
data = self.serial.read(1) # read one, blocking
n = self.serial.inWaiting() # look if there is more
if n:
data = data + self.serial.read(n) # and get as much as possible
if data:
# escape outgoing data when needed (Telnet IAC (0xff) character)
data = serial.to_bytes(self.rfc2217.escape(data))
self._write_lock.acquire()
try:
self.socket.sendall(data) # send it over TCP
finally:
self._write_lock.release()
except socket.error, msg:
self.log.error('%s' % (msg,))
# probably got disconnected
break
self.alive = False
self.log.debug('reader thread terminated')
def handle_socket_read(self):
"""Read from socket"""
try:
# read a chunk from the serial port
data = self.socket.recv(1024)
if data:
# Process RFC 2217 stuff when enabled
if self.rfc2217:
data = serial.to_bytes(self.rfc2217.filter(data))
# add data to buffer
self.buffer_net2ser += data
else:
# empty read indicates disconnection
self.handle_disconnect()
except socket.error:
self.handle_socket_error()
def hex_encode(input, errors='strict'):
return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
def encode(self, input, errors='strict'):
return serial.to_bytes([int(h, 16) for h in input.split()])
def hex_encode(input, errors='strict'):
return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
def encode(self, input, errors='strict'):
return serial.to_bytes([int(h, 16) for h in input.split()])
def hex_encode(data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
def encode(self, data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return serial.to_bytes([int(h, 16) for h in data.split()])
def hex_encode(data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
def encode(self, data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return serial.to_bytes([int(h, 16) for h in data.split()])
def hex_encode(input, errors='strict'):
return (serial.to_bytes([int(h, 16) for h in input.split()]), len(input))
def encode(self, input, errors='strict'):
return serial.to_bytes([int(h, 16) for h in input.split()])
def hex_encode(data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
def encode(self, data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return serial.to_bytes([int(h, 16) for h in data.split()])
def hex_encode(data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return (serial.to_bytes([int(h, 16) for h in data.split()]), len(data))
def encode(self, data, errors='strict'):
"""'40 41 42' -> b'@ab'"""
return serial.to_bytes([int(h, 16) for h in data.split()])
def reader(self):
"""loop forever and copy serial->socket"""
self.log.debug('reader thread started')
while self.alive:
try:
data = self.serial.read(self.serial.in_waiting or 1)
if data:
# escape outgoing data when needed (Telnet IAC (0xff) character)
self.write(serial.to_bytes(self.rfc2217.escape(data)))
except socket.error as msg:
self.log.error('{}'.format(msg))
# probably got disconnected
break
self.alive = False
self.log.debug('reader thread terminated')
def writer(self):
"""loop forever and copy socket->serial"""
while self.alive:
try:
data = self.socket.recv(1024)
if not data:
break
self.serial.write(serial.to_bytes(self.rfc2217.filter(data)))
except socket.error as msg:
self.log.error('{}'.format(msg))
# probably got disconnected
break
self.stop()
def handle_serial_read(self):
"""Reading from serial port"""
try:
data = os.read(self.serial.fileno(), 1024)
if data:
# store data in buffer if there is a client connected
if self.socket is not None:
# escape outgoing data when needed (Telnet IAC (0xff) character)
if self.rfc2217:
data = serial.to_bytes(self.rfc2217.escape(data))
self.buffer_ser2net += data
else:
self.handle_serial_error()
except Exception as msg:
self.handle_serial_error(msg)
def test_to_bytes(self):
self.assertEqual(serial.to_bytes([1, 2, 3]), b'\x01\x02\x03')
self.assertEqual(serial.to_bytes(b'\x01\x02\x03'), b'\x01\x02\x03')
self.assertEqual(serial.to_bytes(bytearray([1,2,3])), b'\x01\x02\x03')
# unicode is not supported test. use decode() instead of u'' syntax to be
# compatible to Python 3.x < 3.4
self.assertRaises(TypeError, serial.to_bytes, b'hello'.decode('utf-8'))