def get_number_reads(bam_filename):
return reduce(lambda x, y: x + y, [ eval('+'.join(l.rstrip('\n').split('\t')[2:]) ) for l in pysam.idxstats(bam_filename) ])
# NOTE!! pgs argument used to be called pg and take a single pg item
# When resolving to tenkit:master, clients that inject PG tags must be updated to pass a list
python类idxstats()的实例源码
def test_merge_and_switch():
test_bam1 = bam.BamFile(
os.path.join(dir, "chr19_window.bam"), "samtools", no_initial_index=True)
test_bam2 = bam.BamFile(
os.path.join(dir, "chrX_window1.bam"), "samtools", no_initial_index=True)
test_bam3 = bam.BamFile(
os.path.join(dir, "chrX_window2.bam"), "samtools", no_initial_index=True)
merged = bam.samtools_merge(
"samtools", [test_bam1.filepath, test_bam2.filepath],
os.path.join(dir, "merged1"), 1)
merged = bam.BamFile(
os.path.join(dir, "merged1.merged.bam"), "samtools", no_initial_index=True)
a = pysam.idxstats(test_bam1.filepath)
test1_reads = sum([
int(k[2]) + int(k[3]) for k in [
x.split("\t") for x in a.split("\n")] if len(k) > 3])
a = pysam.idxstats(test_bam2.filepath)
test2_reads = sum([
int(k[2]) + int(k[3]) for k in [
x.split("\t") for x in a.split("\n")] if len(k) > 3])
a = pysam.idxstats(test_bam3.filepath)
test3_reads = sum([
int(k[2]) + int(k[3]) for k in [
x.split("\t") for x in a.split("\n")] if len(k) > 3])
a = pysam.idxstats(merged.filepath)
merged1_reads = sum([
int(k[2]) + int(k[3]) for k in [
x.split("\t") for x in a.split("\n")] if len(k) > 3])
assert merged1_reads == test1_reads + test2_reads
swapped = bam.switch_sex_chromosomes_sambamba(
"samtools", "sambamba", merged.filepath, test_bam3.filepath,
"chrX", dir, "swapped", 1, {"CL": ["foo"], "ID": "xyalign"})
swapped = bam.BamFile(
os.path.join(dir, "swapped.merged.bam"), "samtools", no_initial_index=True)
a = pysam.idxstats(swapped.filepath)
swapped_reads = sum([
int(k[2]) + int(k[3]) for k in [
x.split("\t") for x in a.split("\n")] if len(k) > 3])
assert swapped_reads == test1_reads + test3_reads
header = read_bed(os.path.join(dir, "swapped.header.sam"))
assert ["@PG", "ID:xyalign", "CL:foo"] in header
def test_bwa_mem_mapping_sambamba():
# Test single end, no rg
toy_fasta = reftools.RefFasta(os.path.join(dir, "toy.fasta"))
test_bam = assemble.bwa_mem_mapping_sambamba(
"bwa", "samtools", "sambamba", toy_fasta,
os.path.join(dir, "assemble_test"), [os.path.join(dir, "toy_1.fastq")], 1,
"None", [""], cram=False)
assert os.path.exists(os.path.join(dir, "assemble_test.sorted.bam"))
assert pysam.idxstats(
os.path.join(
dir, "assemble_test.sorted.bam")) == 'seq1\t20\t0\t0\nseq2\t40\t0\t0\n*\t0\t0\t3\n'
# Test single end, with rg
toy_fasta = reftools.RefFasta(os.path.join(dir, "toy.fasta"))
test_bam = assemble.bwa_mem_mapping_sambamba(
"bwa", "samtools", "sambamba", toy_fasta,
os.path.join(dir, "test_assemble.rg"), [os.path.join(dir, "toy_1.fastq")], 1,
"@RG\tID:{}".format("test"), [""], cram=False)
assert os.path.exists(os.path.join(dir, "test_assemble.rg.sorted.bam"))
assert pysam.idxstats(
os.path.join(
dir, "test_assemble.rg.sorted.bam")) == 'seq1\t20\t0\t0\nseq2\t40\t0\t0\n*\t0\t0\t3\n'
header = pysam.view("-H", os.path.join(dir, "test_assemble.rg.sorted.bam"))
assert header.find("@RG\\tID:test") != -1
# Test raising error for inaccessible reference
with pytest.raises(RuntimeError):
fake_fasta = reftools.RefFasta(os.path.join(
dir, "DOES_NOT_EXIST.fasta"), no_initial_index=True)
test_bam = assemble.bwa_mem_mapping_sambamba(
"bwa", "samtools", "sambamba", fake_fasta,
os.path.join(dir, "test_assemble"),
[os.path.join(dir, "toy_1.fastq")], 1,
"None", [""], cram=False)
# Test raising error for no fastqs
toy_fasta = reftools.RefFasta(os.path.join(dir, "toy.fasta"))
with pytest.raises(RuntimeError):
test_bam = assemble.bwa_mem_mapping_sambamba(
"bwa", "samtools", "sambamba", toy_fasta,
os.path.join(dir, "test_assemble"), [], 1,
"None", [""], cram=False)
# Test raising error for inaccessible fastq
with pytest.raises(RuntimeError):
toy_fasta = reftools.RefFasta(os.path.join(dir, "toy.fasta"))
test_bam = assemble.bwa_mem_mapping_sambamba(
"bwa", "samtools", "sambamba", toy_fasta,
os.path.join(dir, "test_assemble"),
[os.path.join(dir, "toy_FOO_fake.fastq")], 1,
"None", [""], cram=False)