python类parse()的实例源码

chewBBACA.py 文件源码 项目:chewBBACA_deprecated 作者: mickaelsilva 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def check_if_list_or_folder(folder_or_list):
    list_files = []
    # check if given a list of genomes paths or a folder to create schema
    try:
        f = open(folder_or_list, 'r')
        f.close()
        list_files = folder_or_list
    except IOError:

        for gene in os.listdir(folder_or_list):
            try:
                genepath = os.path.join(folder_or_list, gene)
                for allele in SeqIO.parse(genepath, "fasta", generic_dna):
                    break
                list_files.append(os.path.abspath(genepath))
            except Exception as e:
                print e
                pass

    return list_files
common.py 文件源码 项目:lcdb-wf 作者: lcdb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def resolve_config(config, workdir=None):
    """
    Parameters
    ----------
    config : str, dict
        If str, assume it's a YAML file and parse it; otherwise pass through

    workdir : str
        Optional location to specify relative location of all paths in `config`
    """
    if isinstance(config, str):
        config = yaml.load(open(config))

    def rel(pth):
        if workdir is None or os.path.isabs(pth):
            return pth
        return os.path.join(workdir, pth)
    for key in PATH_KEYS:
        if key in config:
            config[key] = rel(config[key])
    return config
utils.py 文件源码 项目:gloTK 作者: conchoecia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fastq_info(path):
    """ Found some info about how to ignore warnings in code blocks here:
      - http://stackoverflow.com/questions/14463277/how-to-disable-python-warnings
    """
    numBases = 0
    numReads = 0
    readLengths = Counter()
    GCTot = 0
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        handle = gzip.open(path, "rt")
        for record in SeqIO.parse(handle, "fastq"):
            numBases += len(record)
            numReads += 1
            readLengths[len(record)] += 1
            GCTot += sum(record.seq.count(x) for x in ['G', 'C', 'g', 'c', 'S', 's'])
        handle.close()
    GCPer = (GCTot/numBases)
    avgReadLen = (sum(value*count for value,count in readLengths.items())/numReads)
    return {"numBases": numBases,
            "numReads": numReads,
            "numGCBases": GCTot,
            "portionGC": GCPer,
            "avgReadLen": avgReadLen}
essential_genes_from_tables.py 文件源码 项目:pipelines 作者: gis-rpd 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def parse_gene_id_and_name():
    """return a mapping of genes ids to gene names/description as dictionary

    deliberately only looking at protein coding genes
    """

    gene_id_to_name = dict()

    # modelled after http://genome.crg.es/~lpryszcz/scripts/gb2gtf.py
    #allowed_types = set(['gene', 'CDS', 'tRNA', 'tmRNA', 'rRNA', 'ncRNA'])
    allowed_types = set(['CDS'])
    #wanted_qualifiers = set(['product', 'locus_tag'])
    with open(GB_FILE) as fh:
        for gb in SeqIO.parse(fh, 'gb'):
            for f in gb.features:
                if f.type not in allowed_types:
                    continue
                qualifiers = dict(f.qualifiers)
                assert len(qualifiers['locus_tag'])==1 and len(qualifiers['product'])==1
                gene_id = qualifiers['locus_tag'][0]
                gene_name = qualifiers['product'][0]
                assert len(qualifiers['locus_tag']) == 1, (qualifiers['locus_tag'])
                gene_id_to_name[gene_id] = gene_name
    return gene_id_to_name
models.py 文件源码 项目:biovec 作者: kyu999 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def generate_corpusfile(corpus_fname, n, out):
    '''
    Args:
        corpus_fname: corpus file name
        n: the number of chunks to split. In other words, "n" for "n-gram"
        out: output corpus file path
    Description:
        Protvec uses word2vec inside, and it requires to load corpus file
        to generate corpus.
    '''
    f = open(out, "w")
    for r in SeqIO.parse(corpus_fname, "fasta"):
        ngram_patterns = split_ngrams(r.seq, n)
        for ngram_pattern in ngram_patterns:
            f.write(" ".join(ngram_pattern) + "\n")
        sys.stdout.write(".")

    f.close()
init_schema_4_bbaca.py 文件源码 项目:chewBBACA 作者: B-UMMI 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def get_Short(genesList):
    for gene in genesList:
        # gene = gene.rstrip('\n')
        pathtoDir = os.path.join(os.path.dirname(gene), "short")
        if not os.path.exists(pathtoDir):
            os.makedirs(pathtoDir)
        shortgene = os.path.join(os.path.dirname(gene), "short", os.path.basename(gene))
        shortgene = shortgene.replace(".fasta", "_short.fasta")

        #gene_fp2 = HTSeq.FastaReader(gene)
        for allele in SeqIO.parse(gene, "fasta", generic_dna):
            fG = open(shortgene, 'w')
            fG.write('>' + str(allele.id) + '\n' + str(allele.seq.upper()) + '\n')
            fG.close()
            break

    return True
