def test_genbank_record_fixing(self):
with open(os.path.join(FILES_PATH, 'sample.gb')) as f:
first_record = next(SeqIO.parse(f, 'genbank'))
first_record.features = [unconvert_feature(convert_feature(f)) for f in first_record.features]
output = StringIO()
SeqIO.write(first_record, output, "genbank")
with open(os.path.join(FILES_PATH, 'sample.fixed.gb')) as f:
self.assertEqual(output.getvalue(), f.read())
output.close()
python类write()的实例源码
def write_representative_sequences_file(self, outname, outdir=None, set_ids_from_model=True):
"""Write all the model's sequences as a single FASTA file. By default, sets IDs to model gene IDs.
Args:
outname (str): Name of the output FASTA file without the extension
outdir (str): Path to output directory of downloaded files, must be set if GEM-PRO directories
were not created initially
set_ids_from_model (bool): If the gene ID source should be the model gene IDs, not the original sequence ID
"""
if not outdir:
outdir = self.data_dir
if not outdir:
raise ValueError('Output directory must be specified')
outfile = op.join(outdir, outname + '.faa')
tmp = []
for x in self.genes_with_a_representative_sequence:
repseq = x.protein.representative_sequence
copied_seq_record = copy(repseq)
if set_ids_from_model:
copied_seq_record.id = x.id
tmp.append(copied_seq_record)
SeqIO.write(tmp, outfile, "fasta")
log.info('{}: wrote all representative sequences to file'.format(outfile))
self.genome_path = outfile
return self.genome_path
def write_fasta_file_from_dict(indict, outname, outdir=None, outext='.faa', force_rerun=False):
"""Write a FASTA file for a dictionary of IDs and their sequence strings.
Args:
indict: Input dictionary with keys as IDs and values as sequence strings
outname: Name of the output file which will have outext appended to it
outdir: Path to directory to output sequences to
outext: Extension of FASTA file, default ".faa"
force_rerun: If file should be overwritten if it exists
Returns:
str: Path to output FASTA file.
"""
if not outdir:
outdir = ''
outfile = ssbio.utils.outfile_maker(inname='', outname=outname, outdir=outdir, outext=outext)
if ssbio.utils.force_rerun(flag=force_rerun, outfile=outfile):
seqs = []
for i, s in indict.items():
seq = ssbio.protein.sequence.utils.cast_to_seq_record(s, id=i)
seqs.append(seq)
SeqIO.write(seqs, outfile, "fasta")
return outfile
def write_fasta_file(self, outfile, force_rerun=False):
"""Write a FASTA file for the protein sequence, ``seq`` will now load directly from this file.
Args:
outfile (str): Path to new FASTA file to be written to
force_rerun (bool): If an existing file should be overwritten
"""
if ssbio.utils.force_rerun(flag=force_rerun, outfile=outfile):
SeqIO.write(self, outfile, "fasta")
# The Seq as it will now be dynamically loaded from the file
self.sequence_path = outfile
def write_gff_file(self, outfile, force_rerun=False):
"""Write a GFF file for the protein features, ``features`` will now load directly from this file.
Args:
outfile (str): Path to new FASTA file to be written to
force_rerun (bool): If an existing file should be overwritten
"""
if ssbio.utils.force_rerun(outfile=outfile, flag=force_rerun):
with open(outfile, "w") as out_handle:
GFF.write([self], out_handle)
self.feature_path = outfile
def download_gbk(assemb_tab, cmd, outdir = '.'):
'''
This should take a list of accessions and paths, and produce a
source/destination pairs file.
It is then possible to use the --file-pair-file to download all at once,
and we can add the following flags to the ascp command:
--overwrite=diff -k2
the -k2 means files are compared with sparse checksums.
'''
fo = open("aspera_assemblies_src_dest.txt", 'w')
assembs_dic_list = [parse_aseemb_rows(row) for row in assemb_tab.itertuples()]
for assembly in assembs_dic_list:
fo.write("{}\n{}\n".format(assembly['source'], assembly['dest']))
fo.close()
cmd = cmd.format(outdir, 'aspera_assemblies_src_dest.txt', outdir)
print("Running the aspera cmd: {}".format(cmd), file = sys.stderr)
p = subprocess.Popen( shlex.split(cmd))
p.communicate()
files_to_check = [a['gbk_file'] for a in assembs_dic_list]
was_updated = parse_aspera_manifest_file(outdir, files_to_check, "aspera_assemblies_manifest.txt")
print("Finished downloading all genomes.", file = sys.stderr)
return assembs_dic_list
### LOADING GENOMES TO THE KRAKEN STAGGING AREA ################################
def add_isolates(dic_list, db_name):
for assembly in dic_list:
print("Adding {} to kraken stagging area.".format(assembly['organism']), file = sys.stderr)
genbank_zip_file = assembly['dest']
fi = gzip.open( genbank_zip_file, 'rt')
seqs = list(SeqIO.parse( fi, 'genbank'))
new_seqs = []
for s in seqs:
tmp = SeqIO.SeqRecord(s.seq)
tmp.id = 'gi|{}'.format(s.annotations['gi'])
tmp.description = s.description
tmp.name = s.name
new_seqs.append(tmp)
fi.close()
fa_file = os.path.join(os.getcwd(), os.path.basename(genbank_zip_file).strip('gbff.gz') + ".fa")
tmpf = open(fa_file, 'wt')
SeqIO.write(new_seqs, tmpf, 'fasta')
tmpf.close()
kraken_add(db_name, fa_file)
# cmd = 'kraken-build --add-to-library {} --db {}'.format(fa_file, db_name)
# print(cmd, file = sys.stderr)
# cmd = shlex.split(cmd)
# p = subprocess.check_output(cmd)
# os.remove(fa_file)
print("Added all {} assemblies to kraken stagging area. DB is ready to build".format(len(dic_list)), file = sys.stderr)
### ACTUALLY BUILD THE DATABASE ################################################
def generate_log(dic_list):
print("Generating a log of added genomes.", file = sys.stderr)
fo = open("log", 'w')
header = ['Organism', 'Accession', 'RefSeq', 'Assembly Level', 'Source', 'Destination']
fo.write('\t'.join(header) + '\n')
for assembly in dic_list:
row = '\t'.join( [assembly['organism'], assembly['asm_name'], assembly['ref_status'], assembly['asm_level'], assembly['source'], assembly['dest'] ]) + '\n'
fo.write(row)
fo.close()
def filter_data(self, mind=0.2, recalculate=True):
"""
Re-write with Pandas
"""
if mind is None:
stamp("Returning data without filtering.")
return self.data, self.attributes
stamp("Filtering samples with missing data >", mind)
stamp("Missing data calculated over", len(self.data), "SNPs")
mind_prop = self._calculate_mind()
to_remove = mind_prop[mind_prop > mind].index.tolist()
filtered_data = {}
for snp, data in self.data.items():
data["calls"] = [snp_call for i, snp_call in self._iterate_call_indices(data["calls"])
if i not in to_remove]
filtered_data[snp] = data
attributes = self._adjust_attributes(self.attributes, mind, to_remove)
percent_removed = format((len(to_remove) / attributes["sample_size"])*100, ".2f")
stamp("Removed {r} samples out of {t} samples ({p}%)"
.format(r=len(to_remove), t=attributes["sample_size"], p=percent_removed))
# Recalculating SNP parameters:
if recalculate:
stamp("Recalculating MAF, CALL RATE and HWE for SNPs")
marker = SNPModule(filtered_data, attributes)
filtered_data, attributes = marker.get_data(threshold=None)
return filtered_data, attributes
def _write_fasta(self, target="allele_seq_ref"):
""" Write fasta file of sequences with SNP IDs for CD-HIT. """
file_name = os.path.join(self.tmp_path, self.project + "_Seqs")
seqs = [SeqRecord(Seq(data[target], IUPAC.unambiguous_dna), id=snp_id, name="", description="")
for snp_id, data in self.data.items()]
file_name += ".fasta"
with open(file_name, "w") as fasta_file:
SeqIO.write(seqs, fasta_file, "fasta")
return file_name
def main():
records = SeqIO.to_dict(SeqIO.parse(open(sys.argv[1]), 'fasta'))
reader = csv.DictReader(sys.stdin, dialect="excel-tab")
clusters = list(reader)
groups = set([c['group'] for c in clusters])
for group in groups:
print "cluster%s\t%s-cluster%s" % (group, sys.argv[1], group)
with open('%s-cluster%s' %(sys.argv[1], group), 'w') as fout:
SeqIO.write([records[i['node']] for i in clusters if i['group'] == group], fout, 'fasta')
def go(args):
if args.trimmed.endswith('.gz'):
fh = gzip.open(args.trimmed)
else:
fh = open(args.trimmed)
ids = [rec.id for rec in SeqIO.parse(fh, "fasta")]
SeqIO.write((seq for seq in SeqIO.parse(args.fasta, "fasta") if seq.id in ids), sys.stdout, "fasta")
def format_seq_content(seq_str, out_format):
"""Format a string with sequences into a list of BioPython sequence objects (SeqRecord)
:param seq_str: string with sequences to format
:param out_format: fasta or fastq
:return: a list of SeqRecord objects with the sequences in the input string
"""
sequences = []
with tempfile.TemporaryFile(mode='w+') as fp:
fp.write(seq_str)
fp.seek(0)
for record in SeqIO.parse(fp, out_format):
sequences.append(record)
return sequences
def request_url(url, display, file=None):
"""Run the URL request and return content or status
This function tooks an URL built to query or extract data from ENA and apply
this URL. If a filepath is given, the function puts the result into the
file and returns the status of the request. Otherwise, the results of the
request is returned by the function in different format depending of the
display format
:param url: URL to request on ENA
:param display: display option
:param length: number of records to retrieve
:param file: filepath to save the content of the search
:return: status of the request or the result of the request (in different format)
"""
if file is not None:
r = requests.get(url, stream=True)
r.raise_for_status()
with open(file, "wb") as fd:
for chunk in r.iter_content(chunk_size=128):
fd.write(chunk)
return r.raise_for_status()
else:
r = requests.get(url)
r.raise_for_status()
if display == "xml":
return xmltodict.parse(r.text)
elif display == "fasta" or display == "fastq":
return format_seq_content(r.text, display)
else:
return r.text
def to_fasta(filename, outfile = None):
try:
file_in = gzip.open(filename) if filename.endswith(".gz") else open(filename)
except Exception as e:
sys.stderr.write("ERROR: Couldn't open input file %s\n" % filename)
sys.stderr.write(str(e) + "\n")
sys.exit(1)
ext = filename.rstrip('.gz').split('.')[-1]
ext = "fastq" if ext in ["fq"] else ext
if not outfile:
outfile = filename.split("/")[-1].rstrip('.gz')
outfile = ".".join( outfile.split(".")[:-1] ) + ".fasta"
outfile += ".gz" if filename.endswith(".gz") else ""
file_out = gzip.open(outfile, "w") if outfile.endswith(".gz") else open(outfile, "w")
print "%s -> %s" % (filename, outfile)
try:
for record in SeqIO.parse(file_in, ext):
SeqIO.write(record, file_out, "fasta")
except Exception as e:
sys.stderr.write(str(e) + "\n")
os.remove(outfile)
sys.exit(1)
def addSegmentToJunctionFileSE(vSeg,jSeg,cSeg,out,fastaDict, bases, idNameDict):
vSeq = fastaDict[vSeg]
if jSeg != 'NA':
jName = idNameDict[jSeg]
jSeq = fastaDict[jSeg]
else:
jSeq = ''
jName = 'NoJ'
if cSeg != 'NA':
cName = idNameDict[cSeg]
cSeq = fastaDict[cSeg]
else:
cName = 'NoC'
cSeq = ''
jcSeq = jSeq + cSeq
lenSeg = min(len(vSeq),len(jcSeq))
if bases != -10:
if lenSeg < bases:
sys.stdout.write(str(datetime.datetime.now()) + ' Bases parameter is bigger than the length of the V or J segment, taking the length' \
'of the V/J segment instead, which is: ' + str(lenSeg) + '\n')
sys.stdout.flush()
else:
lenSeg = bases
jTrim = jcSeq[:lenSeg]
vTrim = vSeq[-1*lenSeg:]
junc = vTrim + jTrim
recordName = vSeg + '.' + jSeg + '.' + cSeg + '(' + idNameDict[vSeg] + '-' + jName + '-' + cName + ')'
record = SeqRecord(Seq(junc,IUPAC.ambiguous_dna), id = recordName, description = '')
SeqIO.write(record,out,'fasta')
def findCDR3(fasta, aaDict, vdjFaDict):
f = open(fasta, 'rU')
fDict = dict()
for record in SeqIO.parse(f, 'fasta'):
if record.id in fDict:
sys.stderr.write(str(datetime.datetime.now()) + ' Error! same name for two fasta entries %s\n' % record.id)
sys.stderr.flush()
else:
idArr = record.id.split('.')
vSeg = idArr[0]
jSeg = idArr[1]
if ((vSeg in aaDict) & (jSeg in aaDict)):
currDict = findVandJaaMap(aaDict[vSeg],aaDict[jSeg],record.seq)
else:
if vSeg in aaDict:
newVseg = aaDict[vSeg]
else:
vId = idArr[3]
currSeq = vdjFaDict[vId]
newVseg = getBestVaa(Seq(currSeq))
if jSeg in aaDict:
newJseg = aaDict[jSeg]
else:
jId = idArr[4]
currSeq= vdjFaDict[jId]
newJseg = getBestJaa(Seq(currSeq))
currDict = findVandJaaMap(newVseg,newJseg,record.seq)
fDict[record.id] = currDict
f.close()
return fDict
def makeAADict(aaF):
fDict = dict()
f = open(aaF,'r')
l = f.readline()
while l != '':
lArr = l.strip('\n').split('\t')
if lArr[0] in fDict:
sys.stderr.write(str(datetime.datetime.now()) + ' Warning! %s appear twice in AA file\n' % lArr[0])
sys.stderr.flush()
fDict[lArr[0]] = lArr[1]
l = f.readline()
f.close()
return fDict
def toTakePair(segType, strand, readStrand):
if strand == 'none':
return True
if ((readStrand != 'minus') & (readStrand != 'plus')):
sys.stderr.write(str(datetime.datetime.now()) + ' Error! Read strand should be plus or minus only\n')
sys.stderr.flush()
return True
if ((segType == 'C') | (segType == 'J')):
if strand == 'minus':
if readStrand == 'minus':
return True
else:
return False
else:
if readStrand == 'minus':
return False
else:
return True
else:
if strand == 'minus':
if readStrand == 'minus':
return False
else:
return True
else:
if readStrand == 'minus':
return True
else:
return False
return True
def makeVDJBedDict(bed,idNameDict):
fDict = {'Alpha':{'C':[],'V':[],'J':[]}, 'Beta':{'C':[],'V':[],'J':[]}}
f = open(bed, 'r')
l = f.readline()
while l != '':
lArr = l.strip('\n').split('\t')
gID = lArr[3]
gName = idNameDict[gID]
chain = ''
if (gName.startswith('TRA')):
chain = 'Alpha'
elif (gName.startswith('TRB')):
chain = 'Beta'
else:
sys.stderr.write(str(datetime.datetime.now()) + ' Error! %s name is not alpha or beta chain, ignoring it\n' % gName)
sys.stderr.flush()
if gName.find('C') != -1:
fDict[chain]['C'].append(l)
elif gName.find('V') != -1:
fDict[chain]['V'].append(l)
elif gName.find('J') != -1:
fDict[chain]['J'].append(l)
l = f.readline()
f.close()
return fDict
# Creates a dictionary of ENSEMBL ID -> fasta sequence