def test_process_reads_read_obs_paired_end(self):
aln1b = pysam.AlignedSegment()
aln1b.reference_start = 30
aln1b.query_name = 'read1'
aln1b.mapping_quality = 30
aln1b.query_sequence = "AAAAACAAAACAAAAT"
aln1b.query_qualities = [30] * 16
aln1b.cigarstring = '16M'
self.alns.append(aln1b)
var_pos = [15, 20, 25, 35, 40]
res = preprocess.process_reads(self.alns, var_pos, 20, 10)
exp = {'read1':{15:'T', 20:'T', 25:'T', 35:'C', 40:'C'},
'read2':{15:'G', 20:'G', 25:'G'}}
self.assertEqual(res, exp)
python类AlignedSegment()的实例源码
def test_process_reads_read_obs_paired_end_overlap(self):
aln1b = pysam.AlignedSegment()
aln1b.reference_start = 20
aln1b.query_name = 'read1'
aln1b.mapping_quality = 20
aln1b.query_sequence = "AAAAATAAAACAAAAT"
aln1b.query_qualities = [30] * 16
aln1b.cigarstring = '16M'
self.alns.append(aln1b)
var_pos = [15, 20, 25, 35]
res = preprocess.process_reads(self.alns, var_pos, 20, 10)
exp = {'read1':{15:'T', 25:'T', 35:'T'},
'read2':{15:'G', 20:'G', 25:'G'}}
self.assertEqual(res, exp)
def test_process_reads_read_obs_paired_end_overlap_1bad_base_qual(self):
aln1b = pysam.AlignedSegment()
aln1b.reference_start = 20
aln1b.query_name = 'read1'
aln1b.mapping_quality = 20
aln1b.query_sequence = "AAAAATAAAACAAAAC"
qqual = [30] * 16
qqual[0] = 5
aln1b.query_qualities = qqual
aln1b.cigarstring = '16M'
self.alns.append(aln1b)
var_pos = [15, 20, 25, 35]
res = preprocess.process_reads(self.alns, var_pos, 20, 10)
exp = {'read1':{15:'T', 20:'T', 25:'T', 35:'C'},
'read2':{15:'G', 20:'G', 25:'G'}}
self.assertEqual(res, exp)
def cigar_parse(self, tuples):
"""
arguments:
<tuples> a CIGAR string tuple list in pysam format
purpose:
This function uses the pysam cigarstring tuples format and returns
a list of tuples in the internal format, [(20, 'M'), (5, "I")], et
cetera. The zeroth element of each tuple is the number of bases for the
CIGAR string feature. The first element of each tuple is the CIGAR
string feature type.
There are several feature types in SAM/BAM files. See below:
'M' - match
'I' - insertion relative to reference
'D' - deletion relative to reference
'N' - skipped region from the reference
'S' - soft clip, not aligned but still in sam file
'H' - hard clip, not aligned and not in sam file
'P' - padding (silent deletion from padded reference)
'=' - sequence match
'X' - sequence mismatch
'B' - BAM_CBACK (I don't actually know what this is)
"""
# I used the map values from http://pysam.readthedocs.io/en/latest/api.html#pysam.AlignedSegment
psam_to_char = {0: 'M', 1: 'I', 2: 'D', 3: 'N', 4: 'S',
5: 'H', 6: 'P', 7: '=', 8: 'X', 9: 'B'}
return [(value, psam_to_char[feature]) for feature, value in tuples]
def assignReadRefAlt(x, bam):
'''
Given a VCF record and a bam, return the read names that support either the REF or ALT alleles
:param x: VCF record python object
:param bam: BAM file handle
:return: dictionary containing two lists of strings, ref/alt.
'''
iter = input_bam.pileup(x.contig, x.start,
x.stop) # Iterable mpileup, covering a much wider genomic range, from start of left-most read
output_dict = {'ref': None, 'alt': None, 'nomatch': 0, 'coverage': None} # Initialise output object
for pileupcolumn in iter:
ref_reads = []
alt_reads = []
if pileupcolumn.pos == x.start:
for pileupread in pileupcolumn.pileups:
if not pileupread.is_del and not pileupread.is_refskip:
a_read = pileupread.alignment # pysam.AlignedSegment
base_at_pos = a_read.query_sequence[pileupread.query_position]
#print base_at_pos, x.alleles, x.info["AF"]
if base_at_pos == x.ref[0]:
ref_reads.append(a_read.query_name) # Read matches ref at pos
elif base_at_pos == x.alts[0]:
alt_reads.append(a_read.query_name) # Read matches alt at pos
else:
output_dict['nomatch'] += 1 ## Allele matches neither ref/alt
output_dict['ref'] = ref_reads
output_dict['alt'] = alt_reads
output_dict['coverage'] = float(pileupcolumn.n)
return output_dict
def make_read(seq, cigar, mdtag=None, name="dummy", mapq=10, baseq=30):
read = pysam.AlignedSegment()
read.seq = seq
read.cigarstring = cigar
if mdtag:
read.set_tag("MD", mdtag)
read.qname = name
read.mapq = mapq
qualities_string = pysam.qualities_to_qualitystring([baseq] * len(seq))
qualities_bytes = qualities_string.encode("ascii")
read.qual = qualities_bytes
return read
def CreateReadObject(read, newseq, newqual, newcigar, startread, basetag=[]) :
a = pysam.AlignedSegment()
a.query_name = read.query_name
a.query_sequence = newseq
a.query_qualities = pysam.qualitystring_to_array(newqual)
a.cigar = newcigar
a.reference_start = startread
# If (Star) mapper has assigned a value of 255 to mapping quality,
# change it to 50
mapqual = read.mapping_quality
if mapqual == 255 :
a.mapping_quality = 50
else :
a.mapping_quality = mapqual
a.reference_id = read.reference_id
# If read has RG read group tag, keep it
try :
r_RG = read.get_tag('RG')
a.tags = ()
a.set_tag('RG', r_RG)
except :
a.tags = ()
a.next_reference_id = -1
a.next_reference_start = -1
a.template_length = 0
a.flag = UpdateFlag(read.flag)
return a
# Goes through the given cigar list and reports how many times cigarType is equal to 3
# Optional field is finalpos, which is cutoff position for counting the splits (relative to start pos)
# Default value for finalpos is something very large
def __init__(self, line, start=-1, end=-1, strand=1,
file_format='', bamfile=None, info=''):
self.info = ""
self.file_format = file_format
if type(line) == pysam.AlignedRead or type(line) == pysam.AlignedSegment:
self.load_pysamread(line, bamfile)
elif start == -1:
self.load_line(line, file_format)
elif end == -1:
self.load_pos(line, start, start, strand)
else:
self.load_pos(line, start, end, strand)
if len(info) > 0:
self.info = info
def setUp(self):
self.mq = 30
self.bq = 30
aln1 = pysam.AlignedSegment()
aln1.reference_start = 10
aln1.query_name = 'read1'
aln1.mapping_quality = 30
aln1.query_sequence = "AAAAATAAAATAAAAT"
aln1.query_qualities = [30] * 16
aln1.cigarstring = '16M'
aln2 = pysam.AlignedSegment()
aln2.reference_start = 12
aln2.query_name = 'read2'
aln2.mapping_quality = 20
aln2.query_sequence = "AAAGAAGAAAAG"
qqual = [33] * 12
qqual[3] = 20
aln2.query_qualities = qqual
aln2.cigarstring = '5M2D7M'
aln3 = pysam.AlignedSegment()
aln3.mapping_quality = 0
aln3.query_name = 'read3'
self.alns = [aln1, aln2, aln3]
def setUp(self):
self.mq = 30
self.bq = 30
aln1 = pysam.AlignedSegment()
aln1.reference_start = 10
aln1.query_name = 'read1'
aln1.mapping_quality = 30
aln1.query_sequence = "AAAAATAAAATAAAAT"
aln1.query_qualities = [30] * 16
aln1.cigarstring = '16M'
aln2 = pysam.AlignedSegment()
aln2.reference_start = 12
aln2.query_name = 'read2'
aln2.mapping_quality = 20
aln2.query_sequence = "AAAGAAGAAAAG"
qqual = [33] * 12
qqual[3] = 20
aln2.query_qualities = qqual
aln2.cigarstring = '5M2D7M'
aln3 = pysam.AlignedSegment()
aln3.mapping_quality = 0
aln3.query_name = 'read3'
self.alns = [aln1, aln2, aln3]
def construct_subgraph(mbam, read_qname, mread_dict, processed_mreads, chr_list, winsize=50, unstranded=False):
"""DOCSTRING
Args:
Returns:
"""
# record of processed alignments only need kept on within-subgraph level
processed_mread_alignments = set()
counter = 0
# a list of `pysam.AlignedSegment` objects
# note that all taggers are already stored in `pysam.AlignedSegment.opt('RT')`
read_aln_list = [x for x in mread_dict[read_qname]]
processed_mreads.add(read_qname)
read_to_locations = defaultdict(dict) # read_qname -> {node_name1:segment1, node_name2:segment2}
# enumerate all connected components
while True:
counter+=1; print "%i: %i"%(counter, len(read_aln_list))
next_read_aln_list = []
gen = read_aln_list if len(read_aln_list)<200 else tqdm(read_aln_list)
for alignment in gen:
## build a node for this mread alignment
## (if not already processed, i.e. built before)
if alignment in processed_mread_alignments:
continue
genomic_cluster, this_mread_dict, discarded_mread_list = \
build_read_cluster(alignment, chr_list, mbam, unstranded=unstranded, winsize=winsize)
_ = map(processed_mread_alignments.add, discarded_mread_list)
if genomic_cluster is None: # this cluster is invald (only double-mappers)
continue
## update loc2read, read2loc
node_name = ':'.join([str(x) for x in genomic_cluster])
#if node_name in subgraph:
# logger.debug("I revisited '%s' at read '%s'."%(node_name, read_qname))
# break
#subgraph.add(node_name)
for x_qname in this_mread_dict:
read_to_locations[x_qname].update({node_name : this_mread_dict[x_qname]})
## then add new alignments(edges) to generate connected nodes
## in the next iteration
_ = map(processed_mread_alignments.add, this_mread_dict.values())
for read_x_qname in this_mread_dict:
if read_x_qname in processed_mreads:
continue
x_aln_list = [aln for aln in mread_dict[read_x_qname] if not aln in processed_mread_alignments]
next_read_aln_list.extend(x_aln_list)
## .. and record to processed reads since we have generated
## the nodes for them
_ = map(processed_mreads.add, this_mread_dict.keys())
# if no more connected nodes can be found, break loop
if len(next_read_aln_list)==0:
break
read_aln_list = next_read_aln_list
return read_to_locations, processed_mreads
def stats_from_aligned_read(read):
"""Create summary information for an aligned read
:param read: :class:`pysam.AlignedSegment` object
"""
tags = dict(read.tags)
try:
tags.get('NM')
except:
raise IOError("Read is missing required 'NM' tag. Try running 'samtools fillmd -S - ref.fa'.")
name = read.qname
if read.flag == 4:
return None
counts = defaultdict(int)
for (i, j) in read.cigar:
counts[i] += j
match = counts[0]
ins = counts[1]
delt = counts[2]
# NM is edit distance: NM = INS + DEL + SUB
sub = tags['NM'] - ins - delt
length = match + ins + delt
iden = 100*float(match - sub)/match
acc = 100 - 100*float(tags['NM'])/length
read_length = read.infer_read_length()
coverage = 100*float(read.query_alignment_length) / read_length
direction = '-' if read.is_reverse else '+'
results = {
"name":name,
"coverage":coverage,
"direction":direction,
"length":length,
"read_length":read_length,
"ins":ins,
"del":delt,
"sub":sub,
"iden":iden,
"acc":acc
}
return results