python类AlignedSegment()的实例源码

preprocess_test.py 文件源码 项目:mixemt 作者: svohr 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_process_reads_read_obs_paired_end(self):
        aln1b = pysam.AlignedSegment()
        aln1b.reference_start = 30
        aln1b.query_name = 'read1'
        aln1b.mapping_quality = 30
        aln1b.query_sequence = "AAAAACAAAACAAAAT"
        aln1b.query_qualities = [30] * 16
        aln1b.cigarstring = '16M'
        self.alns.append(aln1b)

        var_pos = [15, 20, 25, 35, 40]

        res = preprocess.process_reads(self.alns, var_pos, 20, 10)
        exp = {'read1':{15:'T', 20:'T', 25:'T', 35:'C', 40:'C'},
               'read2':{15:'G', 20:'G', 25:'G'}}
        self.assertEqual(res, exp)
preprocess_test.py 文件源码 项目:mixemt 作者: svohr 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_process_reads_read_obs_paired_end_overlap(self):
        aln1b = pysam.AlignedSegment()
        aln1b.reference_start = 20
        aln1b.query_name = 'read1'
        aln1b.mapping_quality = 20
        aln1b.query_sequence = "AAAAATAAAACAAAAT"
        aln1b.query_qualities = [30] * 16
        aln1b.cigarstring = '16M'
        self.alns.append(aln1b)

        var_pos = [15, 20, 25, 35]

        res = preprocess.process_reads(self.alns, var_pos, 20, 10)
        exp = {'read1':{15:'T', 25:'T', 35:'T'},
               'read2':{15:'G', 20:'G', 25:'G'}}
        self.assertEqual(res, exp)
preprocess_test.py 文件源码 项目:mixemt 作者: svohr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_process_reads_read_obs_paired_end_overlap_1bad_base_qual(self):
        aln1b = pysam.AlignedSegment()
        aln1b.reference_start = 20
        aln1b.query_name = 'read1'
        aln1b.mapping_quality = 20
        aln1b.query_sequence = "AAAAATAAAACAAAAC"
        qqual = [30] * 16
        qqual[0] = 5
        aln1b.query_qualities = qqual
        aln1b.cigarstring = '16M'
        self.alns.append(aln1b)

        var_pos = [15, 20, 25, 35]

        res = preprocess.process_reads(self.alns, var_pos, 20, 10)
        exp = {'read1':{15:'T', 20:'T', 25:'T', 35:'C'},
               'read2':{15:'G', 20:'G', 25:'G'}}
        self.assertEqual(res, exp)
redwood.py 文件源码 项目:pauvre 作者: conchoecia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def cigar_parse(self, tuples):
        """
        arguments:
         <tuples> a CIGAR string tuple list in pysam format

        purpose:
         This function uses the pysam cigarstring tuples format and returns
         a list of tuples in the internal format, [(20, 'M'), (5, "I")], et
         cetera. The zeroth element of each tuple is the number of bases for the
         CIGAR string feature. The first element of each tuple is the CIGAR
         string feature type.

        There are several feature types in SAM/BAM files. See below:
         'M' - match
         'I' - insertion relative to reference
         'D' - deletion relative to reference
         'N' - skipped region from the reference
         'S' - soft clip, not aligned but still in sam file
         'H' - hard clip, not aligned and not in sam file
         'P' - padding (silent deletion from padded reference)
         '=' - sequence match
         'X' - sequence mismatch
         'B' - BAM_CBACK (I don't actually know what this is)

        """
        # I used the map values from http://pysam.readthedocs.io/en/latest/api.html#pysam.AlignedSegment
        psam_to_char = {0: 'M', 1: 'I', 2: 'D', 3: 'N', 4: 'S',
                        5: 'H', 6: 'P', 7: '=', 8: 'X', 9: 'B'}
        return [(value, psam_to_char[feature]) for feature, value in tuples]
