def test_add_signal_handler_install_error(self, m_signal):
m_signal.NSIG = signal.NSIG
def set_wakeup_fd(fd):
if fd == -1:
raise ValueError()
m_signal.set_wakeup_fd = set_wakeup_fd
class Err(OSError):
errno = errno.EFAULT
m_signal.signal.side_effect = Err
self.assertRaises(
Err,
self.loop.add_signal_handler,
signal.SIGINT, lambda: True)
python类EFAULT的实例源码
def test_bind_by_addr(self, llc, raw, ldl, dlc):
llc.bind(raw, 16)
assert llc.getsockname(raw) == 16
for i, sock in enumerate([ldl, dlc]):
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(sock, 16)
assert excinfo.value.errno == errno.EACCES
llc.bind(ldl, 63)
assert llc.getsockname(ldl) == 63
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(dlc, 63)
assert excinfo.value.errno == errno.EADDRINUSE
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(dlc, 64)
assert excinfo.value.errno == errno.EFAULT
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(dlc, -1)
assert excinfo.value.errno == errno.EFAULT
llc.bind(dlc, 62)
assert llc.getsockname(dlc) == 62
def test_bind_by_name(self, llc, raw, ldl, dlc):
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(dlc, 'urn:nfc:snep')
assert excinfo.value.errno == errno.EFAULT
llc.bind(dlc, 'urn:nfc:sn:snep')
assert llc.getsockname(dlc) == 4
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(ldl, 'urn:nfc:sn:snep')
assert excinfo.value.errno == errno.EADDRINUSE
llc.bind(ldl, 'urn:nfc:xsn:nfcpy.org:service')
assert llc.getsockname(ldl) == 16
for sap in range(17, 32):
sock = llc.socket(nfc.llcp.llc.RAW_ACCESS_POINT)
llc.bind(sock, 'urn:nfc:sn:use_sap-{}'.format(sap))
assert llc.getsockname(sock) == sap
with pytest.raises(nfc.llcp.Error) as excinfo:
llc.bind(raw, 'urn:nfc:sn:sap-32')
assert excinfo.value.errno == errno.EADDRNOTAVAIL
def _bind_by_name(self, socket, name):
if not service_name_format.match(name):
raise err.Error(errno.EFAULT)
with self.lock:
if self.snl.get(name) is not None:
raise err.Error(errno.EADDRINUSE)
addr = wks_map.get(name)
if addr is None:
try:
addr = 16 + self.sap[16:32].index(None)
except ValueError:
raise err.Error(errno.EADDRNOTAVAIL)
socket.bind(addr)
self.sap[addr] = ServiceAccessPoint(addr, self)
self.sap[addr].insert_socket(socket)
self.snl[name] = addr
def test_add_signal_handler_install_error(self, m_signal):
m_signal.NSIG = signal.NSIG
def set_wakeup_fd(fd):
if fd == -1:
raise ValueError()
m_signal.set_wakeup_fd = set_wakeup_fd
class Err(OSError):
errno = errno.EFAULT
m_signal.signal.side_effect = Err
self.assertRaises(
Err,
self.loop.add_signal_handler,
signal.SIGINT, lambda: True)
def test_add_signal_handler_install_error(self, m_signal):
m_signal.NSIG = signal.NSIG
def set_wakeup_fd(fd):
if fd == -1:
raise ValueError()
m_signal.set_wakeup_fd = set_wakeup_fd
class Err(OSError):
errno = errno.EFAULT
m_signal.signal.side_effect = Err
self.assertRaises(
Err,
self.loop.add_signal_handler,
signal.SIGINT, lambda: True)
def sys_read(self, fd, buf, count):
data = ''
if count != 0:
# TODO check count bytes from buf
if not buf in self.current.memory: # or not self.current.memory.isValid(buf+count):
logger.info("READ: buf points to invalid address. Returning EFAULT")
return -errno.EFAULT
try:
# Read the data and put in tin memory
data = self._get_fd(fd).read(count)
except BadFd:
logger.info(("READ: Not valid file descriptor on read."
" Returning EBADF"))
return -errno.EBADF
self.syscall_trace.append(("_read", fd, data))
self.current.write_bytes(buf, data)
return len(data)
def sys_readlink(self, path, buf, bufsize):
'''
Read
:rtype: int
:param path: the "link path id"
:param buf: the buffer where the bytes will be putted.
:param bufsize: the max size for read the link.
:todo: Out eax number of bytes actually sent | EAGAIN | EBADF | EFAULT | EINTR | errno.EINVAL | EIO | ENOSPC | EPIPE
'''
if bufsize <= 0:
return -errno.EINVAL
filename = self.current.read_string(path)
if filename == '/proc/self/exe':
data = os.path.abspath(self.program)
else:
data = os.readlink(filename)[:bufsize]
self.current.write_bytes(buf, data)
return len(data)
def bind(self, socket, addr_or_name=None):
if not isinstance(socket, tco.TransmissionControlObject):
raise err.Error(errno.ENOTSOCK)
if socket.addr is not None:
raise err.Error(errno.EINVAL)
if addr_or_name is None:
self._bind_by_none(socket)
elif isinstance(addr_or_name, int):
self._bind_by_addr(socket, addr_or_name)
elif isinstance(addr_or_name, bytes):
self._bind_by_name(socket, addr_or_name)
else:
raise err.Error(errno.EFAULT)
def _bind_by_addr(self, socket, addr):
if addr < 0 or addr > 63:
raise err.Error(errno.EFAULT)
with self.lock:
if addr in range(32, 64) or isinstance(socket, tco.RawAccessPoint):
if self.sap[addr] is None:
socket.bind(addr)
self.sap[addr] = ServiceAccessPoint(addr, self)
self.sap[addr].insert_socket(socket)
else:
raise err.Error(errno.EADDRINUSE)
else:
raise err.Error(errno.EACCES)
def sys_getcwd(self, buf, size):
'''
getcwd - Get the current working directory
:param int buf: Pointer to dest array
:param size: size in bytes of the array pointed to by the buf
:return: buf (Success), or 0
'''
try:
current_dir = os.getcwd()
length = len(current_dir) + 1
if size > 0 and size < length:
logger.info("GETCWD: size is greater than 0, but is smaller than the length"
"of the path + 1. Returning ERANGE")
return -errno.ERANGE
if not self.current.memory.access_ok(slice(buf, buf+length), 'w'):
logger.info("GETCWD: buf within invalid memory. Returning EFAULT")
return -errno.EFAULT
self.current.write_string(buf, current_dir)
logger.debug("getcwd(0x%08x, %u) -> <%s> (Size %d)", buf, size, current_dir, length)
return length
except OSError as e:
return -e.errno
def test_SocketListener_accept_errors():
class FakeSocket(tsocket.SocketType):
def __init__(self, events):
self._events = iter(events)
type = tsocket.SOCK_STREAM
# Fool the check for SO_ACCEPTCONN in SocketListener.__init__
def getsockopt(self, level, opt):
return True
def setsockopt(self, level, opt, value):
pass
# Fool the check for connection in SocketStream.__init__
def getpeername(self):
pass
async def accept(self):
await _core.checkpoint()
event = next(self._events)
if isinstance(event, BaseException):
raise event
else:
return event, None
fake_server_sock = FakeSocket([])
fake_listen_sock = FakeSocket(
[
OSError(errno.ECONNABORTED, "Connection aborted"),
OSError(errno.EPERM, "Permission denied"),
OSError(errno.EPROTO, "Bad protocol"),
fake_server_sock,
OSError(errno.EMFILE, "Out of file descriptors"),
OSError(errno.EFAULT, "attempt to write to read-only memory"),
OSError(errno.ENOBUFS, "out of buffers"),
fake_server_sock,
]
)
l = SocketListener(fake_listen_sock)
with assert_checkpoints():
s = await l.accept()
assert s.socket is fake_server_sock
for code in [errno.EMFILE, errno.EFAULT, errno.ENOBUFS]:
with assert_checkpoints():
with pytest.raises(OSError) as excinfo:
await l.accept()
assert excinfo.value.errno == code
with assert_checkpoints():
s = await l.accept()
assert s.socket is fake_server_sock
def sys_write(self, fd, buf, count):
''' write - send bytes through a file descriptor
The write system call writes up to count bytes from the buffer pointed
to by buf to the file descriptor fd. If count is zero, write returns 0
and optionally sets *tx_bytes to zero.
:param fd a valid file descriptor
:param buf a memory buffer
:param count number of bytes to send
:return: 0 Success
EBADF fd is not a valid file descriptor or is not open.
EFAULT buf or tx_bytes points to an invalid address.
'''
data = []
cpu = self.current
if count != 0:
try:
write_fd = self._get_fd(fd)
except BadFd:
logger.error("WRITE: Not valid file descriptor. Returning EBADFD %d", fd)
return -errno.EBADF
# TODO check count bytes from buf
if buf not in cpu.memory or buf + count not in cpu.memory:
logger.debug("WRITE: buf points to invalid address. Returning EFAULT")
return -errno.EFAULT
if fd > 2 and write_fd.is_full():
cpu.PC -= cpu.instruction.size
self.wait([], [fd], None)
raise RestartSyscall()
data = cpu.read_bytes(buf, count)
data = self._transform_write_data(data)
write_fd.write(data)
for line in ''.join([str(x) for x in data]).split('\n'):
logger.debug("WRITE(%d, 0x%08x, %d) -> <%.48r>",
fd,
buf,
count,
line)
self.syscall_trace.append(("_write", fd, data))
self.signal_transmit(fd)
return len(data)