python类write()的实例源码

test_feature_conversion.py 文件源码 项目:goodbye-genbank 作者: biosustain 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_genbank_record_fixing(self):
        with open(os.path.join(FILES_PATH, 'sample.gb')) as f:
            first_record = next(SeqIO.parse(f, 'genbank'))

        first_record.features = [unconvert_feature(convert_feature(f)) for f in first_record.features]

        output = StringIO()
        SeqIO.write(first_record, output, "genbank")
        with open(os.path.join(FILES_PATH, 'sample.fixed.gb')) as f:
            self.assertEqual(output.getvalue(), f.read())
        output.close()
gempro.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def write_representative_sequences_file(self, outname, outdir=None, set_ids_from_model=True):
        """Write all the model's sequences as a single FASTA file. By default, sets IDs to model gene IDs.

        Args:
            outname (str): Name of the output FASTA file without the extension
            outdir (str): Path to output directory of downloaded files, must be set if GEM-PRO directories
                were not created initially
            set_ids_from_model (bool): If the gene ID source should be the model gene IDs, not the original sequence ID

        """

        if not outdir:
            outdir = self.data_dir
            if not outdir:
                raise ValueError('Output directory must be specified')

        outfile = op.join(outdir, outname + '.faa')

        tmp = []
        for x in self.genes_with_a_representative_sequence:
            repseq = x.protein.representative_sequence
            copied_seq_record = copy(repseq)
            if set_ids_from_model:
                copied_seq_record.id = x.id
            tmp.append(copied_seq_record)

        SeqIO.write(tmp, outfile, "fasta")

        log.info('{}: wrote all representative sequences to file'.format(outfile))
        self.genome_path = outfile
        return self.genome_path
fasta.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_fasta_file_from_dict(indict, outname, outdir=None, outext='.faa', force_rerun=False):
    """Write a FASTA file for a dictionary of IDs and their sequence strings.

    Args:
        indict: Input dictionary with keys as IDs and values as sequence strings
        outname: Name of the output file which will have outext appended to it
        outdir: Path to directory to output sequences to
        outext: Extension of FASTA file, default ".faa"
        force_rerun: If file should be overwritten if it exists

    Returns:
        str: Path to output FASTA file.

    """

    if not outdir:
        outdir = ''
    outfile = ssbio.utils.outfile_maker(inname='', outname=outname, outdir=outdir, outext=outext)

    if ssbio.utils.force_rerun(flag=force_rerun, outfile=outfile):
        seqs = []
        for i, s in indict.items():
            seq = ssbio.protein.sequence.utils.cast_to_seq_record(s, id=i)
            seqs.append(seq)
        SeqIO.write(seqs, outfile, "fasta")

    return outfile
seqprop.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def write_fasta_file(self, outfile, force_rerun=False):
        """Write a FASTA file for the protein sequence, ``seq`` will now load directly from this file.

        Args:
            outfile (str): Path to new FASTA file to be written to
            force_rerun (bool): If an existing file should be overwritten

        """
        if ssbio.utils.force_rerun(flag=force_rerun, outfile=outfile):
            SeqIO.write(self, outfile, "fasta")

        # The Seq as it will now be dynamically loaded from the file
        self.sequence_path = outfile
seqprop.py 文件源码 项目:ssbio 作者: SBRG 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write_gff_file(self, outfile, force_rerun=False):
        """Write a GFF file for the protein features, ``features`` will now load directly from this file.

        Args:
            outfile (str): Path to new FASTA file to be written to
            force_rerun (bool): If an existing file should be overwritten

        """
        if ssbio.utils.force_rerun(outfile=outfile, flag=force_rerun):
            with open(outfile, "w") as out_handle:
                GFF.write([self], out_handle)

        self.feature_path = outfile
kraken_trawl.py 文件源码 项目:kraken-trawl 作者: andersgs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def download_gbk(assemb_tab, cmd, outdir = '.'):
    '''
    This should take a list of accessions and paths, and produce a
    source/destination pairs file.

    It is then possible to use the --file-pair-file to download all at once,
    and we can add the following flags to the ascp command:

    --overwrite=diff -k2

    the -k2 means files are compared with sparse checksums.
    '''

    fo = open("aspera_assemblies_src_dest.txt", 'w')
    assembs_dic_list = [parse_aseemb_rows(row) for row in assemb_tab.itertuples()]
    for assembly in assembs_dic_list:
        fo.write("{}\n{}\n".format(assembly['source'], assembly['dest']))
    fo.close()

    cmd = cmd.format(outdir, 'aspera_assemblies_src_dest.txt', outdir)
    print("Running the aspera cmd: {}".format(cmd), file = sys.stderr)
    p = subprocess.Popen( shlex.split(cmd))
    p.communicate()
    files_to_check = [a['gbk_file'] for a in assembs_dic_list]
    was_updated = parse_aspera_manifest_file(outdir, files_to_check, "aspera_assemblies_manifest.txt")
    print("Finished downloading all genomes.", file = sys.stderr)
    return assembs_dic_list

### LOADING GENOMES TO THE KRAKEN STAGGING AREA ################################
kraken_trawl.py 文件源码 项目:kraken-trawl 作者: andersgs 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def add_isolates(dic_list, db_name):
    for assembly in dic_list:
        print("Adding {} to kraken stagging area.".format(assembly['organism']), file = sys.stderr)
        genbank_zip_file = assembly['dest']
        fi = gzip.open( genbank_zip_file, 'rt')
        seqs = list(SeqIO.parse( fi, 'genbank'))
        new_seqs = []
        for s in seqs:
            tmp = SeqIO.SeqRecord(s.seq)
            tmp.id = 'gi|{}'.format(s.annotations['gi'])
            tmp.description = s.description
            tmp.name = s.name
            new_seqs.append(tmp)
        fi.close()
        fa_file = os.path.join(os.getcwd(), os.path.basename(genbank_zip_file).strip('gbff.gz') + ".fa")
        tmpf = open(fa_file, 'wt')
        SeqIO.write(new_seqs, tmpf, 'fasta')
        tmpf.close()
        kraken_add(db_name, fa_file)
        # cmd = 'kraken-build --add-to-library {} --db {}'.format(fa_file, db_name)
        # print(cmd, file = sys.stderr)
        # cmd = shlex.split(cmd)
        # p = subprocess.check_output(cmd)
        # os.remove(fa_file)
    print("Added all {} assemblies to kraken stagging area. DB is ready to build".format(len(dic_list)), file = sys.stderr)

### ACTUALLY BUILD THE DATABASE ################################################
kraken_trawl.py 文件源码 项目:kraken-trawl 作者: andersgs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def generate_log(dic_list):
    print("Generating a log of added genomes.", file = sys.stderr)
    fo = open("log", 'w')
    header = ['Organism', 'Accession', 'RefSeq', 'Assembly Level', 'Source', 'Destination']
    fo.write('\t'.join(header) + '\n')
    for assembly in dic_list:
        row = '\t'.join( [assembly['organism'], assembly['asm_name'], assembly['ref_status'], assembly['asm_level'],  assembly['source'], assembly['dest'] ]) + '\n'
        fo.write(row)
    fo.close()
DartModules.py 文件源码 项目:dartqc 作者: esteinig 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def filter_data(self, mind=0.2, recalculate=True):

        """
        Re-write with Pandas
        """

        if mind is None:
            stamp("Returning data without filtering.")
            return self.data, self.attributes

        stamp("Filtering samples with missing data >", mind)
        stamp("Missing data calculated over", len(self.data), "SNPs")

        mind_prop = self._calculate_mind()

        to_remove = mind_prop[mind_prop > mind].index.tolist()

        filtered_data = {}
        for snp, data in self.data.items():
            data["calls"] = [snp_call for i, snp_call in self._iterate_call_indices(data["calls"])
                             if i not in to_remove]
            filtered_data[snp] = data

        attributes = self._adjust_attributes(self.attributes, mind, to_remove)

        percent_removed = format((len(to_remove) / attributes["sample_size"])*100, ".2f")

        stamp("Removed {r} samples out of {t} samples ({p}%)"
              .format(r=len(to_remove), t=attributes["sample_size"], p=percent_removed))

        # Recalculating SNP parameters:

        if recalculate:
            stamp("Recalculating MAF, CALL RATE and HWE for SNPs")
            marker = SNPModule(filtered_data, attributes)
            filtered_data, attributes = marker.get_data(threshold=None)

        return filtered_data, attributes
DartModules.py 文件源码 项目:dartqc 作者: esteinig 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _write_fasta(self, target="allele_seq_ref"):

        """ Write fasta file of sequences with SNP IDs for CD-HIT. """

        file_name = os.path.join(self.tmp_path, self.project + "_Seqs")

        seqs = [SeqRecord(Seq(data[target], IUPAC.unambiguous_dna), id=snp_id, name="", description="")
                for snp_id, data in self.data.items()]

        file_name += ".fasta"

        with open(file_name, "w") as fasta_file:
            SeqIO.write(seqs, fasta_file, "fasta")

        return file_name