assignVariantToSpecies.py 文件源码 项目:pdxBlacklist 作者: MaxSalm 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def assignReadRefAlt(x, bam):
    '''
    Given a VCF record and a bam, return the read names that support either the REF or ALT alleles

    :param x: VCF record python object
    :param bam: BAM file handle
    :return: dictionary containing two lists of strings, ref/alt.
    '''
    iter = input_bam.pileup(x.contig, x.start,
                            x.stop)  # Iterable mpileup, covering a much wider genomic range, from start of left-most read
    output_dict = {'ref': None, 'alt': None, 'nomatch': 0, 'coverage': None}  # Initialise output object
    for pileupcolumn in iter:
        ref_reads = []
        alt_reads = []
        if pileupcolumn.pos == x.start:
            for pileupread in pileupcolumn.pileups:
                if not pileupread.is_del and not pileupread.is_refskip:
                    a_read = pileupread.alignment  # pysam.AlignedSegment
                    base_at_pos = a_read.query_sequence[pileupread.query_position]
                    #print base_at_pos, x.alleles, x.info["AF"]
                    if base_at_pos == x.ref[0]:
                        ref_reads.append(a_read.query_name)  # Read matches ref at pos
                    elif base_at_pos == x.alts[0]:
                        alt_reads.append(a_read.query_name)  # Read matches alt at pos
                    else:
                        output_dict['nomatch'] += 1  ## Allele matches neither ref/alt
            output_dict['ref'] = ref_reads
            output_dict['alt'] = alt_reads
            output_dict['coverage'] = float(pileupcolumn.n)
    return output_dict
mock_read_data.py 文件源码 项目:isovar 作者: hammerlab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def make_read(seq, cigar, mdtag=None, name="dummy", mapq=10, baseq=30):
    read = pysam.AlignedSegment()
    read.seq = seq
    read.cigarstring = cigar
    if mdtag:
        read.set_tag("MD", mdtag)
    read.qname = name
    read.mapq = mapq
    qualities_string = pysam.qualities_to_qualitystring([baseq] * len(seq))
    qualities_bytes = qualities_string.encode("ascii")
    read.qual = qualities_bytes
    return read
Opossum.py 文件源码 项目:Opossum 作者: BSGOxford 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def CreateReadObject(read, newseq, newqual, newcigar, startread, basetag=[]) :

    a = pysam.AlignedSegment()
    a.query_name = read.query_name
    a.query_sequence = newseq
    a.query_qualities = pysam.qualitystring_to_array(newqual)
    a.cigar = newcigar
    a.reference_start = startread

    # If (Star) mapper has assigned a value of 255 to mapping quality,
    # change it to 50
    mapqual = read.mapping_quality 
    if mapqual == 255 :
        a.mapping_quality = 50
    else :
        a.mapping_quality = mapqual

    a.reference_id = read.reference_id

    # If read has RG read group tag, keep it
    try :
        r_RG = read.get_tag('RG')
        a.tags = ()
        a.set_tag('RG', r_RG)
    except :
        a.tags = ()

    a.next_reference_id = -1
    a.next_reference_start = -1
    a.template_length = 0
    a.flag = UpdateFlag(read.flag)

    return a


# Goes through the given cigar list and reports how many times cigarType is equal to 3
# Optional field is finalpos, which is cutoff position for counting the splits (relative to start pos)
# Default value for finalpos is something very large
hg19util.py 文件源码 项目:AmpliconArchitect 作者: virajbdeshpande 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, line, start=-1, end=-1, strand=1,
        file_format='', bamfile=None, info=''):
        self.info = ""
        self.file_format = file_format
        if type(line) == pysam.AlignedRead or type(line) == pysam.AlignedSegment:
            self.load_pysamread(line, bamfile)
        elif start == -1:
            self.load_line(line, file_format)
        elif end == -1:
            self.load_pos(line, start, start, strand)
        else:
            self.load_pos(line, start, end, strand)
        if len(info) > 0:
            self.info = info
observe_test.py 文件源码 项目:mixemt 作者: svohr 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        self.mq = 30
        self.bq = 30
        aln1 = pysam.AlignedSegment()
        aln1.reference_start = 10
        aln1.query_name = 'read1'
        aln1.mapping_quality = 30
        aln1.query_sequence = "AAAAATAAAATAAAAT"
        aln1.query_qualities = [30] * 16
        aln1.cigarstring = '16M'

        aln2 = pysam.AlignedSegment()
        aln2.reference_start = 12
        aln2.query_name = 'read2'
        aln2.mapping_quality = 20
        aln2.query_sequence = "AAAGAAGAAAAG"
        qqual = [33] * 12
        qqual[3] = 20
        aln2.query_qualities = qqual
        aln2.cigarstring = '5M2D7M'

        aln3 = pysam.AlignedSegment()
        aln3.mapping_quality = 0
        aln3.query_name = 'read3'

        self.alns = [aln1, aln2, aln3]
