def designPrimers(p3_args, input_log=None, output_log=None, err_log=None):
''' Return the raw primer3_core output for the provided primer3 args.
Returns an ordered dict of the boulderIO-format primer3 output file
'''
sp = subprocess.Popen([pjoin(PRIMER3_HOME, 'primer3_core')],
stdout=subprocess.PIPE, stdin=subprocess.PIPE,
stderr=subprocess.STDOUT)
p3_args.setdefault('PRIMER_THERMODYNAMIC_PARAMETERS_PATH',
pjoin(PRIMER3_HOME, 'primer3_config/'))
in_str = _formatBoulderIO(p3_args)
if input_log:
input_log.write(in_str)
input_log.flush()
out_str, err_str = sp.communicate(input=in_str)
if output_log:
output_log.write(out_str)
output_log.flush()
if err_log and err_str is not None:
err_log.write(err_str)
err_log.flush()
return _parseBoulderIO(out_str)
python类STDOUT的实例源码
def call(self, args, devnull=False):
"""Call other processes.
args - list of command args
devnull - whether to pipe stdout to /dev/null (or equivalent)
"""
if self.debug:
click.echo(subprocess.list2cmdline(args))
click.confirm('Continue?', default=True, abort=True)
try:
kwargs = {}
if devnull:
# Pipe to /dev/null (or equivalent).
kwargs['stderr'] = subprocess.STDOUT
kwargs['stdout'] = self.FNULL
ret_code = subprocess.call(args, **kwargs)
except subprocess.CalledProcessError:
return False
return ret_code
def _start_from_profile_path(self, path):
self._firefox_env["XRE_PROFILE_PATH"] = path
if platform.system().lower() == 'linux':
self._modify_link_library_path()
command = [self._start_cmd, "-silent"]
if self.command_line is not None:
for cli in self.command_line:
command.append(cli)
# The following exists upstream and is known to create hanging
# firefoxes, leading to zombies.
# subprocess.Popen(command, stdout=self._log_file,
# stderr=subprocess.STDOUT,
# env=self._firefox_env).communicate()
command[1] = '-foreground'
self.process = subprocess.Popen(
command, stdout=self._log_file, stderr=subprocess.STDOUT,
env=self._firefox_env)
def __init__(self, project, arguments=None, stdout="file", stdin=None, timeout=10.0, name=None):
if not name:
name = "process:%s" % basename(arguments[0])
ProjectAgent.__init__(self, project, name)
self.env = Environment(self)
if arguments is None:
arguments = []
self.cmdline = CommandLine(self, arguments)
self.timeout = timeout
self.max_memory = 100*1024*1024
self.stdout = stdout
self.popen_args = {
'stderr': STDOUT,
}
if stdin is not None:
if stdin == "null":
self.popen_args['stdin'] = open('/dev/null', 'r')
else:
raise ValueError("Invalid stdin value: %r" % stdin)
test_subscription_transport.py 文件源码
项目:graphql-python-subscriptions
作者: hballard
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_should_trigger_on_connect_if_client_connect_valid(server_with_mocks):
node_script = '''
module.paths.push('{0}')
WebSocket = require('ws')
const SubscriptionClient =
require('subscriptions-transport-ws').SubscriptionClient
new SubscriptionClient('ws://localhost:{1}/socket')
'''.format(
os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
try:
subprocess.check_output(
['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2)
except:
mock = server_with_mocks.get_nowait()
assert mock.name == 'on_connect'
mock.assert_called_once()
test_subscription_transport.py 文件源码
项目:graphql-python-subscriptions
作者: hballard
项目源码
文件源码
阅读 30
收藏 0
点赞 0
评论 0
def test_should_trigger_on_connect_with_correct_cxn_params(server_with_mocks):
node_script = '''
module.paths.push('{0}')
WebSocket = require('ws')
const SubscriptionClient =
require('subscriptions-transport-ws').SubscriptionClient
const connectionParams = {{test: true}}
new SubscriptionClient('ws://localhost:{1}/socket', {{
connectionParams,
}})
'''.format(
os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
try:
subprocess.check_output(
['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2)
except:
mock = server_with_mocks.get_nowait()
assert mock.name == 'on_connect'
mock.assert_called_once()
mock.assert_called_with({'test': True})
def interface_exists(interface):
'''
Checks if interface exists on node.
'''
try:
subprocess.check_call(['ip', 'link', 'show', interface],
stdout=open(os.devnull, 'w'),
stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
return False
return True
def systemv_services_running():
output = subprocess.check_output(
['service', '--status-all'],
stderr=subprocess.STDOUT).decode('UTF-8')
return [row.split()[-1] for row in output.split('\n') if '[ + ]' in row]
def service_available(service_name):
"""Determine whether a system service is available"""
try:
subprocess.check_output(
['service', service_name, 'status'],
stderr=subprocess.STDOUT).decode('UTF-8')
except subprocess.CalledProcessError as e:
return b'unrecognized service' not in e.output
else:
return True
def _call_security(self, action, service, account, *args):
"""Call ``security`` CLI program that provides access to keychains.
May raise `PasswordNotFound`, `PasswordExists` or `KeychainError`
exceptions (the first two are subclasses of `KeychainError`).
:param action: The ``security`` action to call, e.g.
``add-generic-password``
:type action: ``unicode``
:param service: Name of the service.
:type service: ``unicode``
:param account: name of the account the password is for, e.g.
"Pinboard"
:type account: ``unicode``
:param password: the password to secure
:type password: ``unicode``
:param *args: list of command line arguments to be passed to
``security``
:type *args: `list` or `tuple`
:returns: ``(retcode, output)``. ``retcode`` is an `int`, ``output`` a
``unicode`` string.
:rtype: `tuple` (`int`, ``unicode``)
"""
cmd = ['security', action, '-s', service, '-a', account] + list(args)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdout, _ = p.communicate()
if p.returncode == 44: # password does not exist
raise PasswordNotFound()
elif p.returncode == 45: # password already exists
raise PasswordExists()
elif p.returncode > 0:
err = KeychainError('Unknown Keychain error : %s' % stdout)
err.retcode = p.returncode
raise err
return stdout.strip().decode('utf-8')
def run_script(self, script):
fd, self.tmp_script_filename = tempfile.mkstemp()
os.close(fd)
f = open(self.tmp_script_filename, 'w')
f.write(script)
f.close()
os.chmod(self.tmp_script_filename, 0o744)
p = subprocess.Popen(
self.tmp_script_filename, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
self.process = p
self.stream = p.stdout
def scrapeDoi(url):
env = os.environ.copy()
cmd_line = ['timeout', '30s', 'google-chrome-unstable', '--headless', '--dump-dom', url]
p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
env=env)
out, err = p.communicate()
if p.returncode:
print('UTOH')
return None
elif b'ERROR:headless_shell.cc' in out:
print(out)
raise IOError('Something is wrong...')
qurl = quote(url, '')
if len(qurl) > 200:
qurl = qurl[:200]
with open(os.path.expanduser(f'~/files/scibot/{qurl}'), 'wb') as f:
f.write(out)
both = BeautifulSoup(out, 'lxml')
doi = getDoi(both, both)
return doi
def sys_run(command,check=False):
Ret = subprocess.run(command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, shell=True, check=check)
return Ret
def list_links():
try:
ret = subprocess.run(['ip', 'link', 'show'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
links = ipcontrol.parse(ret.stdout.decode('utf-8'))
return [True, list(links.keys())]
except subprocess.CalledProcessError as suberror:
return [False, "list links failed : %s" % suberror.stdout.decode('utf-8')]
def link_exist(linkname):
try:
subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return True
except subprocess.CalledProcessError:
return False
def link_info(linkname):
try:
ret = subprocess.run(['ip', 'address', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]]
except subprocess.CalledProcessError as suberror:
return [False, "get link info failed : %s" % suberror.stdout.decode('utf-8')]
def link_state(linkname):
try:
ret = subprocess.run(['ip', 'link', 'show', 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, ipcontrol.parse(ret.stdout.decode('utf-8'))[str(linkname)]['state']]
except subprocess.CalledProcessError as suberror:
return [False, "get link state failed : %s" % suberror.stdout.decode('utf-8')]
def down_link(linkname):
try:
subprocess.run(['ip', 'link', 'set', 'dev', str(linkname), 'down'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(linkname)]
except subprocess.CalledProcessError as suberror:
return [False, "set link down failed : %s" % suberror.stdout.decode('utf-8')]
def add_addr(linkname, address):
try:
subprocess.run(['ip', 'address', 'add', address, 'dev', str(linkname)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(linkname)]
except subprocess.CalledProcessError as suberror:
return [False, "add address failed : %s" % suberror.stdout.decode('utf-8')]
def list_bridges():
try:
ret = subprocess.run(['ovs-vsctl', 'list-br'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, ret.stdout.decode('utf-8').split()]
except subprocess.CalledProcessError as suberror:
return [False, "list bridges failed : %s" % suberror.stdout.decode('utf-8')]