assemblyStats.py 文件源码 项目:chewBBACA 作者: B-UMMI 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def calculateN_50(input_file,ratio):

    lengths=[]
    sums=0

    for seq_record in SeqIO.parse(input_file, "fasta"):
        lengths.append(len(seq_record.seq))

    lengths=sorted(lengths, reverse=True)

    contigsMore1000=0
    totalLengthMore1000=0

    for elem in lengths:
        if elem >=10000:
            contigsMore1000+=1
            totalLengthMore1000+=int(elem)
    N_half=sum(lengths)*ratio

    for i in range(0, len(lengths)):
        sums=sums+lengths[i]
        if sums >= N_half:
            return lengths[i],len(lengths),N_half/ratio,contigsMore1000,totalLengthMore1000
uniprot_find.py 文件源码 项目:chewBBACA 作者: B-UMMI 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_if_list_or_folder(folder_or_list):
    list_files = []
    # check if given a list of genomes paths or a folder to create schema
    try:
        f = open(folder_or_list, 'r')
        f.close()
        list_files = folder_or_list
    except IOError:

        for gene in os.listdir(folder_or_list):
            try:
                genepath = os.path.join(folder_or_list, gene)
                for allele in SeqIO.parse(genepath, "fasta", generic_dna):
                    break
                list_files.append(os.path.abspath(genepath))
            except Exception as e:
                print (e)
                pass

    return list_files
mmff.py 文件源码 项目:Mighty-Morphin-FASTA-Files 作者: lonsbio 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def mmff_from_file(fasta_file, morphs):
    '''Compute a metamorphic tests files from an input FASTA file.
    Arguments:
       fasta_file: an open file object for the FASTA file
       morphs: list of morph functions to apply
    Result:
       tbc
    '''

    morphs_dict={
    'reverse':mmff_reverse,
    "passthrough": mmff_passthrough
    }
    morphed_sequences= []

    for seq in SeqIO.parse(fasta_file, "fasta"):
        for morph in morphs:
            #print("morph "+str(morphs_dict[morph])+" on "+seq.id)
            morphed_sequences.append(morphs_dict[morph](seq))
    return morphed_sequences
mmff_wooey.py 文件源码 项目:Mighty-Morphin-FASTA-Files 作者: lonsbio 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def mmff_from_file(fasta_file, morphs):
    '''Compute a metamorphic tests files from an input FASTA file.
    Arguments:
       fasta_file: an open file object for the FASTA file
       morphs: list of morph functions to apply
    Result:
       tbc
    '''

    morphs_dict={
    'reverse':mmff_reverse,
    "passthrough": mmff_passthrough,
    'numeric_header': mmff_numeric_header
    }
    morphed_sequences= []

    for seq in SeqIO.parse(fasta_file, "fasta"):
        for morph in morphs:
            #print("morph "+str(morphs_dict[morph])+" on "+seq.id)
            morphed_sequences.append(morphs_dict[morph](seq))
    return morphed_sequences
mmff.py 文件源码 项目:Mighty-Morphin-FASTA-Files 作者: lonsbio 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def mmff_from_file(fasta_file, morphs):
    '''Compute a FastaStats object from an input FASTA file.
    Arguments:
       fasta_file: an open file object for the FASTA file
       morphs: list of morph functions to apply
    Result:
       tbc
    '''

    morphs_dict={
    'reverse':mmff_reverse,
    "passthrough": mmff_passthrough
    }
    morphed_sequences= []

    for seq in SeqIO.parse(fasta_file, "fasta"):
        for morph in morphs:
            print("morph "+str(morphs_dict[morph])+" on "+seq.id)
            morphed_sequences.append(morphs_dict[morph](seq))
    return morphed_sequences
prep-genome.py 文件源码 项目:kevlar 作者: dib-lab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def main(args):
    for record in SeqIO.parse(args.infile, 'fasta'):
        if args.discard:
            if sum([1 for rx in args.discard if re.match(rx, record.id)]) > 0:
                continue

        subseqcounter = 0
        printlog(args.debug, "DEBUG: convert to upper case", record.id)
        sequence = str(record.seq).upper()
        printlog(args.debug, "DEBUG: split seq by Ns", record.id)
        subseqs = [ss for ss in re.split('[^ACGT]+', sequence) if len(ss) > args.minlength]
        printlog(args.debug, "DEBUG: print subseqs", record.id)
        for subseq in subseqs:
            subseqcounter += 1
            subid = '{:s}_chunk_{:d}'.format(record.id, subseqcounter)
            subrecord = SeqRecord(Seq(subseq), subid, '', '')
            SeqIO.write(subrecord, args.outfile, 'fasta')
