Sunday 31 May 2020

Python patterns for processing large SDF files

Running large SDF files through a processing pipeline is not an uncommon task in cheminformatics/computational chemistry. Here are some patterns I have been using to manage this task while making maximum use of available processing power.

An SDF iterator
Cheminformatics toolkits are great, but have you ever not used one? If you are handing off an entry in an SDF file to third-party software for processing, then there is no need to make your processor do the busy-work of reading the SDF file, building a molecule, and then writing it out again. Here's a lazy iterator over a file-like object that returns SDF entries one-at-a-time.
def sdf_iterator(fileobj):
    data = []
    for line in fileobj:
        data.append(line)
        if line.startswith("$$$$"):
            yield "".join(data)
            data = []

Handle gzipped files
As all chemists know, an atom is mostly empty space. It is fitting therefore that SDF files mirror this property and exhibit excellent compressibility. In practice, one ends up having to deal with a mixture of compressed and uncompressed SDF files, and it's tedious adapting code to handle them both. Here's a helper function one could use for this:
def myopen(filename):
    return gzip.open(filename, "rt") if filename.endswith(".gz") else open(filename)

if __name__ == "__main__":
    # assign inputfile as either an .sdf or .sdf.gz
    with myopen(inputfile) as inp:
        for sdfentry in sdf_iterator(inp):
            # do something with sdfentry

Calling a command-line application from Python to process an SDF entry
For some reason, not everything is wrapped up nicely as a Python library, and often it's necessary to pass an SDF file to a command-line application for processing and get the result back. For example, the result could be another SDF file containing docked poses and results, or protonated molecules, or shape-matched hits. In the best-case scenario, the command-line application can read from standard input and write to standard out. This is the situation I describe below; it's the most efficient way of communicating as disk-access will slow things down. The alternative is to use temporary files - this can be more tricky than you'd think, and is best avoided if at all possible.
def calculate(sdf):
    proc = subprocess.Popen(["noelina", "-option1", "option1value", "-option2", "option2value"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding="utf-8")
    stdout, stderr = proc.communicate(sdf)
    return stdout

if __name__ == "__main__":
    with open("outputfile.sdf") as out:
        with myopen("inputfile.sdf") as inp:
            for sdfentry in sdf_iterator(inp):
                output = calculate(sdfentry)
                out.write(output)
Note that you may wish to inspect and handle error messages. My advice is to pass any errors back to the caller (as a string that is, not an Exception), and let it handle them (e.g. write them to a logfile). This separates the processing of the results from their generation. This is particularly important when we move to parallel processing (see below), where we want the processing of results to be handled by a single process, the primary process.

Parallel processing I
It's all very well doing this from Python, but it would have been faster and simpler to call the command-line application directly using the command-line. The advantage of using Python comes into play when we consider parallel processing.

There are several approaches you can use here. One is to split the SDF file into N chunks, one for each processor, and then run the command-line application N times in the background. And you then wait as you watch all of the chunks finish, except for that last one, which just keeps going seemingly for ever. To avoid this problem, a more sophisticated approach is to split into M chunks where M is a few multiples larger than N, and use GNU parallel to manage a queue over N processors.

I prefer using Python for this as it feels much tidier. Everything is contained in a single fairly-simple script. It avoids the need for manual splitting and collation. The script finishes when everything is finished (so I don't need to keep checking 'top'...but I do anyway :-) ), which means I can queue things up at a higher-level. The magic is handled by the multiprocessing library (see my earlier post for more details):
if __name__ == "__main__":
    INPUTFILE = "myinput.sdf"
    OUTPUTFILE = "myoutput.sdf"

    POOLSIZE = 12 # the number of CPUs to use
    CHUNKSIZE = 250

    pool = multiprocessing.Pool(POOLSIZE)
    with open(OUTPUTFILE, "w") as out:
        with myopen(INPUTFILE) as inp:
            miter = sdf_iterator(inp)
            for data in pool.imap_unordered(calculate, miter, CHUNKSIZE):
                out.write(data)

You may question whether 'calculate' and 'data' are appropriate function/variable names in this case. The thing is, I use this pattern over and over again for different applications, and I just keep the same variable names. Regarding CHUNKSIZE, I try to choose a value that keeps each process running for 10 seconds or so (check 'top' immediately after starting the script); this tends to ensure that all processes are running at 100% instead of waiting to communicate their results to the primary process.

Chunked parallel processing II
The code above will work fine, but it's not as efficient as I would like for this use-case. The command-line application is being called with a single SDF entry each time, and there is a price to pay in terms of process overhead. What about the CHUNKSIZE in the code above? Well, that's solving a different but similar problem; minimising the communication between the N worker processes and the primary process. Each worker is given a chunk of 250 SDF entries, and these are passed one-at-a-time to the calculate function. What we'd really like to do is give the command-line application the whole chunk of 250 in one go. Enter the chunked_sdf_iterator(), which lazily returns chunks of N molecules at a time:
def chunked_sdf_iterator(fileobj, num_mols):
    miter = sdf_iterator(fileobj)
    tmp = []
    i = 0
    for sdf in miter:
        if i==num_mols:
            yield "".join(tmp)
            tmp = []
            i = 0
        i += 1
        tmp.append(sdf)
    yield "".join(tmp) # NOTE!!

Note the final line of the function. It's quite easy to accidentally omit the final instance when writing iterators like this, so always test by checking that the output includes the final case.

Given this function, only small changes are needed to __main__ to use it:
if __name__ == "__main__":
    INPUTFILE = "myinput.sdf"
    OUTPUTFILE = "myoutput.sdf"

    CHUNK_MOLS = 250
    POOLSIZE = 12 # the number of CPUs to use
    CHUNKSIZE = 1

    pool = multiprocessing.Pool(POOLSIZE)

    with open(OUTPUTFILE, "w") as out:
        with myopen(INPUTFILE) as inp:
            miter = chunked_sdf_iterator(inp, CHUNK_MOLS)
            for data in pool.imap_unordered(calculate, miter, CHUNKSIZE):
                out.write(data)

Note that it's possible to keep using a value of CHUNKSIZE > 1 but there's no real benefit and it's simpler this way. Just ensure that the process takes around 10 seconds as before by increasing CHUNK_MOLS.

No comments: