def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
python类devnull()的实例源码
def test_err_in_fun(self):
# Test that the original signal this process was hit with
# is not returned in case fun raise an exception. Instead,
# we're supposed to see retsig = 1.
ret = pyrun(textwrap.dedent(
"""
import os, signal, imp, sys
mod = imp.load_source("mod", r"{}")
def foo():
sys.stderr = os.devnull
1 / 0
sig = signal.SIGTERM if os.name == 'posix' else \
signal.CTRL_C_EVENT
mod.register_exit_fun(foo)
os.kill(os.getpid(), sig)
""".format(os.path.abspath(__file__), TESTFN)
))
if POSIX:
self.assertEqual(ret, 1)
assert ret != signal.SIGTERM, strfsig(ret)
def epubcheck_help():
"""Return epubcheck.jar commandline help text.
:return unicode: helptext from epubcheck.jar
"""
# tc = locale.getdefaultlocale()[1]
with open(os.devnull, "w") as devnull:
p = subprocess.Popen(
[c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
stdout=subprocess.PIPE,
stderr=devnull,
)
result = p.communicate()[0]
return result.decode()
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
""" Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
raise ValueError('Unsupported method at the moment')
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
'-w', weight_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
proc.stdin.write('%d\t%d\t%f\n' % ijx)
proc.stdin.close()
proc.wait()
devnull.close()
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
""" Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
devnull = open(os.devnull, 'w')
proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
'-i', '/dev/stdin',
'-o', bin_filename,
], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)
# Stream text triplets to 'convert'
for ij in itertools.izip(matrix.row, matrix.col):
proc.stdin.write('%d\t%d\n' % ij)
proc.stdin.close()
proc.wait()
devnull.close()
def get_audio_streams(self):
with open(os.devnull, 'w') as DEV_NULL:
#Get file info and Parse it
try:
proc = subprocess.Popen([
FFPROBE,
'-i', self.input_video,
'-of', 'json',
'-show_streams'
], stdout=subprocess.PIPE, stderr=DEV_NULL)
except OSError as e:
if e.errno == os.errno.ENOENT:
Logger.error("FFPROBE not found, install on your system to use this script")
sys.exit(0)
output = proc.stdout.read()
return get_audio_streams(json.loads(output))
def launch(self, cfg, path, flags):
logging.debug("Determine the OS and Architecture this application is currently running on")
hostOS = platform.system().lower()
logging.debug("hostOS: " + str(hostOS))
is_64bits = sys.maxsize > 2 ** 32
if is_64bits:
hostArchitecture = 'x64'
else:
hostArchitecture = 'ia32'
logging.debug("hostArchitecture: " + str(hostArchitecture))
if(self.validateConfig(cfg, hostOS, hostArchitecture)):
fnull = open(os.devnull, 'w')
if os.environ.get("WPW_HOME") is not None:
cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
else:
cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
cmd.extend(flags)
proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
return proc
else:
logging.debug("Invalid OS/Architecture combination detected")
def win64_available(self):
wine_bin, \
wineserver_bin, \
wine_lib = self.get_wine_bin_path()
dev_null = open(os.devnull, 'w')
try:
proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
dev_null.close()
stdoutdata, stderrdata = proc.communicate()
if proc.returncode == 1:
self.combobox_winearch.set_visible(True)
return True
else:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
except:
self.combobox_winearch.set_visible(False)
self.winearch = 'win32'
return False
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
"""Ensure that the IPFS daemon is running via HTTP before proceeding"""
client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)
try:
# OSError if ipfs not installed, redundant of below
# subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))
# ConnectionError/AttributeError if IPFS daemon not running
client.id()
return True
except (ConnectionError, exceptions.AttributeError):
logError("Daemon is not running at http://" + hostAndPort)
return False
except OSError:
logError("IPFS is likely not installed. "
"See https://ipfs.io/docs/install/")
sys.exit()
except:
logError('Unknown error in retrieving daemon status')
logError(sys.exc_info()[0])
def _query(self):
"""
Returns the value of deepsea_minions
"""
# When search matches no minions, salt prints to stdout.
# Suppress stdout.
_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
# Relying on side effect - pylint: disable=unused-variable
ret = self.local.cmd('*', 'saltutil.pillar_refresh')
minions = self.local.cmd('*', 'pillar.get', ['deepsea_minions'],
expr_form="compound")
sys.stdout = _stdout
for minion in minions:
if minions[minion]:
return minions[minion]
log.error("deepsea_minions is not set")
return []
def _matches(self):
"""
Returns the list of matched minions
"""
if self.deepsea_minions:
# When search matches no minions, salt prints to stdout.
# Suppress stdout.
_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
result = self.local.cmd(self.deepsea_minions,
'pillar.get',
['id'],
expr_form="compound")
sys.stdout = _stdout
return result.keys()
return []
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def test_exception(self):
# Make sure handler fun is called on exception.
ret = pyrun(textwrap.dedent(
"""
import os, imp, sys
mod = imp.load_source("mod", r"{}")
def foo():
with open(r"{}", "ab") as f:
f.write(b"1")
mod.register_exit_fun(foo)
sys.stderr = os.devnull
1 / 0
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 1)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def test_kinterrupt(self):
# Simulates CTRL + C and make sure the exit function is called.
ret = pyrun(textwrap.dedent(
"""
import os, imp, sys
mod = imp.load_source("mod", r"{}")
def foo():
with open(r"{}", "ab") as f:
f.write(b"1")
mod.register_exit_fun(foo)
sys.stderr = os.devnull
raise KeyboardInterrupt
""".format(os.path.abspath(__file__), TESTFN)
))
self.assertEqual(ret, 1)
with open(TESTFN, "rb") as f:
self.assertEqual(f.read(), b"1")
def __init__(self, raw_email, debug=False):
'''
Setup the base options of the copy/convert setup
'''
self.raw_email = raw_email
self.log_processing = StringIO()
self.log_content = StringIO()
self.tree(self.raw_email)
twiggy_out = outputs.StreamOutput(formats.shell_format, stream=self.log_processing)
emitters['*'] = filters.Emitter(levels.DEBUG, True, twiggy_out)
self.log_name = log.name('files')
self.cur_attachment = None
self.debug = debug
if self.debug:
if not os.path.exists('debug_logs'):
os.makedirs('debug_logs')
self.log_debug_err = os.path.join('debug_logs', 'debug_stderr.log')
self.log_debug_out = os.path.join('debug_logs', 'debug_stdout.log')
else:
self.log_debug_err = os.devnull
self.log_debug_out = os.devnull
def call(self, args, devnull=False):
"""Call other processes.
args - list of command args
devnull - whether to pipe stdout to /dev/null (or equivalent)
"""
if self.debug:
click.echo(subprocess.list2cmdline(args))
click.confirm('Continue?', default=True, abort=True)
try:
kwargs = {}
if devnull:
# Pipe to /dev/null (or equivalent).
kwargs['stderr'] = subprocess.STDOUT
kwargs['stdout'] = self.FNULL
ret_code = subprocess.call(args, **kwargs)
except subprocess.CalledProcessError:
return False
return ret_code
def _check_ssl_cert(self, cert, key):
# Check SSL-Certificate with openssl, if possible
try:
exit_code = subprocess.call(
["openssl", "x509", "-text", "-noout", "-in", cert],
stdout=open(os.devnull, 'wb'),
stderr=subprocess.STDOUT)
except OSError:
exit_code = 0
if exit_code is 0:
try:
self.httpd.socket = ssl.wrap_socket(
self.httpd.socket, certfile=cert, keyfile=key, server_side=True)
except ssl.SSLError as error:
self.logger.exception('Failed to init SSL socket')
raise TelegramError(str(error))
else:
raise TelegramError('SSL Certificate invalid')
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def run_script(script):
script = 'set -euo pipefail\n' + script
try:
with open(os.devnull) as devnull:
# is this the right way to block stdin?
data = subprocess.check_output(['bash', '-c', script], stderr=subprocess.STDOUT, stdin=devnull)
status = 0
except subprocess.CalledProcessError as ex:
data = ex.output
status = ex.returncode
data = data.decode('utf8')
if status != 0:
raise PheWebError(
'FAILED with status {}\n'.format(status) +
'output was:\n' +
data)
return data
def __init__(self, targetfd, tmpfile=None):
self.targetfd = targetfd
try:
self.targetfd_save = os.dup(self.targetfd)
except OSError:
self.start = lambda: None
self.done = lambda: None
else:
if targetfd == 0:
assert not tmpfile, "cannot set tmpfile with stdin"
tmpfile = open(os.devnull, "r")
self.syscapture = SysCapture(targetfd)
else:
if tmpfile is None:
f = TemporaryFile()
with f:
tmpfile = safe_text_dupfile(f, mode="wb+")
if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, tmpfile)
else:
self.syscapture = NoCapture()
self.tmpfile = tmpfile
self.tmpfile_fd = tmpfile.fileno()
def analyze_structure(seq,filename,ensemble=False):
chdir(project_dir)
system("echo '" + str(seq) + "' > " + filename + ".seq")
fnull = open(devnull, 'w') #this line is necessary to omit output generated by UNAFOLD
if ensemble:
call("./3rdParty/unafold/UNAFold.pl -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull) #code is necessary to omit output generated by UNAFOLD
else:
call("./3rdParty/unafold/hybrid-ss-min -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull) #code is necessary to omit output generated by UNAFOLD
if os.path.isfile(filename+".ct"):
system("mv %s*.ct tmp/structures/" % filename)
# remove tmp files
system("rm %s*" % filename)
#system("mv " + filename + "* tmp/unafold_files/")
fnull.close()
return 1
def _run_cdhit(self, fasta_path, identity=0.95, word_size=5, description_length=0, cdhit_path=None):
""" Run CDHIT-EST for sequences, install with sudo apt install cd-hit on Ubuntu """
self.messages.get_cdhit_message(identity)
if cdhit_path is None:
cdhit_path = "cd-hit-est"
file_name = self.project + "_IdentityClusters_" + str(identity)
out_file = os.path.join(self.tmp_path, file_name)
cluster_path = os.path.join(self.tmp_path, file_name + '.clstr')
with open(os.devnull, "w") as devnull:
call([cdhit_path, "-i", fasta_path, "-o", out_file, "-c", str(identity), "-n", str(word_size),
"-d", str(description_length)], stdout=devnull)
return cluster_path
def disassemble_it(filename, address=None):
"""
Wrapper for the ruby disassembler script.
"""
FNULL = open(os.devnull, 'w')
if address is not None:
outfile = filename + "_disass_" + hex(address)
else:
outfile = filename + "_disass_None"
args = ['ruby', 'analysis_tools/disassfunc.rb',
"-graph", "-svg", "-o", outfile, filename]
if address is not None:
args.append(hex(address))
proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL)
proc.wait()
FNULL.close()
app.logger.debug("Disassembly just finished!")
return outfile
def analyze_it(self):
"""
Wrapper for the ruby analysis script.
Executes and get results from files.
"""
FNULL = open(os.devnull, 'w')
args = ['ruby', 'analysis_tools/AnalyzeIt.rb', self.storage_file]
proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL)
proc.wait()
FNULL.close()
# TEXT report, just UTF-8 decode/parsing
fname = self.storage_file + '.txt'
if os.path.exists(fname):
data = open(fname, 'rb').read()
self.txt_report = re.sub(r'[^\x00-\x7F]', '', data).decode('utf-8')
return True
def test_no_clustering(self):
with redirected_stdio(stderr=os.devnull):
obs_table, obs_sequences = cluster_features_de_novo(
sequences=self.input_sequences, table=self.input_table,
perc_identity=1.0)
# order of identifiers is important for biom.Table equality
obs_table = \
obs_table.sort_order(self.input_table.ids(axis='observation'),
axis='observation')
self.assertEqual(obs_table, self.input_table)
obs_seqs = _read_seqs(obs_sequences)
# sequences are reverse-sorted by abundance in output
exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3],
self.input_sequences_list[2], self.input_sequences_list[1]]
self.assertEqual(obs_seqs, exp_seqs)
def test_99_percent_clustering(self):
exp_table = biom.Table(np.array([[104, 106, 109],
[1, 1, 2],
[7, 8, 9]]),
['feature1', 'feature2',
'feature4'],
['sample1', 'sample2', 'sample3'])
with redirected_stdio(stderr=os.devnull):
obs_table, obs_sequences = cluster_features_de_novo(
sequences=self.input_sequences, table=self.input_table,
perc_identity=0.99)
# order of identifiers is important for biom.Table equality
obs_table = \
obs_table.sort_order(exp_table.ids(axis='observation'),
axis='observation')
self.assertEqual(obs_table, exp_table)
# sequences are reverse-sorted by abundance in output
obs_seqs = _read_seqs(obs_sequences)
exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3],
self.input_sequences_list[1]]
self.assertEqual(obs_seqs, exp_seqs)
def test_97_percent_clustering_feature1_most_abundant(self):
exp_table = biom.Table(np.array([[111, 114, 118],
[1, 1, 2]]),
['feature1', 'feature2'],
['sample1', 'sample2', 'sample3'])
with redirected_stdio(stderr=os.devnull):
obs_table, obs_sequences = cluster_features_de_novo(
sequences=self.input_sequences, table=self.input_table,
perc_identity=0.97)
# order of identifiers is important for biom.Table equality
obs_table = \
obs_table.sort_order(exp_table.ids(axis='observation'),
axis='observation')
self.assertEqual(obs_table, exp_table)
# sequences are reverse-sorted by abundance in output
obs_seqs = _read_seqs(obs_sequences)
exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[1]]
self.assertEqual(obs_seqs, exp_seqs)
def test_uchime_denovo(self):
with redirected_stdio(stderr=os.devnull):
chime, nonchime, stats = uchime_denovo(
sequences=self.input_sequences, table=self.input_table)
obs_chime = _read_seqs(chime)
exp_chime = [self.input_sequences_list[3]]
self.assertEqual(obs_chime, exp_chime)
# sequences are reverse-sorted by abundance in output
obs_nonchime = _read_seqs(nonchime)
exp_nonchime = [self.input_sequences_list[0],
self.input_sequences_list[1],
self.input_sequences_list[2]]
self.assertEqual(obs_nonchime, exp_nonchime)
with stats.open() as stats_fh:
stats_text = stats_fh.read()
self.assertTrue('feature1' in stats_text)
self.assertTrue('feature2' in stats_text)
self.assertTrue('feature3' in stats_text)
self.assertTrue('feature4' in stats_text)
stats_lines = [e for e in stats_text.split('\n')
if len(e) > 0]
self.assertEqual(len(stats_lines), 4)