def read_word_data(self, addr, cmd):
"""Read a word (2 bytes) from the specified cmd register of the device.
Note that this will interpret data using the endianness of the processor
running Python (typically little endian)!
"""
assert self._device is not None, 'Bus must be opened before operations are made against it!'
# Build ctypes values to marshall between ioctl and Python.
reg = c_uint8(cmd)
result = c_uint16()
# Build ioctl request.
request = make_i2c_rdwr_data([
(addr, 0, 1, pointer(reg)), # Write cmd register.
(addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes).
])
# Make ioctl call and return result data.
ioctl(self._device.fileno(), I2C_RDWR, request)
return result.value
python类ioctl()的实例源码
def process_call(self, addr, cmd, val):
"""Perform a smbus process call by writing a word (2 byte) value to
the specified register of the device, and then reading a word of response
data (which is returned).
"""
assert self._device is not None, 'Bus must be opened before operations are made against it!'
# Build ctypes values to marshall between ioctl and Python.
data = create_string_buffer(struct.pack('=BH', cmd, val))
result = c_uint16()
# Build ioctl request.
request = make_i2c_rdwr_data([
(addr, 0, 3, cast(pointer(data), POINTER(c_uint8))), # Write data.
(addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes).
])
# Make ioctl call and return result data.
ioctl(self._device.fileno(), I2C_RDWR, request)
# Note the python-smbus code appears to have a rather serious bug and
# does not return the result value! This is fixed below by returning it.
return result.value
def get_iphostname():
'''??linux?????????IP??'''
def get_ip(ifname):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ipaddr = socket.inet_ntoa(fcntl.ioctl(
sock.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24]
)
sock.close()
return ipaddr
try:
ip = get_ip('eth0')
except IOError:
ip = get_ip('eno1')
hostname = socket.gethostname()
return {'hostname': hostname, 'ip':ip}
def read_i2c_block_data(self, addr, cmd, length=32):
"""Perform a read from the specified cmd register of device. Length number
of bytes (default of 32) will be read and returned as a bytearray.
"""
assert self._device is not None, 'Bus must be opened before operations are made against it!'
# Build ctypes values to marshall between ioctl and Python.
reg = c_uint8(cmd)
result = create_string_buffer(length)
# Build ioctl request.
request = make_i2c_rdwr_data([
(addr, 0, 1, pointer(reg)), # Write cmd register.
(addr, I2C_M_RD, length, cast(result, POINTER(c_uint8))) # Read data.
])
# Make ioctl call and return result data.
ioctl(self._device.fileno(), I2C_RDWR, request)
return bytearray(result.raw) # Use .raw instead of .value which will stop at a null byte!
def get_iphostname():
'''??linux?????????IP??'''
def get_ip(ifname):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ipaddr = socket.inet_ntoa(fcntl.ioctl(
sock.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24]
)
sock.close()
return ipaddr
try:
ip = get_ip('eth0')
except IOError:
ip = get_ip('eno1')
hostname = socket.gethostname()
return {'hostname': hostname, 'ip':ip}
def get_terminal_size():
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
import struct
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
except:
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.env['LINES'], os.env['COLUMNS'])
except:
cr = (25, 80)
return int(cr[1]), int(cr[0])
def check_vmx_pt():
from fcntl import ioctl
KVMIO = 0xAE
KVM_VMX_PT_SUPPORTED = KVMIO << (8) | 0xe4
try:
fd = open("/dev/kvm", "wb")
except:
print('\033[91m' + error_prefix + "KVM is not loaded!" + '\033[0m')
return False
try:
ret = ioctl(fd, KVM_VMX_PT_SUPPORTED, 0)
except IOError:
print('\033[91m' + error_prefix + "VMX_PT is not loaded!" + '\033[0m')
return False
fd.close()
if ret == 0:
print('\033[91m' + error_prefix + "Intel PT is not supported on this CPU!" + '\033[0m')
return False
return True
def getipaddr(self, ifname='eth0'):
import socket
import struct
ret = '127.0.0.1'
try:
ret = socket.gethostbyname(socket.getfqdn(socket.gethostname()))
except:
pass
if ret == '127.0.0.1':
try:
import fcntl
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ret = socket.inet_ntoa(fcntl.ioctl(s.fileno(), 0x8915, struct.pack('256s', ifname[:15]))[20:24])
except:
pass
return ret
def terminal_size():
"""Get the width and height of the terminal.
http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
http://stackoverflow.com/questions/17993814/why-the-irrelevant-code-made-a-difference
:return: Width (number of characters) and height (number of lines) of the terminal.
:rtype: tuple
"""
if hasattr(ctypes, 'windll'):
# Only works on Microsoft Windows platforms.
string_buffer = ctypes.create_string_buffer(22) # To be written to by GetConsoleScreenBufferInfo.
ctypes.windll.kernel32.GetConsoleScreenBufferInfo(ctypes.windll.kernel32.GetStdHandle(-11), string_buffer)
left, top, right, bottom = struct.unpack('hhhhHhhhhhh', string_buffer.raw)[5:-2]
width, height = right - left, bottom - top
if width < 1 or height < 1:
return DEFAULT_WIDTH, DEFAULT_HEIGHT
return width, height
try:
device = fcntl.ioctl(0, termios.TIOCGWINSZ, '\0' * 8)
except IOError:
return DEFAULT_WIDTH, DEFAULT_HEIGHT
height, width = struct.unpack('hhhh', device)[:2]
return width, height
def read_word_data(self, addr, cmd):
"""Read a word (2 bytes) from the specified cmd register of the device.
Note that this will interpret data using the endianness of the processor
running Python (typically little endian)!
"""
assert self._device is not None, 'Bus must be opened before operations are made against it!'
# Build ctypes values to marshall between ioctl and Python.
reg = c_uint8(cmd)
result = c_uint16()
# Build ioctl request.
request = make_i2c_rdwr_data([
(addr, 0, 1, pointer(reg)), # Write cmd register.
(addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes).
])
# Make ioctl call and return result data.
ioctl(self._device.fileno(), I2C_RDWR, request)
return result.value
def read_i2c_block_data(self, addr, cmd, length=32):
"""Perform a read from the specified cmd register of device. Length number
of bytes (default of 32) will be read and returned as a bytearray.
"""
assert self._device is not None, 'Bus must be opened before operations are made against it!'
# Build ctypes values to marshall between ioctl and Python.
reg = c_uint8(cmd)
result = create_string_buffer(length)
# Build ioctl request.
request = make_i2c_rdwr_data([
(addr, 0, 1, pointer(reg)), # Write cmd register.
(addr, I2C_M_RD, length, cast(result, POINTER(c_uint8))) # Read data.
])
# Make ioctl call and return result data.
ioctl(self._device.fileno(), I2C_RDWR, request)
return bytearray(result.raw) # Use .raw instead of .value which will stop at a null byte!
def process_call(self, addr, cmd, val):
"""Perform a smbus process call by writing a word (2 byte) value to
the specified register of the device, and then reading a word of response
data (which is returned).
"""
assert self._device is not None, 'Bus must be opened before operations are made against it!'
# Build ctypes values to marshall between ioctl and Python.
data = create_string_buffer(struct.pack('=BH', cmd, val))
result = c_uint16()
# Build ioctl request.
request = make_i2c_rdwr_data([
(addr, 0, 3, cast(pointer(data), POINTER(c_uint8))), # Write data.
(addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes).
])
# Make ioctl call and return result data.
ioctl(self._device.fileno(), I2C_RDWR, request)
# Note the python-smbus code appears to have a rather serious bug and
# does not return the result value! This is fixed below by returning it.
return result.value
def _printProgessBar(self, f, startTime):
diff = time.time() - startTime
total = f.total
try:
winSize = struct.unpack('4H',
fcntl.ioctl(0, tty.TIOCGWINSZ, '12345679'))
except IOError:
winSize = [None, 80]
speed = total/diff
if speed:
timeLeft = (f.size - total) / speed
else:
timeLeft = 0
front = f.name
back = '%3i%% %s %sps %s ' % ((total/f.size)*100, self._abbrevSize(total),
self._abbrevSize(total/diff), self._abbrevTime(timeLeft))
spaces = (winSize[1] - (len(front) + len(back) + 1)) * ' '
self.transport.write('\r%s%s%s' % (front, spaces, back))
def getFPVersion():
ret = None
try:
if getBrandOEM() == "blackbox":
file = open("/proc/stb/info/micomver", "r")
ret = file.readline().strip()
file.close()
elif getBoxType() in ('dm7080','dm820','dm520','dm525','dm900'):
ret = open("/proc/stb/fp/version", "r").read()
else:
ret = long(open("/proc/stb/fp/version", "r").read())
except IOError:
try:
fp = open("/dev/dbox/fp0")
ret = ioctl(fp.fileno(),0)
except IOError:
try:
ret = open("/sys/firmware/devicetree/base/bolt/tag", "r").read().rstrip("\0")
except:
print "getFPVersion failed!"
return ret
def get(self):
arg = DrmModeObjGetPropertiesC()
arg.obj_id = self.obj_id
arg.obj_type = self.obj_type
fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg)
prop_ids = (ctypes.c_uint32*arg.count_props)()
arg.props_ptr = ctypes.cast(ctypes.pointer(prop_ids), ctypes.c_void_p).value
prop_values = (ctypes.c_uint64*arg.count_props)()
arg.prop_values_ptr = ctypes.cast(ctypes.pointer(prop_values), ctypes.c_void_p).value
fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_OBJ_GETPROPERTIES, arg)
self._arg = arg
vals = [v for i, v in zip(prop_ids, prop_values) if i == self.id]
return vals[0]
def fetch(self):
arg = DrmModeCrtcC()
arg.crtc_id = self.id
fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_GETCRTC, arg)
self._arg = arg
if arg.fb_id:
self.fb = self._drm.get_framebuffer(arg.fb_id)
else:
self.fb = None
self.x = int(arg.x)
self.y = int(arg.y)
self.gamma_size = int(arg.gamma_size)
self.mode_valid = bool(arg.mode_valid)
if self.mode_valid:
self.mode = DrmMode(arg.mode)
self.width = self.mode.hdisplay
self.height = self.mode.vdisplay
else:
self.mode = None
self.width = 0
self.height = 0
def set(self, fb, x, y, mode, *conns):
arg = DrmModeCrtcC()
arg.crtc_id = self.id
arg.fb_id = fb.id
arg.x = x
arg.y = y
arg.mode_valid = 1
arg.mode = mode._arg
connector_ids = (ctypes.c_uint32 * len(conns))(*[conn.id for conn in conns])
arg.set_connectors_ptr = ctypes.cast(ctypes.pointer(connector_ids), ctypes.c_void_p).value
arg.count_connectors = len(conns)
fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_SETCRTC, arg)
self.fetch()
def __init__(self, drm):
self._drm = drm
arg = DrmVersionC()
arg.name = create_string_buffer(256)
arg.name_len = 255
arg.date = create_string_buffer(256)
arg.date_len = 255
arg.desc = create_string_buffer(256)
arg.desc_len = 255
fcntl.ioctl(self._drm.fd, DRM_IOCTL_VERSION, arg)
self._arg = arg
self.major = int(arg.major)
self.minor = int(arg.minor)
self.patchlevel = int(arg.patchlevel)
self.name = str(arg.name[:arg.name_len])
self.date = str(arg.date[:arg.date_len])
self.desc = str(arg.desc[:arg.desc_len])
def prime_import(cls, drm, fd, format_, width, height):
arg = DrmPrimeHandleC()
arg.fd = fd
fcntl.ioctl(drm.fd, DRM_IOCTL_PRIME_FD_TO_HANDLE, arg)
self = cls(drm, format_, width, height)
self._arg = arg
self.fd = fd
self.id = int(arg.handle)
self.pitch = self.format.cpp[0] * width
self.len = height * self.pitch
self.handles[0] = self.id
self.pitches[0] = self.pitch
self.offsets[0] = 0
if self.format.planes > 1:
raise NotImplementedError("support for format %s is not implemented yet\n" % self.format.name)
return self
def __init__(self, drm, format_, width, height):
super(DrmDumbBuffer, self).__init__(drm, format_, width, height)
arg = DrmModeCreateDumbC()
arg.bpp = self.format.cpp[0] * 8;
arg.width = self.width;
arg.height = self.height;
fcntl.ioctl(self._drm.fd, DRM_IOCTL_MODE_CREATE_DUMB, arg)
#self.handle = arg.handle
self.id = int(arg.handle)
self.len = int(arg.size)
self.pitch = int(arg.pitch)
self._arg = arg
self.handles[0] = self.id
self.pitches[0] = self.pitch
self.offsets[0] = 0
if self.format.planes > 1:
raise NotImplementedError("support for format %s is not implemented yet\n" % self.format.name)
def read_i2c_block_data(self, i2c_addr, register, length):
# type: (int, int, int) -> list
"""
Read a block of byte data from a given register
:rtype: list
:param i2c_addr: i2c address
:param register: Start register
:param length: Desired block length
:return: List of bytes
"""
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Desired block length over %d bytes" % I2C_SMBUS_BLOCK_MAX)
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_READ, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.byte = length
ioctl(self.fd, I2C_SMBUS, msg)
return msg.data.contents.block[1:length+1]
def write_i2c_block_data(self, i2c_addr, register, data):
# type: (int, int, list) -> None
"""
Write a block of byte data to a given register
:param i2c_addr: i2c address
:param register: Start register
:param data: List of bytes
"""
length = len(data)
if length > I2C_SMBUS_BLOCK_MAX:
raise ValueError("Data length cannot exceed %d bytes" % I2C_SMBUS_BLOCK_MAX)
self._set_address(i2c_addr)
msg = i2c_smbus_ioctl_data.create(
read_write=I2C_SMBUS_WRITE, command=register, size=I2C_SMBUS_I2C_BLOCK_DATA
)
msg.data.contents.block[0] = length
msg.data.contents.block[1:length + 1] = data
ioctl(self.fd, I2C_SMBUS, msg)
def _set_rs485_mode(self, rs485_settings):
buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
try:
fcntl.ioctl(self.fd, TIOCGRS485, buf)
if rs485_settings is not None:
if rs485_settings.loopback:
buf[0] |= SER_RS485_RX_DURING_TX
else:
buf[0] &= ~SER_RS485_RX_DURING_TX
if rs485_settings.rts_level_for_tx:
buf[0] |= SER_RS485_RTS_ON_SEND
else:
buf[0] &= ~SER_RS485_RTS_ON_SEND
if rs485_settings.rts_level_for_rx:
buf[0] |= SER_RS485_RTS_AFTER_SEND
else:
buf[0] &= ~SER_RS485_RTS_AFTER_SEND
buf[1] = int(rs485_settings.delay_rts_before_send * 1000)
buf[2] = int(rs485_settings.delay_rts_after_send * 1000)
else:
buf[0] = 0 # clear SER_RS485_ENABLED
fcntl.ioctl(self.fd, TIOCSRS485, buf)
except IOError as e:
raise ValueError('Failed to set RS485 mode: %s' % (e,))
def _get_terminal_size():
def ioctl_GWINSZ(fd):
try:
import fcntl, termi_os, struct
cr = struct.unpack('hh', fcntl.ioctl(fd, termi_os.TIOCGWINSZ,'1234'))
except:
return
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = _os.open(_os.ctermid(), _os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
_os.cl_ose(fd)
except:
pass
if not cr: cr = (_os.environ.get('LINES', 25), _os.environ.get('COLUMNS', 80))
return list(map(int, cr))
def make_uinput():
import fcntl, struct
# Requires uinput driver, but it's usually available.
uinput = open("/dev/uinput", 'wb')
UI_SET_EVBIT = 0x40045564
fcntl.ioctl(uinput, UI_SET_EVBIT, EV_KEY)
UI_SET_KEYBIT = 0x40045565
for i in range(256):
fcntl.ioctl(uinput, UI_SET_KEYBIT, i)
BUS_USB = 0x03
uinput_user_dev = "80sHHHHi64i64i64i64i"
axis = [0] * 64 * 4
uinput.write(struct.pack(uinput_user_dev, b"Virtual Keyboard", BUS_USB, 1, 1, 1, 0, *axis))
uinput.flush() # Without this you may get Errno 22: Invalid argument.
UI_DEV_CREATE = 0x5501
fcntl.ioctl(uinput, UI_DEV_CREATE)
UI_DEV_DESTROY = 0x5502
#fcntl.ioctl(uinput, UI_DEV_DESTROY)
return uinput
def _get_terminal_size_linux():
def ioctl_GWINSZ(fd):
try:
import fcntl
import termios
cr = struct.unpack('hh',
fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
return cr
except:
pass
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.environ['LINES'], os.environ['COLUMNS'])
except:
return None
return int(cr[1]), int(cr[0])
def getpending(self):
e = Event('key', '', '')
while not self.event_queue.empty():
e2 = self.event_queue.get()
e.data += e2.data
e.raw += e.raw
amount = struct.unpack(
"i", ioctl(self.input_fd, FIONREAD, "\0\0\0\0"))[0]
data = os.read(self.input_fd, amount)
raw = unicode(data, self.encoding, 'replace')
#XXX: something is wrong here
e.data += raw
e.raw += raw
return e
def GetHelpWidth():
"""Returns: an integer, the width of help lines that is used in TextWrap."""
if (not sys.stdout.isatty()) or (termios is None) or (fcntl is None):
return _help_width
try:
data = fcntl.ioctl(sys.stdout, termios.TIOCGWINSZ, '1234')
columns = struct.unpack('hh', data)[1]
# Emacs mode returns 0.
# Here we assume that any value below 40 is unreasonable
if columns >= 40:
return columns
# Returning an int as default is fine, int(int) just return the int.
return int(os.getenv('COLUMNS', _help_width))
except (TypeError, IOError, struct.error):
return _help_width
def _set_rs485_mode(self, rs485_settings):
buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
try:
fcntl.ioctl(self.fd, TIOCGRS485, buf)
if rs485_settings is not None:
if rs485_settings.loopback:
buf[0] |= SER_RS485_RX_DURING_TX
else:
buf[0] &= ~SER_RS485_RX_DURING_TX
if rs485_settings.rts_level_for_tx:
buf[0] |= SER_RS485_RTS_ON_SEND
else:
buf[0] &= ~SER_RS485_RTS_ON_SEND
if rs485_settings.rts_level_for_rx:
buf[0] |= SER_RS485_RTS_AFTER_SEND
else:
buf[0] &= ~SER_RS485_RTS_AFTER_SEND
buf[1] = int(rs485_settings.delay_rts_before_send * 1000)
buf[2] = int(rs485_settings.delay_rts_after_send * 1000)
else:
buf[0] = 0 # clear SER_RS485_ENABLED
fcntl.ioctl(self.fd, TIOCSRS485, buf)
except IOError as e:
raise ValueError('Failed to set RS485 mode: %s' % (e,))
def get_terminal_size(fallback=(80, 24)):
"""
Return tuple containing columns and rows of controlling terminal, trying harder
than shutil.get_terminal_size to find a tty before returning fallback.
Theoretically, stdout, stderr, and stdin could all be different ttys that could
cause us to get the wrong measurements (instead of using the fallback) but the much more
common case is that IO is piped.
"""
for stream in [sys.__stdout__, sys.__stderr__, sys.__stdin__]:
try:
# Make WINSIZE call to terminal
data = fcntl.ioctl(stream.fileno(), TIOCGWINSZ, b"\x00\x00\00\x00")
except (IOError, OSError):
pass
else:
# Unpack two shorts from ioctl call
lines, columns = struct.unpack("hh", data)
break
else:
columns, lines = fallback
return columns, lines