split-clusters.py 文件源码 项目:zika-pipeline 作者: zibraproject 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def main():
        records = SeqIO.to_dict(SeqIO.parse(open(sys.argv[1]), 'fasta'))

    reader = csv.DictReader(sys.stdin, dialect="excel-tab")
    clusters = list(reader)

    groups = set([c['group'] for c in clusters])

    for group in groups:
        print "cluster%s\t%s-cluster%s" % (group, sys.argv[1], group)
        with open('%s-cluster%s' %(sys.argv[1], group), 'w') as fout:
            SeqIO.write([records[i['node']] for i in clusters if i['group'] == group], fout, 'fasta')
reconstitute.py 文件源码 项目:zika-pipeline 作者: zibraproject 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def go(args):
    if args.trimmed.endswith('.gz'):
        fh = gzip.open(args.trimmed)
    else:
        fh = open(args.trimmed)

    ids = [rec.id for rec in SeqIO.parse(fh, "fasta")]
    SeqIO.write((seq for seq in SeqIO.parse(args.fasta, "fasta") if seq.id in ids), sys.stdout, "fasta")
__init__.py 文件源码 项目:enasearch 作者: bebatut 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def format_seq_content(seq_str, out_format):
    """Format a string with sequences into a list of BioPython sequence objects (SeqRecord)

    :param seq_str: string with sequences to format
    :param out_format: fasta or fastq

    :return: a list of SeqRecord objects with the sequences in the input string
    """
    sequences = []
    with tempfile.TemporaryFile(mode='w+') as fp:
        fp.write(seq_str)
        fp.seek(0)
        for record in SeqIO.parse(fp, out_format):
            sequences.append(record)
    return sequences
__init__.py 文件源码 项目:enasearch 作者: bebatut 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def request_url(url, display, file=None):
    """Run the URL request and return content or status

    This function tooks an URL built to query or extract data from ENA and apply
    this URL. If a filepath is given, the function puts the result into the
    file and returns the status of the request. Otherwise, the results of the
    request is returned by the function in different format depending of the
    display format

    :param url: URL to request on ENA
    :param display: display option
    :param length: number of records to retrieve
    :param file: filepath to save the content of the search

    :return: status of the request or the result of the request (in different format)
    """
    if file is not None:
        r = requests.get(url, stream=True)
        r.raise_for_status()
        with open(file, "wb") as fd:
            for chunk in r.iter_content(chunk_size=128):
                fd.write(chunk)
        return r.raise_for_status()
    else:
        r = requests.get(url)
        r.raise_for_status()
        if display == "xml":
            return xmltodict.parse(r.text)
        elif display == "fasta" or display == "fastq":
            return format_seq_content(r.text, display)
        else:
            return r.text
to_fasta.py 文件源码 项目:metlab 作者: norling 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def to_fasta(filename, outfile = None):

    try:
        file_in = gzip.open(filename) if filename.endswith(".gz") else open(filename)
    except Exception as e:
        sys.stderr.write("ERROR: Couldn't open input file %s\n" % filename)
        sys.stderr.write(str(e) + "\n")
        sys.exit(1)

    ext = filename.rstrip('.gz').split('.')[-1]
    ext = "fastq" if ext in ["fq"] else ext

    if not outfile:
        outfile = filename.split("/")[-1].rstrip('.gz')
        outfile = ".".join( outfile.split(".")[:-1] ) + ".fasta"
        outfile += ".gz" if filename.endswith(".gz") else ""
    file_out = gzip.open(outfile, "w") if outfile.endswith(".gz") else open(outfile, "w")

    print "%s -> %s" % (filename, outfile)
    try:
        for record in SeqIO.parse(file_in, ext):
            SeqIO.write(record, file_out, "fasta")
    except Exception as e:
        sys.stderr.write(str(e) + "\n")
        os.remove(outfile)
        sys.exit(1)
trapes.py 文件源码 项目:TRAPeS 作者: YosefLab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def addSegmentToJunctionFileSE(vSeg,jSeg,cSeg,out,fastaDict, bases, idNameDict):
    vSeq = fastaDict[vSeg]
    if jSeg != 'NA':
        jName = idNameDict[jSeg]
        jSeq = fastaDict[jSeg]
    else:
        jSeq = ''
        jName = 'NoJ'
    if cSeg != 'NA':
        cName = idNameDict[cSeg]
        cSeq = fastaDict[cSeg]
    else:
        cName = 'NoC'
        cSeq = ''
    jcSeq = jSeq + cSeq
    lenSeg = min(len(vSeq),len(jcSeq))
    if bases != -10:
        if lenSeg < bases:
            sys.stdout.write(str(datetime.datetime.now()) + ' Bases parameter is bigger than the length of the V or J segment, taking the length' \
                    'of the V/J segment instead, which is: ' + str(lenSeg) + '\n')
            sys.stdout.flush()
        else:
            lenSeg = bases
    jTrim = jcSeq[:lenSeg]
    vTrim = vSeq[-1*lenSeg:]
    junc = vTrim + jTrim
    recordName = vSeg + '.' + jSeg + '.' + cSeg + '(' + idNameDict[vSeg] + '-' + jName + '-' + cName + ')'
    record = SeqRecord(Seq(junc,IUPAC.ambiguous_dna), id = recordName, description = '')
    SeqIO.write(record,out,'fasta')
