def cast_to_str(obj):
"""Return a string representation of a Seq or SeqRecord.
Args:
obj (str, Seq, SeqRecord): Biopython Seq or SeqRecord
Returns:
str: String representation of the sequence
"""
if isinstance(obj, str):
return obj
if isinstance(obj, Seq):
return str(obj)
if isinstance(obj, SeqRecord):
return str(obj.seq)
else:
raise ValueError('Must provide a string, Seq, or SeqRecord object.')
python类Seq()的实例源码
def cast_to_seq(obj, alphabet=IUPAC.extended_protein):
"""Return a Seq representation of a string or SeqRecord object.
Args:
obj (str, Seq, SeqRecord): Sequence string or Biopython SeqRecord object
alphabet: See Biopython SeqRecord docs
Returns:
Seq: Seq representation of the sequence
"""
if isinstance(obj, Seq):
return obj
if isinstance(obj, SeqRecord):
return obj.seq
if isinstance(obj, str):
obj = obj.upper()
return Seq(obj, alphabet)
else:
raise ValueError('Must provide a string, Seq, or SeqRecord object.')
def seq(self):
"""Seq: Dynamically loaded Seq object from the sequence file"""
if self.sequence_file:
file_to_load = copy(self.sequence_path)
log.debug('{}: reading sequence from sequence file {}'.format(self.id, file_to_load))
tmp_sr = SeqIO.read(file_to_load, 'fasta')
return tmp_sr.seq
else:
if not self._seq:
log.debug('{}: no sequence stored in memory'.format(self.id))
else:
log.debug('{}: reading sequence from memory'.format(self.id))
return self._seq
GraphicRecord.py 文件源码
项目:DnaFeaturesViewer
作者: Edinburgh-Genome-Foundry
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def to_biopython_record(self):
"""
Example
-------
from Bio import SeqIO
gr_record = GraphicRecord(features=features, sequence_length=len(seq),
sequence=seq)
bio_record = gr_record.to_biopython_record()
with open("example.gb", "w+") as f:
SeqIO.write(record, f, "genbank")
"""
if not BIOPYTHON_AVAILABLE:
raise ImportError(".to_biopython_record requires Biopython")
features = [
SeqFeature(FeatureLocation(f.start, f.end, f.strand),
type=f.feature_type, qualifiers={"label": f.label})
for f in self.features
]
sequence = Seq(self.data["sequence"], alphabet=DNAAlphabet())
return SeqRecord(sequence=sequence, features=features)
def _sequence(v, deparse=False):
if not deparse:
try:
return Seq(v, IUPAC.ambiguous_dna)
except:
return None
else:
try:
return str(v)
except:
return ''
# Initializer
#
# Arguments: row = dictionary of {field:value} data
# genotyped = if True assign v_call from genotyped field
# Returns: IgRecord
def getSeqField(self, field):
if field in IgRecord._field_map:
v = getattr(self, IgRecord._field_map[field])
elif field in self.annotations:
v = self.annotations[field]
else:
return None
if isinstance(v, Seq):
return v
elif isinstance(v, str):
return Seq(v, IUPAC.ambiguous_dna)
else:
return None
# Returns: dictionary of the namespace
def load_dataframe(db_file, dialect='excel-tab'):
df = pd.read_csv(db_file, dialect=dialect)
df = df.rename(columns=dict(zip(df.columns, df.columns.str.lower())))
# df = df[df['functional'] == 'F']
# parse v and j genes to speed up computation later
df['v_gene_set'] = [set(
parseAllele(x, gene_regex, 'set')) for x in df.v_call]
df['v_gene_set_str'] = [str(set(
parseAllele(x, gene_regex, 'set'))) for x in df.v_call]
df['j_gene_set'] = [set(
parseAllele(x, gene_regex, 'set')) for x in df.j_call]
df['junc'] = [junction_re(x) for x in df.junction]
df['aa_junc'] = [str(Seq(x, generic_dna).translate()) for x in df.junc]
df['aa_junction_length'] = [len(x) for x in df.aa_junc]
return df
def _parse_gff(file, file_name):
records = []
with open(file, "r") as infile:
for i, rec in enumerate(GFF.parse(infile)):
# Enumerates the contigs (can be chromosome, plasmid and unidentified)
# based on total number of contigs (not type)
rec_id = rec.id + "_" + str(i + 1)
if len(rec_id) > 15:
rec_id = "contig_" + "_" + str(i + 1)
seq_record = SeqRecord(Seq(str(rec.seq), IUPAC.unambiguous_dna), id=rec_id,
description=os.path.basename(file_name),
features=rec.features)
records.append(seq_record)
return records
def get_cds_sequence(rna,c_df,chrom_seq):
pr_df = c_df[c_df['rna_id'].values==rna]
strand = list(set(pr_df[6].tolist()))
if len(strand) == 2:
assert False, rna+' has both strands'
# seqeunce merge
chr_seq = Seq('')
for start,end in zip(pr_df[3],pr_df[4]):
if strand == ['-']:
chr_seq += chrom_seq[start-1:end].reverse_complement()
else:
chr_seq += chrom_seq[start-1:end]
# consider the frame information in 7th column
frame = int(pr_df[7].tolist()[0])
rna_seq = chr_seq[frame:]
return str(rna_seq.translate())
def translation(self):
"""
Returns the amino acid sequence of self
B = "Asx"; Aspartic acid (R) or Asparagine (N)
X = "Xxx"; Unknown or 'other' amino acid
Z = "Glx"; Glutamic acid (E) or Glutamine (Q)
J = "Xle"; Leucine (L) or Isoleucine (I), used in mass-spec (NMR)
U = "Sec"; Selenocysteine
O = "Pyl"; Pyrrolysine
"""
codon_table = CodonTable.ambiguous_dna_by_id[self.transl_table]
seq = Seq(str(self.sequence()),IUPACAmbiguousDNA())
translated_seq = seq.translate(codon_table).tostring().replace('B','X').replace('Z','X').replace('J','X')
if '*' in translated_seq[:-1]: # check if premature stop codon in the translation
logging.error('Stop codon found within the CDS. It will rise an error submiting the data to ENA. Please fix your gff file.')
# remove the stop character. It's not accepted by embl
if translated_seq[-1:] == "*":
translated_seq = translated_seq[:-1]
return translated_seq
def main(args):
for record in SeqIO.parse(args.infile, 'fasta'):
if args.discard:
if sum([1 for rx in args.discard if re.match(rx, record.id)]) > 0:
continue
subseqcounter = 0
printlog(args.debug, "DEBUG: convert to upper case", record.id)
sequence = str(record.seq).upper()
printlog(args.debug, "DEBUG: split seq by Ns", record.id)
subseqs = [ss for ss in re.split('[^ACGT]+', sequence) if len(ss) > args.minlength]
printlog(args.debug, "DEBUG: print subseqs", record.id)
for subseq in subseqs:
subseqcounter += 1
subid = '{:s}_chunk_{:d}'.format(record.id, subseqcounter)
subrecord = SeqRecord(Seq(subseq), subid, '', '')
SeqIO.write(subrecord, args.outfile, 'fasta')
def get_reconstructed_alignment(self):
"""
Get the multiple sequence alignment including reconstructed sequences for
the internal nodes.
"""
from Bio.Align import MultipleSeqAlignment
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
self.logger("TreeAnc.get_reconstructed_alignment ...",2)
if not hasattr(self.tree.root, 'sequence'):
self.logger("TreeAnc.reconstructed_alignment... reconstruction not yet done",3)
self.reconstruct_anc('ml')
new_aln = MultipleSeqAlignment([SeqRecord(id=n.name, seq=Seq("".join(n.sequence)), description="")
for n in self.tree.find_clades()])
return new_aln
def make_RefCmap(fasta_file, enz=None, min_len=20, min_nsite=5, path=None):
name = fasta_file.rsplit('.',1)[0].split('/')[-1]
index = 0
enzymes = {'BspQI':'GCTCTTC',
'BbvCI':'CCTCAGC',
'Bsml':'GAATGC',
'BsrDI':'GCAATG',
'bseCI':'ATCGAT',
'BssSI':'CACGAG'}
try:
cmap_file='%s/%s_%s.cmap'%(path,name,enz)
forwards = enzymes[enz]
reverse = str(Seq(forwards).reverse_complement())
with open (cmap_file,'a') as ref_cmap:
ref_cmap.write('# CMAP File Version:\t0.1\n')
ref_cmap.write('# Label Channels:\t1\n')
ref_cmap.write('# Nickase Recognition Site 1:\t%s\n'%forwards)
ref_cmap.write('# Enzyme1:\tNt.%s\n'%enz)
ref_cmap.write('# Number of Consensus Nanomaps:\tN/A\n')
ref_cmap.write('#h CMapId\tContigLength\tNumSites\tSiteID\tLabelChannel\tPosition\tStdDev\tCoverage\tOccurrence\n')
ref_cmap.write('#f int\tfloat\tint\tint\tint\tfloat\tfloat\tint\tint\n')
for seqs in SeqIO.parse(fasta_file,'fasta'):
seq = str(seqs.seq.upper())
seq_len = len(seq)
index+=1
if seq_len >= min_len*1000:
nsites = len(re.findall('%s|%s'%(forwards,reverse),seq))
if nsites >=min_nsite:
j=1
for o in re.finditer('%s|%s'%(forwards,reverse),seq):
ref_cmap.write('%s\t%.1f\t%d\t%d\t1\t%.1f\t1.0\t1\t1\n'%(index,seq_len,nsites,j,o.start()+1))
j+=1
ref_cmap.write('%s\t%.1f\t%d\t%d\t0\t%.1f\t0.0\t1\t0\n'%(index,seq_len,nsites,j,seq_len))
except:
pass
def generateSeqHandles(anIndexCfg):
"""
The YAML config file to parse is like:
handles:
prefix: "TTAGTCTCCGACGGCAGGCTTCAAT"
postfix: "ACGCACCCACCGGGACTCAG"
indexes: [
"ACAGTC",
"TGATGC",
"TCTCAG"
]
There is a handle at one end of each sequence which is as follows:
TTAGTCTCCGACGGCAGGCTTCAAT-ACAGTC-ACGCACCCACCGGGACTCAG
prefix -index - postfix
"""
forwardIdx= [] # the result array to collect handle sequence strings
handlePrefix = anIndexCfg["handles"]["prefix"]
handlePostfix = anIndexCfg["handles"]["postfix"]
for index in anIndexCfg["indexes"]:
forwardIdx.append(handlePrefix + index + handlePostfix)
reverseIdx = [] # to collect reverse complements
for handle in forwardIdx:
seq = Seq(handle)
rc = str(seq.reverse_complement())
reverseIdx.append(rc)
return (forwardIdx,reverseIdx)
def seq_record_example():
"""Dummy SeqRecord to load"""
return SeqRecord(Seq("MKQHKAMIVALIVICITAVVAALVTRKDLCEVHIRTGQTEVAVF",
IUPAC.protein),
id="YP_025292.1", name="HokC",
description="toxic membrane protein, small",
annotations={'hello':'world'})
def test_set_seq_with_str(self, seqprop_with_i, seq_str_example):
"""Test setting the seq attribute with a sequence string"""
seqprop_with_i.seq = seq_str_example
assert type(seqprop_with_i.seq) == Seq
assert str(seqprop_with_i.seq) == seq_str_example
def test_set_seq_with_seqrecord(self, seqprop_with_i, seq_record_example):
"""Test setting the seq attribute with a SeqRecord"""
seqprop_with_i.seq = seq_record_example
assert type(seqprop_with_i.seq) == Seq
assert seqprop_with_i.seq == seq_record_example.seq
assert seqprop_with_i.name == seq_record_example.name
assert seqprop_with_i.description == seq_record_example.description
assert seqprop_with_i.annotations == seq_record_example.annotations
def test_write_fasta_file(self, seqprop_with_i, tmpdir, test_files_outputs, seq_record_example):
"""Test that everything behaves properly when writing the SeqProp to a FASTA file"""
# Add dummy annotations to the SeqProp - check to see if they stay in the SeqProp even after Seq is written
seqprop_with_i.letter_annotations.update({'test_la_key': 'X' * len(seqprop_with_i.seq)})
seqprop_with_i.features.append(SeqFeature(FeatureLocation(1, 3)))
# Write the Seq to a FASTA file
outpath = tmpdir.join('test_seqprop_with_i_write_fasta_file.fasta').strpath
seqprop_with_i.write_fasta_file(outfile=outpath, force_rerun=True)
# Test that the file was written
assert op.exists(outpath)
assert op.getsize(outpath) > 0
# Test that file paths are correct
assert seqprop_with_i.sequence_path == outpath
assert seqprop_with_i.sequence_file == 'test_seqprop_with_i_write_fasta_file.fasta'
assert seqprop_with_i.sequence_dir == tmpdir
# Once a file is written, the annotations should not be lost, even though the sequence now
# loads from the written file as a Seq
assert seqprop_with_i.description == seq_record_example.description
assert seqprop_with_i.annotations == seq_record_example.annotations
assert seqprop_with_i.letter_annotations == {'test_la_key': 'X' * len(seq_record_example.seq)}
assert len(seqprop_with_i.features) == 1
# Test that sequence cannot be changed
with pytest.raises(ValueError):
seqprop_with_i.seq = 'THISWILLNOTBETHESEQ'
assert seqprop_with_i.seq == seq_record_example.seq
def cast_to_seq_record(obj, alphabet=IUPAC.extended_protein, id="<unknown id>", name="<unknown name>",
description="<unknown description>", dbxrefs=None,
features=None, annotations=None,
letter_annotations=None):
"""Return a SeqRecord representation of a string or Seq object.
Args:
obj (str, Seq, SeqRecord): Sequence string or Biopython Seq object
alphabet: See Biopython SeqRecord docs
id: See Biopython SeqRecord docs
name: See Biopython SeqRecord docs
description: See Biopython SeqRecord docs
dbxrefs: See Biopython SeqRecord docs
features: See Biopython SeqRecord docs
annotations: See Biopython SeqRecord docs
letter_annotations: See Biopython SeqRecord docs
Returns:
SeqRecord: SeqRecord representation of the sequence
"""
if isinstance(obj, SeqRecord):
return obj
if isinstance(obj, Seq):
return SeqRecord(obj, id, name, description, dbxrefs, features, annotations, letter_annotations)
if isinstance(obj, str):
obj = obj.upper()
return SeqRecord(Seq(obj, alphabet), id, name, description, dbxrefs, features, annotations, letter_annotations)
else:
raise ValueError('Must provide a string, Seq, or SeqRecord object.')
def write_fasta_file(self, outfile, force_rerun=False):
"""Write a FASTA file for the protein sequence, ``seq`` will now load directly from this file.
Args:
outfile (str): Path to new FASTA file to be written to
force_rerun (bool): If an existing file should be overwritten
"""
if ssbio.utils.force_rerun(flag=force_rerun, outfile=outfile):
SeqIO.write(self, outfile, "fasta")
# The Seq as it will now be dynamically loaded from the file
self.sequence_path = outfile
def createRCs(inputFile, outNameVal):
"""Creates a .bed file with the reverse complements of the given set of
sequences."""
# Determine the stem of the input filename.
fileName = str(inputFile).split('.')[0]
# Open input file for reading.
with open(inputFile, 'r') as f:
file_read = [line.strip() for line in f]
# Create list to hold output.
outList = []
# Parse out probe info, flip sequence to RC, and write to output list.
for i in range(0, len(file_read), 1):
chrom = file_read[i].split('\t')[0]
start = file_read[i].split('\t')[1]
stop = file_read[i].split('\t')[2]
probeSeq = file_read[i].split('\t')[3]
RevSeq = Seq(probeSeq, IUPAC.unambiguous_dna).reverse_complement()
Tm = file_read[i].split('\t')[4]
outList.append('%s\t%s\t%s\t%s\t%s' % (chrom, start, stop, RevSeq, Tm))
# Determine the name of the output file.
if outNameVal is None:
outName = '%s_RC' % fileName
else:
outName = outNameVal
# Create the output file.
output = open('%s.bed' % outName, 'w')
# Write the output file
output.write('\n'.join(outList))
output.close()
def _write_fasta(self, target="allele_seq_ref"):
""" Write fasta file of sequences with SNP IDs for CD-HIT. """
file_name = os.path.join(self.tmp_path, self.project + "_Seqs")
seqs = [SeqRecord(Seq(data[target], IUPAC.unambiguous_dna), id=snp_id, name="", description="")
for snp_id, data in self.data.items()]
file_name += ".fasta"
with open(file_name, "w") as fasta_file:
SeqIO.write(seqs, fasta_file, "fasta")
return file_name
def reverse_complement(seq):
return Seq(seq).reverse_complement()
def consensus_seqrecord(consensus, ref_id):
return SeqRecord(Seq(consensus), id=ref_id + '_cns', description='')
def addSegmentToJunctionFileSE(vSeg,jSeg,cSeg,out,fastaDict, bases, idNameDict):
vSeq = fastaDict[vSeg]
if jSeg != 'NA':
jName = idNameDict[jSeg]
jSeq = fastaDict[jSeg]
else:
jSeq = ''
jName = 'NoJ'
if cSeg != 'NA':
cName = idNameDict[cSeg]
cSeq = fastaDict[cSeg]
else:
cName = 'NoC'
cSeq = ''
jcSeq = jSeq + cSeq
lenSeg = min(len(vSeq),len(jcSeq))
if bases != -10:
if lenSeg < bases:
sys.stdout.write(str(datetime.datetime.now()) + ' Bases parameter is bigger than the length of the V or J segment, taking the length' \
'of the V/J segment instead, which is: ' + str(lenSeg) + '\n')
sys.stdout.flush()
else:
lenSeg = bases
jTrim = jcSeq[:lenSeg]
vTrim = vSeq[-1*lenSeg:]
junc = vTrim + jTrim
recordName = vSeg + '.' + jSeg + '.' + cSeg + '(' + idNameDict[vSeg] + '-' + jName + '-' + cName + ')'
record = SeqRecord(Seq(junc,IUPAC.ambiguous_dna), id = recordName, description = '')
SeqIO.write(record,out,'fasta')
def findCDR3(fasta, aaDict, vdjFaDict):
f = open(fasta, 'rU')
fDict = dict()
for record in SeqIO.parse(f, 'fasta'):
if record.id in fDict:
sys.stderr.write(str(datetime.datetime.now()) + ' Error! same name for two fasta entries %s\n' % record.id)
sys.stderr.flush()
else:
idArr = record.id.split('.')
vSeg = idArr[0]
jSeg = idArr[1]
if ((vSeg in aaDict) & (jSeg in aaDict)):
currDict = findVandJaaMap(aaDict[vSeg],aaDict[jSeg],record.seq)
else:
if vSeg in aaDict:
newVseg = aaDict[vSeg]
else:
vId = idArr[3]
currSeq = vdjFaDict[vId]
newVseg = getBestVaa(Seq(currSeq))
if jSeg in aaDict:
newJseg = aaDict[jSeg]
else:
jId = idArr[4]
currSeq= vdjFaDict[jId]
newJseg = getBestJaa(Seq(currSeq))
currDict = findVandJaaMap(newVseg,newJseg,record.seq)
fDict[record.id] = currDict
f.close()
return fDict
def extract_genes(output_folder, refs_folder, gff_folder, di):
tp = 'gene'
refs_files = sorted([join(refs_folder, f) for f in listdir(refs_folder) if isfile(join(refs_folder, f))])
gff_files = sorted([join(gff_folder, f) for f in listdir(gff_folder) if isfile(join(gff_folder, f))])
limit_info = dict( gff_type =['gene'])
sequences = []
for ind in range(len(refs_files)):
f_fasta = refs_files[ind]
f_gff = gff_files[ind]
nm = f_gff.split("/")[-1][:-4]
record = SeqIO.to_dict(SeqIO.parse(f_fasta, "fasta"))
new_record = {}
p = re.compile("N\w_\w+\.\d")
for it in record:
new_id = p.findall(it)[0]
new_record[new_id] = record[it]
record = new_record
in_handle = open(f_gff)
for rec in GFF.parse(in_handle, limit_info=limit_info, base_dict=record):
for f in rec.features:
if f.qualifiers['ID'][0] in di.get(f_gff, []):
subrecord = SeqRecord.SeqRecord(Seq.Seq(re.sub(r'[^ATGC]', 'A', str(f.extract(rec.seq)))),
id=nm + "_" + rec.id + "_" + str(f.location),
name=nm + "_" + str(f.location)+ "_" + tp,
description= tp)
sequences.append(subrecord)
in_handle.close()
with open(output_folder+'sc_genes_seq.fasta', "w") as output_handle:
SeqIO.write(sequences, output_handle, "fasta")
def partial_genes_extract(contigs_path, outdir):
contigs = SeqIO.to_dict(SeqIO.parse(contigs_path, "fasta"))
partial_genes = []
with open(outdir+'prodigal_output.gff') as f:
for rec in GFF.parse(f, base_dict=contigs):
for feature in rec.features:
extract_seq = str(feature.extract(rec.seq))
if feature.qualifiers['partial'][0] in ['11', '01', '10'] and len(extract_seq) > 110:
subrecord = SeqRecord.SeqRecord(Seq.Seq(extract_seq),
id='gene_'+str(len(partial_genes))+'_len_'+str(len(extract_seq)) +
'_conf_'+str(feature.qualifiers['conf'][0]),
description='patrial gene')
partial_genes.append(subrecord)
return partial_genes
def write_orfs(self, outdir):
"""
Write ORFs from self.orfs and create files ORFs.[fasta, path] with sequences and paths
:param outdir: saves path (with "/" in the end)
"""
with open(outdir + 'ORFs.fasta', 'w') as f, open(outdir + 'ORFs.path', 'w') as path:
counter = 0
for o in self.orfs:
SeqIO.write(SeqRecord(Seq(o), id='ORF_{0}'.format(counter), description=self.orfs[o][1]), f, 'fasta')
path.write('ORF_{0} '.format(counter) + 'max_edge_len: {0}\n'.format(self.max_edge_len(o)) +
str(self.orfs[o][0][0]) + ',' + ''.join([i[0] + i[1] + ',' for i in self.orfs[o][0][1:-1]]) +
str(self.orfs[o][0][-1]) + '\n')
counter += 1
def getVSeq(self):
# _seq_vdj = self.getField('SEQUENCE_VDJ')
_seq_vdj = self.getField('SEQUENCE_IMGT')
# _idx_v = int(self.getField('V_SEQ_LENGTH'))
return _seq_vdj[:312] # 312: V length without cdr3
# Get a field value converted to a Seq object by column name
#
# Arguments: field = column name
# Returns: value in the field as a Seq object