general.py 文件源码 项目:NucDiff 作者: uio-cels 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def READ_FASTA_ENTRY(file_name):
    fasta_sequences=[]
    sequence_name=[]
    full_names_dict={}

    sequences_dict={}


    if os.stat(file_name)[6]!=0: #not empty

        fh = open(file_name, "r")
        for record in SeqIO.parse(fh, "fasta"):
            short_name=str(record.id).split(' ')[0]
            sequence_name.append(short_name)
            full_names_dict[short_name]=str(record.id)

            fasta_sequences.append(str(record.seq))
            sequences_dict[short_name]=str(record.seq)


    return sequences_dict,fasta_sequences, sequence_name, full_names_dict
ref_coord.py 文件源码 项目:NucDiff 作者: uio-cels 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def READ_FASTA_ENTRY(file_name):
    fasta_sequences=[]
    sequence_name=[]
    full_names_dict={}

    sequences_dict={}


    if os.stat(file_name)[6]!=0: #not empty

        fh = open(file_name, "r")
        for record in SeqIO.parse(fh, "fasta"):
            short_name=str(record.id).split(' ')[0]
            sequences_dict[short_name]=str(record.seq)


    return sequences_dict



#------------------------------------------------------------------------------------------
test_new.py 文件源码 项目:insilico-subtyping 作者: superphy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_malign(self):

        self.setUpPhylotyper(aa=False)

        tmpfiles = [ os.path.join(self.test_dir, 'tmp1.fasta'),
            os.path.join(self.test_dir, 'tmp2.fasta')
        ]

        tmpfile = os.path.join(self.test_dir, 'tmp_saln.fasta')

        aln = SeqAligner(self.configObj)
        aln.malign(self.subtype_options['input'], tmpfiles, tmpfile)

        lengths = []
        tmpfiles.append(tmpfile)
        for f in tmpfiles:
            seqs = SeqIO.parse(open(f, 'r'),'fasta')
            lengths.append(len(str(seqs.next().seq)))

        self.assertEqual(lengths[0]+lengths[1], lengths[2])


    ## TODO Add test for new multi amino acid/dna
utils.py 文件源码 项目:insilico-subtyping 作者: superphy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def remove_duplicates(self):
        """Remove duplicate genbank records

        There are multiple methods to download genes. Each download method appends
        to the genbank file, so method this is needed to remove duplicates.

            Args:
                None

        """

        input_seq_iterator = SeqIO.parse(open(self._tmpfile, "rU"), "genbank")
        unique_seq_iterator = self.unique(input_seq_iterator)

        output_handle = open(self._gbfile, "w")
        SeqIO.write(unique_seq_iterator, output_handle, "genbank")
        output_handle.close()

        os.remove(self._tmpfile)
nanoQC.py 文件源码 项目:nanoQC 作者: wdecoster 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def get_lengths(fastq):
    '''
    Loop over the fastq file, extract length of sequences
    '''
    return np.array([len(record) for record in SeqIO.parse(fastq, "fastq")])
nanoQC.py 文件源码 项目:nanoQC 作者: wdecoster 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def get_bin(fq, sizeRange):
    '''
    Loop over the fastq file
    Extract list of nucleotides and list of quality scores in tuples in list
    Only select those reads of which the length is within the size range
    '''
    logging.info("Extracting nucleotides and quality scores of selected bin.")
    return [(list(rec.seq), list(rec.letter_annotations["phred_quality"]))
            for rec in SeqIO.parse(fq, "fastq") if sizeRange[0] < len(rec) < sizeRange[1]]
import_genbank.py 文件源码 项目:db-import 作者: antismash 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def main():
    '''Run the import'''
    if len(sys.argv) < 2:
        print('Usage: {} <gbk file>'.format(sys.argv[0]), file=sys.stderr)
        sys.exit(2)

    connection = psycopg2.connect(DB_CONNECTION)

    recs = SeqIO.parse(sys.argv[1], 'genbank')
    with connection:
        with connection.cursor() as cursor:
            for rec in recs:
                if rec.name in BLACKLIST:
                    print('Skipping blacklisted record {!r}'.format(rec.name), file=sys.stderr)
                    continue
                load_record(rec, cursor)

    connection.close()
import_genbank.py 文件源码 项目:db-import 作者: antismash 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def parse_smcog(feature):
    '''Parse the smCOG feature qualifier'''
    if 'note' not in feature.qualifiers:
        raise ValueError('No note qualifier in {}'.format(feature))

    for entry in feature.qualifiers['note']:
        if not entry.startswith('smCOG:'):
            continue
        match = SMCOG_PATTERN.search(entry)
        if match is None:
            print(entry)
            raise ValueError('Failed to parse smCOG line {!r}'.format(entry))
        return match.groups()

    raise ValueError('No smcog qualifier in {}'.format(feature))


问题


面经


文章

微信
公众号

扫码关注公众号