def _stop_tunnel(cmd):
pexpect.run(cmd)
python类run()的实例源码
def _stop_tunnel(cmd):
pexpect.run(cmd)
def get_process_info ():
ps = pexpect.run ('ps ax -O ppid')
pass
def runu(command, timeout=-1, withexitstatus=False, events=None,
extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
"""This offers the same interface as :func:`run`, but using unicode.
Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
which will be used for both input and output.
"""
return _run(command, timeout=timeout, withexitstatus=withexitstatus,
events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
env=env, _spawn=spawnu, **kwargs)
def readlines(self, sizehint=-1):
'''This reads until EOF using readline() and returns a list containing
the lines thus read. The optional 'sizehint' argument is ignored.
Remember, because this reads until EOF that means the child
process should have closed its stdout. If you run this method on
a child that is still running with its stdout open then this
method will block until it timesout.'''
lines = []
while True:
line = self.readline()
if not line:
break
lines.append(line)
return lines
def get_process_info ():
# This seems to work on both Linux and BSD, but should otherwise be considered highly UNportable.
ps = pexpect.run ('ps ax -O ppid')
pass
def test_expect_eof (self):
the_old_way = subprocess.Popen(args=['/bin/ls', '-l', '/bin'],
stdout=subprocess.PIPE).communicate()[0].rstrip()
p = pexpect.spawn('/bin/ls -l /bin')
p.expect(pexpect.EOF) # This basically tells it to read everything. Same as pexpect.run() function.
the_new_way = p.before
the_new_way = the_new_way.replace(b'\r\n', b'\n'
).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
the_old_way = the_old_way.replace(b'\r\n', b'\n'
).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
def test_env(self):
default = pexpect.run('env')
userenv = pexpect.run('env', env={'foo':'pexpect'})
assert default!=userenv, "'default' and 'userenv' should be different"
assert b'foo' in userenv and b'pexpect' in userenv, "'foo' and 'pexpect' should be in 'userenv'"
def test_cwd (self): # This assumes 'pwd' and '/tmp' exist on this platform.
default = pexpect.run('pwd')
tmpdir = pexpect.run('pwd', cwd='/tmp')
assert default!=tmpdir, "'default' and 'tmpdir' should be different"
assert (b'tmp' in tmpdir), "'tmp' should be returned by 'pwd' command"
def runu(command, timeout=-1, withexitstatus=False, events=None,
extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
"""This offers the same interface as :func:`run`, but using unicode.
Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
which will be used for both input and output.
"""
return _run(command, timeout=timeout, withexitstatus=withexitstatus,
events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
env=env, _spawn=spawnu, **kwargs)
def readlines(self, sizehint=-1):
'''This reads until EOF using readline() and returns a list containing
the lines thus read. The optional 'sizehint' argument is ignored.
Remember, because this reads until EOF that means the child
process should have closed its stdout. If you run this method on
a child that is still running with its stdout open then this
method will block until it timesout.'''
lines = []
while True:
line = self.readline()
if not line:
break
lines.append(line)
return lines
def execute_command():
logger.info('Got incoming request')
if 'command' not in request.form:
return jsonify(error="Missing parameter 'command'"), 422
command = request.form['command']
file_name = _extract_filename_from_command(command)
if file_name is not None and not os.path.isfile(file_name):
logger.warn("Couldn't find file %s", file_name)
if not command.startswith('idaw ') and not command.startswith('idaw64 '):
return jsonify(error="'idaw' and 'idaw64' are the only valid commands"), 422
try:
logger.info('Executing %s', command)
timeout = None if 'timeout' not in request.form else int(request.form['timeout'])
_, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True)
except pexpect.TIMEOUT:
return jsonify(error='request to ida timed out'), 408
finally:
if file_name is not None:
_remove_ida_created_files(file_name)
logger.info('Removed ida leftover files')
if exit_code == 0:
logger.info('Command %s finished executing successfully', command)
else:
logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code)
if exit_code != 0:
return jsonify(error='ida finish with status code %s' % exit_code), 500
else:
return jsonify(message='OK'), 200
def transfer(self, method, source_path, remote_filename):
"""create Par2 files and transfer the given file and the Par2 files
with the wrapped backend.
Par2 must run on the real filename or it would restore the
temp-filename later on. So first of all create a tempdir and symlink
the soure_path with remote_filename into this.
"""
import pexpect
par2temp = source_path.get_temp_in_same_dir()
par2temp.mkdir()
source_symlink = par2temp.append(remote_filename)
source_target = source_path.get_canonical()
if not os.path.isabs(source_target):
source_target = os.path.join(os.getcwd(), source_target)
os.symlink(source_target, source_symlink.get_canonical())
source_symlink.setdata()
log.Info("Create Par2 recovery files")
par2create = 'par2 c -r%d -n1 %s %s' % (self.redundancy, self.common_options, source_symlink.get_canonical())
out, returncode = pexpect.run(par2create, None, True)
source_symlink.delete()
files_to_transfer = []
if not returncode:
for file in par2temp.listdir():
files_to_transfer.append(par2temp.append(file))
method(source_path, remote_filename)
for file in files_to_transfer:
method(file, file.get_filename())
par2temp.deltree()
def script(self, logininfo, filepath):
'''
run script on managed cloud server using /usr/bin/env expect
'''
if not logininfo.admin_password:
raise Exception('Unmanaged Cloud Server: no rack password')
if '/' in filepath:
logininfo.script = filepath.split('/')[-1]
else:
logininfo.script = filepath
if filepath.startswith('https://'):
newpath = os.path.expanduser(
'~/.cache/hammercloud/{login.script}'.format(login=logininfo)
)
if not os.path.exists(newpath):
with open(newpath, 'w') as newfile:
resp = requests.get(filepath)
print(resp.content, file=newfile)
filepath = newpath
sftp(
logininfo, 'put', filepath, logininfo.script,
quiet=True, executable=True
)
command = '/home/{login.ssh_user}/{login.script} {login.extraargs}; '
if not logininfo.no_clean:
command += 'rm /home/{login.ssh_user}/{login.script}'
logininfo.command = command
cmd(logininfo)
def execute_command():
logger.info('Got incoming request')
if 'command' not in request.form:
return jsonify(error="Missing parameter 'command'"), 422
command = request.form['command']
file_name = _extract_filename_from_command(command)
if file_name is not None and not os.path.isfile(file_name):
logger.warn("Couldn't find file %s", file_name)
if not command.startswith('idal ') and not command.startswith('idal64 '):
return jsonify(error="'idal' and 'idal64' are the only valid commands"), 422
try:
logger.info('Executing %s', command)
timeout = None if 'timeout' not in request.form else int(request.form['timeout'])
_, exit_code = pexpect.run(command, timeout=timeout, withexitstatus=True)
except pexpect.TIMEOUT:
return jsonify(error='request to ida timed out'), 408
finally:
if file_name is not None:
_remove_ida_created_files(file_name)
logger.info('Removed ida leftover files')
if exit_code == 0:
logger.info('Command %s finished executing successfully', command)
else:
logger.warn("Command %s didn't finish correctly, IDA returned exit code %s", command, exit_code)
if exit_code != 0:
return jsonify(error='ida finish with status code %s' % exit_code), 500
else:
return jsonify(message='OK'), 200
def runu(command, timeout=-1, withexitstatus=False, events=None,
extra_args=None, logfile=None, cwd=None, env=None, **kwargs):
"""This offers the same interface as :func:`run`, but using unicode.
Like :class:`spawnu`, you can pass ``encoding`` and ``errors`` parameters,
which will be used for both input and output.
"""
return _run(command, timeout=timeout, withexitstatus=withexitstatus,
events=events, extra_args=extra_args, logfile=logfile, cwd=cwd,
env=env, _spawn=spawnu, **kwargs)
def readlines(self, sizehint=-1):
'''This reads until EOF using readline() and returns a list containing
the lines thus read. The optional 'sizehint' argument is ignored.
Remember, because this reads until EOF that means the child
process should have closed its stdout. If you run this method on
a child that is still running with its stdout open then this
method will block until it timesout.'''
lines = []
while True:
line = self.readline()
if not line:
break
lines.append(line)
return lines
def get_process_info ():
# This seems to work on both Linux and BSD, but should otherwise be considered highly UNportable.
ps = pexpect.run ('ps ax -O ppid')
pass
def test_expect_eof (self):
the_old_way = subprocess.Popen(args=['/bin/ls', '-l', '/bin'],
stdout=subprocess.PIPE).communicate()[0].rstrip()
p = pexpect.spawn('/bin/ls -l /bin')
p.expect(pexpect.EOF) # This basically tells it to read everything. Same as pexpect.run() function.
the_new_way = p.before
the_new_way = the_new_way.replace(b'\r\n', b'\n'
).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
the_old_way = the_old_way.replace(b'\r\n', b'\n'
).replace(b'\r', b'\n').replace(b'\n\n', b'\n').rstrip()
assert the_old_way == the_new_way, hex_diff(the_old_way, the_new_way)
def test_env(self):
default = pexpect.run('env')
userenv = pexpect.run('env', env={'foo':'pexpect'})
assert default!=userenv, "'default' and 'userenv' should be different"
assert b'foo' in userenv and b'pexpect' in userenv, "'foo' and 'pexpect' should be in 'userenv'"