def handle_file(self, fname):
with open(fname, 'rb') as fd:
if fd.read(4) == b'dex\n':
new_jar = self.name + '/classes-dex2jar.jar'
run([dex2jar, fname, '-f', '-o', new_jar], cwd=self.name, stderr=DEVNULL)
fname = new_jar
with ZipFile(fname) as jar:
jar.extractall(self.name)
for cls in jar.namelist():
if cls.endswith('.class'):
cls = cls.replace('/', '.')[:-6]
self.classes.append(cls)
elif cls.endswith('.dex'):
self.handle_file(self.name + '/' + cls)
elif cls.endswith('.proto'):
self.bonus_protos[cls] = jar.read(cls).decode('utf8')
elif cls.endswith('.so'):
self.bonus_protos.update(walk_binary(self.name + '/' + cls))
python类DEVNULL的实例源码
def run_cmake(self):
print("Running CMake")
build_dir_cmd_out = subprocess.call(
["mkdir", "build"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
if build_dir_cmd_out != 0:
print("Can\'t setup CMake build directory.")
return
if self.cmake_build_info["build_dir"].is_dir():
try:
subprocess.check_output(
self.cmake_cmd_info["cmake_cmd"],
cwd=str(self.cmake_build_info["build_dir"]))
except subprocess.CalledProcessError as e:
print(e.output)
if not self.cmake_build_info["comp_data_cmake"].is_file():
print("Couldn't setup CMake Project")
return
else:
print("Couldn't setup CMake Project")
return
def _openDownloadFile(self, buildId, suffix):
(tmpFd, tmpName) = mkstemp()
url = self._makeUrl(buildId, suffix)
try:
os.close(tmpFd)
env = { k:v for (k,v) in os.environ.items() if k in self.__whiteList }
env["BOB_LOCAL_ARTIFACT"] = tmpName
env["BOB_REMOTE_ARTIFACT"] = url
ret = subprocess.call(["/bin/bash", "-ec", self.__downloadCmd],
stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL,
cwd="/tmp", env=env)
if ret == 0:
ret = tmpName
tmpName = None
return CustomDownloader(ret)
else:
raise ArtifactDownloadError("failed (exit {})".format(ret))
finally:
if tmpName is not None: os.unlink(tmpName)
def callGit(self, workspacePath, *args):
cmdLine = ['git']
cmdLine.extend(args)
try:
output = subprocess.check_output(cmdLine, cwd=os.path.join(os.getcwd(), workspacePath, self.__dir),
universal_newlines=True, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
raise BuildError("git error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
return output
# Get GitSCM status. The purpose of this function is to return the status of the given directory
#
# return values:
# - error: The SCM is in a error state. Use this if git returned a error code.
# - dirty: SCM is dirty. Could be: modified files, switched to another branch/tag/commit/repo, unpushed commits.
# - clean: Same branch/tag/commit as specified in the recipe and no local changes.
# - empty: Directory is not existing.
#
# This function is called when build with --clean-checkout. 'error' and 'dirty' SCMs are moved to attic,
# while empty and clean directories are not.
def callSubversion(self, workspacePath, *args):
cmdLine = ['svn']
cmdLine.extend(args)
try:
output = subprocess.check_output(cmdLine, cwd=workspacePath,
universal_newlines=True, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
raise BuildError("svn error:\n Directory: '{}'\n Command: '{}'\n'{}'".format(
os.path.join(workspacePath, self.__dir), " ".join(cmdLine), e.output.rstrip()))
return output
# Get SvnSCM status. The purpose of this function is to return the status of the given directory
#
# return values:
# - error: the scm is in a error state. Use this if svn call returns a error code.
# - dirty: SCM is dirty. Could be: modified files, switched to another URL or revision
# - clean: same URL and revision as specified in the recipe and no local changes.
# - empty: directory is not existing
#
# This function is called when build with --clean-checkout. 'error' and 'dirty' scm's are moved to attic,
# while empty and clean directories are not.
def version_getter(config):
"""Get tag associated with HEAD; fall back to SHA1.
If HEAD is tagged, return the tag name; otherwise fall back to
HEAD's short SHA1 hash.
.. note:: Only annotated tags are considered.
TODO: Support non-annotated tags?
"""
try:
check_output(['git', 'rev-parse', '--is-inside-work-tree'], stderr=DEVNULL)
except CalledProcessError:
return None
encoding = getpreferredencoding(do_setlocale=False)
try:
version = check_output(['git', 'describe', '--exact-match'], stderr=DEVNULL)
except CalledProcessError:
version = check_output(['git', 'rev-parse', '--short', 'HEAD'])
version = version.decode(encoding).strip()
return version
def _check_system(self):
null_commands = self.commands + ["--help"]
if six.PY2:
not_found_exception = IOError
else:
not_found_exception = FileNotFoundError
try:
if six.PY2:
subprocess.check_output(null_commands)
else:
subprocess.check_call(null_commands, stdout=subprocess.DEVNULL)
except not_found_exception:
raise RepositorySystemError("Cannot run {}".format(self.command))
except subprocess.CalledProcessError:
raise RepositorySystemError(
"Error running {}".format(self.command))
def _convert(self, command_path, input_path, settings=[],
prefix='', fileformat='png'):
if prefix:
prefix = '{}-'.format(prefix)
output_path = path.join(
self.dest_pages,
'{}%04d.{}'.format(prefix, fileformat)
)
exit_code = subprocess.Popen(
(command_path, *settings, input_path, output_path),
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
).wait()
if exit_code != 0:
raise ProcessingError(
'Was unable to convert document to {}s ({}).'.format(fileformat,
exit_code)
)
return glob.glob(path.join(
self.dest_pages, '{}[0-9][0-9][0-9][0-9].{}'.format(prefix,
fileformat)
))
test_execution.py 文件源码
项目:integration-prototype
作者: SKA-ScienceDataProcessor
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def run_vis(self, config):
"""."""
self.poll_master_stdout(5, 'Waiting to receive',
'run_vis initialisation')
pid = subprocess.Popen(['/usr/bin/python', '-m',
'sip.emulators.csp_visibility_sender',
config],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
pid.wait()
output = pid.communicate()
if DEBUG:
print('--------------------------')
print('nvis_sender, stdout:')
print('--------------------------')
print(output[0].decode("utf-8"))
print('--------------------------')
print('nvis_sender, stderr:')
print('--------------------------')
print(output[1].decode("utf-8"))
self.poll_master_stdout(5, 'Received heap 9', 'run_vis termination')
def getDeployedIps(deploymentName):
deploymentCount = subprocess.check_output("bosh -e lite deployments | grep {} | wc -l".format(deploymentName),
shell=True, stderr=subprocess.DEVNULL)
if int(deploymentCount) == 0:
return None
deployedManifest = yaml.load(
subprocess.check_output("bosh -e lite -d {} manifest".format(deploymentName),
shell=True, stderr=subprocess.DEVNULL))
deployedIpConfig = {}
deployedIpConfig["global"] = getTestSubnet(deployedManifest)["static"]
deployedJobs = [j for j in deployedManifest["jobs"] if j["name"] in commonUtils.POOL_TYPES]
for job in deployedJobs:
jobIps = job["networks"][0]["static_ips"]
deployedIpConfig[job["name"]] = jobIps
return deployedIpConfig
def find_class_having_main(self, classes):
for file in classes:
# run javap(1) with type signatures
try:
stdout = subprocess.check_output(
[self.extra_binaries['disassembler'].cmd, '-s', str(file)],
stderr=subprocess.DEVNULL, env=self.compiler.env)
except subprocess.SubprocessError: # noqa
continue
# iterate on lines to find p s v main() signature and then
# its descriptor on the line below; we don't rely on the type
# from the signature, because it could be String[], String... or
# some other syntax I'm not even aware of
lines = iter(stdout.decode().split('\n'))
for line in lines:
line = line.lstrip()
if line.startswith('public static') and 'void main(' in line:
if next(lines).lstrip() == PSVMAIN_DESCRIPTOR:
return file.stem
def runTarsnap(args, timeout = None):
command = [config.tarsnap_bin] + config.tarsnap_extra_args + args
proc = subprocess.Popen(command,
stdout = subprocess.PIPE, stderr = subprocess.PIPE,
stdin = subprocess.DEVNULL, universal_newlines = True)
result = CheapoCompletedProcess()
try:
result.stdout, result.stderr = proc.communicate(timeout = timeout)
result.returncode = proc.wait()
if result.returncode:
sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
return result
except subprocess.TimeoutExpired:
print("Tarsnap timed out, sending SIGQUIT...")
proc.send_signal(signal.SIGQUIT)
result.stdout, result.stderr = proc.communicate()
result.returncode = proc.wait()
print("Tarsnap finished")
if result.returncode:
sys.exit("Error running tarsnap:\nCommand: {}\nSTDOUT:\n{}\nSTDERR:\n{}\n".format(" ".join(command),result.stdout,result.stderr))
return result
def parse_text(self, img):
filename = "obj_{}.png".format(id(self))
self.save_bounding_box(img, filename)
cmd = [
'tesseract',
filename,
'stdout',
'--psm',
'7',
'-l',
'eng+equ',
'-c',
'tessedit_char_whitelist=abcdefghijklmnopqrstuvwxyz0123456789=+-*/'
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
self.text = result.stdout.decode('ascii').strip().lower() or ''
def running_old_sell(manager, installer):
if any(installed is False for package, installed in installer.check_dependencies()):
return False
if isinstance(manager.machine, Two1MachineVirtual):
try:
vbox_conf = subprocess.check_output(["VBoxManage", "showvminfo", "21", "--machinereadable"],
stderr=subprocess.DEVNULL)
if type(vbox_conf) is bytes:
vbox_conf = vbox_conf.decode()
if "bridgeadapter3" in vbox_conf:
return True
else:
return False
except subprocess.CalledProcessError:
return False
else:
return False
def _start_sell_service(self, service_name, failed_to_start_hook, started_hook, failed_to_up_hook, up_hook,
timeout=Two1Composer.SERVICE_START_TIMEOUT):
try:
subprocess.check_output(["docker-compose", "-f", Two1Composer.COMPOSE_FILE, "up", "-d", service_name],
stderr=subprocess.DEVNULL, env=self.machine_env)
except subprocess.CalledProcessError:
failed_to_start_hook(service_name)
else:
started_hook(service_name)
if service_name == 'router':
time.sleep(5)
elif service_name != 'router' and service_name != 'base':
start = time.clock()
exec_id = self.docker_client.exec_create('sell_router', "curl %s:5000" % service_name)['Id']
self.docker_client.exec_start(exec_id)
running = True
while time.clock() - start < timeout and running is True:
running = self.docker_client.exec_inspect(exec_id)['Running']
if running is True:
failed_to_up_hook(service_name)
else:
up_hook(service_name)
def start_networking(self):
""" Start ZeroTier daemon.
"""
if not self.status_networking():
try:
subprocess.Popen(['sudo', 'service', 'zerotier-one', 'start'], stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
return 1
else:
now = time.time()
while time.time() <= now + 60:
if self.status_networking():
return 0
return 1
else:
return 0
def status_networking(self):
""" Get status of ZeroTier One service.
"""
try:
if "ps_zerotier.sh" not in self.docker_ssh("ls", stderr=subprocess.DEVNULL).decode().split("\n"):
subprocess.check_output(["docker-machine",
"scp",
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"util",
"scripts",
"ps_zerotier.sh"),
"21:~/ps_zerotier.sh"])
self.docker_ssh("chmod a+x ~/ps_zerotier.sh", stderr=subprocess.DEVNULL)
processes = self.docker_ssh("./ps_zerotier.sh", stderr=subprocess.DEVNULL).decode().split("\n")
for process in processes:
if process.find("zerotier-one -d") != -1:
return True
return False
except subprocess.CalledProcessError:
return False
def connect_market(self, client, market):
try:
zt_device_address = json.loads(self.docker_ssh("sudo ./zerotier-cli info -j",
stderr=subprocess.DEVNULL).decode())["address"]
response = client.join(market, zt_device_address)
if response.ok:
network_id = response.json().get("networkid")
self.docker_ssh("sudo ./zerotier-cli join %s" % network_id, stderr=subprocess.DEVNULL)
if self.wait_for_zt_confirmation():
pass
except exceptions.ServerRequestError as e:
if e.status_code == 400:
logger.info(uxstring.UxString.invalid_network)
else:
raise e
except subprocess.CalledProcessError as e:
logger.info(str(e))
time.sleep(10) # wait for interface to come up
return
def _get_market_address(self):
""" Get status of 21mkt network connection.
Returns:
zt_ip (str): ZeroTier IP address.
"""
try:
zt_conf = self.docker_ssh("sudo ./zerotier-cli listnetworks -j", stderr=subprocess.DEVNULL)
if type(zt_conf) == bytes:
zt_conf = zt_conf.decode()
zt_conf_json = json.loads(zt_conf)
for net in zt_conf_json:
if net["name"] == "21mkt":
if net["status"] == "OK":
ip_addrs = net["assignedAddresses"]
for addr in ip_addrs:
potential_match = Two1Machine.ZEROTIER_CLI_IPV6_PATTERN.search(addr)
if potential_match is not None:
return "[%s]" % potential_match.group(0)
return ""
else:
return ""
return ""
except Exception:
return ""
def status_machine(self):
""" Get status of virtual machine.
"""
try:
status = subprocess.check_output(["docker-machine", "status",
self.name],
stderr=subprocess.DEVNULL).decode().rstrip()
if status.lower() == "running":
return VmState.RUNNING
elif status.lower() == "stopped":
return VmState.STOPPED
else:
return VmState.UNKNOWN
except:
return VmState.NOEXIST
# private methods
def runRedisServer(port=6379):
"""runs redis-server"""
# waits until it can accept client connection by reading its all
# startup messages until it says 'ready to accept ...', then
# redirect any following output to DEVNULL.
port = str(port)
server = subprocess.Popen(['redis-server', '--port', port],
stdout=subprocess.PIPE,
universal_newlines=True)
message = _REDIS_READY_MESSAGE + port
while message not in server.stdout.readline():
pass
dumper = subprocess.Popen(['cat'],
stdin=server.stdout,
stdout=subprocess.DEVNULL)
return server, dumper
def devserver(port):
config.rc_root = 'http://localhost:{}'.format(port)
config.rc_api_root = config.rc_root + '/api/v1'
proc = subprocess.Popen(
['python', 'devserver/__init__.py'],
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env={**os.environ, 'PORT': str(port)}
)
# wait for the dev server to come up
time.sleep(1)
try:
yield
finally:
proc.kill()
proc.wait()
def clone(self, repo, branch="master"):
location_index = repo+'#'+branch
if self.repos.get(location_index) is not None:
logger.debug("Repo %s already exists, reusing old." % location_index)
return
location = os.path.join(tempfile.gettempdir(), os.path.basename(location_index))
if os.path.exists(location):
logger.debug("Repo %s with branch %s already exists on disk, reusing and pulling." % (repo, branch))
self.pull(location)
else:
logger.debug("Cloning %s with branch %s" % (repo, branch))
result = subprocess.run(
["git", "clone", "-b", branch, repo, location],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
result.check_returncode()
self.repos[location_index] = location
self.copies[location_index] = []
return location
def _instancecheck_impl(self, value, info: Info = NoInfo()):
if not isinstance(value, List(Str())):
return info.errormsg(self)
if not is_perf_available():
return info.wrap(True)
assert isinstance(value, list)
if "wall-clock" in value:
value = value.copy()
value.remove("wall-clock")
cmd = "perf stat -x ';' -e {props} -- /bin/echo".format(props=",".join(value))
proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE, universal_newlines=True)
out, err = proc.communicate()
if proc.poll() > 0:
return info.errormsg(self, "Not a valid properties list: " + str(err).split("\n")[0].strip())
return info.wrap(True)
def _exec(self, cmd: str, fail_on_error: bool = False,
error_message: str = "Failed executing {cmd!r}: out={out!r}, err={err!r}",
timeout: int = 10) -> bool:
"""
Execute the passed command.
:param cmd:
:param fail_on_error:
:param error_message: error message format
:param timeout: time in seconds after which the command is aborted
:return: Was the command executed successfully?
"""
out_mode = subprocess.PIPE if fail_on_error else subprocess.DEVNULL
proc = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=out_mode, stderr=out_mode,
universal_newlines=True)
out, err = proc.communicate(timeout=timeout)
if proc.wait() > 0:
if fail_on_error:
self._fail(error_message.format(cmd=cmd, out=out, err=err))
else:
return False
return True
def cpp_verify(data, user_file):
answer = open(user_file).read()
# write to temp file
with open('answer.cpp', 'w') as temp:
temp.write(answer)
try:
test = open('task/test.h').read()
with open('test.h', 'w') as temp:
temp.write(test)
except IOError: # user has not requested task yet
print("A task folder could not be found. Request a task\nusing python client.py request <task_id>")
sys.exit(1) # terminate program
proc = sp.Popen(['g++', 'answer.cpp', 'test.h', '-o', 'test'], stderr=sp.DEVNULL, stdout=sp.DEVNULL)
proc.wait()
if(proc.returncode):
print("Answer failed to compile.")
sys.exit(1)
proc = sp.Popen(['./test'], stdin=sp.PIPE, stdout=sp.PIPE)
test_result, _ = proc.communicate(input=data)
# dispose of temporary files
os.remove('answer.cpp')
os.remove('test.h')
os.remove('test')
return test_result
def create_ssl_cert():
if PY3:
from subprocess import DEVNULL
else:
DEVNULL = open(os.devnull, 'wb')
try:
ssl_exec_list = ['openssl', 'req', '-new', '-x509', '-keyout', ssl_cert_path,
'-out', ssl_cert_path, '-days', '365', '-nodes',
'-subj', '/CN=www.talhasch.com/O=Talhasch Inc./C=TR']
call(ssl_exec_list, stdout=DEVNULL, stderr=DEVNULL)
except OpenSslExecutableNotFoundError:
logging.error('openssl executable not found!')
exit(1)
logging.info('Self signed ssl certificate created at {}'.format(ssl_cert_path))
def _DEPREC_bash_run(cmd, suppress_err=0):
if not U.f_exists('~/.bashrc'):
print('WARNING: ~/.bashrc not found. '
'bash_output() will not be able to read the aliases.', file=sys.stderr)
# hack: if you don't echo something first, your alias such as `ll` will
# kill the script after it's run.
# https://stackoverflow.com/questions/45558993/subprocess-interactive-bash-behavior
cmd = 'printf ""; ' + cmd
err = pc.DEVNULL if suppress_err else pc.STDOUT
proc = pc.check_output(['/bin/bash', '-i', '-c', cmd],
stderr=err,
).decode()
# try:
# err = pc.DEVNULL if suppress_err else None
# return pc.check_output(['/bin/bash', '-i', '-c', cmd],
# stderr=err,
# ).decode()
# except pc.CalledProcessError as exc:
# if suppress_err:
# print('Failed bash call:\n', exc.output.decode(), file=sys.stderr)
# raise
def __start(self, args):
# build aireplay flags
flags = args
if self.DEAUTH_BSSID_FLAG not in flags:
flags.extend([self.DEAUTH_BSSID_FLAG, self._bssid])
if '--ignore-negative-one' not in flags:
flags.extend(["--ignore-negative-one"])
# build Popen command as list
cmd = [self.AIREPLAY] + flags + [self._miff]
print("\t" + " ".join(cmd))
self._proc = Popen(cmd,env={'PATH': os.environ['PATH']}, \
stderr=DEVNULL, stdin=DEVNULL, stdout=DEVNULL)
return True
def cut_video(input_file: str, output_file: str, start_time: str, end_time: str, is_concat: bool = True):
"""
????
:param input_file: ????
:param output_file: ????
:param start_time: ????
:param end_time: ????
:param is_concat: ?????Virtual Concatenation Script
:return:
"""
ffmpeg_command = ['ffmpeg', '-y']
if is_concat:
ffmpeg_command.extend(['-protocol_whitelist', 'file,pipe',
'-safe', '0',
'-f', 'concat',
'-i', '-'])
else:
ffmpeg_command.extend(['-i', input_file])
ffmpeg_command.extend(['-ss', start_time,
'-to', end_time,
'-c', 'copy',
output_file])
child = subprocess.Popen(ffmpeg_command, stdin=subprocess.PIPE, stderr=subprocess.DEVNULL)
child.communicate(input_file if is_concat else None)