preprocess_test.py 文件源码 项目:mixemt 作者: svohr 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def setUp(self):
        self.mq = 30
        self.bq = 30
        aln1 = pysam.AlignedSegment()
        aln1.reference_start = 10
        aln1.query_name = 'read1'
        aln1.mapping_quality = 30
        aln1.query_sequence = "AAAAATAAAATAAAAT"
        aln1.query_qualities = [30] * 16
        aln1.cigarstring = '16M'

        aln2 = pysam.AlignedSegment()
        aln2.reference_start = 12
        aln2.query_name = 'read2'
        aln2.mapping_quality = 20
        aln2.query_sequence = "AAAGAAGAAAAG"
        qqual = [33] * 12
        qqual[3] = 20
        aln2.query_qualities = qqual
        aln2.cigarstring = '5M2D7M'

        aln3 = pysam.AlignedSegment()
        aln3.mapping_quality = 0
        aln3.query_name = 'read3'

        self.alns = [aln1, aln2, aln3]
realigner.bak.py 文件源码 项目:CLAM 作者: Xinglab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def construct_subgraph(mbam, read_qname, mread_dict, processed_mreads, chr_list, winsize=50, unstranded=False):
    """DOCSTRING
    Args:
    Returns:
    """
    # record of processed alignments only need kept on within-subgraph level
    processed_mread_alignments = set()
    counter = 0
    # a list of `pysam.AlignedSegment` objects
    # note that all taggers are already stored in `pysam.AlignedSegment.opt('RT')`
    read_aln_list = [x for x in mread_dict[read_qname]] 
    processed_mreads.add(read_qname)
    read_to_locations = defaultdict(dict) # read_qname -> {node_name1:segment1, node_name2:segment2}

    # enumerate all connected components
    while True:
        counter+=1; print "%i: %i"%(counter, len(read_aln_list))
        next_read_aln_list = []

        gen = read_aln_list if len(read_aln_list)<200 else tqdm(read_aln_list)
        for alignment in gen:
            ## build a node for this mread alignment 
            ## (if not already processed, i.e. built before)
            if alignment in processed_mread_alignments:
                continue

            genomic_cluster, this_mread_dict, discarded_mread_list = \
                build_read_cluster(alignment, chr_list, mbam, unstranded=unstranded, winsize=winsize)
            _ = map(processed_mread_alignments.add, discarded_mread_list)
            if genomic_cluster is None:  # this cluster is invald (only double-mappers)
                continue

            ## update loc2read, read2loc
            node_name = ':'.join([str(x) for x in genomic_cluster])
            #if node_name in subgraph:
            #   logger.debug("I revisited '%s' at read '%s'."%(node_name, read_qname))
            #   break
            #subgraph.add(node_name)
            for x_qname in this_mread_dict:
                read_to_locations[x_qname].update({node_name :  this_mread_dict[x_qname]})

            ## then add new alignments(edges) to generate connected nodes
            ## in the next iteration
            _ = map(processed_mread_alignments.add, this_mread_dict.values())
            for read_x_qname in this_mread_dict:
                if read_x_qname in processed_mreads:
                    continue
                x_aln_list = [aln for aln in mread_dict[read_x_qname] if not aln in processed_mread_alignments]
                next_read_aln_list.extend(x_aln_list)

            ## .. and record to processed reads since we have generated
            ## the nodes for them
            _ = map(processed_mreads.add, this_mread_dict.keys())

        # if no more connected nodes can be found, break loop 
        if len(next_read_aln_list)==0:
            break
        read_aln_list = next_read_aln_list      
    return read_to_locations, processed_mreads
stats_from_bam.py 文件源码 项目:pomoxis 作者: nanoporetech 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def stats_from_aligned_read(read):
    """Create summary information for an aligned read

    :param read: :class:`pysam.AlignedSegment` object
    """

    tags = dict(read.tags)
    try:
        tags.get('NM')
    except:
        raise IOError("Read is missing required 'NM' tag. Try running 'samtools fillmd -S - ref.fa'.")

    name = read.qname
    if read.flag == 4:
        return None
    counts = defaultdict(int)
    for (i, j) in read.cigar:
        counts[i] += j
    match = counts[0]
    ins = counts[1]
    delt = counts[2]
    # NM is edit distance: NM = INS + DEL + SUB
    sub = tags['NM'] - ins - delt
    length = match + ins + delt
    iden = 100*float(match - sub)/match
    acc = 100 - 100*float(tags['NM'])/length

    read_length = read.infer_read_length()
    coverage = 100*float(read.query_alignment_length) / read_length
    direction = '-' if read.is_reverse else '+'

    results = {
        "name":name,
        "coverage":coverage,
        "direction":direction,
        "length":length,
        "read_length":read_length,
        "ins":ins,
        "del":delt,
        "sub":sub,
        "iden":iden,
        "acc":acc
    }

    return results


问题


面经


文章

微信
公众号

扫码关注公众号