def check_if_list_or_folder(folder_or_list):
list_files = []
# check if given a list of genomes paths or a folder to create schema
try:
f = open(folder_or_list, 'r')
f.close()
list_files = folder_or_list
except IOError:
for gene in os.listdir(folder_or_list):
try:
genepath = os.path.join(folder_or_list, gene)
for allele in SeqIO.parse(genepath, "fasta", generic_dna):
break
list_files.append(os.path.abspath(genepath))
except Exception as e:
print e
pass
return list_files
python类parse()的实例源码
def resolve_config(config, workdir=None):
"""
Parameters
----------
config : str, dict
If str, assume it's a YAML file and parse it; otherwise pass through
workdir : str
Optional location to specify relative location of all paths in `config`
"""
if isinstance(config, str):
config = yaml.load(open(config))
def rel(pth):
if workdir is None or os.path.isabs(pth):
return pth
return os.path.join(workdir, pth)
for key in PATH_KEYS:
if key in config:
config[key] = rel(config[key])
return config
def fastq_info(path):
""" Found some info about how to ignore warnings in code blocks here:
- http://stackoverflow.com/questions/14463277/how-to-disable-python-warnings
"""
numBases = 0
numReads = 0
readLengths = Counter()
GCTot = 0
with warnings.catch_warnings():
warnings.simplefilter("ignore")
handle = gzip.open(path, "rt")
for record in SeqIO.parse(handle, "fastq"):
numBases += len(record)
numReads += 1
readLengths[len(record)] += 1
GCTot += sum(record.seq.count(x) for x in ['G', 'C', 'g', 'c', 'S', 's'])
handle.close()
GCPer = (GCTot/numBases)
avgReadLen = (sum(value*count for value,count in readLengths.items())/numReads)
return {"numBases": numBases,
"numReads": numReads,
"numGCBases": GCTot,
"portionGC": GCPer,
"avgReadLen": avgReadLen}
def parse_gene_id_and_name():
"""return a mapping of genes ids to gene names/description as dictionary
deliberately only looking at protein coding genes
"""
gene_id_to_name = dict()
# modelled after http://genome.crg.es/~lpryszcz/scripts/gb2gtf.py
#allowed_types = set(['gene', 'CDS', 'tRNA', 'tmRNA', 'rRNA', 'ncRNA'])
allowed_types = set(['CDS'])
#wanted_qualifiers = set(['product', 'locus_tag'])
with open(GB_FILE) as fh:
for gb in SeqIO.parse(fh, 'gb'):
for f in gb.features:
if f.type not in allowed_types:
continue
qualifiers = dict(f.qualifiers)
assert len(qualifiers['locus_tag'])==1 and len(qualifiers['product'])==1
gene_id = qualifiers['locus_tag'][0]
gene_name = qualifiers['product'][0]
assert len(qualifiers['locus_tag']) == 1, (qualifiers['locus_tag'])
gene_id_to_name[gene_id] = gene_name
return gene_id_to_name
def generate_corpusfile(corpus_fname, n, out):
'''
Args:
corpus_fname: corpus file name
n: the number of chunks to split. In other words, "n" for "n-gram"
out: output corpus file path
Description:
Protvec uses word2vec inside, and it requires to load corpus file
to generate corpus.
'''
f = open(out, "w")
for r in SeqIO.parse(corpus_fname, "fasta"):
ngram_patterns = split_ngrams(r.seq, n)
for ngram_pattern in ngram_patterns:
f.write(" ".join(ngram_pattern) + "\n")
sys.stdout.write(".")
f.close()
def get_Short(genesList):
for gene in genesList:
# gene = gene.rstrip('\n')
pathtoDir = os.path.join(os.path.dirname(gene), "short")
if not os.path.exists(pathtoDir):
os.makedirs(pathtoDir)
shortgene = os.path.join(os.path.dirname(gene), "short", os.path.basename(gene))
shortgene = shortgene.replace(".fasta", "_short.fasta")
#gene_fp2 = HTSeq.FastaReader(gene)
for allele in SeqIO.parse(gene, "fasta", generic_dna):
fG = open(shortgene, 'w')
fG.write('>' + str(allele.id) + '\n' + str(allele.seq.upper()) + '\n')
fG.close()
break
return True
def calculateN_50(input_file,ratio):
lengths=[]
sums=0
for seq_record in SeqIO.parse(input_file, "fasta"):
lengths.append(len(seq_record.seq))
lengths=sorted(lengths, reverse=True)
contigsMore1000=0
totalLengthMore1000=0
for elem in lengths:
if elem >=10000:
contigsMore1000+=1
totalLengthMore1000+=int(elem)
N_half=sum(lengths)*ratio
for i in range(0, len(lengths)):
sums=sums+lengths[i]
if sums >= N_half:
return lengths[i],len(lengths),N_half/ratio,contigsMore1000,totalLengthMore1000
def check_if_list_or_folder(folder_or_list):
list_files = []
# check if given a list of genomes paths or a folder to create schema
try:
f = open(folder_or_list, 'r')
f.close()
list_files = folder_or_list
except IOError:
for gene in os.listdir(folder_or_list):
try:
genepath = os.path.join(folder_or_list, gene)
for allele in SeqIO.parse(genepath, "fasta", generic_dna):
break
list_files.append(os.path.abspath(genepath))
except Exception as e:
print (e)
pass
return list_files
def mmff_from_file(fasta_file, morphs):
'''Compute a metamorphic tests files from an input FASTA file.
Arguments:
fasta_file: an open file object for the FASTA file
morphs: list of morph functions to apply
Result:
tbc
'''
morphs_dict={
'reverse':mmff_reverse,
"passthrough": mmff_passthrough
}
morphed_sequences= []
for seq in SeqIO.parse(fasta_file, "fasta"):
for morph in morphs:
#print("morph "+str(morphs_dict[morph])+" on "+seq.id)
morphed_sequences.append(morphs_dict[morph](seq))
return morphed_sequences
def mmff_from_file(fasta_file, morphs):
'''Compute a metamorphic tests files from an input FASTA file.
Arguments:
fasta_file: an open file object for the FASTA file
morphs: list of morph functions to apply
Result:
tbc
'''
morphs_dict={
'reverse':mmff_reverse,
"passthrough": mmff_passthrough,
'numeric_header': mmff_numeric_header
}
morphed_sequences= []
for seq in SeqIO.parse(fasta_file, "fasta"):
for morph in morphs:
#print("morph "+str(morphs_dict[morph])+" on "+seq.id)
morphed_sequences.append(morphs_dict[morph](seq))
return morphed_sequences
def mmff_from_file(fasta_file, morphs):
'''Compute a FastaStats object from an input FASTA file.
Arguments:
fasta_file: an open file object for the FASTA file
morphs: list of morph functions to apply
Result:
tbc
'''
morphs_dict={
'reverse':mmff_reverse,
"passthrough": mmff_passthrough
}
morphed_sequences= []
for seq in SeqIO.parse(fasta_file, "fasta"):
for morph in morphs:
print("morph "+str(morphs_dict[morph])+" on "+seq.id)
morphed_sequences.append(morphs_dict[morph](seq))
return morphed_sequences
def main(args):
for record in SeqIO.parse(args.infile, 'fasta'):
if args.discard:
if sum([1 for rx in args.discard if re.match(rx, record.id)]) > 0:
continue
subseqcounter = 0
printlog(args.debug, "DEBUG: convert to upper case", record.id)
sequence = str(record.seq).upper()
printlog(args.debug, "DEBUG: split seq by Ns", record.id)
subseqs = [ss for ss in re.split('[^ACGT]+', sequence) if len(ss) > args.minlength]
printlog(args.debug, "DEBUG: print subseqs", record.id)
for subseq in subseqs:
subseqcounter += 1
subid = '{:s}_chunk_{:d}'.format(record.id, subseqcounter)
subrecord = SeqRecord(Seq(subseq), subid, '', '')
SeqIO.write(subrecord, args.outfile, 'fasta')
def READ_FASTA_ENTRY(file_name):
fasta_sequences=[]
sequence_name=[]
full_names_dict={}
sequences_dict={}
if os.stat(file_name)[6]!=0: #not empty
fh = open(file_name, "r")
for record in SeqIO.parse(fh, "fasta"):
short_name=str(record.id).split(' ')[0]
sequence_name.append(short_name)
full_names_dict[short_name]=str(record.id)
fasta_sequences.append(str(record.seq))
sequences_dict[short_name]=str(record.seq)
return sequences_dict,fasta_sequences, sequence_name, full_names_dict
def READ_FASTA_ENTRY(file_name):
fasta_sequences=[]
sequence_name=[]
full_names_dict={}
sequences_dict={}
if os.stat(file_name)[6]!=0: #not empty
fh = open(file_name, "r")
for record in SeqIO.parse(fh, "fasta"):
short_name=str(record.id).split(' ')[0]
sequences_dict[short_name]=str(record.seq)
return sequences_dict
#------------------------------------------------------------------------------------------
def test_malign(self):
self.setUpPhylotyper(aa=False)
tmpfiles = [ os.path.join(self.test_dir, 'tmp1.fasta'),
os.path.join(self.test_dir, 'tmp2.fasta')
]
tmpfile = os.path.join(self.test_dir, 'tmp_saln.fasta')
aln = SeqAligner(self.configObj)
aln.malign(self.subtype_options['input'], tmpfiles, tmpfile)
lengths = []
tmpfiles.append(tmpfile)
for f in tmpfiles:
seqs = SeqIO.parse(open(f, 'r'),'fasta')
lengths.append(len(str(seqs.next().seq)))
self.assertEqual(lengths[0]+lengths[1], lengths[2])
## TODO Add test for new multi amino acid/dna
def remove_duplicates(self):
"""Remove duplicate genbank records
There are multiple methods to download genes. Each download method appends
to the genbank file, so method this is needed to remove duplicates.
Args:
None
"""
input_seq_iterator = SeqIO.parse(open(self._tmpfile, "rU"), "genbank")
unique_seq_iterator = self.unique(input_seq_iterator)
output_handle = open(self._gbfile, "w")
SeqIO.write(unique_seq_iterator, output_handle, "genbank")
output_handle.close()
os.remove(self._tmpfile)
def get_lengths(fastq):
'''
Loop over the fastq file, extract length of sequences
'''
return np.array([len(record) for record in SeqIO.parse(fastq, "fastq")])
def get_bin(fq, sizeRange):
'''
Loop over the fastq file
Extract list of nucleotides and list of quality scores in tuples in list
Only select those reads of which the length is within the size range
'''
logging.info("Extracting nucleotides and quality scores of selected bin.")
return [(list(rec.seq), list(rec.letter_annotations["phred_quality"]))
for rec in SeqIO.parse(fq, "fastq") if sizeRange[0] < len(rec) < sizeRange[1]]
def main():
'''Run the import'''
if len(sys.argv) < 2:
print('Usage: {} <gbk file>'.format(sys.argv[0]), file=sys.stderr)
sys.exit(2)
connection = psycopg2.connect(DB_CONNECTION)
recs = SeqIO.parse(sys.argv[1], 'genbank')
with connection:
with connection.cursor() as cursor:
for rec in recs:
if rec.name in BLACKLIST:
print('Skipping blacklisted record {!r}'.format(rec.name), file=sys.stderr)
continue
load_record(rec, cursor)
connection.close()
def parse_smcog(feature):
'''Parse the smCOG feature qualifier'''
if 'note' not in feature.qualifiers:
raise ValueError('No note qualifier in {}'.format(feature))
for entry in feature.qualifiers['note']:
if not entry.startswith('smCOG:'):
continue
match = SMCOG_PATTERN.search(entry)
if match is None:
print(entry)
raise ValueError('Failed to parse smCOG line {!r}'.format(entry))
return match.groups()
raise ValueError('No smcog qualifier in {}'.format(feature))