Back then, when I started with my Master’s studies in Bioinformatics I wrote computational intense Python scripts. Unfortunately, my code was was horribly slow, running through million of loop iterations. Of course this was a lot due to bad coding style. But even when you follow the basic guidance of “writing better python code” you will reach a dead-end called “single core load”. We all have modern CPUs with several cores but Python is not using them by default. You have to write code which runs in concurrency.

This is actually not as complicated as you might think. I will try to explain here how you can easily achieve concurrency and when it comes in handy.

First you have to look into the design of your program: If you can split it into the following structure this tutorial is what you’re looking for: Usually you have three main parts you can divide a program into:

  1. An initiating part reading in a file and creating “jobs” to work on
  2. A worker part doing a computational intense calculation
  3. A collector part formatting the results of your program

The main idea is now to split these parts into own processes and have multiple worker processes doing the calculations in concurrency. There is a nice python package called PyCSP which brings CSP to python. Please install it to follow this tutorial:

  • download the latest version of PyCSP from here
  • unpack your download and run the following command within the unpacked folder:
    python setup.py install
    
  • use the python shell to test the installation:
    >>> import pycsp
    >>> print pycsp.version
    (0, 7, 1, 'threads')
    

In this tutorial we will calculate the Levenshtein distance which gives the edit distance of two strings. In this example we use a fasta file with DNA sequences as input and calculate the edit distance of all combinations of sequences within the fasta file. This edit distance defines the minimum number of edits needed to transform one string into the other. We will use dynamic programming as described here at Wikipedia.

Our program will have the following design where boxes represent processes running in concurrence and the lines are communication channels connecting the processes. Each channel has a name and one or many channel ends.

I) The main construct of the script

This will show you the structure of the script. First we have to import PyCSP and some other packages we will need for our program. Next, we define three functions representing our processes. All we need to do to make them a PyCSP process is to label them with "@process".

from pycsp import *
import sys
import numpy  # for the distMatrix
from Bio import SeqIO # for the fasta parsing

@process
def fileReader(jobOut, inputFile):
    # code doing stuff .....

@process
def worker(jobIn, resultOut):
    # code doing stuff ....

@process
def resultCollector(resultsIn):
    # code doing stuff ....

if __name__ == "__main__":
    try:
        inputFile = sys.argv[2]
        networkSize = int(sys.argv[1])
    except:
        raise IOError('program call: python levishtein.py number_of_threads in_file')
    try:
        fileHandle = open(inputFile, 'r')
    except:
        raise IOError('can not open/find specified input file: %s' % inputFile)

    job = Channel()   # <----- 1)
    levDist = Channel()     #  <----- 1)

   Parallel(    # <----- 2)
        fileReader(job.writer(), fileHandle),
        networkSize * worker(job.reader(), levDist.writer()),   # <----- 3)
        resultCollector(levDist.reader())
    )

In the main part of our script I highlighted the important PyCSP stuff. In # 1) you see how we create the channels for communication. In # 2) you see how we start our network of processes just by a simple Parallel(). In # 3) you see how we create the channel ends as reading ends and writing ends. You can also see how we simply create several processes by multiplying them with a number: networkSize * worker().

II) The fileReader process

@process
def fileReader(jobOut, inputFile):
    records = SeqIO.to_dict(SeqIO.parse(inputFile, "fasta"))
    IDs = records.keys()
    for i in IDs:
        for j in IDs:
            jobOut( ([i, str(records[i].seq)], [j, str(records[j].seq)]) )

    retire(jobOut)  # no more strings to process -> retire channel

This process uses Biopython to parse the fasta file and writes to the jobOut() channel end. It sends a tuple of two lists including the ID and the sequence of a fasta entry as a string. Summarizing, this process does nothing else than parsing the input file and iterating over all possible combinations of entries of the file sending a pair of two sequences down the channel to calculate their edit distance. Important is the last command retire(). When your processes all run in concurrency it becomes necessary to shut down the network at the end so the python script will terminate with all the sub-processes. The process creating the jobs for the workers retires its channel end after it sent all possible jobs down the channel. This poisons the channel and you will see later how this is used to terminate the network.

III) The worker process

The worker process iterates over a matrix using dynamic programming to find the minimal edit distance using the following conditions

@process
def worker(jobIn, resultOut):
    try:
        while True:
            (string1, string2) = jobIn()
            len1 = len(string1[1])
            len2 = len(string2[1])

            # for the initial line in the distMatrix len+1
            distMatrix = numpy.zeros((len1 + 1, len2 + 1))
            distMatrix[0, :] = range(len2 + 1)
            distMatrix[:, 0] = range(len1 + 1)
            for i in range(len1):
                for j in range(len2):
                    distMatrix[i+1, j+1] =
                    min(distMatrix[i, j+1] + 1,  # insertion
                    distMatrix[i+1, j] + 1,  # deletion
                    distMatrix[i, j] + int(string1[1][i] != string2[1][j]))
                    # replacement or match
                    # insertion and deletion punishs +1
                    # if letter1 == letter2: int(False) = 0
            resultOut((string1[0], string2[0], distMatrix[-1, -1]))

    except ChannelPoisonException:
        retire(jobIn, resultOut)

This code calculates the distance of two strings it received through the incoming channel and sends the results down the result channel connecting the workers with the resultsCollector process. Important to recognize is the following construct:

try:
    while True:
        a = incomingChannel()
        # some code doing stuff

except ChannelPoisonException:
    retire(outgoingChannels)

This construct loops over the channel input until the channel gets poisoned by the fileReader process which gets caught by the except: and retires its own outgoing channels.

IV) The resultsCollector process

Finally we have a process collecting the results of the different worker processes.

@process
def resultCollector(resultsIn):
    try:
        while True:
            (ID1, ID2, leviDist) = resultsIn()
            print '%s - %s: %i' % (ID1, ID2, leviDist)

    except ChannelPoisonException:
        retire(resultsIn)

As you noticed its the same construct again. We loop over the input from the channel until all channel ends got retired and the channel poisoned. As soon the resultsCollector process retires the last open channel end the entire network successfully terminates and the python script terminates itself. This was already the entire magic needed for a concurrent python script using PyCSP. Finally you call your script by: python levishtein.py number_of_workers fasta_file.

Download the full script as discussed in this tutorial: GitHub

Threads vs. Processes (Update)

PyCSP uses threads by default. But you maybe noticed that you do not see the speed improvements you hoped for. Python uses a global interpreter lock (GIL). This means that only one thread at a time is allowed to use the python interpreter. This sounds like you are actually not winning anything by using multiple threads with Python. This depends on your code as Numpy for example releases the GIL during its computations. If your code uses many array operations such as numpy.dot() you will see an improvement by using threads as other threads have the chance to run as long Numpy is busy. In contrast, if you use processes every process comes with its own python interpretor with its own GIL. But threads can be created, destroyed and switched between faster. If you want to use processes instead of threads using PyCSP you don't have to change your code at all. Just import PyCSP using processes:

from pycsp.processes import *

Read this article about the different implementations and look at the graphs showing communication overhead and speed-up of different scenarios.

Changes in version 0.9

There have been greater changes in version 0.9: the processes and threads have been merged into pycsp.parallel and can be mixed.

from pycsp.parallel import *

@process
def some_thread_process()

@multiprocess
def some_OS_process()

if __name__ == "__main__":
  some_channel = Channel()
  Parallel( some_precesses() )
  shutdown()

To terminate your programme you now need the shutdown() call at the end of your script. As PyCSP is now based on a transparent network layer I had some strange network socket errors. In my case I had to change the hostname of my computer to localhost.