Run bcftools mpileup in parallel with Python

March 18 2020

BAM files and pileups

When aligning short reads to a reference genome, the result is kept as a bam file. Each position in the reference is covered by the set of reads aligning to that region. Often the purpose of doing this is to call variants of the sample against the reference. To do this you first need to essentially ‘pile up’ all the bases from any reads covering a position (a single column in the image) and check them against the reference genotype. In most cases they will be the same since you’re dealing with very similar genomes. Where there is an SNP there will be some difference and the majority might show a mutation from A->T for example. These changes might be due to sequence errors but if you have good coverage you can rule those out. Quality scores are assigned to the calls and used to filter them. These basic steps can be done using bcftools using a few commands: essentially pileup and then call variants.

There are various software tools to do this, one is bcftools. The two steps are as follows:

bcftools mpileup -O b -o <out> -f <ref> <bam files>
bcftools call --ploidy 1 -m -v -o {b} <out>

The pileup (bcftools mpileup) step is time consuming but is not multithreaded. The solution is to split the genome by region or chromosome and then join the results together. This can be done with gnu-parallel in Linux. Another method is to run via Python with subprocess to call the commands. The method shown here works for microbial genomes with one chromosome, so the genome is split into regions and each run is called in parallel. We then use bcftools concat to join the resulting bcf files. The idea is shown below.

This requires you install bcftools. On Linux this is easily done through the package manager.

Imports

import sys,os,subprocess
import numpy as np
from Bio import SeqIO
from pyfaidx import Fasta
import multiprocessing as mp

Utility methods

def get_chrom(filename):
    rec = list(SeqIO.parse(filename, 'fasta'))[0]
    return rec.id

def get_fasta_length(filename):
    """Get length of reference sequence"""

    refseq = Fasta(filename)
    key = list(refseq.keys())[0]
    l = len(refseq[key])
    return l

Code

This method implements mpileup in parallel using the Python multiprocessing module. You need to create a worker method that will be called by pool.apply_async. In our case wach worker will call bcftools mpileup for a specific region. The regions should not overlap. We take the entire genome length and split the coordates into equally sized blocks, depending on the number of threads we want to use. Each worker outputs it’s own bcf file. These can then be joined when finished using bcftools concat. Note this works for any number of bam files, but the samples should be the same for all workers. You could likely also used the Threading module for this, but I am less familiar with that.

def worker(region,out,bam_files):
    """Run bcftools for a single region."""

    cmd = 'bcftools mpileup -r {reg} -O b -o {o} -f {r} {b}'.format(r=ref, reg=region, b=bam_files, o=out)
    #print (cmd)
    subprocess.check_output(cmd, shell=True)
    cmd = 'bcftools index {o}'.format(o=out)
    subprocess.check_output(cmd, shell=True)
    return

def mpileup_parallel(bam_files, ref, outpath, threads=4, callback=None):
    """Run mpileup in parallel over multiple regions, then concat vcf files.
    Assumes alignment to a bacterial reference with a single chromosome."""

    bam_files = ' '.join(bam_files)    
    rawbcf = os.path.join(outpath,'raw.bcf')
    tmpdir = 'tmp'
    chr = get_chrom(ref)  
    length = get_fasta_length(ref)

    #find regions
    bsize = int(length/(threads-1))
    x = np.linspace(1,length,threads,dtype=int)
    blocks=[]
    for i in range(len(x)):
        if i < len(x)-1:
            blocks.append((x[i],x[i+1]-1))

    pool = mp.Pool(threads)    
    outfiles = []    

    for start,end in blocks:        
        print (start, end)
        region = '{c}:{s}-{e}'.format(c=chr,s=start,e=end)
        out = '{o}/{s}.bcf'.format(o=tmpdir,s=start)
        f = pool.apply_async(worker, [region,out,bam_files])
        outfiles.append(out)

    pool.close()
    pool.join()

    #concat files
    cmd = 'bcftools concat {i} -O b -o {o}'.format(i=' '.join(outfiles),o=rawbcf)
    subprocess.check_output(cmd, shell=True)
    #remove temp files
    for f in outfiles:
        os.remove(f)
    return rawbcf

You can call the function like this. Then do the usual call command with the resulting bcf.

rawbcf = mpileup_parallel(bam_files, ref, outpath, threads=10)