def test_join_pairs_all_samples_w_no_joined_seqs(self):
# minmergelen is set very high here, resulting in no sequences
# being joined across the three samples.
with redirected_stdio(stderr=os.devnull):
obs = join_pairs(self.input_seqs, minmergelen=500)
# manifest is as expected
self._test_manifest(obs)
# expected number of fastq files are created
output_fastqs = list(obs.sequences.iter_views(FastqGzFormat))
self.assertEqual(len(output_fastqs), 3)
for fastq_name, fastq_path in output_fastqs:
with redirected_stdio(stderr=os.devnull):
seqs = skbio.io.read(str(fastq_path), format='fastq',
compression='gzip', constructor=skbio.DNA)
seqs = list(seqs)
seq_lengths = np.asarray([len(s) for s in seqs])
self.assertEqual(len(seq_lengths), 0)
python类devnull()的实例源码
def get_parsed_args(self, comp_words):
""" gets the parsed args from a patched parser """
active_parsers = self._patch_argument_parser()
parsed_args = argparse.Namespace()
self.completing = True
if USING_PYTHON2:
# Python 2 argparse only properly works with byte strings.
comp_words = [ensure_bytes(word) for word in comp_words]
try:
stderr = sys.stderr
sys.stderr = io.open(os.devnull, "w")
active_parsers[0].parse_known_args(comp_words, namespace=parsed_args)
sys.stderr.close()
sys.stderr = stderr
except BaseException:
pass
self.completing = False
return parsed_args
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def get_cloud_project():
cmd = [
'gcloud', '-q', 'config', 'list', 'project',
'--format=value(core.project)'
]
with open(os.devnull, 'w') as dev_null:
try:
res = subprocess.check_output(cmd, stderr=dev_null).strip()
if not res:
raise Exception('--cloud specified but no Google Cloud Platform '
'project found.\n'
'Please specify your project name with the --project '
'flag or set a default project: '
'gcloud config set project YOUR_PROJECT_NAME')
return res
except OSError as e:
if e.errno == errno.ENOENT:
raise Exception('gcloud is not installed. The Google Cloud SDK is '
'necessary to communicate with the Cloud ML service. '
'Please install and set up gcloud.')
raise
def convert_image(inpath, outpath, size):
"""Convert an image file using ``sips``.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if ``sips`` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', str(size), str(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with %d' % retcode)
def validate(opt, agent):
old_datatype = agent.opt['datatype']
agent.opt['datatype'] = 'valid'
opt = deepcopy(opt)
opt['datatype'] = 'valid'
opt['terminate'] = True
opt['batchsize'] = 1
old_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
valid_world = create_task(opt, agent)
sys.stdout = old_stdout
for _ in valid_world:
valid_world.parley()
stats = valid_world.report()
agent.opt['datatype'] = old_datatype
return stats
def build(self):
self.line('[<comment>Building tzdata</>]')
dest_path = os.path.join(self.path, 'tz')
# Getting VERSION
with open(os.path.join(dest_path, 'version')) as f:
version = f.read().strip()
self.write('<comment>Building</> version <fg=cyan>{}</>'.format(version))
os.chdir(dest_path)
with open(os.devnull, 'w') as temp:
subprocess.call(
['make', 'TOPDIR={}'.format(dest_path), 'install'],
stdout=temp,
stderr=temp
)
self.overwrite('<info>Built</> version <fg=cyan>{}</>'.format(version))
self.line('')
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def convert_image(inpath, outpath, size):
"""Convert an image file using `sips`.
Args:
inpath (str): Path of source file.
outpath (str): Path to destination file.
size (int): Width and height of destination image in pixels.
Raises:
RuntimeError: Raised if `sips` exits with non-zero status.
"""
cmd = [
b'sips',
b'-z', b'{0}'.format(size), b'{0}'.format(size),
inpath,
b'--out', outpath]
# log().debug(cmd)
with open(os.devnull, 'w') as pipe:
retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)
if retcode != 0:
raise RuntimeError('sips exited with {0}'.format(retcode))
def TeeCmd(cmd, logfile, fail_hard=True):
"""Runs cmd and writes the output to both stdout and logfile."""
# Reading from PIPE can deadlock if one buffer is full but we wait on a
# different one. To work around this, pipe the subprocess's stderr to
# its stdout buffer and don't give it a stdin.
# shell=True is required in cmd.exe since depot_tools has an svn.bat, and
# bat files only work with shell=True set.
proc = subprocess.Popen(cmd, bufsize=1, shell=sys.platform == 'win32',
stdin=open(os.devnull), stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in iter(proc.stdout.readline,''):
Tee(line, logfile)
if proc.poll() is not None:
break
exit_code = proc.wait()
if exit_code != 0 and fail_hard:
print 'Failed:', cmd
sys.exit(1)
def TeeCmd(cmd, logfile, fail_hard=True):
"""Runs cmd and writes the output to both stdout and logfile."""
# Reading from PIPE can deadlock if one buffer is full but we wait on a
# different one. To work around this, pipe the subprocess's stderr to
# its stdout buffer and don't give it a stdin.
# shell=True is required in cmd.exe since depot_tools has an svn.bat, and
# bat files only work with shell=True set.
proc = subprocess.Popen(cmd, bufsize=1, shell=sys.platform == 'win32',
stdin=open(os.devnull), stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in iter(proc.stdout.readline,''):
Tee(line, logfile)
if proc.poll() is not None:
break
exit_code = proc.wait()
if exit_code != 0 and fail_hard:
print 'Failed:', cmd
sys.exit(1)
def run(self, with_intermediate_file=False, cwd=None):
"""Method to run the backup command where it applies."""
command = self.build_dump_command()
if with_intermediate_file:
try:
backup_file_f = open('%s/%s' % (self.output_directory,
self.backup_file), 'w')
except IOError as exc:
raise
p = subprocess.Popen(command.split(), stdout=backup_file_f,
env=self.env, cwd=cwd)
p.wait()
backup_file_f.flush()
else:
FNULL = open(os.devnull, 'w')
p = subprocess.Popen(command.split(), env=self.env, cwd=cwd,
stdout=FNULL, stderr=subprocess.STDOUT)
def restore(self, backup_filename,with_intermediate_file=False):
"""Method to restore the backup."""
self.store.get(self.swift_container, backup_filename,
self.output_directory)
command = self.build_restore_command(backup_filename)
if with_intermediate_file:
file_path = '%s/%s' % (self.output_directory, backup_filename)
backup_file_content = open(file_path, 'r').read()
p = subprocess.Popen(command.split(), stdin=subprocess.PIPE)
p.communicate(backup_file_content)
else:
FNULL = open(os.devnull, 'w')
p = subprocess.Popen(command.split(), stdout=FNULL,
stderr=subprocess.STDOUT)
p.wait()
if self.clean_local_copy:
self._clean_local_copy(backup_filename)
def dump_database(id):
"""Dump the database to a temporary directory."""
tmp_dir = tempfile.mkdtemp()
current_dir = os.getcwd()
os.chdir(tmp_dir)
FNULL = open(os.devnull, 'w')
heroku_app = HerokuApp(dallinger_uid=id, output=FNULL)
heroku_app.backup_capture()
heroku_app.backup_download()
for filename in os.listdir(tmp_dir):
if filename.startswith("latest.dump"):
os.rename(filename, "database.dump")
os.chdir(current_dir)
return os.path.join(tmp_dir, "database.dump")
def setUp(self):
super(PostArgParseSetupTest, self).setUp()
self.config.debug = False
self.config.max_log_backups = 1000
self.config.quiet = False
self.config.verbose_count = constants.CLI_DEFAULTS['verbose_count']
self.devnull = open(os.devnull, 'w')
from certbot.log import ColoredStreamHandler
self.stream_handler = ColoredStreamHandler(six.StringIO())
from certbot.log import MemoryHandler, TempHandler
self.temp_handler = TempHandler()
self.temp_path = self.temp_handler.path
self.memory_handler = MemoryHandler(self.temp_handler)
self.root_logger = mock.MagicMock(
handlers=[self.memory_handler, self.stream_handler])