def parse_specificity(feature, params):
'''parse the feature's specificity entries'''
if 'specificity' in feature.qualifiers:
for spec in feature.qualifiers['specificity']:
if spec.startswith('KR activity: '):
params['kr_activity'] = False if spec.endswith('inactive') else True
continue
if spec.startswith('KR stereochemistry: '):
params['kr_stereochemistry'] = spec.split(':')[-1].strip()
continue
if spec.startswith('NRPSpredictor2 SVM: '):
params['nrps_predictor'] = spec.split(':')[-1].strip()
continue
if spec.startswith('Stachelhaus code: '):
params['stachelhaus'] = spec.split(':')[-1].strip()
continue
if spec.startswith('Minowa: '):
params['minowa'] = spec.split(':')[-1].strip()
continue
if spec.startswith('PKS signature: '):
params['pks_signature'] = spec.split(':')[-1].strip()
continue
if spec.startswith('consensus: '):
params['consensus'] = spec.split(':')[-1].strip()
continue
python类parse()的实例源码
def parse_fasta(self):
self.ref_id=dict()
self.ref_inf=dict()
i=1
N = 0
ref_inf=np.empty(shape=[0,3])
for seqs in SeqIO.parse(self.ref,'fasta'):
seq_id = seqs.id
self.ref_id[i] = seq_id
seq = str(seqs.seq.upper())
seq_len = len(seq)
self.ref_inf[seq_id]=seq_len
N+=seq.count('N')
ref_inf = np.append(ref_inf,[[i,seq_id,seq_len]],axis=0)
i+=1
self.ref_detail = pd.DataFrame(ref_inf,columns=['Index','Contig','Length(bp)'])
self.N = N
makeShorterClassII.py 文件源码
项目:icing
作者: NationalGenomicsInfrastructure
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def printShortenedFASTA(fixed,locus):
"""
Prints out a only genomic FASTA sequences with ID, but only -200 bases before exon 2
"""
for seq_record in SeqIO.parse(fixed,"imgt"):
if seq_record.description.startswith(locus) and len(seq_record.seq) > 1:
# the new exon record we are extracting
newSeq = ""
extracted = False # we will not be able to extract all parts as many sequences has missing introns
for f in seq_record.features: # we hope the features are in order
if f.type == "intron" and f.qualifiers['number'] == ['1']:
newSeq = seq_record.seq[f.location.end-200:]
extracted = True
break # we have the whole sequence now
if extracted:
sys.stdout.write( SeqRecord(newSeq, id=seq_record.id, description=seq_record.description).format("fasta") )
def getUniqueRedundSets(fileName,speciesName):
'''Run through genbank file fileName, get set of unique genes (with no
redundancies), and set with redundancies.'''
f = open(fileName, 'rU')
geneL=[]
for record in SeqIO.parse(f, "genbank"):
# iterate through the genes on the chromosome
for feature in record.features:
# choose only the features that are protein coding genes
if feature.type == "CDS" and 'protein_id' in feature.qualifiers:
geneName = speciesName + '-' + feature.qualifiers['protein_id'][0]
geneL.append(geneName)
f.close()
# now figure out which ones are unique
uniqueS = set()
redundS = set()
for gene in geneL:
if geneL.count(gene)>1:
redundS.add(gene)
else:
uniqueS.add(gene)
return uniqueS,redundS
def dump_cluster(c):
if os.path.exists("%s_dist_%s_aligned_short.fasta-cluster%d.png" % (prefix, dist, c)):
print """
## Subcluster %d
""" % (c)
if c == 0:
print "Cluster 0 represents isolates that do not cluster with any other isolates within the distance cut-off, i.e. singleton sequences. The sequences presented are unrelated."
print """

""" % (c, prefix, dist, c)
else:
print """
## Subcluster %s
(Tree not shown for clusters with <5 isolates)
Isolates:
""" % (c)
for rec in SeqIO.parse(open("%s_dist_%s_aligned_short.fasta-cluster%d" % (prefix, dist, c)), "fasta"):
print " - %s" % (rec.id)
def parse_barcodes(barcode_file):
#print "parsing barcodes"
barcode_list = list()
barcode_list.append("uncalssified")
barcode_dict = dict()
barcode_sequences = SeqIO.parse(open(barcode_file),'fasta')
for barcode in barcode_sequences:
name, sequence = barcode.id, str(barcode.seq)
barcode_dict[name]=sequence
barcode_list.append(name)
barcode_dict[name+"_rev"]=str(barcode.reverse_complement().seq)
#print barcode_list
#for barcode in barcode_dict:
# print barcode, barcode_dict[barcode]
#sys.exit()
return barcode_dict,barcode_list
def filter_using_summary(fq, args):
"""Use quality scores from albacore summary file for filtering
Use the summary file from albacore for more accurate quality estimate
Get the dataframe from nanoget, convert to dictionary
"""
data = {entry[0]: entry[1] for entry in process_summary(
summaryfile=args.summary,
threads="NA",
readtype=args.readtype,
barcoded=False)[
["readIDs", "quals"]].itertuples(index=False)}
try:
for record in SeqIO.parse(fq, "fastq"):
if data[record.id] > args.quality and len(record) > args.length:
print(record[args.headcrop:args.tailcrop].format("fastq"), end="")
except KeyError:
logging.error("mismatch between summary and fastq: \
{} was not found in the summary file.".format(record.id))
sys.exit('\nERROR: mismatch between sequencing_summary and fastq file: \
{} was not found in the summary file.\nQuitting.'.format(record.id))
def filter_fastq(input_file,output_file,downweight_number,ctot,gtoa):
"""
Takes a Fastq file as input and weights the quality of the bases down
at the start and the end of the reads.
"""
in_iterator = SeqIO.parse(input_file,'fastq')
input_records=list(in_iterator)
for i, record in enumerate(input_records):
change_bases_c = None
change_bases_t = None
temp_qual = record.letter_annotations['phred_quality']
if(ctot):
change_bases_c = [check_c_2_t(nuc) and i < downweight_number for i, nuc in enumerate(record.seq)]
if(gtoa):
change_bases_t = [check_g_2_a(nuc) and (len(record.seq)-i) <= downweight_number for i, nuc in enumerate(record.seq)]
new_qual =downweight_quality(temp_qual, change_bases_c ,change_bases_t)
input_records[i].letter_annotations['phred_quality']=new_qual
handle = file(output_file,'wt')
SeqIO.write(input_records, handle, "fastq")
def match_both_pairs_filter(self):
self.logger.warning("Extracting read names from FASTQ file where both reads must match")
regex = re.compile(r'/[12]$')
base_read_names = {}
with open(self.fastq_file, "r") as fastq_file_input:
for record in SeqIO.parse(fastq_file_input, "fastq"):
base_read_name = regex.sub('', record.id)
if base_read_name in base_read_names:
base_read_names[base_read_name] = record.id
else:
base_read_names[base_read_name] = 1
with open(self.output_readnames_file, "w") as readnames_output:
for base_name, read_name in base_read_names.items():
if read_name== 1:
continue
readnames_output.write(read_name + '\n')
def remove_small_large_contigs(self,input_file, output_file):
with open(input_file, "r") as spades_input_file, open(output_file, "w") as spades_output_file:
sequences = []
for record in SeqIO.parse(spades_input_file, "fasta"):
if self.min_spades_contig_coverage != 0 and self.sequence_coverage(record.id) < self.min_spades_contig_coverage:
self.logger.warning("Excluding contig with low coverage of "+ str(self.sequence_coverage(record.id))+ " from "+ self.spades_assembly_file())
continue
if self.max_spades_contig_coverage != 0 and self.sequence_coverage(record.id) > self.max_spades_contig_coverage:
self.logger.warning("Excluding contig with high coverage of "+ str(self.sequence_coverage(record.id))+ " from "+ self.spades_assembly_file())
continue
if len(record.seq) > self.minimum_length:
sequences.append(record)
else:
self.logger.warning("Excluding contig of length "+ str(len(record.seq))+ " from "+ self.spades_assembly_file() )
SeqIO.write(sequences, spades_output_file, "fasta")
def read_id2idx(reads_fn):
if reads_fn[-1] == 'q':
fmt = 'fastq'
else:
fmt = 'fasta'
reads_idx={}
reads_fh = open(reads_fn, "rU")
num_read=0
for record in SeqIO.parse(reads_fh, fmt):
reads_idx[record.id] = num_read
num_read += 1
reads_fh.close()
return reads_idx
def writeReadsFileSE(mappedReadsDict, outReads, fastq):
if fastq.endswith('.gz'):
subprocess.call(['gunzip', fastq])
newFq = fastq.replace('.gz','')
else:
newFq = fastq
out = open(outReads, 'w')
fqF = open(newFq, 'rU')
for record in SeqIO.parse(fqF, 'fastq'):
if record.id in mappedReadsDict:
newRec = SeqRecord(record.seq, id = record.id, description = '')
SeqIO.write(newRec,out,'fasta')
out.close()
fqF.close()
if fastq.endswith('.gz'):
subprocess.call(['gzip',newFq])
def writeFailedReconstructions(outF, chain, writtenArr, output, idNameDict, fastaDict):
recF = output + '.reconstructed.junctions.' + chain + '.fa'
if os.path.isfile(recF):
f = open(recF, 'rU')
segDict = dict()
for tcrRecord in SeqIO.parse(f, 'fasta'):
tcrSeq = str(tcrRecord.seq)
if tcrSeq.find('NNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN') != -1:
status = 'Failed reconstruction - reached maximum number of iterations'
segDict = addSegmentsToDict(segDict, status, writtenArr, tcrRecord, idNameDict, fastaDict)
elif tcrSeq.find('NNNN') != -1:
status = 'Failed reconstruction - V and J segment do not overlap'
segDict = addSegmentsToDict(segDict, status, writtenArr, tcrRecord, idNameDict, fastaDict)
f.close()
if len(segDict) > 0:
writeSegDict(segDict, outF, chain)
def _read_reference(self, reference):
"""Read in the reference from the file into a dictionary.
Parameters
----------
reference: str
Path to the reference.
Returns
-------
reference: dict
Dictionary with chromosome - sequence mapping.
"""
if reference is None:
self._reference = None
else:
self._reference = {}
with open(reference) as reference_fp:
for record in SeqIO.parse(reference_fp, "fasta"):
self._reference[record.id] = list(record.seq)
def from_orfs_files(cls, seq_path, paths_path, graph_path):
"""
Create object from graph, ORFs files
:param seq_path: path to .fasta file with ORFs sequences
:param paths_path: path to .path file with ORF's paths
:param graph_path: path to graph
:return: LinkageCluster
"""
from Bio import SeqIO
orfs = collections.OrderedDict()
with open(seq_path) as seq_file, open(paths_path) as path_file:
paths_dict = {}
orfid = ''
for ind, line in enumerate(path_file.read().split('\n')):
if ind % 2 == 0:
try:
orfid = re.findall(r'(ORF_\d+)', line)[0]
except:
continue
else:
paths_dict[orfid] = [int(re.findall(r'^(\d+),', line)[0])] + re.findall('(\d+)([+-]),', line) +\
[int(re.findall(r',(\d+)$', line)[0])]
for rec in SeqIO.parse(seq_file, 'fasta'):
orfs[str(rec.seq)] = (paths_dict[rec.id], rec.id)
return cls(GFAgraph().load_graph(graph_path), orfs)
def single_fasta(ref, wd):
"""
From a fasta file make single files with each sequence
:param ref:
:param wd:
:return:
"""
wd_split = wd + '/split/'
logistic.check_create_dir(wd_split)
fastaFile = open(ref, 'r')
single_fasta_list = []
for record in SeqIO.parse(fastaFile, "fasta"):
fasta_name = wd_split + '/' + record.id + '.fasta'
single_fasta_list.append(fasta_name)
output_handle = open(fasta_name, "w")
SeqIO.write(record, output_handle, "fasta")
output_handle.close()
return single_fasta_list
def mga_summary(orf_fp, contigs_fp, sample_id):
"""Summarizes results from MetaGene Annotator."""
genes = []
gene_no = 0
contigs = SeqIO.parse(contigs_fp, 'fasta')
contig_seqs = {r.description: r.seq for r in contigs}
annotations = MetaGeneAnnotation.parse(open(orf_fp))
for anno in annotations:
contig_seq = contig_seqs[anno.id]
# Gather putative genes and create SeqRecords for them
for pgene in anno.genes:
gene_seq = contig_seq[pgene.start:pgene.end]
if pgene.strand == '-':
gene_seq = gene_seq.reverse_complement()
gene = SeqRecord(
gene_seq,
id=anno.id,
description="{}:{}".format(sample_id, gene_no))
genes.append(gene)
gene_no += 1
return genes
def FastaToFDB(self, fastafile, username):
fdb_registers = []
content = open(fastafile, "r")
sequences = parse(content, 'fasta')
for sequence in sequences:
fdb_register = FDBRegister()
fdb_register.description = sequence.description
fdb_register.gene = str(sequence.seq)
fdb_register.geneinfo = sequence.annotations
fdb_register.filename = fastafile
fdb_register.date = date.today()
fdb_register.user = username
fdb_registers.append(fdb_register)
content.close()
return self.mount_fdb_file(fdb_registers)
def is_sequence_file(self, file_name, file_format, show_error=True):
"""
Try to open a sequence file using Biopython. Returns 'True' if the file is a valid fasta file.
"""
valid_file = False
file_handler = None
try:
file_handler = open(file_name,"r")
r = list(SeqIO.parse(file_handler, file_format))
if len(r) > 0:
valid_file = True
else:
valid_file = False
except:
valid_file = False
if file_handler != None:
file_handler.close()
if not valid_file:
title = "File Type Error"
message = "The selected File is not a valid %s." % (pmdt.supported_sequence_file_types[file_format])
if show_error:
self.show_error_message(title,message)
return valid_file
def get_psi_blast_record(self,result_handle):
"""
Convert it to a list because when a using .parse(), Biopython returns a generator.
"""
records = list(NCBIXML.parse(result_handle))
return records[0]
###############################################################################################
# ALIGNMENT BUILDING. #
###############################################################################################
#################################################################
# Step 1/4 for performing an alignment from the main menu. #
# Methods to launch an alignment program and check if it can be #
# used (for example, if it is installed on the user's machine). #
#################################################################
def get_fasta_in_kraken_format(outfile_fasta='sequences.fa'):
output=open(outfile_fasta,'w')
for file_name in os.listdir(cwd):
if file_name.endswith('.gbff'):
records = SeqIO.parse(file_name, "genbank")
for seq_record in records:
seq_id=seq_record.id
seq=seq_record.seq
for feature in seq_record.features:
if 'source' in feature.type:
print(feature.qualifiers)
taxid=''.join(feature.qualifiers['db_xref'])
taxid=re.sub(r'.*taxon:','kraken:taxid|',taxid)
print(''.join(taxid))
outseq=">"+seq_id+"|"+taxid+"\n"+str(seq)+"\n"
output.write(outseq)
os.remove(file_name)
output.close()
return
def read(self,input_file,trim=None):
# load file to a parser
fasta_sequences = SeqIO.parse(open(input_file),'fasta')
if(trim==None):
# if there is nothing to trim from
# parse sequences as they are
# by storing name and sequence as
# string
for fasta in fasta_sequences:
name, sequence = fasta.id, str(fasta.seq)
self._sequence_dictionary[name] = sequence
else:
# if there is sothing to trim from
# parse sequences as they are but when holding sequence name
# as dictionary key trim as suggested
for fasta in fasta_sequences:
name, sequence = fasta.id, str(fasta.seq)
self._sequence_dictionary[name.strip(str(trim))] = sequence
# method that returns the sequence dictionary
def run_cnvnator(input_file,output_file):
root = output_file[:-3] + 'root'
# 1
cnv_extract_bam(input_file,root,others)
# 2
chr_path = file_path + '/cnv/scaffold'
path = chr_path
if not os.path.exists(path):
os.mkdir(path)
for record in SeqIO.parse(ref_fa,'fasta'):
SeqIO.write(record,path+'/'+record.id+'.fa','fasta')
cnv_generate_hist(root,chr_path,bin_win,others)
# 3
cnv_statistics(root,bin_win,others)
# 4
cnv_partitioning(root,bin_win,others)
# 5
cnv_call(root,output_file,bin_win,others)
def count_records(input_object, format='fasta'):
"""Count SeqRecord objects from a file in the specified format.
:param input_object: A file object or a file name.
:param format: Input format (fasta by default).
:returns: Number of records in input file.
:rtype: int
"""
handle = input_object
if type(handle) == str:
handle = open(handle, "rU")
counter = 0
for _ in SeqIO.parse(handle, format):
counter += 1
return counter
def get_fasta_seq_dictonary(fa_file):
#returns fasta files dictonary for length and gc content
dict_fa = {}
for seq_record in SeqIO.parse(fa_file, "fasta"):
fa_id = seq_record.id
faseq = seq_record.seq
gc_count = GC(faseq)
seq_len = len(faseq)
#calculate gc content distribution to nearest 10
gc_content_decimal_distribution = math.floor(gc_count / 10) * 10 #10-bin window
#gc_content_decimal_distribution = gc_count/seq_len
dict_fa[fa_id] = [faseq, seq_len, gc_content_decimal_distribution]
return dict_fa
def get_Short(genesList):
for gene in genesList:
# gene = gene.rstrip('\n')
pathtoDir = os.path.join(os.path.dirname(gene), "short")
if not os.path.exists(pathtoDir):
os.makedirs(pathtoDir)
shortgene = os.path.join(os.path.dirname(gene), "short", os.path.basename(gene))
shortgene = shortgene.replace(".fasta", "_short.fasta")
#gene_fp2 = HTSeq.FastaReader(gene)
for allele in SeqIO.parse(gene, "fasta", generic_dna):
fG = open(shortgene, 'w')
fG.write('>' + str(allele.id) + '\n' + str(allele.seq) + '\n')
fG.close()
break
return True
def check_if_list_or_folder(folder_or_list):
list_files = []
# check if given a list of genomes paths or a folder to create schema
try:
f = open(folder_or_list, 'r')
f.close()
list_files = folder_or_list
except IOError:
for gene in os.listdir(folder_or_list):
try:
genepath = os.path.join(folder_or_list, gene)
for allele in SeqIO.parse(genepath, "fasta", generic_dna):
break
list_files.append(os.path.abspath(genepath))
except Exception as e:
print e
pass
return list_files
# ================================================ MAIN ================================================ #
def calculateN_50(input_file,ratio):
lengths=[]
sums=0
for seq_record in SeqIO.parse(input_file, "fasta"):
lengths.append(len(seq_record.seq))
lengths=sorted(lengths, reverse=True)
contigsMore1000=0
totalLengthMore1000=0
for elem in lengths:
if elem >=10000:
contigsMore1000+=1
totalLengthMore1000+=int(elem)
N_half=sum(lengths)*ratio
for i in range(0, len(lengths)):
sums=sums+lengths[i]
if sums >= N_half:
return lengths[i],len(lengths),N_half/ratio,contigsMore1000,totalLengthMore1000
sort_allele_orientation.py 文件源码
项目:chewBBACA_deprecated
作者: mickaelsilva
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def check_if_list_or_folder(folder_or_list):
list_files = []
# check if given a list of genomes paths or a folder to create schema
try:
f = open(folder_or_list, 'r')
f.close()
list_files = folder_or_list
except IOError:
for gene in os.listdir(folder_or_list):
try:
genepath = os.path.join(folder_or_list, gene)
for allele in SeqIO.parse(genepath, "fasta", generic_dna):
break
list_files.append(os.path.abspath(genepath))
except Exception as e:
print e
pass
return list_files
sort_allele_orientation.py 文件源码
项目:chewBBACA_deprecated
作者: mickaelsilva
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def get_Short (gene):
newFasta=''
for allele in SeqIO.parse(gene, "fasta", generic_dna):
try:
translatedSequence,sequence=translateSeq(allele.seq)
newFasta+=">"+allele.id+"\n"+str(sequence)+"\n"
except Exception as e:
print e
pass
with open(gene, "wb") as f:
f.write(newFasta)
return True