trapes.py 文件源码 项目:TRAPeS 作者: YosefLab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def findCDR3(fasta, aaDict, vdjFaDict):
    f = open(fasta, 'rU')
    fDict = dict()
    for record in SeqIO.parse(f, 'fasta'):
        if record.id in fDict:
            sys.stderr.write(str(datetime.datetime.now()) + ' Error! same name for two fasta entries %s\n' % record.id)
            sys.stderr.flush()
        else:
            idArr = record.id.split('.')
            vSeg = idArr[0]
            jSeg = idArr[1]
            if ((vSeg in aaDict) & (jSeg in aaDict)):
                currDict = findVandJaaMap(aaDict[vSeg],aaDict[jSeg],record.seq)
            else:
                if vSeg in aaDict:
                    newVseg = aaDict[vSeg]
                else:
                    vId = idArr[3]
                    currSeq = vdjFaDict[vId]
                    newVseg = getBestVaa(Seq(currSeq))
                if jSeg in aaDict:
                    newJseg = aaDict[jSeg]
                else:
                    jId = idArr[4]
                    currSeq= vdjFaDict[jId]
                    newJseg = getBestJaa(Seq(currSeq))
                currDict = findVandJaaMap(newVseg,newJseg,record.seq)
            fDict[record.id] = currDict
    f.close()
    return fDict
trapes.py 文件源码 项目:TRAPeS 作者: YosefLab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def makeAADict(aaF):
    fDict = dict()
    f = open(aaF,'r')
    l = f.readline()
    while l != '':
        lArr = l.strip('\n').split('\t')
        if lArr[0] in fDict:
            sys.stderr.write(str(datetime.datetime.now()) + ' Warning! %s appear twice in AA file\n' % lArr[0])
            sys.stderr.flush()
        fDict[lArr[0]] = lArr[1]
        l = f.readline()
    f.close()
    return fDict
trapes.py 文件源码 项目:TRAPeS 作者: YosefLab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def toTakePair(segType, strand, readStrand):
    if strand == 'none':
        return True
    if ((readStrand != 'minus') & (readStrand != 'plus')):
        sys.stderr.write(str(datetime.datetime.now()) + ' Error! Read strand should be plus or minus only\n')
        sys.stderr.flush()
        return True
    if ((segType == 'C') | (segType == 'J')):
        if strand == 'minus':
            if readStrand == 'minus':
                return True
            else:
                return False
        else:
            if readStrand == 'minus':
                return False
            else:
                return True
    else:
        if strand == 'minus':
            if readStrand == 'minus':
                return False
            else:
                return True
        else:
            if readStrand == 'minus':
                return True
            else:
                return False
    return True
trapes.py 文件源码 项目:TRAPeS 作者: YosefLab 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def makeVDJBedDict(bed,idNameDict):
    fDict = {'Alpha':{'C':[],'V':[],'J':[]}, 'Beta':{'C':[],'V':[],'J':[]}}
    f = open(bed, 'r')
    l = f.readline()
    while l != '':
        lArr = l.strip('\n').split('\t')
        gID = lArr[3]
        gName = idNameDict[gID]
        chain = ''
        if (gName.startswith('TRA')):
            chain = 'Alpha'
        elif (gName.startswith('TRB')):
            chain = 'Beta'
        else:
            sys.stderr.write(str(datetime.datetime.now()) + ' Error! %s name is not alpha or beta chain, ignoring it\n' % gName)
            sys.stderr.flush()
        if gName.find('C') != -1:
            fDict[chain]['C'].append(l)
        elif gName.find('V') != -1:
            fDict[chain]['V'].append(l)
        elif gName.find('J') != -1:
            fDict[chain]['J'].append(l)
        l = f.readline()
    f.close()
    return fDict





# Creates a dictionary of ENSEMBL ID -> fasta sequence


问题


面经


文章

微信
公众号

扫码关注公众号