Going Parallel with multiprocessing
For a current project I need to debarcode MiSeq-sequenced amplicons that were generated by two rounds of PCR, each of which used a barcoded primer. I will use Qiime’s split_libraries.py for the second set of primers but I wanted a script that can iterate through the sequences, detect some conserved bits and chop out the left part of my sequences. I came across Brad Chapman’s post about such a script and wanted to try it out. It was great but slow. Can I make it faster?
The simple (only?) thing to do is to refactor the main script to allow parallelization. Brad has already switched from Biopythons’s SeqIO.parse()
to the FastqGeneralIterator()
which avoids the overhead of object creation and only parses the Fastq as a string. At each step in the iteration I should be able to shell out the work to an individual process, allowing me to use the multiprocessing
module. After some failed efforts I got the following to work:
@click.command()
@click.option('--in_file', type=click.STRING)
@click.option('--out_file', type=click.STRING)
@click.option('--adaptor_seq', type=click.STRING)
@click.option('--num_errors', type=click.INT, default=1)
@click.option('--min_size', type=click.INT, default=1)
@click.option('--max_size', type=click.INT, default=None)
@click.option('--right_side', default=False)
@click.option('--ncpus', default=4)
def main(in_file, out_file, adaptor_seq, num_errors, min_size, max_size, right_side, ncpus):
num_errors = int(num_errors)
min_size = int(min_size)
max_size = int(max_size) if max_size else None
# create a pool of processing nodes
p = multiprocessing.Pool(ncpus)
# use partial to create a function needing only one argument
fastqfunc = partial(process_fastq, adaptor_seq=adaptor_seq, num_errors=num_errors,
min_size=min_size, max_size=max_size,right_side=right_side)
#open and process the files by shelling out each fastq record to the pool
with open(in_file) as in_handle:
with open(out_file, "w") as out_handle:
# fastqs is an iterator over each fastq record in the file
fastqs = (rec for rec in FastqGeneralIterator(in_handle))
# results are an iterated map of the fastqfunc against each fastq
results = p.imap(fastqfunc, fastqs)
# iterate through the resutls and write them to the same file
for result in results:
out_handle.write(str(result))
There are basically three things in this script that are different from Brad’s original:
- I create a pool of processes using
multiprocessing.Pool
that I will use to process individual fastq data. This can be scaled by thencpus
argument to scale to however many cpus you have. Its now trivially easy to scale (on your local computer, at least). - I have a multi-arity function called
process_fastq
that handles the work. However, it requires six arguments and I can only pass a single-arity function topool.imap
. To get around this problem, I used thepartial
function from thefunctools
library to make a new function,fastfunc
, to which I have passed all the arguments except the data. Now I can usep.imap
to mapfastfunc
to each fastq using all the processes in the pool. - I added
click
for handling the input args. This library is simple and sweet. I am now using it for all my python programs that are destined for the command line.click
makes it easy to put all of your control logic at the site where you call the function. In this example, all of logic of iterating and pooling is done in one place and the processing of individual fastqs is done elsewhere. To me this makes it straightforward to understand whats going on and to allow reuse/modularity down the line. If I have a different process that needs to be performed? Swap out the processing function and modify the cli-args if neccessary.
This works nicely so far, and I can run data on our cluster and easily achieve parallel processing for my data. Its available as a gist here. In this particlar case I need to check for sequences on the ends of my files but for fast, simple manipulation of fastq files I came across a few nice toolkits for fastq processing: fastq-tools, fastx-toolkit, and seqtk.