Going Parallel with multiprocessing
Fri Dec 12, 2014
For a current project I need to debarcode MiSeq-sequenced amplicons that were generated by two rounds of PCR, each of which used a barcoded primer. I will use Qiime’s split_libraries.py for the second set of primers but I wanted a script that can iterate through the sequences, detect some conserved bits and chop out the left part of my sequences. I came across Brad Chapman’s post about such a script and wanted to try it out. It was great but slow. Can I make it faster?
The simple (only?) thing to do is to refactor the main script to allow parallelization. Brad has already switched from Biopythons’s
SeqIO.parse() to the
FastqGeneralIterator() which avoids the overhead of object creation and only parses the Fastq as a string. At each step in the iteration I should be able to shell out the work to an individual process, allowing me to use the
multiprocessing module. After some failed efforts I got the following to work:
@click.command() @click.option('--in_file', type=click.STRING) @click.option('--out_file', type=click.STRING) @click.option('--adaptor_seq', type=click.STRING) @click.option('--num_errors', type=click.INT, default=1) @click.option('--min_size', type=click.INT, default=1) @click.option('--max_size', type=click.INT, default=None) @click.option('--right_side', default=False) @click.option('--ncpus', default=4) def main(in_file, out_file, adaptor_seq, num_errors, min_size, max_size, right_side, ncpus): num_errors = int(num_errors) min_size = int(min_size) max_size = int(max_size) if max_size else None # create a pool of processing nodes p = multiprocessing.Pool(ncpus) # use partial to create a function needing only one argument fastqfunc = partial(process_fastq, adaptor_seq=adaptor_seq, num_errors=num_errors, min_size=min_size, max_size=max_size,right_side=right_side) #open and process the files by shelling out each fastq record to the pool with open(in_file) as in_handle: with open(out_file, "w") as out_handle: # fastqs is an iterator over each fastq record in the file fastqs = (rec for rec in FastqGeneralIterator(in_handle)) # results are an iterated map of the fastqfunc against each fastq results = p.imap(fastqfunc, fastqs) # iterate through the resutls and write them to the same file for result in results: out_handle.write(str(result))
There are basically three things in this script that are different from Brad’s original:
- I create a pool of processes using
multiprocessing.Poolthat I will use to process individual fastq data. This can be scaled by the
ncpusargument to scale to however many cpus you have. Its now trivially easy to scale (on your local computer, at least).
- I have a multi-arity function called
process_fastqthat handles the work. However, it requires six arguments and I can only pass a single-arity function to
pool.imap. To get around this problem, I used the
partialfunction from the
functoolslibrary to make a new function,
fastfunc, to which I have passed all the arguments except the data. Now I can use
fastfuncto each fastq using all the processes in the pool.
- I added
clickfor handling the input args. This library is simple and sweet. I am now using it for all my python programs that are destined for the command line.
clickmakes it easy to put all of your control logic at the site where you call the function. In this example, all of logic of iterating and pooling is done in one place and the processing of individual fastqs is done elsewhere. To me this makes it straightforward to understand whats going on and to allow reuse/modularity down the line. If I have a different process that needs to be performed? Swap out the processing function and modify the cli-args if neccessary.
This works nicely so far, and I can run data on our cluster and easily achieve parallel processing for my data. Its available as a gist here. In this particlar case I need to check for sequences on the ends of my files but for fast, simple manipulation of fastq files I came across a few nice toolkits for fastq processing: fastq-tools, fastx-toolkit, and seqtk.