def seq(self):
"""Seq: Get the Seq object from the sequence file, metadata file, or in memory"""
if self.sequence_file:
log.debug('{}: reading sequence from sequence file {}'.format(self.id, self.sequence_path))
tmp_sr = SeqIO.read(self.sequence_path, 'fasta')
return tmp_sr.seq
elif self.metadata_file:
log.debug('{}: reading sequence from metadata file {}'.format(self.id, self.metadata_path))
tmp_sr = SeqIO.read(self.metadata_path, 'uniprot-xml')
return tmp_sr.seq
else:
if not self._seq:
log.debug('{}: no sequence stored in memory'.format(self.id))
else:
log.debug('{}: reading sequence from memory'.format(self.id))
return self._seq
python类read()的实例源码
def features(self):
"""list: Get the features from the feature file, metadata file, or in memory"""
if self.feature_file:
log.debug('{}: reading features from feature file {}'.format(self.id, self.feature_path))
with open(self.feature_path) as handle:
feats = list(GFF.parse(handle))
if len(feats) > 1:
log.warning('Too many sequences in GFF')
else:
return feats[0].features
elif self.metadata_file:
log.debug('{}: reading features from metadata file {}'.format(self.id, self.metadata_path))
tmp_sr = SeqIO.read(self.metadata_path, 'uniprot-xml')
return tmp_sr.features
else:
return self._features
def fasta_files_equal(seq_file1, seq_file2):
"""Check equality of a FASTA file to another FASTA file
Args:
seq_file1: Path to a FASTA file
seq_file2: Path to another FASTA file
Returns:
bool: If the sequences are the same
"""
# Load already set representative sequence
seq1 = SeqIO.read(open(seq_file1), 'fasta')
# Load kegg sequence
seq2 = SeqIO.read(open(seq_file2), 'fasta')
# Test equality
if str(seq1.seq) == str(seq2.seq):
return True
else:
return False
def seq(self):
"""Seq: Dynamically loaded Seq object from the sequence file"""
if self.sequence_file:
file_to_load = copy(self.sequence_path)
log.debug('{}: reading sequence from sequence file {}'.format(self.id, file_to_load))
tmp_sr = SeqIO.read(file_to_load, 'fasta')
return tmp_sr.seq
else:
if not self._seq:
log.debug('{}: no sequence stored in memory'.format(self.id))
else:
log.debug('{}: reading sequence from memory'.format(self.id))
return self._seq
def download_mibig(outputdir, version='1.3'):
""" Download and extract MIBiG files into outputdir
"""
assert version in ['1.0', '1.1', '1.2', '1.3'], \
"Invalid version of MIBiG"
server = 'http://mibig.secondarymetabolites.org'
filename = "mibig_gbk_%s.tar.gz" % version
url = urlparse.urljoin(server, filename)
with tempfile.NamedTemporaryFile(delete=True) as f:
u = urllib2.urlopen(url)
f.write(u.read())
f.file.flush()
tar = tarfile.open(f.name)
tar.extractall(path=outputdir)
tar.close()
# MIBiG was packed with strange files ._*gbk. Let's remove it
for f in [f for f in os.listdir(outputdir) if f[:2] == '._']:
os.remove(os.path.join(outputdir, f))
#def gbk2tablegen(gb_file, strain_id=None):
#def cds_from_gbk(gb_file, strain_id=None):
def get_hits(filename, criteria='cum_BLAST_score'):
"""
Reproduces original Tiago's code: table_1_extender.py
In the future allow different criteria. Right now it takes
from the very first block, which has the highest Cumulative
BLAST.
"""
with open(filename) as f:
df = antiSMASH_to_dataFrame(f.read())
df.dropna(subset=['query_gene'], inplace=True)
df.sort_values(by=criteria, ascending=False, na_position='last',
inplace=True)
return df.groupby('query_gene', as_index=False).first()
def show_weights(sample_ids):
# sample_weights = [SeqIO.read('tests/bam/test_cns/' + id + '.bam.cns.fa', 'fasta') for id in sample_ids]
# print(sample_weights)
alignments = []
for sample_id in sample_ids:
alignments.append(kindel.parse_alignment('tests/bam/' + sample_id + '.bam'))
for id, alignment in zip(sample_ids, alignments):
print(id)
for i, w in enumerate(alignment.weights):
coverage = sum({nt:w[nt] for nt in list('ACGT')}.values())
consensus = kindel.consensus(w)
print(i+1,
coverage,
consensus[0],
consensus[1],
consensus[2],
str(alignment.clip_starts[i]) + 'clip' + str(alignment.clip_ends[i]),
'TIE' if consensus[3] else '',
'DIVERGENT' if consensus[2] and consensus[2] <= 0.75 else '',
w)
BiopythonTranslator.py 文件源码
项目:DnaFeaturesViewer
作者: Edinburgh-Genome-Foundry
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def translate_record(self, record, grecord_class=None):
"""Create a new GraphicRecord from a BioPython Record object.
Parameters
----------
record
A BioPython Record object or the path to a Genbank file.
grecord_class
The graphic record class to use, e.g. GraphicRecord (default) or
CircularGraphicRecord.
"""
if isinstance(record, str):
record = SeqIO.read(record, "genbank")
if grecord_class is None:
grecord_class = GraphicRecord
return grecord_class(sequence_length=len(record.seq), features=[
self.translate_feature(feature)
for feature in self.compute_filtered_features(record.features)
if feature.location is not None
], **self.graphic_record_parameters)
def sbol_to_list(self):
"""
Take an sbol file and return a linear list of components
"""
# Read in SBOL file
# Look for component def with components
# Build a list of SBOL componetns from this
# If multiple do something fancy?
elements = []
self.document.read(self.file_data)
for c in self.document.list_components():
if len(c.components) > 0:
comp_elems = []
for cl in self.document.get_components(c.identity):
role_uri = cl.definition.roles[0]
comp_elems.append({'name': cl.display_id,
'role': self.INVERT_ROLES[role_uri].replace(' ', '-')})
elements.append(comp_elems)
return elements
def metadata_path(self, m_path):
"""Provide pointers to the paths of the metadata file
Args:
m_path: Path to metadata file
"""
if not m_path:
self.metadata_dir = None
self.metadata_file = None
else:
if not op.exists(m_path):
raise OSError('{}: file does not exist!'.format(m_path))
if not op.dirname(m_path):
self.metadata_dir = '.'
else:
self.metadata_dir = op.dirname(m_path)
self.metadata_file = op.basename(m_path)
# Parse the metadata file using Biopython's built in SeqRecord parser
# Just updating IDs and stuff
tmp_sr = SeqIO.read(self.metadata_path, 'uniprot-xml')
parsed = parse_uniprot_xml_metadata(tmp_sr)
self.update(parsed, overwrite=True)
def seq_record_loaded_from_file_example(fasta_path):
"""Original SeqRecord loaded from sequence file"""
return SeqIO.read(fasta_path, "fasta")
def seq_record_loaded_from_file_example(fasta_path):
"""Original SeqRecord loaded from sequence file"""
return SeqIO.read(fasta_path, "fasta")
def seq_record_loaded_from_file_example(sequence_path):
"""Original SeqRecord loaded from sequence file"""
return SeqIO.read(sequence_path, "fasta")
def sequence_path(self, fasta_path):
"""Provide pointers to the paths of the FASTA file
Args:
fasta_path: Path to FASTA file
"""
if not fasta_path:
self.sequence_dir = None
self.sequence_file = None
else:
if not op.exists(fasta_path):
raise OSError('{}: file does not exist'.format(fasta_path))
if not op.dirname(fasta_path):
self.sequence_dir = '.'
else:
self.sequence_dir = op.dirname(fasta_path)
self.sequence_file = op.basename(fasta_path)
tmp_sr = SeqIO.read(fasta_path, 'fasta')
if self.name == '<unknown name>':
self.name = tmp_sr.name
if self.description == '<unknown description>':
self.description = tmp_sr.description
if not self.dbxrefs:
self.dbxrefs = tmp_sr.dbxrefs
if not self.features:
self.features = tmp_sr.features
if not self.annotations:
self.annotations = tmp_sr.annotations
if not self.letter_annotations:
self.letter_annotations = tmp_sr.letter_annotations
def load(self, filename):
self.data = {}
with open(filename, 'r') as f:
parsed = parse_antiSMASH(f.read())
for v in parsed:
self.data[v] = parsed[v]
def efetch_hit(term, seq_start, seq_stop):
""" Fetch the relevant part of a hit
"""
db = "nucleotide"
maxtry = 3
ntry = -1
downloaded = False
while ~downloaded and (ntry <= maxtry):
ntry += 1
try:
handle = Entrez.esearch(db=db, term=term)
record = Entrez.read(handle)
assert len(record['IdList']) == 1, \
"Sorry, I'm not ready to handle more than one record"
handle = Entrez.efetch(db=db, rettype="gb", retmode="text",
id=record['IdList'][0],
seq_start=seq_start, seq_stop=seq_stop)
content = handle.read()
downloaded = True
except:
nap = ntry*3
print "Fail to download (term). I'll take a nap of %s seconds ", \
" and try again."
time.sleep(ntry*3)
return content
def cds_from_gbk(gb_file):
gb_record = SeqIO.read(open(gb_file,"rU"), "genbank")
#if strain_id is not None:
# gb_record.id = strain_id
output = pd.DataFrame()
sign = lambda x: '+' if x > 0 else '-'
for feature in gb_record.features:
if feature.type == "CDS":
tmp = {}
tmp = {'BGC': gb_record.id,
'locus_tag': feature.qualifiers['locus_tag'][0],
'start': feature.location.start.position,
'stop': feature.location.end.position,
'strand': sign(feature.location.strand) }
if 'note' in feature.qualifiers:
for note in feature.qualifiers['note']:
product = re.search( r"""smCOG: \s (?P<product>.*?) \s+ \(Score: \s* (?P<score>.*); \s* E-value: \s (?P<e_value>.*?)\);""", note, re.VERBOSE)
if product is not None:
product = product.groupdict()
product['score'] = float(product['score'])
product['e_value'] = float(product['e_value'])
for p in product:
tmp[p] = product[p]
output = output.append(pd.Series(tmp), ignore_index=True)
return output
test_basics.py 文件源码
项目:DnaFeaturesViewer
作者: Edinburgh-Genome-Foundry
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def test_plot_with_gc_content(tmpdir):
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 4), sharex=True)
# Parse the genbank file, plot annotations
record = SeqIO.read(example_genbank, "genbank")
graphic_record = BiopythonTranslator().translate_record(record)
ax, levels = graphic_record.plot()
graphic_record.plot(ax=ax1, with_ruler=False)
# Plot the local GC content
def plot_local_gc_content(record, window_size, ax):
def gc_content(seq):
return 100.0*len([c for c in seq if c in "GC"]) / len(seq)
yy = [gc_content(record.seq[i:i+window_size])
for i in range(len(record.seq)-window_size)]
xx = np.arange(len(record.seq)-window_size)+25
ax.fill_between(xx, yy, alpha=0.3)
ax.set_ylabel("GC(%)")
plot_local_gc_content(record, window_size=50, ax=ax2)
# Resize the figure to the right height
target_file = os.path.join(str(tmpdir), "with_plot.png")
fig.tight_layout()
fig.savefig(target_file)
def make_sbol_xml(self):
"""
Take an SBOL definition and turn into RDF/XML
"""
xml_file = BytesIO()
self.document.write(xml_file)
xml_file.seek(0)
return xml_file.read()
def get_sbol_from_xml(self, source_data):
"""
Read in SBOL XML from source data and set document
"""
filename = '/tmp/' + str(uuid.uuid4()) + '.xml'
with open(filename, 'w+') as sf:
sf.write(self.file_data.read())
self.document.read(filename)
os.remove(filename)
def parse_gb(self):
"""
Take a genbank file and parse to items/SBOL
"""
items = []
elements = OrderedDict()
sbol = None
try:
record = SeqIO.read(self.file_data, 'genbank')
features = record.features # sorted(record.features, key=attrgetter('location.start'))
for feat in features:
# The file sometimes has lowercase and sometimes uppercase
# types so normalise to lowercase.
if feat.type.lower() in self.GENBANK_FEATURE_TYPES:
name = ''
# Look for the label key. Other keys can be set but
# most software simply sets the label key and nothing
# else.
for key, value in feat.qualifiers.items():
if key == 'label':
name = value[0]
if name:
feature_type = feat.type.lower()
if feature_type == 'rbs':
feature_type = 'ribosome entry site'
elif feature_type == 'primer_bind':
feature_type = 'primer binding site'
seq = str(feat.extract(record.seq))
elements[name] = self.genbank_to_sbol_component(name, seq, feature_type)
item = self.get_inventory_item(name)
if item:
items.append(item)
except Exception as e:
print(e)
pass
else:
if len(elements.values()) > 0:
self.make_sbol_construct(list(elements.values()))
sbol = self.make_sbol_xml()
return items, sbol
def need_to_update_release():
ftp = FTP('ftp.ncbi.nlm.nih.gov')
ftp.login()
ftp.cwd('genbank')
"""
Check whether the user currently has the latest GB release (in which case
we only need to download the daily updates) or whether they have an old
release or no release, in which case we need to download everything)
"""
try:
current_release_number = int(open('GB_Release_Number').read())
logging.info(
'You currently have GB release number {}'
.format(current_release_number)
)
except IOError:
logging.info('No current release, downloading the files')
return True
latest_release_file = StringIO()
ftp.retrlines('RETR GB_Release_Number', latest_release_file.write)
latest_release_number = int(latest_release_file.getvalue())
logging.info('Latest release number is {}'.format(latest_release_number))
if current_release_number == latest_release_number:
logging.info('You have the latest release, getting daily updates')
return False
else:
logging.info('New release available, downloading the files')
return True
def delimited(file, delimiter='\n', bufsize=4096):
buf = ''
while True:
newbuf = file.read(bufsize)
if not newbuf:
yield buf
return
buf += newbuf
lines = buf.split(delimiter)
for line in lines[:-1]:
yield line
buf = lines[-1]
def process_record(record):
if 'gaattc' in record.lower(): # quick check, might be false positive
real_record = SeqIO.read(StringIO(record), format='gb')
if 'gaattc' in str(real_record.seq).lower():
output_file.write(record)
def read_fasta_file(fasta):
# read it in
# assume there is only one
try:
fasta_record = SeqIO.read(open(fasta), "fasta")
except ValueError:
log.error("Fasta file %s has more than one sequence -- Exiting",
fasta)
raise
return fasta_record
def optparser():
print("Parse command line options")
# Usage and version strings
program_name = "pyssw"
program_version = 0.1
version_string = "{}\t{}".format(program_name, program_version)
usage_string = "{}.py -s subject.fasta -q fastq (or fasta) [Facultative options]".format(program_name)
optparser = optparse.OptionParser(usage = usage_string, version = version_string)
# Define optparser options
hstr = "Path of the fasta file containing the subject genome sequence. Can be gziped. [REQUIRED] "
optparser.add_option( '-s', '--subject', dest="subject", help=hstr)
hstr = "Path of the fastq or fasta file containing the short read to be aligned. Can be gziped. [REQUIRED]"
optparser.add_option( '-q', '--query', dest="query", help=hstr)
hstr = "Type of the query file = fastq or fasta. [default: fastq]"
optparser.add_option( '-t', '--qtype', dest="qtype", default="fastq", help=hstr)
hstr = "Positive integer for weight match in genome sequence alignment. [default: 2]"
optparser.add_option( '-m', '--match', dest="match",default=2, help=hstr)
hstr = "Positive integer. The negative value will be used as weight mismatch in genome sequence alignment. [default: 2]"
optparser.add_option( '-x', '--mismatch', dest="mismatch", default=2, help=hstr)
hstr = "Positive integer. The negative value will be used as weight for the gap opening. [default: 3]"
optparser.add_option( '-o', '--gap_open', dest="gap_open", default=3, help=hstr)
hstr = "Positive integer. The negative value will be used as weight for the gap opening. [default: 1]"
optparser.add_option( '-e', '--gap_extend', dest="gap_extend", default=1, help=hstr)
hstr = "Integer. Consider alignments having a score <= as not aligned. [default: 0]"
optparser.add_option( '-f', '--min_score', dest="min_score", default=0, help=hstr)
hstr = "Integer. Consider alignments having a length <= as not aligned. [default: 0]"
optparser.add_option( '-l', '--min_len', dest="min_len", default=0, help=hstr)
hstr = "Flag. Align query in forward and reverse orientation and choose the best alignment. [Set by default]"
optparser.add_option( '-r', '--reverse', dest="reverse", action="store_true", default=True, help=hstr)
hstr = "Flag. Write unaligned reads in sam output [Unset by default]"
optparser.add_option( '-u', '--unaligned', dest="unaligned", action="store_true", default=False, help=hstr)
# Parse arg and return a dictionnary_like object of options
opt, args = optparser.parse_args()
if not opt.subject:
print ("\nERROR: a subject fasta file has to be provided (-s option)\n")
optparser.print_help()
sys.exit()
if not opt.query:
print ("\nERROR: a query fasta or fastq file has to be provided (-q option)\n")
optparser.print_help()
sys.exit()
return opt
#~~~~~~~TOP LEVEL INSTRUCTIONS~~~~~~~#
def load_reference(self, path, fmts, metadata, include=2, genes=False):
"""Assume it's genbank."""
try:
self.reference = SeqIO.read(path, 'genbank')
except Exception as e:
self.log.fatal("Problem reading reference {}. Error: {}".format(path, e))
## some checks
try:
assert("strain" in metadata)
if include > 0:
assert("date" in metadata)
except AssertionError as e:
self.log.fatal("Poorly defined reference. Error:".format(e))
if genes:
# we used to make these FeatureLocation objects here, but that won't go to JSON
# so just do it in the Process part instead. For reference:
# FeatureLocation(start=f.location.start, end=f.location.end, strand=1)
self.reference.genes = {
sequence_set.get_gene_name(f.qualifiers['gene'][0], genes): {"start": int(f.location.start), "end": int(f.location.end), "strand": 1}
for f in self.reference.features
if 'gene' in f.qualifiers and f.qualifiers['gene'][0] in genes
}
else:
self.reference.genes = {}
# use the supplied metadata dict to define attributes
seq_attr_keys = self.seqs.values()[0].attributes.keys()
self.reference.attributes = {k:fix_names(v) for k,v in metadata.items() if k in seq_attr_keys}
self.reference.name = self.reference.attributes["strain"]
self.reference.id = self.reference.attributes["strain"]
# is there any possibility that the reference will be added to the sequences?
self.reference.include = include; # flag {0,1,2}
if self.reference.name in self.seqs:
self.log.notify("Segment {} reference already in dataset".format(self.segmentName))
if include == 0:
self.log.notify("Removing reference from pool of sequences to analyse")
del self.seqs[self.reference.name]
elif include > 0:
## add to sequences (tidy up attributes first)
self._parse_date_per_seq(self.reference, fmts)
self.seqs[self.reference.name] = self.reference
missing_attrs = set(seq_attr_keys) - set(self.reference.attributes.keys()) - set(["date", "num_date"])
if len(missing_attrs) > 0:
self.log.notify("Including reference in segment {} but the following attributes are missing: {}".format(self.segmentName, " & ".join(missing_attrs)))