python类devnull()的实例源码

notify.py 文件源码 项目:alfred-mpd 作者: deanishe 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
recipe-580672.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_err_in_fun(self):
            # Test that the original signal this process was hit with
            # is not returned in case fun raise an exception. Instead,
            # we're supposed to see retsig = 1.
            ret = pyrun(textwrap.dedent(
                """
                import os, signal, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    sys.stderr = os.devnull
                    1 / 0

                sig = signal.SIGTERM if os.name == 'posix' else \
                    signal.CTRL_C_EVENT
                mod.register_exit_fun(foo)
                os.kill(os.getpid(), sig)
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            if POSIX:
                self.assertEqual(ret, 1)
                assert ret != signal.SIGTERM, strfsig(ret)
utils.py 文件源码 项目:epubcheck 作者: titusz 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def epubcheck_help():
    """Return epubcheck.jar commandline help text.

    :return unicode: helptext from epubcheck.jar
    """

    # tc = locale.getdefaultlocale()[1]

    with open(os.devnull, "w") as devnull:
        p = subprocess.Popen(
            [c.JAVA, '-Duser.language=en', '-jar', c.EPUBCHECK, '-h'],
            stdout=subprocess.PIPE,
            stderr=devnull,
        )
        result = p.communicate()[0]

    return result.decode()
graphclust.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def pipe_weighted_edgelist_to_convert(matrix, bin_filename, weight_filename):
    """ Pipe a weighted edgelist (COO sparse matrix) to Louvain's convert utility """
    raise ValueError('Unsupported method at the moment')

    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                           '-w', weight_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ijx in itertools.izip(matrix.row, matrix.col, matrix.data):
        proc.stdin.write('%d\t%d\t%f\n' % ijx)

    proc.stdin.close()
    proc.wait()
    devnull.close()
graphclust.py 文件源码 项目:cellranger 作者: 10XGenomics 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def pipe_unweighted_edgelist_to_convert(matrix, bin_filename):
    """ Pipe an unweighted edgelist (COO sparse matrix) to Louvain's convert utility """
    devnull = open(os.devnull, 'w')

    proc = subprocess.Popen([LOUVAIN_CONVERT_BINPATH,
                           '-i', '/dev/stdin',
                           '-o', bin_filename,
                         ], stdin=subprocess.PIPE, stdout=devnull, stderr=devnull)

    # Stream text triplets to 'convert'
    for ij in itertools.izip(matrix.row, matrix.col):
        proc.stdin.write('%d\t%d\n' % ij)

    proc.stdin.close()
    proc.wait()
    devnull.close()
conv2mp4-server.py 文件源码 项目:conv2mp4-py 作者: Kameecoding 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_audio_streams(self):
        with open(os.devnull, 'w') as DEV_NULL:
            #Get file info and Parse it
            try:
                proc = subprocess.Popen([
                    FFPROBE,
                    '-i', self.input_video,
                    '-of', 'json',
                    '-show_streams'
                ], stdout=subprocess.PIPE, stderr=DEV_NULL)
            except OSError as e:
                if e.errno == os.errno.ENOENT:
                    Logger.error("FFPROBE not found, install on your system to use this script")
                    sys.exit(0)
            output = proc.stdout.read()

            return get_audio_streams(json.loads(output))
launcher.py 文件源码 项目:wpw-sdk-python 作者: WPTechInnovation 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def launch(self, cfg, path, flags):
        logging.debug("Determine the OS and Architecture this application is currently running on")
        hostOS = platform.system().lower()
        logging.debug("hostOS: " + str(hostOS))
        is_64bits = sys.maxsize > 2 ** 32
        if is_64bits:
            hostArchitecture = 'x64'
        else:
            hostArchitecture = 'ia32'
        logging.debug("hostArchitecture: " + str(hostArchitecture))
        if(self.validateConfig(cfg, hostOS, hostArchitecture)):
            fnull = open(os.devnull, 'w')
            if os.environ.get("WPW_HOME") is not None:
                cmd = [os.environ["WPW_HOME"] + '/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            else:
                cmd = [path + '/wpwithinpy/iot-core-component/bin/rpc-agent-' + platform.system().lower() + '-' + self.detectHostArchitecture()]
            cmd.extend(flags)
            proc = subprocess.Popen(cmd, stdin=None, stdout=fnull, stderr=subprocess.STDOUT)
            return proc
        else:
            logging.debug("Invalid OS/Architecture combination detected")
launcher_wine.py 文件源码 项目:games_nebula 作者: yancharkin 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def win64_available(self):

        wine_bin, \
        wineserver_bin, \
        wine_lib = self.get_wine_bin_path()

        dev_null = open(os.devnull, 'w')
        try:
            proc = subprocess.Popen([wine_bin + '64'], stdout=dev_null, \
                stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            dev_null.close()
            stdoutdata, stderrdata = proc.communicate()
            if proc.returncode == 1:
                self.combobox_winearch.set_visible(True)
                return True
            else:
                self.combobox_winearch.set_visible(False)
                self.winearch = 'win32'
                return False
        except:
            self.combobox_winearch.set_visible(False)
            self.winearch = 'win32'
            return False
util.py 文件源码 项目:ipwb 作者: oduwsdl 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def isDaemonAlive(hostAndPort="{0}:{1}".format(IPFSAPI_IP, IPFSAPI_PORT)):
    """Ensure that the IPFS daemon is running via HTTP before proceeding"""
    client = ipfsapi.Client(IPFSAPI_IP, IPFSAPI_PORT)

    try:
        # OSError if ipfs not installed, redundant of below
        # subprocess.call(['ipfs', '--version'], stdout=open(devnull, 'wb'))

        # ConnectionError/AttributeError if IPFS daemon not running
        client.id()
        return True
    except (ConnectionError, exceptions.AttributeError):
        logError("Daemon is not running at http://" + hostAndPort)
        return False
    except OSError:
        logError("IPFS is likely not installed. "
                 "See https://ipfs.io/docs/install/")
        sys.exit()
    except:
        logError('Unknown error in retrieving daemon status')
        logError(sys.exc_info()[0])
deepsea_minions.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def _query(self):
        """
        Returns the value of deepsea_minions
        """
        # When search matches no minions, salt prints to stdout.
        # Suppress stdout.
        _stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

        # Relying on side effect - pylint: disable=unused-variable
        ret = self.local.cmd('*', 'saltutil.pillar_refresh')
        minions = self.local.cmd('*', 'pillar.get', ['deepsea_minions'],
                                 expr_form="compound")
        sys.stdout = _stdout
        for minion in minions:
            if minions[minion]:
                return minions[minion]

        log.error("deepsea_minions is not set")
        return []
deepsea_minions.py 文件源码 项目:DeepSea 作者: SUSE 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _matches(self):
        """
        Returns the list of matched minions
        """
        if self.deepsea_minions:
            # When search matches no minions, salt prints to stdout.
            # Suppress stdout.
            _stdout = sys.stdout
            sys.stdout = open(os.devnull, 'w')
            result = self.local.cmd(self.deepsea_minions,
                                    'pillar.get',
                                    ['id'],
                                    expr_form="compound")
            sys.stdout = _stdout
            return result.keys()
        return []
notify.py 文件源码 项目:Gank-Alfred-Workflow 作者: hujiaweibujidao 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
notify.py 文件源码 项目:Gank-Alfred-Workflow 作者: hujiaweibujidao 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
recipe-580672.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_exception(self):
            # Make sure handler fun is called on exception.
            ret = pyrun(textwrap.dedent(
                """
                import os, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:
                        f.write(b"1")

                mod.register_exit_fun(foo)
                sys.stderr = os.devnull
                1 / 0
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            self.assertEqual(ret, 1)
            with open(TESTFN, "rb") as f:
                self.assertEqual(f.read(), b"1")
recipe-580672.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_kinterrupt(self):
            # Simulates CTRL + C and make sure the exit function is called.
            ret = pyrun(textwrap.dedent(
                """
                import os, imp, sys
                mod = imp.load_source("mod", r"{}")

                def foo():
                    with open(r"{}", "ab") as f:
                        f.write(b"1")

                mod.register_exit_fun(foo)
                sys.stderr = os.devnull
                raise KeyboardInterrupt
                """.format(os.path.abspath(__file__), TESTFN)
            ))
            self.assertEqual(ret, 1)
            with open(TESTFN, "rb") as f:
                self.assertEqual(f.read(), b"1")
helpers.py 文件源码 项目:PyCIRCLeanMail 作者: CIRCL 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, raw_email, debug=False):
        '''
            Setup the base options of the copy/convert setup
        '''
        self.raw_email = raw_email
        self.log_processing = StringIO()
        self.log_content = StringIO()
        self.tree(self.raw_email)

        twiggy_out = outputs.StreamOutput(formats.shell_format, stream=self.log_processing)
        emitters['*'] = filters.Emitter(levels.DEBUG, True, twiggy_out)

        self.log_name = log.name('files')

        self.cur_attachment = None

        self.debug = debug
        if self.debug:
            if not os.path.exists('debug_logs'):
                os.makedirs('debug_logs')
            self.log_debug_err = os.path.join('debug_logs', 'debug_stderr.log')
            self.log_debug_out = os.path.join('debug_logs', 'debug_stdout.log')
        else:
            self.log_debug_err = os.devnull
            self.log_debug_out = os.devnull
cli.py 文件源码 项目:wib 作者: chengsoonong 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def call(self, args, devnull=False):
        """Call other processes.
        args - list of command args
        devnull - whether to pipe stdout to /dev/null (or equivalent)
        """
        if self.debug:
            click.echo(subprocess.list2cmdline(args))
            click.confirm('Continue?', default=True, abort=True)
        try:
            kwargs = {}
            if devnull:
                # Pipe to /dev/null (or equivalent).
                kwargs['stderr'] = subprocess.STDOUT
                kwargs['stdout'] = self.FNULL
            ret_code = subprocess.call(args, **kwargs)
        except subprocess.CalledProcessError:
            return False
        return ret_code
updater.py 文件源码 项目:mobot 作者: JokerQyou 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _check_ssl_cert(self, cert, key):
        # Check SSL-Certificate with openssl, if possible
        try:
            exit_code = subprocess.call(
                ["openssl", "x509", "-text", "-noout", "-in", cert],
                stdout=open(os.devnull, 'wb'),
                stderr=subprocess.STDOUT)
        except OSError:
            exit_code = 0
        if exit_code is 0:
            try:
                self.httpd.socket = ssl.wrap_socket(
                    self.httpd.socket, certfile=cert, keyfile=key, server_side=True)
            except ssl.SSLError as error:
                self.logger.exception('Failed to init SSL socket')
                raise TelegramError(str(error))
        else:
            raise TelegramError('SSL Certificate invalid')
notify.py 文件源码 项目:workflows.kyoyue 作者: wizyoung 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
notify.py 文件源码 项目:alphy 作者: maximepeschard 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def convert_image(inpath, outpath, size):
    """Convert an image file using `sips`.

    Args:
        inpath (str): Path of source file.
        outpath (str): Path to destination file.
        size (int): Width and height of destination image in pixels.

    Raises:
        RuntimeError: Raised if `sips` exits with non-zero status.
    """
    cmd = [
        b'sips',
        b'-z', b'{0}'.format(size), b'{0}'.format(size),
        inpath,
        b'--out', outpath]
    # log().debug(cmd)
    with open(os.devnull, 'w') as pipe:
        retcode = subprocess.call(cmd, stdout=pipe, stderr=subprocess.STDOUT)

    if retcode != 0:
        raise RuntimeError('sips exited with {0}'.format(retcode))
load_utils.py 文件源码 项目:pheweb 作者: statgen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_script(script):
    script = 'set -euo pipefail\n' + script
    try:
        with open(os.devnull) as devnull:
            # is this the right way to block stdin?
            data = subprocess.check_output(['bash', '-c', script], stderr=subprocess.STDOUT, stdin=devnull)
        status = 0
    except subprocess.CalledProcessError as ex:
        data = ex.output
        status = ex.returncode
    data = data.decode('utf8')
    if status != 0:
        raise PheWebError(
            'FAILED with status {}\n'.format(status) +
            'output was:\n' +
            data)
    return data
capture.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def __init__(self, targetfd, tmpfile=None):
        self.targetfd = targetfd
        try:
            self.targetfd_save = os.dup(self.targetfd)
        except OSError:
            self.start = lambda: None
            self.done = lambda: None
        else:
            if targetfd == 0:
                assert not tmpfile, "cannot set tmpfile with stdin"
                tmpfile = open(os.devnull, "r")
                self.syscapture = SysCapture(targetfd)
            else:
                if tmpfile is None:
                    f = TemporaryFile()
                    with f:
                        tmpfile = safe_text_dupfile(f, mode="wb+")
                if targetfd in patchsysdict:
                    self.syscapture = SysCapture(targetfd, tmpfile)
                else:
                    self.syscapture = NoCapture()
            self.tmpfile = tmpfile
            self.tmpfile_fd = tmpfile.fileno()
Functions.py 文件源码 项目:d-tailor 作者: jcg 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def analyze_structure(seq,filename,ensemble=False):


    chdir(project_dir)

    system("echo '" + str(seq) + "' > " + filename + ".seq")        
    fnull = open(devnull, 'w')   #this line is necessary to omit output generated by UNAFOLD
    if ensemble:         
        call("./3rdParty/unafold/UNAFold.pl -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull)     #code is necessary to omit output generated by UNAFOLD
    else:
        call("./3rdParty/unafold/hybrid-ss-min -n RNA " + filename + ".seq", shell = True, stdout = fnull, stderr = fnull)     #code is necessary to omit output generated by UNAFOLD

    if os.path.isfile(filename+".ct"):        
        system("mv %s*.ct tmp/structures/" % filename)
        # remove tmp files

    system("rm %s*" % filename)
    #system("mv " + filename + "* tmp/unafold_files/")
    fnull.close()             

    return 1
DartModules.py 文件源码 项目:dartqc 作者: esteinig 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _run_cdhit(self, fasta_path, identity=0.95, word_size=5, description_length=0, cdhit_path=None):

        """ Run CDHIT-EST for sequences, install with sudo apt install cd-hit on Ubuntu """

        self.messages.get_cdhit_message(identity)

        if cdhit_path is None:
            cdhit_path = "cd-hit-est"

        file_name = self.project + "_IdentityClusters_" + str(identity)

        out_file = os.path.join(self.tmp_path, file_name)
        cluster_path = os.path.join(self.tmp_path, file_name + '.clstr')

        with open(os.devnull, "w") as devnull:
            call([cdhit_path, "-i", fasta_path, "-o", out_file, "-c", str(identity), "-n", str(word_size),
                  "-d", str(description_length)], stdout=devnull)

        return cluster_path
sample.py 文件源码 项目:polichombr 作者: ANSSI-FR 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def disassemble_it(filename, address=None):
    """
        Wrapper for the ruby disassembler script.
    """
    FNULL = open(os.devnull, 'w')

    if address is not None:
        outfile = filename + "_disass_" + hex(address)
    else:
        outfile = filename + "_disass_None"
    args = ['ruby', 'analysis_tools/disassfunc.rb',
            "-graph", "-svg", "-o", outfile, filename]
    if address is not None:
        args.append(hex(address))
    proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL)
    proc.wait()
    FNULL.close()
    app.logger.debug("Disassembly just finished!")
    return outfile
task_analyzeitrb.py 文件源码 项目:polichombr 作者: ANSSI-FR 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def analyze_it(self):
        """
        Wrapper for the ruby analysis script.
        Executes and get results from files.
        """
        FNULL = open(os.devnull, 'w')
        args = ['ruby', 'analysis_tools/AnalyzeIt.rb', self.storage_file]
        proc = Popen(args, stdin=FNULL, stdout=FNULL, stderr=FNULL)
        proc.wait()
        FNULL.close()

        # TEXT report, just UTF-8 decode/parsing
        fname = self.storage_file + '.txt'
        if os.path.exists(fname):
            data = open(fname, 'rb').read()
            self.txt_report = re.sub(r'[^\x00-\x7F]', '', data).decode('utf-8')

        return True
test_cluster_features.py 文件源码 项目:q2-vsearch 作者: qiime2 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_no_clustering(self):
        with redirected_stdio(stderr=os.devnull):
            obs_table, obs_sequences = cluster_features_de_novo(
                sequences=self.input_sequences, table=self.input_table,
                perc_identity=1.0)
        # order of identifiers is important for biom.Table equality
        obs_table = \
            obs_table.sort_order(self.input_table.ids(axis='observation'),
                                 axis='observation')
        self.assertEqual(obs_table, self.input_table)

        obs_seqs = _read_seqs(obs_sequences)

        # sequences are reverse-sorted by abundance in output
        exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3],
                    self.input_sequences_list[2], self.input_sequences_list[1]]
        self.assertEqual(obs_seqs, exp_seqs)
test_cluster_features.py 文件源码 项目:q2-vsearch 作者: qiime2 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_99_percent_clustering(self):
        exp_table = biom.Table(np.array([[104, 106, 109],
                                         [1, 1, 2],
                                         [7, 8, 9]]),
                               ['feature1', 'feature2',
                                'feature4'],
                               ['sample1', 'sample2', 'sample3'])

        with redirected_stdio(stderr=os.devnull):
            obs_table, obs_sequences = cluster_features_de_novo(
                sequences=self.input_sequences, table=self.input_table,
                perc_identity=0.99)
        # order of identifiers is important for biom.Table equality
        obs_table = \
            obs_table.sort_order(exp_table.ids(axis='observation'),
                                 axis='observation')
        self.assertEqual(obs_table, exp_table)

        # sequences are reverse-sorted by abundance in output
        obs_seqs = _read_seqs(obs_sequences)
        exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[3],
                    self.input_sequences_list[1]]
        self.assertEqual(obs_seqs, exp_seqs)
test_cluster_features.py 文件源码 项目:q2-vsearch 作者: qiime2 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_97_percent_clustering_feature1_most_abundant(self):
        exp_table = biom.Table(np.array([[111, 114, 118],
                                         [1, 1, 2]]),
                               ['feature1', 'feature2'],
                               ['sample1', 'sample2', 'sample3'])

        with redirected_stdio(stderr=os.devnull):
            obs_table, obs_sequences = cluster_features_de_novo(
                sequences=self.input_sequences, table=self.input_table,
                perc_identity=0.97)
        # order of identifiers is important for biom.Table equality
        obs_table = \
            obs_table.sort_order(exp_table.ids(axis='observation'),
                                 axis='observation')
        self.assertEqual(obs_table, exp_table)

        # sequences are reverse-sorted by abundance in output
        obs_seqs = _read_seqs(obs_sequences)
        exp_seqs = [self.input_sequences_list[0], self.input_sequences_list[1]]
        self.assertEqual(obs_seqs, exp_seqs)
test_chimera.py 文件源码 项目:q2-vsearch 作者: qiime2 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_uchime_denovo(self):
        with redirected_stdio(stderr=os.devnull):
            chime, nonchime, stats = uchime_denovo(
                sequences=self.input_sequences, table=self.input_table)

        obs_chime = _read_seqs(chime)
        exp_chime = [self.input_sequences_list[3]]
        self.assertEqual(obs_chime, exp_chime)

        # sequences are reverse-sorted by abundance in output
        obs_nonchime = _read_seqs(nonchime)
        exp_nonchime = [self.input_sequences_list[0],
                        self.input_sequences_list[1],
                        self.input_sequences_list[2]]
        self.assertEqual(obs_nonchime, exp_nonchime)

        with stats.open() as stats_fh:
            stats_text = stats_fh.read()
        self.assertTrue('feature1' in stats_text)
        self.assertTrue('feature2' in stats_text)
        self.assertTrue('feature3' in stats_text)
        self.assertTrue('feature4' in stats_text)
        stats_lines = [e for e in stats_text.split('\n')
                       if len(e) > 0]
        self.assertEqual(len(stats_lines), 4)


问题


面经


文章

微信
公众号

扫码关注公众号