def do_execute(self, stdin_file, stdout_file, stderr_file, cgroup_file):
pid = fork()
if not pid:
chdir('/in/package')
if stdin_file:
fd = os_open(stdin_file, O_RDONLY)
dup2(fd, STDIN_FILENO)
os_close(fd)
if stdout_file:
fd = os_open(stdout_file, O_WRONLY)
dup2(fd, STDOUT_FILENO)
os_close(fd)
if stderr_file:
fd = os_open(stderr_file, O_WRONLY)
dup2(fd, STDERR_FILENO)
os_close(fd)
if cgroup_file:
enter_cgroup(cgroup_file)
execve(self.execute_file, self.execute_args, SPAWN_ENV)
return wait_and_reap_zombies(pid)
python类STDOUT_FILENO的实例源码
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def test__copy_eof_on_all(self):
"""Test the empty read EOF case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
os.close(masters[1])
socketpair[1].close()
os.close(write_to_stdin_fd)
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
# We expect that both fds were removed from the fds list as they
# both encountered an EOF before the second select call.
self.select_rfds_lengths.append(0)
with self.assertRaises(IndexError):
pty._copy(masters[0])
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None):
"""This is used by the interact() method.
"""
while self.isalive():
r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
data = self.__interact_read(self.child_fd)
if output_filter:
data = output_filter(data)
if self.logfile is not None:
self.logfile.write(data)
self.logfile.flush()
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
self.__interact_writen(self.child_fd, data)
break
self.__interact_writen(self.child_fd, data)
def _signal_winch(self, signum, frame):
if self.set_pty_size is not None:
buf = array.array('h', [0, 0, 0, 0])
fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
self.set_pty_size(buf[0], buf[1], buf[2], buf[3])
def set_pty_size(self, p1, p2, p3, p4):
buf = array.array('h', [p1, p2, p3, p4])
#fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCSWINSZ, buf)
fcntl.ioctl(self.master, termios.TIOCSWINSZ, buf)
def __interact_copy(self, escape_character=None,
input_filter=None, output_filter=None):
'''This is used by the interact() method.
'''
while self.isalive():
r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
self._log(data, 'read')
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = -1
if escape_character is not None:
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
if data:
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
break
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
def do_build(self, output_file, cgroup_file):
pid = fork()
if not pid:
chdir('/out')
os_close(STDIN_FILENO)
if output_file:
fd = os_open(output_file, O_WRONLY)
dup2(fd, STDOUT_FILENO)
dup2(fd, STDERR_FILENO)
os_close(fd)
if cgroup_file:
enter_cgroup(cgroup_file)
execve(self.compiler_file, self.compiler_args, SPAWN_ENV)
return wait_and_reap_zombies(pid)
def _signal_winch(self, signum, frame):
if self.set_pty_size is not None:
buf = array.array('h', [0, 0, 0, 0])
fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True)
self.set_pty_size(buf[0], buf[1], buf[2], buf[3])
def set_pty_size(self, p1, p2, p3, p4):
buf = array.array('h', [p1, p2, p3, p4])
#fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCSWINSZ, buf)
fcntl.ioctl(self.master, termios.TIOCSWINSZ, buf)
def __interact_copy(self, escape_character=None,
input_filter=None, output_filter=None):
'''This is used by the interact() method.
'''
while self.isalive():
r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
if self.child_fd in r:
try:
data = self.__interact_read(self.child_fd)
except OSError as err:
if err.args[0] == errno.EIO:
# Linux-style EOF
break
raise
if data == b'':
# BSD-style EOF
break
if output_filter:
data = output_filter(data)
self._log(data, 'read')
os.write(self.STDOUT_FILENO, data)
if self.STDIN_FILENO in r:
data = self.__interact_read(self.STDIN_FILENO)
if input_filter:
data = input_filter(data)
i = -1
if escape_character is not None:
i = data.rfind(escape_character)
if i != -1:
data = data[:i]
if data:
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
break
self._log(data, 'send')
self.__interact_writen(self.child_fd, data)
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for fd in self.fds:
try:
os.close(fd)
except:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for fd in self.fds:
try:
os.close(fd)
except:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for fd in self.fds:
try:
os.close(fd)
except:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = socket.socketpair()
masters = [s.fileno() for s in socketpair]
self.fds.extend(masters)
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def setUp(self):
self.orig_stdin_fileno = pty.STDIN_FILENO
self.orig_stdout_fileno = pty.STDOUT_FILENO
self.orig_pty_select = pty.select
self.fds = [] # A list of file descriptors to close.
self.files = []
self.select_rfds_lengths = []
self.select_rfds_results = []
def tearDown(self):
pty.STDIN_FILENO = self.orig_stdin_fileno
pty.STDOUT_FILENO = self.orig_stdout_fileno
pty.select = self.orig_pty_select
for file in self.files:
try:
file.close()
except OSError:
pass
for fd in self.fds:
try:
os.close(fd)
except OSError:
pass
def test__copy_to_each(self):
"""Test the normal data case on both master_fd and stdin."""
read_from_stdout_fd, mock_stdout_fd = self._pipe()
pty.STDOUT_FILENO = mock_stdout_fd
mock_stdin_fd, write_to_stdin_fd = self._pipe()
pty.STDIN_FILENO = mock_stdin_fd
socketpair = self._socketpair()
masters = [s.fileno() for s in socketpair]
# Feed data. Smaller than PIPEBUF. These writes will not block.
os.write(masters[1], b'from master')
os.write(write_to_stdin_fd, b'from stdin')
# Expect two select calls, the last one will cause IndexError
pty.select = self._mock_select
self.select_rfds_lengths.append(2)
self.select_rfds_results.append([mock_stdin_fd, masters[0]])
self.select_rfds_lengths.append(2)
with self.assertRaises(IndexError):
pty._copy(masters[0])
# Test that the right data went to the right places.
rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0]
self.assertEqual([read_from_stdout_fd, masters[1]], rfds)
self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master')
self.assertEqual(os.read(masters[1], 20), b'from stdin')
def _write_stdout(self, data):
assert isinstance(data, bytes), '`data` must be a bytes'
os.write(pty.STDOUT_FILENO, data)