def samfile_from_args(args):
return AlignmentFile(args.bam)
python类AlignmentFile()的实例源码
def load_bam(bam_path):
return AlignmentFile(data_path(bam_path))
def __init__(self, filename):
"""
:param str filename: input BAM file
"""
self.filename = filename
self.data = pysam.AlignmentFile(filename, check_sq=False)
self._N = None
self._df = None
self._nb_pass = None
def reset(self):
self.data.close()
self.data = pysam.AlignmentFile(self.filename, check_sq=False)
def random_selection(self, output_filename, nreads=None,
expected_coverage=None, reference_length=None):
"""Select random reads
:param nreads: number of reads to select randomly. Must be less than
number of available reads in the orignal file.
:param expected_coverage:
:param reference_length:
of expected_coverage and reference_length provided, nreads is replaced
automatically.
"""
assert output_filename != self.filename, \
"output filename should be different from the input filename"
self.reset()
if expected_coverage and reference_length:
mu = self.stats['mean']
nreads = int(expected_coverage * reference_length / mu)
assert nreads < len(self), "nreads parameter larger than actual Number of reads"
selector = random.sample(range(len(self)), nreads)
logger.info("Creating a pacbio BAM file with {} reads".format(nreads))
with pysam.AlignmentFile(output_filename,"wb", template=self.data) as fh:
for i, read in enumerate(self.data):
if i in selector:
fh.write(read)
def get_header(bam):
"""
Return the BAM header.
"""
return pysam.AlignmentFile(bam,'rb').header
def findReadsAndSegments(samF, mappedReadsDict, idNameDict,chain):
samFile = pysam.AlignmentFile(samF,'r')
readsIter = samFile.fetch(until_eof = True)
for read in readsIter:
if read.is_unmapped == False:
seg = samFile.getrname(read.reference_id)
if seg in idNameDict:
if idNameDict[seg].find(chain) != -1:
readName = read.query_name
if readName not in mappedReadsDict:
mappedReadsDict[readName] = []
if seg not in mappedReadsDict[readName]:
mappedReadsDict[readName].append(seg)
samFile.close()
return mappedReadsDict
def findCountsInRegion(bamF, start, end, tcr):
readsArr = []
mappedFile = pysam.AlignmentFile(bamF,"rb")
readsIter = mappedFile.fetch(tcr, start, end)
for read in readsIter:
if read.is_read1 :
newName = read.query_name + '_1'
else:
newName = read.query_name + '_2'
if newName not in readsArr:
readsArr.append(newName)
mappedFile.close()
counts = len(readsArr)
return counts
def __init__(self, bam:str, reference:str, columns:int=20000, cache:int=None, minor_gaps:bool=False):
"""Interface to samtools tview functionality.
:param bam: path to bam file of reads aligned to a common
reference.
:param reference: path to fasta file of reference to which reads
are aligned.
:param columns: default number of tview columns to read in one go.
:param cache: Currently not implemented.
:param minor_gaps: encoded gaps in non-reference positions distinctly.
"""
self.logger = logging.getLogger('TView')
self._bam = bam
self._reference = reference
self._columns = columns
self._cache = cache
self._minor_gaps = minor_gaps
if self._minor_gaps:
raise NotImplementedError
self._columns_per_base = 3 # reasonable estimate
self._column_pad = 1.1 # multiplier when _columns_per_base is updated
# crosscheck the inputs
with pysam.AlignmentFile(bam, 'rb') as b:
brefs = b.references
blens = b.lengths
rrefs = set(pysam.FastaFile(reference).references)
sbrefs = set(brefs)
if not sbrefs.issuperset(rrefs):
raise ValueError("input bam and reference do not appear consistent: {} vs {}.".format(sbrefs, rrefs))
self._refs = set(brefs)
self._reflens = dict(zip(brefs, blens))
self._pileup = None # buffered `Pileup` object
def generate_pileup_chunks(bams, ref_fasta, ref_name, start=0, end=None,
chunk_len=50000, overlap=1000):
"""Yield chunks of pileup(s).
:param bams: iterable of input .bam files.
:param ref_fasta: input .fasta file.
:param ref_name: name of reference within .bam to parse.
:param start: start reference coordinate.
:param end: end reference coordinate. If `None` the full extent of
the reference will be parsed.
:param chunk_len: int, chunk length in reference coordinates.
:param overlap: int, overlap of adjacent chunks in reference coordinates.
:yields: (tuple `Pileup` objects, None)
..note:: the None in the yielded values is for compatibility with an
old API and will be removed.
"""
tview = MultiTView(bams, ref_fasta, columns=chunk_len)
if end is None:
align_fhs = (pysam.AlignmentFile(bam) for bam in bams)
end = min([dict(zip(p.references, p.lengths))[ref_name] for p in align_fhs])
logger.info("Chunking {}: {}-{} in chunks of {} overlapping by {}".format(ref_name, start, end, chunk_len, overlap))
#TODO: we could change how this is done since the class could cache
# everything in the entire region of interest.
for chunk_start, chunk_end in segment_limits(start, end, segment_len=chunk_len, overlap_len=overlap):
try:
pileups, ref_seq = tview.pileup(ref_name, chunk_start, chunk_end)
except TViewException:
logger.info("Skipping region {}-{} as TView did not find any reads".format(chunk_start, chunk_end))
else:
yield LabelledPileup(pileups=pileups, labels=None, ref_seq=ref_seq)
def get_chrom_length(self, chrom):
"""
Extract chromosome length from BAM header.
Parameters
----------
chrom : str
The name of the chromosome or scaffold.
Returns
-------
length : int
The length (integer) of the chromsome/scaffold
Raises
------
RuntimeError
If chromosome name not present in bam header
"""
bamfile = pysam.AlignmentFile(self.filepath, "rb")
lengths = dict(zip(bamfile.references, bamfile.lengths))
try:
lens = lengths[chrom]
bamfile.close()
return lens
except:
self.logger.error(
"{} not present in bam header for {}. Exiting.".format(
chrom, self.filepath))
logging.shutdown()
raise RuntimeError(
"Chromosome name not present in bam header. Exiting")
def chromosome_lengths(self):
"""
Returns
-------
tuple
chromosome lengths ordered by sequence order in bam header
"""
bamfile = pysam.AlignmentFile(self.filepath, "rb")
lengths = bamfile.lengths
bamfile.close()
return lengths
def chromosome_names(self):
"""
Returns
-------
tuple
chromosome names ordered by sequence order in bam header
"""
bamfile = pysam.AlignmentFile(self.filepath, "rb")
names = bamfile.references
bamfile.close()
return names
def chrom_counts(self):
"""
Get read counts per chrom from a bamfile
"""
self.logger.info(
"Using index of {} to get read counts per chromosome".format(
self.filepath))
analyze_start = time.time()
samfile = pysam.AlignmentFile(self.filepath, "rb")
idx_out = samfile.get_index_statistics()
samfile.close()
self.logger.info("Index analysis complete. Elapsed time: {} seconds".format(
time.time() - analyze_start))
return idx_out
def reads_to_sam(reads,bam,bp1,bp2,dirout,name):
'''
For testing read assignemnts.
Takes reads from array, matches them to bam
file reads by query name and outputs them to Sam
'''
bamf = pysam.AlignmentFile(bam, "rb")
loc1 = '%s:%d:%d' % (bp1['chrom'], bp1['start'], bp1['end'])
loc2 = '%s:%d:%d' % (bp2['chrom'], bp2['start'], bp2['end'])
iter_loc1 = bamf.fetch(region=loc1,until_eof=True)
iter_loc2 = bamf.fetch(region=loc2,until_eof=True)
loc1 = '%s-%d' % (bp1['chrom'], (bp1['start']+bp1['end'])/2)
loc2 = '%s-%d' % (bp2['chrom'], (bp1['start']+bp1['end'])/2)
sam_name = '%s_%s-%s' % (name,loc1,loc2)
if not os.path.exists(dirout):
os.makedirouts(dirout)
bam_out = pysam.AlignmentFile('%s/%s.sam'%(dirout,sam_name), "w", header=bamf.header)
for x in iter_loc1:
if len(reads)==0:
break
if x.query_name in reads:
bam_out.write(x)
bam_out.write(bamf.mate(x))
idx = int(np.where(reads==x.query_name)[0])
reads = np.delete(reads,idx)
for x in iter_loc2:
if len(reads)==0:
break
if x.query_name in reads:
bam_out.write(x)
bam_out.write(bamf.mate(x))
idx = int(np.where(reads==x.query_name)[0])
reads = np.delete(reads,idx)
bamf.close()
bam_out.close()
def recount_anomalous_reads(bam,outname,anom_reads,max_dp,max_ins):
print('Recounting anomalous reads')
anom_reads = np.unique(anom_reads['query_name'])
sv_proc = np.genfromtxt(outname,delimiter='\t',names=True,dtype=dtypes.sv_out_dtype,invalid_raise=False)
for idx,row in enumerate(sv_proc):
sv_id, chr1_field, pos1_field, dir1_field, \
chr2_field, pos2_field, \
dir2_field, sv_class, \
oid_field, opos1_field, opos2_field = [h[0] for h in dtypes.sv_dtype]
bp1 = np.array((row[chr1_field],row[pos1_field]-max_ins,
row[pos1_field]+max_ins,row[dir1_field]),dtype=dtypes.bp_dtype)
bp2 = np.array((row[chr2_field],row[pos2_field]-max_ins,
row[pos2_field]+max_ins,row[dir2_field]),dtype=dtypes.bp_dtype)
bamf = pysam.AlignmentFile(bam, "rb")
loc1_reads, err_code1 = get_loc_reads(bp1,bamf,max_dp)
loc2_reads, err_code2 = get_loc_reads(bp2,bamf,max_dp)
bamf.close()
if err_code1==0 and err_code2==0:
anom1 = [ x['query_name'] for x in loc1_reads if x['query_name'] in anom_reads]
anom2 = [ x['query_name'] for x in loc2_reads if x['query_name'] in anom_reads]
anom = np.concatenate([anom1,anom2])
anom_count = len(np.unique(anom))
sv_proc[idx]['anomalous'] = anom_count
print('found %d anomalous reads at %s:%d|%s:%d' % (anom_count,row[chr1_field],row[pos1_field],row[chr2_field],row[pos2_field]))
with open(outname,'w') as outf:
header_out = [h[0] for idx,h in enumerate(dtypes.sv_out_dtype)]
writer = csv.writer(outf,delimiter='\t',quoting=csv.QUOTE_NONE)
writer.writerow(header_out)
for row in sv_proc:
writer = csv.writer(outf,delimiter='\t',quoting=csv.QUOTE_NONE)
writer.writerow(row)
def getNumberOfAlignments(bamfile):
'''
return number of alignments in bamfile.
'''
samfile = pysam.AlignmentFile(bamfile)
return samfile.mapped
def __init__(self,fh=None,Ped=None,gen=None):
self.id = None
self.chr_flag=False # True == 'chrN'; False == 'N'
self.sex=None
self.char='b'
self.refs=OrderedDict()
self.fh=fh
self.Snv=None
self.snv_index=None
sm=[]
bam = pysam.AlignmentFile(fh)
if bam.is_cram==True: self.char='c'
header = bam.header
if header.get('RG')==None: sys.stderr.write('WARNING: {} lacks Read Group (@RG) entry in the header. Skipping ...\n'.format(fh))
else:
for entry in header['RG']:
if entry.get('SM')!=None: sm.append(entry['SM'])
sm = list(set(sm))
if len(sm)>1: sys.stderr.write('WARNING: {} contains two sample entries ({}) in the header. Skipping {} ...\n'.format(fh,sm,fh))
else: self.id=sm[0]
if self.id!=None:
if Ped.sex.get(self.id)==None: sys.stderr.write('WARNING: {} is not found in the PED file. Skipping {} ...\n'.format(self.id,fh))
else:
self.sex=Ped.sex[self.id]
if str(bam.references[0]).startswith('chr'): self.chr_flag=True
chroms = accepted_chrom(self.chr_flag,gen)
for i in range(len(bam.references)):
chrom,leng = str(bam.references[i]),bam.lengths[i]
if chrom in chroms: self.refs[chrom]=int(leng)
bam.close()
def bam_init(args=None,Ped=None,Snv=None,gen=None):
bams=[]
for f in args:
errFH(f)
try: pysam.AlignmentFile(f).check_index()
except ValueError:
sys.stderr.write('WARNING: {} is not indexed with samtools index. Skipping ...\n'.format(f))
continue
except AttributeError:
sys.stderr.write('WARNING: {} appears to be in SAM format. Convert to BAM with `samtools view -bh {}` and index with samtools index\n'.format(f,f))
continue
bam = Bam(f,Ped,gen)
if len(Snv)<1:
print 'FATAL ERROR: SNV file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#snv-vcf for details'
sys.stderr.write('FATAL ERROR: SNV file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#snv-vcf for details\n')
sys.exit(1)
for snv in Snv:
if snv.id.get(bam.id)!=None:
bam.Snv,bam.snv_index=snv,snv.id[bam.id]
if bam.id!=None and bam.Snv==None: sys.stderr.write('WARNING: BAM file {} sample name (@RG SM:<sample_id>):{} not found in SNV VCFs. Skipping {} ...\n'.format(f,bam.id,f))
if bam.id==None: sys.stderr.write('WARNING: Skipping BAM file {}. No sample name (@RG SM:<sample_id>). See https://github.com/dantaki/SV2/wiki/input#bam for details')
if bam.id!=None and bam.Snv!=None: bams.append(bam)
if len(bams)<1:
print 'FATAL ERROR: BAM file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#bam for details'
sys.stderr.write('FATAL ERROR: BAM file(s) were not formatted correctly. See https://github.com/dantaki/SV2/wiki/input#bam for details\n')
sys.exit(1)
return bams
def open_asg(in_fn, in_format):
# try to detect kraken and histo formats automatically
if in_format is None:
if '.' in in_fn:
f_ext = in_fn.split('.')[-1]
if f_ext in IN_FMTS:
in_format = f_ext
else:
with open(in_fn, 'rb') as f:
f_start = f.read(2)
if f_start == b'C\t' or f_start == b'U\t':
in_format = 'kraken'
elif f_start == b'#O':
in_format = 'histo'
if in_format == 'sam':
in_f = pysam.AlignmentFile(in_fn, "r")
elif in_format == 'bam':
in_f = pysam.AlignmentFile(in_fn, "rb")
elif in_format == 'cram':
in_f = pysam.AlignmentFile(in_fn, "rc")
elif in_format == 'uncompressed_bam':
in_f = pysam.AlignmentFile(in_fn, "ru")
elif in_format == 'kraken':
in_f = open(in_fn, 'r')
# no need to load assignments if input is a histogram -> go to load_histo
elif in_format == 'histo':
in_f = None
# let pysam assess the format
else:
in_f = pysam.AlignmentFile(in_fn)
return in_f, in_format