Going Parallel with multiprocessing

For a current project I need to debarcode MiSeq-sequenced amplicons that were generated by two rounds of PCR, each of which used a barcoded primer. I will use Qiime’s split_libraries.py for the second set of primers but I wanted a script that can iterate through the sequences, detect some conserved bits and chop out the left part of my sequences. I came across Brad Chapman’s post about such a script and wanted to try it out. It was great but slow. Can I make it faster?

The simple (only?) thing to do is to refactor the main script to allow parallelization. Brad has already switched from Biopythons’s SeqIO.parse() to the FastqGeneralIterator() which avoids the overhead of object creation and only parses the Fastq as a string. At each step in the iteration I should be able to shell out the work to an individual process, allowing me to use the multiprocessing module. After some failed efforts I got the following to work:

@click.option('--in_file', type=click.STRING)
@click.option('--out_file', type=click.STRING)
@click.option('--adaptor_seq', type=click.STRING)
@click.option('--num_errors', type=click.INT, default=1)
@click.option('--min_size', type=click.INT, default=1)
@click.option('--max_size', type=click.INT, default=None)
@click.option('--right_side', default=False)
@click.option('--ncpus', default=4)
def main(in_file, out_file, adaptor_seq, num_errors, min_size, max_size, right_side, ncpus):
    num_errors = int(num_errors)
    min_size = int(min_size)
    max_size = int(max_size) if max_size else None

    # create a pool of processing nodes
    p = multiprocessing.Pool(ncpus)
    # use partial to create a function needing only one argument
    fastqfunc = partial(process_fastq, adaptor_seq=adaptor_seq, num_errors=num_errors,
						min_size=min_size, max_size=max_size,right_side=right_side)

    #open and process the files by shelling out each fastq record to the pool
    with open(in_file) as in_handle:
        with open(out_file, "w") as out_handle:
		    # fastqs is an iterator over each fastq record in the file
            fastqs = (rec for rec in FastqGeneralIterator(in_handle))
			# results are an iterated map of the fastqfunc against each fastq
            results = p.imap(fastqfunc, fastqs)
			# iterate through the resutls and write them to the same file
            for result in results:

There are basically three things in this script that are different from Brad’s original:

  1. I create a pool of processes using multiprocessing.Pool that I will use to process individual fastq data. This can be scaled by the ncpus argument to scale to however many cpus you have. Its now trivially easy to scale (on your local computer, at least).
  2. I have a multi-arity function called process_fastq that handles the work. However, it requires six arguments and I can only pass a single-arity function to pool.imap. To get around this problem, I used the partial function from the functools library to make a new function, fastfunc, to which I have passed all the arguments except the data. Now I can use p.imap to map fastfunc to each fastq using all the processes in the pool.
  3. I added click for handling the input args. This library is simple and sweet. I am now using it for all my python programs that are destined for the command line. click makes it easy to put all of your control logic at the site where you call the function. In this example, all of logic of iterating and pooling is done in one place and the processing of individual fastqs is done elsewhere. To me this makes it straightforward to understand whats going on and to allow reuse/modularity down the line. If I have a different process that needs to be performed? Swap out the processing function and modify the cli-args if neccessary.

This works nicely so far, and I can run data on our cluster and easily achieve parallel processing for my data. Its available as a gist here. In this particlar case I need to check for sequences on the ends of my files but for fast, simple manipulation of fastq files I came across a few nice toolkits for fastq processing: fastq-tools, fastx-toolkit, and seqtk.