def merge_reads(bams, merged_output_dir):
"""
Merge bam files, can take odd numbers of reads.
"""
try:
os.mkdir(merged_output_dir)
except OSError:
pass
bam_list = get_bam_pairs(bams)
for sample_id, bams in bam_list.items():
header = get_header(bams[0])
bam_out = os.path.join(merged_output_dir, os.path.basename(sample_id)) + '.bam'
out = pysam.AlignmentFile(bam_out, 'wb', header=header)
for bam in bams:
sam = pysam.AlignmentFile(bam, 'rb')
for read in sam:
out.write(read)
out.close()
pysam.sort(bam_out, 'tmp')
os.rename('tmp.bam',bam_out)
pysam.index(bam_out)
python类index()的实例源码
def CigarIndex(cigar, pos) :
position = 0
k = 0 # cigar index
addpos = pos # extra positions
for cigartype, cigarlength in cigar :
position += cigarlength
if position < pos :
k += 1
addpos -= cigarlength
else : return k, addpos
# Converts string of cigar symbol characters into the format used by Pysam
def index(file_name):
pysam.index(str(file_name))
if not os.path.isfile(file_name + ".bai"):
raise RuntimeError("samtools index failed, likely bam is not sorted")
def sort_and_index(file_name, sorted_prefix=None):
""" Sorts and indexes the bam file given by file_name.
"""
if sorted_prefix is None:
sorted_prefix = file_name.replace('.bam', '') + '_sorted'
sorted_name = sorted_prefix + '.bam'
subprocess.check_call(['samtools','sort', '-o', sorted_name, file_name])
pysam.index(sorted_name)
def load_key_index(self):
f = open(self.index_file_name, 'r')
in_index = csv.reader(f, delimiter='\t')
self.keys = []
for row in in_index:
if len(row) != 2:
raise Exception("BAM index file %s has incorrect format" % self.index_file_name)
key, pos = row
if not pos.isdigit():
raise Exception("BAM index file %s has incorrect format" % self.index_file_name)
self.keys.append((key, long(pos)))
f.close()
def collapse_stack(stack, collapse_dict, max_tags):
"""DOCSTRING
Args
Returns
"""
new_alignment_list = []
new_alignment_dict = defaultdict(list)
for aln in stack:
new_alignment_dict[aln.query_sequence].append(aln)
# TODO 2017.10.21:
# further collapse `new_alignment_dict`
# based on degeneracy and/or read tags
for seq in new_alignment_dict:
this_alignment_qname_list = [x.qname for x in new_alignment_dict[seq] ]
is_collapsed = [True if x in collapse_dict else False for x in this_alignment_qname_list]
## if any of the alignment is collapsed before,
## we require all of them to be collapsed
if any(is_collapsed):
assert all(is_collapsed)
target_alignment_qname = collapse_dict[this_alignment_qname_list[0]][0:max_tags]
assert len(collapse_dict[this_alignment_qname_list[0]]) <= max_tags
target_alignment = [new_alignment_dict[seq][this_alignment_qname_list.index(x)] for x in target_alignment_qname]
else:
target_alignment = new_alignment_dict[seq][0:max_tags]
for aln_qname in this_alignment_qname_list:
collapse_dict[aln_qname] = [x.qname for x in target_alignment]
new_alignment_list.extend( target_alignment )
return new_alignment_list, collapse_dict
def _second_start(read, poss, strand, chrom, segmentation, holesize_th):
"""
Return the coordinate of second start.
If read is not split or we wish algorithm
to think of read as linear, second_start equals to 0.
"""
holes = [j - i - 1 for i, j in zip(poss, poss[1:])]
# Get the size of the biggest hole:
biggest_hole_size = max(holes) if holes else 0
second_start = 0
is_strange = False
if not segmentation:
# Effectively this means, that read is considered as it has no holes.
if biggest_hole_size > holesize_th:
# Still, read is not treated as on with distinct second_start.
# However, it is reported as starnge:
is_strange = True
else:
biggest_hole_size_index = holes.index(biggest_hole_size)
# Take right border of hole on "+" and left border on "-" strand:
if strand == '+':
second_start = poss[biggest_hole_size_index + 1]
else:
second_start = poss[biggest_hole_size_index]
# Read is strange if:
# it is not intersecting with segmentation AND
# if there actually is a hole
if not _intersects_with_annotaton(second_start, segmentation, chrom, strand) and \
biggest_hole_size != 0:
is_strange = True
return second_start, is_strange
def MatchingBases(md, cigar, FromBeginning) :
if not FromBeginning :
md = md[::-1]
cigar = cigar[::-1]
i = 0 # md index
number = ''
mlen = len(md)
while i < mlen and md[i] in '0123456789' :
number += md[i]
i += 1
if not number : number = '0'
if FromBeginning : number = int(number)
else : number = int(number[::-1])
# Go through cigar to find possible inserts
clen = 0
for cigartype, cigarlength in cigar :
if cigartype in [1,3] :
break
elif cigartype == 5 :
pass
else :
clen += cigarlength
if number < clen :
number += AddClips(cigar, True) # cigar has been reversed here!
return min(number, clen)
# Takes flag as input and returns whether alignment is primary (True) or not (False).
# If additional input parameter is set to True, then also check whether read is properly
# paired (definition of properly paired depends on mapper).
def UpdateFlag(flag) :
if (flag & 16) == 16 : return 16
else : return 0
# Returns the cigar length when cigar is expressed base by base,
# up to the index k of cigarlist (default is a very large index)
def ReadCigarLength(cigar, k=100) :
if k < len(cigar) :
cigarlist = cigar[:(k+1)]
else :
cigarlist = cigar
l = sum([c[1] for c in cigarlist])
return l
# Input cigar and pos denoting index within a cigar transformed into a string sequence.
# Return what is the index in the original cigar and how many extra positions from the beginning
# of this index is the current position.
def generate_index_if_needed(filepath):
index_file = os.path.abspath(filepath) + '.bai'
if not os.path.isfile(index_file):
# Index file doesn't exist; generate it
pysam.index(filepath, index_file)
return True
# end .generate_index_if_needed()
def find_roi_bam(chromosome_event):
chr,event = chromosome_event .split("_")
roi,sortbyname,sortbyCoord, hetsnp = init_file_names(chr, event, tmpbams_path, haplotype_path)
exonsinroibed = "/".join([haplotype_path, event + "_exons_in_roi_"+ chr +'.bed'])
success = True
try:
if not terminating.is_set():
roisort = sub('.bam$', '.sorted', roi)
if(os.path.isfile(exonsinroibed)):
cmd=" ".join(["sort -u", exonsinroibed, "-o", exonsinroibed]); runCommand(cmd)
extractPairedReadfromROI(sortbyname, exonsinroibed, roi)
removeIfEmpty(tmpbams_path,ntpath.basename(roi))
pysam.sort(roi ,roisort )
pysam.index(roisort+'.bam')
os.remove(roi)
except (KeyboardInterrupt):
logger.error('Exception Crtl+C pressed in the child process in find_roi_bam for chr ' +chr + event)
terminating.set()
success=False
return
except Exception as e:
logger.exception("Exception in find_roi_bam %s" ,e )
terminating.set()
success=False
return
if(success):
logger.debug("find_roi_bam complete successfully for "+chr + event)
return
def removeReadsOverlappingHetRegion(inbamfn, bedfn,outbamfn,path):
print "___ removing reads overlapping heterozygous region ___"
inbamsorted = sub('.bam$','.sorted',inbamfn)
pysam.sort(inbamfn, inbamsorted)
pysam.index(inbamsorted+'.bam')
alignmentfile = pysam.AlignmentFile(inbamsorted+'.bam', "rb" )
outbam = pysam.Samfile(outbamfn, 'wb', template=alignmentfile )
bedfile = open(bedfn, 'r')
for bedline in bedfile:
c = bedline.strip().split()
if (len(c) == 3 ):
chr2 = c[0]
chr = c[0].strip("chr")
start = int(c[1])
end = int(c[2])
else :
continue
try:
readmappings = alignmentfile.fetch(chr2, start, end)
except ValueError as e:
print("problem fetching the read ")
for shortread in readmappings:
try:
outbam.write(shortread)
except ValueError as e:
print ("problem removing read :" + shortread.qname)
outbamsorted = sub('.bam$','.sorted',outbamfn)
pysam.sort(outbamfn, outbamsorted)
bamDiff(inbamsorted+'.bam', outbamsorted +'.bam', path )
outbam.close()
def find_roi_amp(chromosome_event):
chr, event = chromosome_event.split("_")
roi, sortbyname, sortbyCoord, hetsnp = init_file_names(chr, event, tmpbams_path, haplotype_path)
exonsinroibed = "/".join([haplotype_path, event + "_exons_in_roi_" + chr + '.bed'])
success = True
try:
if not terminating.is_set():
roisort = sub('.bam$', '.sorted', roi)
if (os.path.isfile(exonsinroibed)):
cmd = " ".join(["sort -u", exonsinroibed, "-o", exonsinroibed]);
runCommand(cmd)
extractPairedReadfromROI(sortbyname, exonsinroibed, roi)
removeIfEmpty(tmpbams_path, ntpath.basename(roi))
pysam.sort(roi, roisort)
pysam.index(roisort + '.bam')
os.remove(roi)
except (KeyboardInterrupt):
logger.error('Exception Crtl+C pressed in the child process in find_roi_bam for chr ' + chr + event)
terminating.set()
success = False
return
except Exception as e:
logger.exception("Exception in find_roi_bam %s", e)
terminating.set()
success = False
return
if (success):
logger.debug("find_roi_bam complete successfully for " + chr + event)
return
def index_bam(in_bam):
if not os.path.isfile(in_bam + ".bai") or os.stat(in_bam).st_mtime >= os.stat(in_bam + ".bai").st_mtime:
pysam.index(in_bam, catch_stdout=False)
def _sort_index(unsorted, output_bam):
pysam.sort("-o", output_bam, "-m", PYSAM_SORT_MEM, unsorted)
os.unlink(unsorted)
pysam.index(output_bam)
def qname_cmp_func(qname1, qname2):
def update_char(qname, index):
index += 1
if index < len(qname):
c = qname[index]
else:
c = ''
return c, index
def get_ord(c):
if c:
return ord(c)
return 0
i, j = 0, 0
c1 = qname1[0] if qname1 else None
c2 = qname2[0] if qname2 else None
while c1 and c2:
if c1.isdigit() and c2.isdigit():
while c1 == '0':
c1, i = update_char(qname1, i)
while c2 == '0':
c2, j = update_char(qname2, j)
while c1.isdigit() and c2.isdigit() and c1 == c2:
c1, i = update_char(qname1, i)
c2, j = update_char(qname2, j)
if c1.isdigit() and c2.isdigit():
k, l = i, j
while c1.isdigit() and c2.isdigit():
c1, k = update_char(qname1, k)
c2, l = update_char(qname2, l)
return 1 if c1.isdigit() else (-1 if c2.isdigit() else get_ord(qname1[i]) - get_ord(qname2[j]))
elif c1.isdigit():
return 1
elif c2.isdigit():
return -1
elif i != j:
return 1 if i < j else -1
else:
if c1 != c2:
return get_ord(c1) - get_ord(c2)
c1, i = update_char(qname1, i)
c2, j = update_char(qname2, j)
return 1 if c1 else (-1 if c2 else 0)
def realigner(out_dir, tmp_dir, winsize=50, unstranded=False):
"""DOCSTRING
Args:
Returns:
"""
# file handlers
mbam = pysam.Samfile(os.path.join(tmp_dir, 'multi.sorted.bam'),'rb')
ubam = pysam.Samfile(os.path.join(tmp_dir, 'unique.sorted.bam'),'rb')
obam = pysam.Samfile(os.path.join(out_dir, 'realigned.bam'), 'wb', template = mbam)
chr_list=[x['SN'] for x in ubam.header['SQ']]
# construct the mread_dict; this will be needed throughout
mread_dict = defaultdict(list)
for alignment in mbam:
mread_dict[alignment.qname].append(alignment)
# keep a record of processed reads
processed_mreads = set()
# iterate through all mreads
for read_qname in mread_dict:
if read_qname in processed_mreads:
continue
## construct the fully-connected subgraph for each read
read_to_locations, processed_mreads = \
construct_subgraph(mbam, read_qname, mread_dict, processed_mreads, chr_list, winsize=winsize, unstranded=unstranded)
subgraph = set()
for read in read_to_locations:
_ = map(subgraph.add, read_to_locations[read].keys())
subgraph = list(subgraph)
## build the BIT tracks
node_track, multi_reads_weights = \
construct_BIT_track(subgraph, read_to_locations, ubam, unstranded)
## run EM
multi_reads_weights = \
run_EM(node_track, multi_reads_weights, w=winsize)
## write to obam
for read in multi_reads_weights:
for node in multi_reads_weights[read]:
alignment = read_to_locations[read][node]
score = round(multi_reads_weights[read][node][0], 3)
alignment.set_tag('AS', score)
alignment.set_tag('PG', 'CLAM')
obam.write(alignment)
# sort the final output
logger.info('sorting output')
obam.close()
ubam.close()
mbam.close()
obam_sorted_fn = os.path.join(out_dir, 'realigned.sorted.bam')
pysam.sort('-o', obam_sorted_fn, os.path.join(out_dir, 'realigned.bam'))
pysam.index(obam_sorted_fn)
os.remove(os.path.join(out_dir, 'realigned.bam'))
return
def filter_bam_maxtags(obam_fn, ibam_fn, max_tags=1):
"""DOCSTRING
Args
Returns
"""
assert max_tags>0
# prepare files
ibam = pysam.Samfile(ibam_fn, 'rb')
obam = pysam.Samfile(obam_fn, 'wb', template=ibam)
# init
collapse_dict = defaultdict(list)
chr_list=[x['SN'] for x in ibam.header['SQ']]
input_counter = 0
output_counter = 0
for chr in chr_list:
# empty stack for each new chromosome
stack = []
last_pos = -1
for read in ibam.fetch(chr):
input_counter += 1
if not (input_counter % (5*(10**6)) ):
logger.debug('collapsed %i alignments'%input_counter)
if read.positions[0] > last_pos:
new_alignment_list, collapse_dict = collapse_stack(stack, collapse_dict, max_tags)
output_counter += len(new_alignment_list)
last_pos = read.positions[0]
stack = [read]
for new_alignment in new_alignment_list:
new_alignment.query_sequence = '*'
new_alignment.query_qualities = '0'
_ = obam.write(new_alignment)
else:
stack.append(read)
new_alignment_list, collapse_dict = collapse_stack(stack, collapse_dict, max_tags)
output_counter += len(new_alignment_list)
last_pos = read.positions[0]
for new_alignment in new_alignment_list:
new_alignment.query_sequence = '*'
new_alignment.query_qualities = '0'
_ = obam.write(new_alignment)
ibam.close()
obam.close()
#os.rename(obam_fn, ibam_fn)
#pysam.sort(obam_fn)
pysam.index(obam_fn)
logger.info('Input = %s; Output = %s; Redundancy = %.2f'%(input_counter,output_counter, 1-float(output_counter)/input_counter))
return
def TrimCigar(cigar) :
clippings = [1 for cigartype, cigarlength in cigar if cigartype in [4,5]]
if not clippings :
return cigar, 0, None
start = 0
end = 0
# soft and hard clippings can either be in the beginning or end of a read
for cigartype, cigarlength in cigar[:] :
if cigartype == 4 : # soft clipping
start = cigarlength
cigar.remove((cigartype, cigarlength))
elif cigartype == 5 : # hard clipping
cigar.remove((cigartype, cigarlength))
else :
break
for cigartype, cigarlength in reversed(cigar[:]) :
if cigartype == 4 : # soft clipping
end -= cigarlength
cigar.remove((cigartype, cigarlength))
elif cigartype == 5 : # hard clipping
cigar.remove((cigartype, cigarlength))
else :
break
return cigar, start, end if not end == 0 else None
# Give as input the quality string, md from Tags field, cigar, and mincut and maxcut,
# and ipos (starting index of current read in terms of quality sequence)
# and adjustbase (only needed if two reads
# are being merged, and second read starts later than first read).
# Md tells whether there are any base-changes lying closer or equal to mincut from
# read start edge, or at least maxcut away from same edge. These base-changes will be
# discarded, that is, their corresponding base
# quality will be set to 0 (or ! in ascii format).
# Returns updated quality string.
def pandas_parallel(df, func, nthreads, mp_type, split, *opts):
'''wrapper to run pandas apply function with multiprocessing. *opts will take any number of optional arguments to be passed
into the function. Note these will be passed to the function as a tuple and need to be parsed. '''
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
logger.info("Begin multiprocessing of function %s in a pool of %s workers using %s protocol" % (str(func.__name__), nthreads, mp_type))
if split == '':
logger.debug("*The dataframe will be split evenly across the %s workers" % nthreads)
df_split = np.array_split(df, min(nthreads, len(df.index)))
else:
logger.debug('*The dataframe will be split on the column %s' % split)
df_split = (df.loc[df[split] == sub, :] for sub in df[split].unique())
# Pack params for processing
if not opts:
params = df_split
elif len(opts) == 1:
params = zip(df_split, repeat(opts[0]))
elif len(opts) > 1:
params = zip(df_split, repeat(opts))
init = time.time()
try:
logger.debug("*Initializing a %s pool with %s workers" % (mp_type, nthreads))
pool = mp.Pool(nthreads, init_worker) # Initialize the pool
pool_func = getattr(pool, mp_type) # set the function (map, imap, etc)
if mp_type == "map_async":
these_res = pool_func(func, params).get()
out = pd.concat(these_res, axis=0)
elif mp_type in ["map", "imap", "imap_unordered"]:
these_res = pool_func(func, params)
out = pd.concat(these_res, axis=0) # this works even for iterables
pool.close()
except KeyboardInterrupt as e:
logger.error("Error: Keyboard interrupt")
pool.terminate()
raise e
except Exception as e:
logger.error("Exception: " + str(e))
pool.terminate()
traceback = sys.exc_info()[2]
raise_(ValueError, e, traceback)
finally:
pool.join()
logger.debug("*Time to run pandas_parallel on " + str(func.__name__) + " took %g seconds" % (time.time() - init))
return(out)