def readBAM(BAMFile, contigs):
"""
Parses an aligned BAM file for coverage.
Args:
BAMFile: The BAM file to parse.
contigs: List of sidr.common.Contigs taken from input FASTA.
Returns:
contigs: Input contigs updated with coverage, measured as an
average over the whole contig.
"""
alignment = pysam.AlignmentFile(BAMFile, "rb")
click.echo("Reading BAM file")
with click.progressbar(contigs) as ci:
for contig in ci:
covArray = [] # coverage over contig = sum(coverage per base)/number of bases
for pile in alignment.pileup(region=str(contig)):
covArray.append(pile.nsegments)
try:
contigs[contig].variables["Coverage"] = (sum(covArray) / len(covArray))
except ZeroDivisionError: # should only occur if 0 coverage recorded
contigs[contig].variables["Coverage"] = 0
return contigs
python类AlignmentFile()的实例源码
def __init__(self, filename, doubled):
self.filename = filename
self.doubled = doubled
#determine if the file is bam or sam
self.filetype = os.path.splitext(self.filename)[1]
#throw an error if the file is not bam or sam
if self.filetype not in ['.bam']:
raise Exception("""You have provided a file with an extension other than
'.bam', please check your command-line arguments""")
#now make sure there is an index file for the bam file
if not os.path.exists("{}.bai".format(self.filename)):
raise Exception("""Your .bam file is there, but it isn't indexed and
there isn't a .bai file to go with it. Use
'samtools index <yourfile>.bam' to fix it.""")
#now open the file and just call it a sambam file
filetype_dict = {'.sam': '', '.bam': 'b'}
self.sambam = pysam.AlignmentFile(self.filename, "r{}".format(filetype_dict[self.filetype]))
self.seqlength = self.sambam.lengths[0]
self.true_seqlength = self.seqlength if not self.doubled else int(self.seqlength/2)
self.raw_depthmap = self.get_depthmap()
self.features = self.parse()
self.features.sort_values(by=['POS','MAPLEN'], ascending=[True, False] ,inplace=True)
self.features.reset_index(inplace=True)
self.features.drop('index', 1, inplace=True)
def parse_fusion_bam(bam_f):
fusions = {}
bam = pysam.AlignmentFile(bam_f, 'rb')
for read in bam:
if read.is_secondary: # not the primary alignment
continue
if not read.has_tag('XF'): # not fusion junctions
continue
chr1, chr2 = read.get_tag('XF').split()[1].split('-')
if chr1 != chr2: # not on the same chromosome
continue
strand = '+' if not read.is_reverse else '-'
if read.query_name not in fusions: # first fragment
fusions[read.query_name] = [chr1, strand, read.reference_start,
read.reference_end]
else: # second fragment
if chr1 == fusions[read.query_name][0] \
and strand == fusions[read.query_name][1]:
yield [chr1, strand, read.reference_start, read.reference_end]
yield fusions[read.query_name]
bam.close()
def test_init_2(tmpdir):
"it should open sam file if provided"
make_bam(tmpdir.strpath, """
123456789_123456789_
r1 + ...........
r1 - ......*....
r2 + .........*.
r2 - .....*.......
""")
o = Namespace(query="test.vcf", cfdna=tmpdir.join("test.bam").strpath, gdna=None, output=None)
init(o)
assert isinstance(o.cfdna, AlignmentFile)
assert o.gdna == None
def test_get_reads_1(tmpdir):
"it should get all but only the reads that covers the given position"
make_bam(tmpdir.strpath, """
123456789_123456789_12
r1 + ...........
r1 - ......*....
r2 + .........*.
r2 - .....*.......
r3 + ...........
r3 - ....*......
r4 + ...........
r4 - ...........
123456789_123456789_12
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
assert sum( 1 for _ in get_reads(o, sam, 'ref', '4') ) == 2
assert sum( 1 for _ in get_reads(o, sam, 'ref', '12') ) == 7
assert sum( 1 for _ in get_reads(o, sam, 'ref', '20') ) == 2
def test_get_reads_2(tmpdir):
"it should read properties correctly"
make_bam(tmpdir.strpath, """
123456789_123
r1 + ...*.......
r1 - .*.........
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
r = next(get_reads(o, sam, 'ref', '4'))
assert r[0] == "r1" # name
assert r[3] == 0 # 0-based pos
assert r[4] == 11 # length
assert r[5] == -1 # mismatch, not caculated
assert r[6] == 2 # mate pos
assert r[7] == 13 # template length
assert r[8] == False # is_reverse
assert r[9] == True # paired and mapped
def test_pad_softclip_3(tmpdir):
"it should pad softclipped bases"
make_bam(tmpdir.strpath, """
123456789_123
r1 + __.*.......
r1 - .*.........
r2 - ...*.......
r2 + .*.......__
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
adjusted_pos = pad_softclip(sam)
assert adjusted_pos["r1"] == (0, 13) # 0-based position
assert adjusted_pos["r2"] == (0, 13)
def init(o):
if o.cfdna == None and o.gdna == None:
raise Exception("At least one of --cfdna and --gdna should be specified")
if o.cfdna != None:
o.cfdna = AlignmentFile(o.cfdna, "rb")
if not o.cfdna.has_index():
raise Exception("Index not found, use `samtools index` to generate")
if o.gdna != None:
o.gdna = AlignmentFile(o.gdna, "rb")
if not o.gdna.has_index():
raise Exception("Index not found, use `samtools index` to generate")
if o.output == None:
basename, extname = splitext(o.query)
o.output = basename + "_MrBam" + extname
def go(args):
bed = read_bed_file(args.bedfile)
infile = pysam.AlignmentFile(args.alignment, "rb")
for s in infile:
#print s.get_aligned_pairs()
#print ">%s\n%s" % (s.query_name, s.query_alignment_sequence)
p1 = find_primer(bed, s.reference_start, '+')
p2 = find_primer(bed, s.reference_end, '-')
primer_start = p1[2]['start']
# start is the 5'
primer_end = p2[2]['start']
query_align_start = find_query_pos(s, primer_start)
query_align_end = find_query_pos(s, primer_end)
print >> sys.stderr, "%s\t%s\t%s\t%s" % (primer_start, primer_end, primer_end - primer_start, s.query_length)
startpos = max(0, query_align_start - 40)
endpos = min(query_align_end+40, s.query_length)
print ">%s\n%s" % (s.query_name, s.query_sequence[startpos:endpos])
#query_align_end + 30])
def stride(self, output_filename, stride=10, shift=0, random=False):
"""Write a subset of reads to BAM output
:param str output_filename: name of output file
:param int stride: optionnal, number of reads to read to output one read
:param int shift: number of reads to ignore at the begining of input file
:param bool random: if True, at each step the read to output is randomly selected
"""
assert output_filename != self.filename, \
"output filename should be different from the input filename"
self.reset()
with pysam.AlignmentFile(output_filename,"wb", template=self.data) as fh:
if random:
shift = np.random.randint(stride)
for i, read in enumerate(self.data):
if (i + shift) % stride == 0:
fh.write(read)
if random:
shift = np.random.randint(stride)
def filter_length(self, output_filename, threshold_min=0,
threshold_max=np.inf):
"""Select and Write reads within a given range
:param str output_filename: name of output file
:param int threshold_min: minimum length of the reads to keep
:param int threshold_max: maximum length of the reads to keep
"""
assert threshold_min < threshold_max
assert output_filename != self.filename, \
"output filename should be different from the input filename"
self.reset()
with pysam.AlignmentFile(output_filename, "wb", template=self.data) as fh:
for read in self.data:
if ((read.query_length > threshold_min) & (read.query_length < threshold_max)):
fh.write(read)
def filter_bool(self, output_filename, mask):
"""Select and Write reads using a mask
:param str output_filename: name of output file
:param list list_bool: True to write read to output, False to ignore it
"""
assert output_filename != self.filename, \
"output filename should be different from the input filename"
assert len(mask) == self._N, \
"list of bool must be the same size as BAM file"
self.reset()
with pysam.AlignmentFile(output_filename, "wb", template=self.data) as fh:
for read, keep in zip(self.data, mask):
if keep:
fh.write(read)
def get_bam_pairs(bams):
"""
Function returns pairs of bam files because sample ID relies
on samples being encoded with a '.'
@bams A List of Bam files.
"""
### TODO Use the read group to merge the reads, far smarter!
bam_list = {}
for bam in bams:
sam = pysam.AlignmentFile(bam,'rb')
sample_id = (sam.header['RG'][0]['SM'])
try:
bam_list[sample_id].append(bam)
except KeyError:
bam_list[sample_id] = [bam]
return bam_list
def merge_reads(bams, merged_output_dir):
"""
Merge bam files, can take odd numbers of reads.
"""
try:
os.mkdir(merged_output_dir)
except OSError:
pass
bam_list = get_bam_pairs(bams)
for sample_id, bams in bam_list.items():
header = get_header(bams[0])
bam_out = os.path.join(merged_output_dir, os.path.basename(sample_id)) + '.bam'
out = pysam.AlignmentFile(bam_out, 'wb', header=header)
for bam in bams:
sam = pysam.AlignmentFile(bam, 'rb')
for read in sam:
out.write(read)
out.close()
pysam.sort(bam_out, 'tmp')
os.rename('tmp.bam',bam_out)
pysam.index(bam_out)
def getUnDictRatio(bamF, start, end, tcr, unDict):
unMappedCount = 0
usedArr = []
mappedFile = pysam.AlignmentFile(bamF,"rb")
readsIter = mappedFile.fetch(tcr, start, end)
for read in readsIter:
if read.is_read1 :
newName = read.query_name + '_1'
else:
newName = read.query_name + '_2'
if newName not in usedArr:
usedArr.append(newName)
if newName in unDict:
unMappedCount += 1
mappedFile.close()
return (float(float(unMappedCount)/len(unDict)), unMappedCount)
def bam_to_alignments(truth_bam, ref_name, start=None, end=None):
"""Create list of TruthAlignment objects from a bam of Truth aligned to ref.
:param truth_bam: (sorted indexed) bam with true sequence aligned to reference
:param ref: name of reference to process
:param start: starting position within reference
:param end: ending position within reference
(all alignments with any overlap with the interval start:end will be retrieved)
:returns: tuple(positions, encoded_label_array)
- positions: numpy structured array with 'ref_major'
(reference position index) and 'ref_minor'
(trailing insertion index) fields.
- feature_array: 1D numpy array of encoded labels
"""
with pysam.AlignmentFile(truth_bam, 'rb') as bamfile:
aln_reads = bamfile.fetch(reference=ref_name, start=start, end=end)
alignments = [TruthAlignment(r) for r in aln_reads if not (r.is_unmapped or r.is_secondary)]
alignments.sort(key=attrgetter('start'))
return alignments
def get_ref_len_from_bam(bam_path, target_contig):
"""
Fetch the length of a given reference sequence from a :py:class:`pysam.AlignmentFile`.
Parameters
----------
bam_path : str
Path to the BAM alignment
target_contig : str
The name of the contig for which to recover haplotypes.
Returns
-------
end_pos : int
The 1-indexed genomic position at which to stop considering variants.
"""
bam = pysam.AlignmentFile(bam_path)
end = bam.lengths[bam.get_tid(target_contig)]
bam.close()
return end
def write_anomalous_read_to_bam(bam,split_reads,span_reads,anom_reads,out):
print('Writing anom reads to file')
split_reads = np.unique(split_reads['query_name'])
span_reads = np.unique(span_reads['query_name'])
anom_reads = np.unique(anom_reads['query_name'])
# need to filter out any reads that were at any point marked as valid supporting reads
anom_reads = np.array([x for x in anom_reads if x not in split_reads])
anom_reads = np.array([x for x in anom_reads if x not in span_reads])
bamf = pysam.AlignmentFile(bam, "rb")
index = pysam.IndexedReads(bamf)
index.build()
anom_bam = pysam.AlignmentFile("%s_anom_reads.bam" % out, "wb", template=bamf)
for read_name in anom_reads:
for read in index.find(read_name):
anom_bam.write(read)
anom_bam.close()
def isPaired(bamfile, alignments=1000):
'''check if a *bamfile* contains paired end data
The method reads at most the first *alignments* and returns
True if any of the alignments are paired.
'''
samfile = pysam.AlignmentFile(bamfile)
n = 0
for read in samfile:
if read.is_paired:
break
n += 1
if n == alignments:
break
samfile.close()
return n != alignments
def estimateInsertSizeDistribution(bamfile,
alignments=50000):
'''estimate insert size from first alignments in bam file.
returns mean and stddev of insert sizes.
'''
assert isPaired(bamfile), \
'can only estimate insert size from' \
'paired bam files'
samfile = pysam.AlignmentFile(bamfile)
# only get positive to avoid double counting
inserts = numpy.array(
[read.tlen for read in samfile.head(alignments)
if read.is_proper_pair and read.tlen > 0])
insert_mean, insert_std = numpy.mean(inserts), numpy.std(inserts)
print('Insert mean of %f, with standard deviation of %f inferred' %
(insert_mean, insert_std))
if insert_mean > 10000 or insert_std > 1000 or \
insert_mean < 1 or insert_std < 1:
print('''WARNING: anomalous insert sizes detected. Please
double check or consider setting values manually.''')
return insert_mean, insert_std
def retrieve_loc_reads(sv, bam, max_dep, threshold):
bp_dtype = [('chrom', 'S20'), ('start', int), ('end', int), ('dir', 'S1')]
sv_id, chr1, pos1, dir1, chr2, pos2, dir2, \
sv_class, orig_id, orig_pos1, orig_pos2 = [h[0] for h in dtypes.sv_dtype]
bp1 = np.array((sv[chr1], sv[pos1]-(threshold*2), \
sv[pos1]+(threshold*2), sv[dir1]), dtype=bp_dtype)
bp2 = np.array((sv[chr2], sv[pos2]-(threshold*2), \
sv[pos2]+(threshold*2), sv[dir2]), dtype=bp_dtype)
bamf = pysam.AlignmentFile(bam, "rb")
loc1_reads, err_code1 = count.get_loc_reads(bp1, bamf, max_dep)
loc2_reads, err_code2 = count.get_loc_reads(bp2, bamf, max_dep)
bamf.close()
sv_class = str(sv['classification'])
if err_code1 == 1 or err_code2 == 1:
sv['classification'] = 'HIDEP' if sv_class == '' else sv_class+';HIDEP'
elif err_code1 == 2 or err_code2 == 2:
sv['classification'] = 'READ_FETCH_FAILED' if sv_class == '' \
else sv_class+';READ_FETCH_FAILED'
return sv, loc1_reads, loc2_reads, err_code1, err_code2
def pysam_open(alignment_file, in_format='BAM'):
"""Open SAM/BAM file using pysam.
:param alignment_file: Input file.
:param in_format: Format (SAM or BAM).
:returns: pysam.AlignmentFile
:rtype: pysam.AlignmentFile
"""
if in_format == 'BAM':
mode = "rb"
elif in_format == 'SAM':
mode = "r"
else:
raise Exception("Invalid format: {}".format(in_format))
aln_iter = pysam.AlignmentFile(alignment_file, mode)
return aln_iter
def count_ref_reads(bampath, reference, chrom, start, end):
ref_counts = numpy.zeros(end-start)
non_ref_counts = numpy.zeros(end-start)
bam = pysam.AlignmentFile(bampath)
# stepper = "all" skips dupe, unmapped, secondary, and qcfail reads
start = max(0, start)
for col in bam.pileup(chrom, start, end, truncate=True, stepper="all"):
refnuc = reference.fasta[chrom][col.reference_pos].upper()
nuc_counts = collections.Counter()
for read in col.pileups:
if read.query_position is None:
# nuc_counts["indel"] += 1
pass
else:
nuc_counts[read.alignment.query_sequence[read.query_position]] += 1
ref_counts[col.reference_pos-start] = nuc_counts[refnuc]
non_ref_counts[col.reference_pos-start] = sum(nuc_counts.values()) - nuc_counts[refnuc]
return ref_counts, non_ref_counts
def get_bam_coverages(options, sample, dataset):
window_skip = int(1e6)
window_size = 1000
skip_at_ends = int(1e6)
bam = pysam.AlignmentFile(dataset.bam)
print "getting coverages"
coverages = []
count = 0
for chrom, start, end in get_search_regions(options.reference,
window_skip, window_size, skip_at_ends):
coverages.append(bam.count(chrom, start, end))
if count % 1000 == 0:
print count
count += 1
return coverages
def dump(bamstream, refrseqs=None, upint=50000, logstream=sys.stderr):
"""
Parse read alignments in BAM/SAM format.
- bamstream: open file handle to the BAM/SAM file input
- refrseqs: dictionary of reference sequences, indexed by sequence ID; if
provided, perfect matches to the reference sequence will be discarded
- upint: update interval for progress indicator
- logstream: file handle do which progress indicator will write output
"""
bam = pysam.AlignmentFile(bamstream, 'rb')
for i, record in enumerate(bam, 1):
if i % upint == 0: # pragma: no cover
print('...processed', i, 'records', file=logstream)
if record.is_secondary or record.is_supplementary:
continue
if refrseqs and perfectmatch(record, bam, refrseqs):
continue
rn = readname(record)
yield screed.Record(name=rn, sequence=record.seq, quality=record.qual)
def bam_reader(bam_file, xs_tag):
global merge_data
bam = pysam.AlignmentFile(bam_file, "rb")
filtered_data = []
chromosomes = bam.references
for chrom in chromosomes:
data = bam.fetch(chrom, multiple_iterators=True)
for entry in data:
try:
chrom_bam = bam.get_reference_name(entry.reference_id)
except:
continue
if (entry.is_unmapped == False and chrom_bam == chrom):
if xs_tag and entry.has_tag("XS"):
continue
else:
filtered_data = get_bam_filter(entry, chrom_bam)
chromosome_info(filtered_data)
print_results(merge_data)
merge_data = []
bam.close()
def _prepare_inputs(ma_fn, bam_file, out_dir):
"""
Convert to fastq with counts
"""
fixed_fa = os.path.join(out_dir, "file_reads.fa")
count_name =dict()
with file_transaction(fixed_fa) as out_tx:
with open(out_tx, 'w') as out_handle:
with open(ma_fn) as in_handle:
h = in_handle.next()
for line in in_handle:
cols = line.split("\t")
name_with_counts = "%s_x%s" % (cols[0], sum(map(int, cols[2:])))
count_name[cols[0]] = name_with_counts
print >>out_handle, ">%s\n%s" % (name_with_counts, cols[1])
fixed_bam = os.path.join(out_dir, "align.bam")
bam_handle = pysam.AlignmentFile(bam_file, "rb")
with pysam.AlignmentFile(fixed_bam, "wb", template=bam_handle) as out_handle:
for read in bam_handle.fetch():
read.query_name = count_name[read.query_name]
out_handle.write(read)
return fixed_fa, fixed_bam
def get_read_refs_from_SAM( self ):
read_refs = {}
read_lens = {}
samfile = pysam.AlignmentFile(self.opts.sam, "rb")
for k,aln in enumerate(samfile.fetch()):
if aln.mapq == 254:
read = "/".join(aln.query_name.split("/")[:2])
read = "_".join(read.split("_")[1:])
# ref = aln.reference_name.split("|")[0]
ref = slugify(aln.reference_name)
read_refs[read] = ref
read_lens[read] = len(aln.seq)
# Write the read_refs file to match the readnames file
readnames = np.loadtxt(os.path.join( self.opts.tmp, self.fns["read_names"]), dtype="str")
f_names = open(os.path.join( self.opts.tmp, self.fns["read_refs"]), "wb")
f_lens = open(os.path.join( self.opts.tmp, self.fns["read_lengths"]), "wb")
for readname in readnames:
try:
f_names.write("%s\n" % read_refs[readname])
except KeyError:
f_names.write("unknown\n")
try:
f_lens.write("%s\n" % read_lens[readname])
except KeyError:
f_lens.write("0\n")
f_names.close()
f_lens.close()
def test_pad_softclip_1(tmpdir):
"it should memorize the result"
make_bam(tmpdir.strpath, """
r1 + __.*.......
r1 - .*.......__
""")
o = Namespace(verbos=False, mismatch_limit=-1)
sam = AlignmentFile(tmpdir.join("test.bam").strpath)
a = pad_softclip(sam)
b = pad_softclip(sam)
assert a is b
def BAM2FASTQ(input_file = bam_in):
'''
Extract paired FASTQ records from a BAM.
:return:
'''
import pysam
samfile = pysam.AlignmentFile(bam_in, "rb") # Open file handle
# Iterate through all reads