def screenshot(self, filename=None):
"""
Return:
PIL.Image
Raises:
EnvironmentError
"""
tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff')
try:
idevice("screenshot", "--udid", self.udid, tmpfile)
except subprocess.CalledProcessError as e:
sys.exit(e.message)
try:
image = Image.open(tmpfile)
image.load()
if filename:
image.save(filename)
return image
finally:
if os.path.exists(tmpfile):
os.unlink(tmpfile)
python类CalledProcessError()的实例源码
def screenshot(self, filename=None):
"""
Return:
PIL.Image
Raises:
EnvironmentError
"""
tmpfile = tempfile.mktemp(prefix='atx-screencap-', suffix='.tiff')
try:
idevice("screenshot", "--udid", self.udid, tmpfile)
except subprocess.CalledProcessError as e:
sys.exit(e.message)
try:
image = Image.open(tmpfile)
image.load()
if filename:
image.save(filename)
return image
finally:
if os.path.exists(tmpfile):
os.unlink(tmpfile)
def _services(ip, timeout, *extra_args):
args = ['ssh', ip, '-t']
if extra_args:
args += list(extra_args)
args += ['-o', 'StrictHostKeyChecking no',
'-o', 'ConnectTimeout %d' % timeout,
'-o', 'BatchMode yes',
'--',
'cat', '/etc/aeriscloud.d/*']
try:
return [
dict(zip(
['name', 'port', 'path'],
service.strip().split(',')
))
for service in check_output(args).split('\n')
if service
]
except CalledProcessError:
return []
def _search_variables(search_path, variable):
files = set()
cmd = "grep -rI '%s = ' %s" % (variable, quote(search_path))
try:
grep = subprocess32.check_output(cmd, shell=True)
except subprocess32.CalledProcessError:
return []
for line in grep.split('\n'):
if not line.strip():
continue
filename = line[:line.find(':')].strip()
if filename.startswith('.'):
continue
files.add(filename)
return files
def do_run(cmd):
try:
cwd = os.getcwd() if inherit_cwd else None
if not async:
if stdin:
return subprocess.check_output(cmd, shell=True,
stderr=stderr, stdin=subprocess.PIPE, env=env_dict, cwd=cwd)
output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
return output.decode(DEFAULT_ENCODING)
# subprocess.Popen is not thread-safe, hence use a mutex here..
try:
mutex_popen.acquire()
stdin_arg = subprocess.PIPE if stdin else None
stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
process = subprocess.Popen(cmd, shell=True, stdin=stdin_arg, bufsize=-1,
stderr=stderr, stdout=stdout_arg, env=env_dict, cwd=cwd)
return process
finally:
mutex_popen.release()
except subprocess.CalledProcessError as e:
if print_error:
print("ERROR: '%s': %s" % (cmd, e.output))
raise e
def do_run(cmd):
try:
cwd = os.getcwd() if inherit_cwd else None
if not async:
if stdin:
return subprocess.check_output(cmd, shell=True,
stderr=stderr, stdin=subprocess.PIPE, env=env_dict, cwd=cwd)
output = subprocess.check_output(cmd, shell=True, stderr=stderr, env=env_dict, cwd=cwd)
return output.decode(DEFAULT_ENCODING)
# subprocess.Popen is not thread-safe, hence use a mutex here..
try:
mutex_popen.acquire()
stdin_arg = subprocess.PIPE if stdin else None
stdout_arg = open(outfile, 'wb') if isinstance(outfile, six.string_types) else outfile
process = subprocess.Popen(cmd, shell=True, stdin=stdin_arg, bufsize=-1,
stderr=stderr, stdout=stdout_arg, env=env_dict, cwd=cwd)
return process
finally:
mutex_popen.release()
except subprocess.CalledProcessError as e:
if print_error:
print("ERROR: '%s': %s" % (cmd, e.output))
raise e
def _call_phantom(self, token, arns, output_file):
"""
shells out to phantomjs.
- Writes ARNs to a file that phantomjs will read as an input.
- Phantomjs exchanges the token for session cookies.
- Phantomjs then navigates to the IAM page and executes JavaScript
to call GenerateServiceLastAccessedDetails for each ARN.
- Every 10 seconds, Phantomjs calls GetServiceLastAccessedDetails
- Phantom saves output to a file that is used by `persist()`
:return: Exit code from phantomjs subprocess32
"""
path = os.path.dirname(__file__)
console_js = os.path.join(path, 'awsconsole.js')
with tempfile.NamedTemporaryFile() as f:
json.dump(arns, f)
f.seek(0)
try:
p = subprocess32.Popen([
self.current_app.config.get('PHANTOMJS'),
console_js,
token,
f.name,
output_file],
stdout=subprocess32.PIPE, stderr=subprocess32.STDOUT)
output, errs = p.communicate(timeout=1200) # 20 mins
self.current_app.logger.debug('Phantom Output: \n{}'.format(output))
self.current_app.logger.debug('Phantom Errors: \n{}'.format(errs))
except subprocess32.TimeoutExpired:
self.current_app.logger.error('PhantomJS timed out')
return 1 # return code 1 for timeout
except CalledProcessError:
self.current_app.logger.error('PhantomJS exited: {}'
''.format(p.returncode))
return p.returncode
else:
self.current_app.logger.info('PhantomJS exited: 0')
return 0
def check_output(cmds, shell=False):
try:
output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=shell)
return output
except subprocess.CalledProcessError:
# logger.warn('Failed to run command: %s', ' '.join(cmds))
# logger.warn('Error output:\n%s', e.output)
raise
def idevice(name, *args):
exec_name = 'idevice' + name
exec_path = look_exec(exec_name)
if not exec_path:
raise EnvironmentError('Necessary binary ("%s") not found.' % exec_name)
cmds = [exec_path] + list(args)
try:
output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=False)
return output
except subprocess.CalledProcessError:
raise
def get_file_size(adb, remote_path):
try:
output = adb.run_cmd('shell', 'ls', '-l', remote_path)
m = re.search(r'\s(\d+)', output)
if not m:
return 0
return int(m.group(1))
except subprocess.CalledProcessError as e:
log.warn("call error: %s", e)
time.sleep(.1)
return 0
def _which(editor):
try:
editor_path = subprocess.check_output(['which', editor], stderr=subprocess.STDOUT).strip()
if six.PY3:
editor_path = editor_path.decode()
except subprocess.CalledProcessError:
editor_path = None
return editor_path
def check_output(cmds, shell=False):
try:
output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=shell)
return output
except subprocess.CalledProcessError:
# logger.warn('Failed to run command: %s', ' '.join(cmds))
# logger.warn('Error output:\n%s', e.output)
raise
def idevice(name, *args):
exec_name = 'idevice' + name
exec_path = look_exec(exec_name)
if not exec_path:
raise EnvironmentError('Necessary binary ("%s") not found.' % exec_name)
cmds = [exec_path] + list(args)
try:
output = subprocess.check_output(cmds, stderr=subprocess.STDOUT, shell=False)
return output
except subprocess.CalledProcessError:
raise
def get_file_size(adb, remote_path):
try:
output = adb.run_cmd('shell', 'ls', '-l', remote_path)
m = re.search(r'\s(\d+)', output)
if not m:
return 0
return int(m.group(1))
except subprocess.CalledProcessError as e:
log.warn("call error: %s", e)
time.sleep(.1)
return 0
def test_check_call_realtime_failure():
lines = ''
with pytest.raises(subprocess.CalledProcessError) as e:
for l in check_call_realtime(['make', 'doesnotexist']):
lines += l
assert e.value.returncode != 0
assert e.value.cmd == ['make', 'doesnotexist']
assert len(lines) != 0
def check_call_realtime(args):
"""Run command with arguments and yield the output as they come.
Stderr is piped into stdout.
:raises subprocess.CalledProcessError: if exit code is non-zero
"""
p = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
while p.poll() is None:
yield p.stdout.read()
yield p.stdout.read()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, args)
def english_g2p(self, text):
text = self.normalize(text)
try:
arpa_text = subprocess.check_output(['t2p', '"{}"'.format(text)])
arpa_text = arpa_text.decode('utf-8')
except OSError:
logging.warning('t2p (from flite) is not installed.')
arpa_text = ''
except subprocess.CalledProcessError:
logging.warning('Non-zero exit status from t2p.')
arpa_text = ''
return self.arpa_to_ipa(arpa_text)
def english_g2p(self, text):
text = self.normalize(text).lower()
try:
arpa_text = subprocess.check_output(['lex_lookup', text])
arpa_text = arpa_text.decode('utf-8')
except OSError:
logging.warning('lex_lookup (from flite) is not installed.')
arpa_text = ''
except subprocess.CalledProcessError:
logging.warning('Non-zero exit status from lex_lookup.')
arpa_text = ''
return self.arpa_to_ipa(arpa_text)
def run_nmap(net):
try:
out = subprocess.check_output(["nmap", "-oX", "-" , "-R", "-p", "22-443", "-sV" , net])
except CalledProcessError:
print("Error in caller\n")
exit(1)
return out
def subproc_call(cmd, timeout=None):
try:
output = subprocess.check_output(
cmd, stderr=subprocess.STDOUT,
shell=True, timeout=timeout)
return output
except subprocess.TimeoutExpired as e:
logger.warn("Command timeout!")
logger.warn(e.output)
except subprocess.CalledProcessError as e:
logger.warn("Commnad failed: {}".format(e.returncode))
logger.warn(e.output)
def create_thumbnail(self, video_path, time, output_path):
try:
subprocess.check_call(['ffmpeg',
'-y',
'-ss',
time,
'-i',
b'{0}'.format(video_path.encode('utf-8')),
'-vframes',
'1',
output_path])
return output_path
except subprocess.CalledProcessError as error:
logger.error(error, exc_info=True)
return output_path
def get_video_meta(self, video_path):
'''
get video meta information
:param video_path: the absolute path of video file
:return: a dictionary
{
'width': integer,
'height': integer,
'duration': integer (millisecond)
}
if an error occurred, this method will return None
'''
try:
output = subprocess.check_output([
'ffprobe',
'-v',
'error',
'-show_entries',
'format=duration:stream=width:stream=height',
'-select_streams',
'v:0',
'-of',
'json',
b'{0}'.format(video_path.encode('utf-8'))
])
meta = json.loads(output)
result = {}
if 'format' in meta and 'duration' in meta['format']:
result['duration'] = int(float(meta['format']['duration']) * 1000)
if 'streams' in meta and len(meta['streams']) and 'width' in meta['streams'][0] and 'height' in meta['streams'][0]:
result['width'] = meta['streams'][0]['width']
result['height'] = meta['streams'][0]['height']
return result
except subprocess.CalledProcessError as error:
logger.error(error)
return None
def run_and_wait(args, timeout=None, logfile=None, append=False,
env=None, cwd=None):
"""Run a command in a subprocess, then wait for it to finish.
Parameters
----------
args : string or list[string]
the command to run. Should be either a command string or a list
of command string and its arguments as strings. A list is preferred;
see Python subprocess documentation.
timeout : float or None
the amount of time to wait for the command to finish, in seconds.
If None, waits indefinitely.
logfile : string or None
If given, stdout and stderr will be written to this file.
append : bool
True to append to the logfile. Defaults to False.
env : dict[string, any]
If not None, environment variables of the subprocess will be set
according to this dictionary instead of inheriting from current
process.
cwd : string or None
The current working directory of the subprocess.
Returns
-------
output : string
the standard output and standard error from the command.
Raises
------
subprocess.CalledProcessError
if any error occurred in the subprocess.
"""
output = subprocess.check_output(args, stderr=subprocess.STDOUT,
timeout=timeout, env=env, cwd=cwd)
output = output.decode(encoding=bag_encoding, errors=bag_codec_error)
if logfile is not None:
write_file(logfile, output, append=append)
return output