def write_fasta_file(seq_records, outname, outdir=None, outext='.faa', force_rerun=False):
"""Write a FASTA file for a SeqRecord or a list of SeqRecord objects.
Args:
seq_records (SeqRecord, list): SeqRecord or a list of SeqRecord objects
outname: Name of the output file which will have outext appended to it
outdir: Path to directory to output sequences to
outext: Extension of FASTA file, default ".faa"
force_rerun: If file should be overwritten if it exists
Returns:
str: Path to output FASTA file.
"""
if not outdir:
outdir = ''
outfile = ssbio.utils.outfile_maker(inname='', outname=outname, outdir=outdir, outext=outext)
if ssbio.utils.force_rerun(flag=force_rerun, outfile=outfile):
SeqIO.write(seq_records, outfile, "fasta")
return outfile
python类write()的实例源码
def filter_fastq(input_file,output_file,downweight_number,ctot,gtoa):
"""
Takes a Fastq file as input and weights the quality of the bases down
at the start and the end of the reads.
"""
in_iterator = SeqIO.parse(input_file,'fastq')
input_records=list(in_iterator)
for i, record in enumerate(input_records):
change_bases_c = None
change_bases_t = None
temp_qual = record.letter_annotations['phred_quality']
if(ctot):
change_bases_c = [check_c_2_t(nuc) and i < downweight_number for i, nuc in enumerate(record.seq)]
if(gtoa):
change_bases_t = [check_g_2_a(nuc) and (len(record.seq)-i) <= downweight_number for i, nuc in enumerate(record.seq)]
new_qual =downweight_quality(temp_qual, change_bases_c ,change_bases_t)
input_records[i].letter_annotations['phred_quality']=new_qual
handle = file(output_file,'wt')
SeqIO.write(input_records, handle, "fastq")
def filter_bam(input_file, output_file, downweight_number, ctot, gtoa):
"""
Takes a bam file as input and weights the quality of the reads down.
Need to ensure we write the header out :)
Investigate pysam and look for a header,
this should really help us understand how to get this bam filter working
and writing the bam files directly back out to the terminal.
"""
bam = pysam.Samfile(input_file,'rb')
bam_out = pysam.Samfile(output_file, 'wb',template=bam)
for line in bam:
change_bases_c = None
change_bases_t = None
seq = line.seq
qual = line.qual
if(ctot):
change_bases_c = [check_c_2_t(nuc) and i < downweight_number for i, nuc in enumerate(seq)]
if(gtoa):
change_bases_t = [check_g_2_a(nuc) and (len(seq)-i) <= downweight_number for i, nuc in enumerate(seq)]
new_qual = downweight_quality(qual,change_bases_c, change_bases_t)
line.qual = new_qual
bam_out.write(line)
def remove_small_large_contigs(self,input_file, output_file):
with open(input_file, "r") as spades_input_file, open(output_file, "w") as spades_output_file:
sequences = []
for record in SeqIO.parse(spades_input_file, "fasta"):
if self.min_spades_contig_coverage != 0 and self.sequence_coverage(record.id) < self.min_spades_contig_coverage:
self.logger.warning("Excluding contig with low coverage of "+ str(self.sequence_coverage(record.id))+ " from "+ self.spades_assembly_file())
continue
if self.max_spades_contig_coverage != 0 and self.sequence_coverage(record.id) > self.max_spades_contig_coverage:
self.logger.warning("Excluding contig with high coverage of "+ str(self.sequence_coverage(record.id))+ " from "+ self.spades_assembly_file())
continue
if len(record.seq) > self.minimum_length:
sequences.append(record)
else:
self.logger.warning("Excluding contig of length "+ str(len(record.seq))+ " from "+ self.spades_assembly_file() )
SeqIO.write(sequences, spades_output_file, "fasta")
def consensus(bam_path: 'path to SAM/BAM file',
realign: 'attempt to reconstruct reference around soft-clip boundaries'=False,
min_depth: 'substitute Ns at coverage depths beneath this value'=2,
min_overlap: 'match length required to close soft-clipped gaps'=7,
clip_decay_threshold: 'read depth fraction at which to cease clip extension'=0.1,
mask_ends: 'ignore clip dominant positions within n positions of termini'=50,
trim_ends: 'trim ambiguous nucleotides (Ns) from sequence ends'=False,
uppercase: 'close gaps using uppercase alphabet'=False):
'''Infer consensus sequence(s) from alignment in SAM/BAM format'''
result = kindel.bam_to_consensus(bam_path,
realign,
min_depth,
min_overlap,
clip_decay_threshold,
mask_ends,
trim_ends,
uppercase)
print(result.report, file=sys.stderr)
SeqIO.write(result.consensuses, sys.stdout,'fasta')
def addCellToTCRsum(cellFolder, noutput, opened, tcrFout):
if os.path.isfile(noutput + '.summary.txt'):
currOut = open(noutput + '.summary.txt','r')
if not opened:
opened = True
head = currOut.readline()
head = 'cell\t' + head
tcrFout.write(head)
else:
currOut.readline()
l = currOut.readline()
while l != '':
newL = cellFolder + '\t' + l
tcrFout.write(newL)
l = currOut.readline()
currOut.close()
return opened
def writeReadsFileSE(mappedReadsDict, outReads, fastq):
if fastq.endswith('.gz'):
subprocess.call(['gunzip', fastq])
newFq = fastq.replace('.gz','')
else:
newFq = fastq
out = open(outReads, 'w')
fqF = open(newFq, 'rU')
for record in SeqIO.parse(fqF, 'fastq'):
if record.id in mappedReadsDict:
newRec = SeqRecord(record.seq, id = record.id, description = '')
SeqIO.write(newRec,out,'fasta')
out.close()
fqF.close()
if fastq.endswith('.gz'):
subprocess.call(['gzip',newFq])
def writeSegDict(segDict, outF, chain):
for seg in segDict:
currDict = segDict[seg]
pairs = ''
for pair in currDict['pairs']:
pairs += pair + '.'
pairs = pairs[:-1]
if currDict['len'] > 0:
fLine = chain + '\t' + currDict['status'] + '\t'
rank = findCurrRank(segDict, seg, currDict['len'])
fLine += str(rank) + '\t'
if currDict['type'] == 'V':
fLine += currDict['name'] + '\t' + 'paired with: ' + pairs + '\t'
else:
fLine += 'paired with: ' + pairs + '\t' + currDict['name'] + '\t'
fLine += 'NA\t' + currDict['seq'] + '\tNA\tNA\tNA\t'
fLine += 'NA\tNA\tNA\tNA\tNA\tNA\tNA\t'
if currDict['type'] == 'V':
fLine += seg + '\tNA\tNA\n'
else:
fLine += 'NA\t' + seg + '\tNA\n'
#print fLine
outF.write(str(fLine))
def makeIdNameDict(mapping):
f = open(mapping, 'r')
fDict = dict()
linesArr = f.read().split('\n')
f.close()
for line in linesArr:
lineArr = line.split('\t')
id = lineArr[0]
name = lineArr[1]
ind = name.find('Gene:')
if ind != -1:
name = name[ind+len('Gene:'):]
if id in fDict:
sys.stderr.write(str(datetime.datetime.now()) + ' Error! %s appear twice in mapping file\n' % id)
sys.stderr.flush()
fDict[id] = name
return fDict
def _reverse_orf_read(s):
"""
Reads given string back until encounters stop-codon. Also write last encounter of start-codon
:param s: string
:return t, t_part_orf, fl, is_stop:
t - pos of end reading
t_part_orf - pos of current ORF's start
fl - True if start codon was read
is_stop - True if the reading was interrupted by stop codon
"""
t = len(s)
t_part_orf = t
fl = False
while True: # do..while
if s[t - 3:t] in ORFsInGraph.starts:
t_part_orf = t - 3
fl = True
t -= 3
if (s[t - 3:t] in ORFsInGraph.stops) or t < 3:
break
return t, t_part_orf, fl, s[t - 3:t] in ORFsInGraph.stops
def pbdagcon(m5, t):
script_dir = os.path.dirname(os.path.realpath(__file__))
cmd = ("%s/pbdagcon -t %d -c 1 -m 1 %s" % (script_dir, t, m5)).split()
## hard coded threshold to prevent trimming too many bases
if(t > 100):
return None
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
## if in 5 sec, pbdagcon does not finish, trim 1 base and recursively run it
poll_seconds = 0.25
deadline = time.time() + 5
while time.time() < deadline and proc.poll() == None:
time.sleep(poll_seconds)
if proc.poll() == None:
proc.terminate()
sys.stderr.write("Warning: PBDAGCON timeout! Trimming %d base(s).\n" %(t+1))
stdout, stderr = proc.communicate()
if proc.returncode != 0:
stdout = pbdagcon(m5, t+1)
return stdout
GraphicRecord.py 文件源码
项目:DnaFeaturesViewer
作者: Edinburgh-Genome-Foundry
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def to_biopython_record(self):
"""
Example
-------
from Bio import SeqIO
gr_record = GraphicRecord(features=features, sequence_length=len(seq),
sequence=seq)
bio_record = gr_record.to_biopython_record()
with open("example.gb", "w+") as f:
SeqIO.write(record, f, "genbank")
"""
if not BIOPYTHON_AVAILABLE:
raise ImportError(".to_biopython_record requires Biopython")
features = [
SeqFeature(FeatureLocation(f.start, f.end, f.strand),
type=f.feature_type, qualifiers={"label": f.label})
for f in self.features
]
sequence = Seq(self.data["sequence"], alphabet=DNAAlphabet())
return SeqRecord(sequence=sequence, features=features)
def maskRibosomalSequence(seqRecord, dict16S, dict23S, dict5S, dict_t_rna):
passed = False
seq = seqRecord.seq
id = seqRecord.id
dicts = [dict16S, dict23S, dict5S, dict_t_rna]
for d in dicts:
if id in d:
start_pos = d[id][1]
end_pos = d[id][2]
seq_length = d[id][0]
logging.debug("identifier: " + id)
logging.debug("Start : " + str(start_pos))
logging.debug("End : " + str(end_pos))
logging.debug("Length: " + str(seq_length))
logging.debug("original sequence: " + seq)
# mask the RNA regions
seq = seq[:int(start_pos) - 1] + "N" * (int(end_pos) - int(start_pos) + 1) + seq[
int(end_pos):int(seq_length)]
logging.debug("masked sequence : " + seq)
# write the resulting masked sequences to file
if len(str(seq).replace("N", "")) > 60:
seqRecord.seq = seq
passed = True
return seqRecord, passed
def single_fasta(ref, wd):
"""
From a fasta file make single files with each sequence
:param ref:
:param wd:
:return:
"""
wd_split = wd + '/split/'
logistic.check_create_dir(wd_split)
fastaFile = open(ref, 'r')
single_fasta_list = []
for record in SeqIO.parse(fastaFile, "fasta"):
fasta_name = wd_split + '/' + record.id + '.fasta'
single_fasta_list.append(fasta_name)
output_handle = open(fasta_name, "w")
SeqIO.write(record, output_handle, "fasta")
output_handle.close()
return single_fasta_list
def augustus_multi(threads, species, single_fasta_list, wd, verbose):
'''handles the assembly process and parsing in a multithreaded way'''
if int(threads) < 1:
threads = 1
all_augustus = []
augustus = [wd, species, verbose]
for record in single_fasta_list:
single_command = augustus + [record]
all_augustus.append(single_command)
sys.stdout.write ("\t###RUNNING AUGUSTUS ###\n")
with Pool(processes=int(threads), maxtasksperchild=1000) as pool:
pool.map(augustus_call, all_augustus)
parseAugustus(wd)
return
def dump(self):
'''
write the current state to file
'''
self.log.warn("unsure if dump() works")
from cPickle import dump
from Bio import Phylo
for attr_name, fname in self.file_dumps.iteritems():
if hasattr(self,attr_name):
print("dumping",attr_name)
#if attr_name=='seqs': self.seqs.all_seqs = None
with myopen(fname, 'wb') as ofile:
if attr_name=='nodes':
continue
elif attr_name=='tree':
#biopython trees don't pickle well, write as newick + node info
self.tree.dump(fname, self.file_dumps['nodes'])
else:
dump(getattr(self,attr_name), ofile, -1)
def clock_filter(self):
if self.config["clock_filter"] == False:
return
self.tree.tt.clock_filter(reroot='best', n_iqd=self.config["clock_filter"]["n_iqd"], plot=self.config["clock_filter"]["plot"])
leaves = [x for x in self.tree.tree.get_terminals()]
for n in leaves:
if n.bad_branch:
self.tree.tt.tree.prune(n)
print('pruning leaf ', n.name)
if self.config["clock_filter"]["remove_deep_splits"]:
self.tree.tt.tree.ladderize()
current_root = self.tree.tt.tree.root
if sum([x.branch_length for x in current_root])>0.1 \
and sum([x.count_terminals() for x in current_root.clades[:-1]])<5:
new_root = current_root.clades[-1]
new_root.up=False
self.tree.tt.tree.root = new_root
with open(self.output_path+"_outliers.txt", 'a') as ofile:
for x in current_root.clades[:-1]:
ofile.write("\n".join([leaf.name for leaf in x.get_terminals()])+'\n')
self.tree.tt.prepare_tree()
def export_diversity(self, fname = 'entropy.json', indent=None):
'''
write the alignment entropy of each alignment (nucleotide and translations) to file
'''
if not hasattr(self, "entropy"):
self.diversity_statistics()
entropy_json = {}
for feat in self.entropy:
S = [max(0,round(x,4)) for x in self.entropy[feat]]
n = len(S)
if feat=='nuc':
entropy_json[feat] = {'pos':range(0,n), 'codon':[x//3 for x in range(0,n)], 'val':S}
else:
entropy_json[feat] = {'pos':[x for x in self.proteins[feat]][::3],
'codon':[(x-self.proteins[feat].start)//3 for x in self.proteins[feat]][::3], 'val':S}
write_json(entropy_json, fname, indent=indent)
def writeMutationLog():
if sort_log_by == 1:
startSort = sorted(mutation_log, key=itemgetter(0))
else:
startSort = sorted(mutation_log, key=itemgetter(5))
print "\tWriting mutation log file: " + mut_log_outfile
try:
outfile = open(mut_log_outfile,"w")
outfile.write("START\tEND\tTYPE\tBEFORE\tAFTER\n")
for line in startSort:
outfile.write(str(line[0]) + "\t" + str(line[1]) + "\t" + line[2] + "\t" + line[3] + "\t" + line[4] + "\n")
except Exception, e:
print "Error writing mutation log. "
print e
sys.exit()
finally:
outfile.close()
##############
# writeGenome: This function will write a provided annotation file and genome file.
# The isMut parameter is a flag to modify the filename for mutated variants of the simulated genome.
##############
def filter_evm_gff(evm_path):
os.chdir(evm_path)
ds = [f for f in os.listdir(evm_path) if os.path.isdir(f)]
out_h = open('evm.evidence.txt','w')
for d in ds:
fPath = d + '/evm.out'
size = os.path.getsize(fPath)
if size > 0:
blocks = open(fPath).read().strip().split('#')[1:]
for block in blocks:
coords = []
evidence = []
for line in block.strip().split('\n')[1:]:
if line.strip() != '' and line[0] != '!':
meta = line.strip().split('\t')
coords.append(int(meta[0]))
coords.append(int(meta[1]))
coords.sort()
evidence.extend([tuple(x[1:-1].split(';')) for x in meta[-1].split(',')])
evidence = set(evidence)
sources = set([x[1] for x in evidence])
out_h.write(d + '\t' + str(coords[0]) + '\t' + str(coords[-1]) + '\t' + ','.join([x[0] for x in evidence]) + '\t' + ','.join(sources) + '\n')
out_h.close()
def fa2embl(fa,embl,gff,path):
if not os.path.exists(path): os.mkdir(path)
os.chdir(path)
df = pd.read_csv(gff,sep='\t',header=None,comment='#',usecols=[0,2])
df = df[df[2].values=='gene']
chroms = list(set(df[0].tolist()))
dic = SeqIO.index(fa,'fasta')
for s in chroms:
SeqIO.write(dic[s],open('fa','w'),'fasta')
sarge.run('grep \'{s}\' {gff} > gff'.format(s=s,gff=gff))
sarge.run('/home/shangzhong/Installation/EMBOSS-6.6.0/bin/seqret \
-sequence fa -feature -fformat gff -fopenfile1 gff -osformat2 embl \
-auto -outseq {s}.embl'.format(s=s))
fns = glob.glob('*.embl')
sarge.run('cat {files} > {embl}'.format(files=' '.join(fns),embl=embl))
# for f in fns:
# os.remove(f)
# fa2embl('/data/genome/hamster/ncbi_refseq/hamster.fa','hamster.embl','/data/genome/hamster/ncbi_refseq/hamster.gff','/data/shangzhong/Picr_assembly/Annotation/RATT/embl')
def run_cnvnator(input_file,output_file):
root = output_file[:-3] + 'root'
# 1
cnv_extract_bam(input_file,root,others)
# 2
chr_path = file_path + '/cnv/scaffold'
path = chr_path
if not os.path.exists(path):
os.mkdir(path)
for record in SeqIO.parse(ref_fa,'fasta'):
SeqIO.write(record,path+'/'+record.id+'.fa','fasta')
cnv_generate_hist(root,chr_path,bin_win,others)
# 3
cnv_statistics(root,bin_win,others)
# 4
cnv_partitioning(root,bin_win,others)
# 5
cnv_call(root,output_file,bin_win,others)
def base_complement(k):
""" Return complement of base.
Performs the subsitutions: A<=>T, C<=>G, X=>X for both upper and lower
case. The return value is identical to the argument for all other values.
:param k: A base.
:returns: Complement of base.
:rtype: str
"""
try:
return comp[k]
except KeyError:
sys.stderr.write(
"WARNING: No reverse complement for {} found, returning argument.".format(k))
return k
def gfa1_to_exons(gfa_in_fn, fasta_out_fn, soft_mask_overlaps=False, hard_mask_overlaps=False):
gfa1 = read_gfa1(gfa_in_fn)
exon_dict = _segments_to_exon_dict(gfa1["segments"])
coordinate_dict = _containments_to_coordinate_dict(gfa1["containments"])
overlap_dict = _links_to_overlap_dict(gfa1["links"])
path_dict = _paths_to_path_dict(gfa1["paths"])
# Mask if necessary
_mask(exon_dict, overlap_dict, soft_mask_overlaps, hard_mask_overlaps)
# Add coordinate information to description
for exon_id, exon_record in exon_dict.items():
exon_record.description = " ".join(coordinate_dict[exon_id])
# Write to fasta
SeqIO.write(format="fasta", handle=fasta_out_fn, sequences=exon_dict.values())
def gfa1_to_gapped_transcript(
gfa_in, fasta_out, number_of_ns=100, soft_mask_overlaps=False, hard_mask_overlaps=False):
if soft_mask_overlaps == True and hard_mask_overlaps == True:
raise Exception("I can't soft mask and hard mask at the same time, dude!")
# Process
gfa1 = read_gfa1(gfa_in)
exon_dict = _segments_to_exon_dict(gfa1["segments"])
coordinate_dict = _containments_to_coordinate_dict(gfa1["containments"])
overlap_dict = _links_to_overlap_dict(gfa1["links"])
path_dict = _paths_to_path_dict(gfa1["paths"])
# Mask if necessary
_mask(exon_dict, overlap_dict, soft_mask_overlaps, hard_mask_overlaps)
composed_paths = _compose_paths(exon_dict, path_dict, number_of_ns)
# Write
SeqIO.write(format="fasta", sequences=composed_paths, handle=fasta_out)
def main(args):
for record in SeqIO.parse(args.infile, 'fasta'):
if args.discard:
if sum([1 for rx in args.discard if re.match(rx, record.id)]) > 0:
continue
subseqcounter = 0
printlog(args.debug, "DEBUG: convert to upper case", record.id)
sequence = str(record.seq).upper()
printlog(args.debug, "DEBUG: split seq by Ns", record.id)
subseqs = [ss for ss in re.split('[^ACGT]+', sequence) if len(ss) > args.minlength]
printlog(args.debug, "DEBUG: print subseqs", record.id)
for subseq in subseqs:
subseqcounter += 1
subid = '{:s}_chunk_{:d}'.format(record.id, subseqcounter)
subrecord = SeqRecord(Seq(subseq), subid, '', '')
SeqIO.write(subrecord, args.outfile, 'fasta')
def remove_duplicates(self):
"""Remove duplicate genbank records
There are multiple methods to download genes. Each download method appends
to the genbank file, so method this is needed to remove duplicates.
Args:
None
"""
input_seq_iterator = SeqIO.parse(open(self._tmpfile, "rU"), "genbank")
unique_seq_iterator = self.unique(input_seq_iterator)
output_handle = open(self._gbfile, "w")
SeqIO.write(unique_seq_iterator, output_handle, "genbank")
output_handle.close()
os.remove(self._tmpfile)
def separateFasta(fasta,prefix):
"""Utility to separate a multi-FASTA to separate FASTA files"""
for seq_record in SeqIO.parse(fasta, "fasta"):
# we are changing the IDs from "HLA:HLA..." to "HLA...". This makes IGV and other tools much happier
# would be nice to know what lead to this idiocity in naming conventions
seq_record.id = seq_record.id.replace("HLA:","")
seq_record.id = prefix + seq_record.id
print("writing " + seq_record.id)
SeqIO.write(seq_record,seq_record.id + ".fasta", "fasta")
def writeFASTQRecord(aRecord,aFASTQFile):
readId = aRecord.id
seqStr = aRecord.__dict__['_seq'].__dict__['_data']
quals = aRecord.__dict__['_per_letter_annotations']['phred_quality']
qualsStr = ""
for c in quals:
qualsStr += chr(c+33)
#import pdb;pdb.set_trace()
aFASTQFile.write("@" + readId+" "+ str(len(seqStr))+ " "+ str(len(qualsStr)) +"\n")
aFASTQFile.write(seqStr+"\n")
aFASTQFile.write("+\n")
aFASTQFile.write(qualsStr+"\n")
aFASTQFile.flush()
def demultiplexFastqByBestMatch(aFASTQFile,aHandleList,aMismatch,isForward=True):
# we are exporting each handle into a different file
# this dictionary has the sequence handles as keys and the files as values
exportFiles = fileFactory(aHandleList)
sw = swFactory()
fh = open(aFASTQFile,'rU')
for idx, record in enumerate(SeqIO.parse(fh, "fastq")):
seqStr = str(record.seq)
# if we are looking for reverse sequence, get it from the end
if isForward:
seqStr = seqStr[0:100]
else:
#import pdb; pdb.set_trace()
seqStr = seqStr[len(seqStr)-99:]
# bestMatches is a list of handles having the same alignment score
# if there is only one, save it, else ignore ambiguities
bestMatches = getBestMatches(seqStr, aHandleList, sw, aMismatch) # get the best matches for all the handles
if len(bestMatches) == 1: # there is a single best match: store it
# unfortunately FASTQ export for very long reads looks to be buggy.
# So we have to export records ourselves
#SeqIO.write(record,exportFiles[bestMatches[0]],"fastq")
writeFASTQRecord(record,exportFiles[bestMatches[0]])
print "Wrote record " +str(idx)+" "+ record.id + " to " + (exportFiles[bestMatches[0]]).name
fh.close()
# be nice and close the exported files
for seq in aHandleList:
exportFiles[seq].close()
print "ready "