Sunday, 5 July 2020

SMI to 2D MOL: Parallel processing with the CDK from Jython

At some point in a computational chemistry workflow, if you started with SMILES you need to transition to the world of 3D. A slight niggle in the works is that some tools may not correctly read SMILES, or indeed not read them at all. For this reason, it is useful to convert to 2D MOL files. In this respect, the CDK has established itself as one of the go-to tools thanks to efforts by John Mayfield (see for example, CDK depict and his RDKit talk on the topic).

The best way to use the CDK is from Java. However, I find the energy barrier to writing a Java problem to be high, and so here I'll use Jython. Once installed, you just add the cdk-2.3.jar to the environment variable CLASSPATH and you are good to go.

Serial approach
I'll be writing about timings for reading 100K SMILES strings from a CSV file and converting them to an SDF. The baseline is the serial implementation, which takes about 60s.
# Python
import csv
import time

# Java
import java
import org.openscience.cdk as cdk

sp = cdk.smiles.SmilesParser(cdk.silent.SilentChemObjectBuilder.getInstance())
TITLE = cdk.CDKConstants.TITLE

def calculate(smi, title):
    # Read SMILES
    mol = sp.parseSmiles(smi)

    # Set the title
    mol.setProperty(TITLE, title)

    # Do the SDG
    sdg = cdk.layout.StructureDiagramGenerator()
    sdg.generateCoordinates(mol)

    # Write SDF file
    writer = java.io.StringWriter()
    molwriter = cdk.io.SDFWriter(writer)
    molwriter.write(mol)
    molwriter.close() # flushes

    return writer.toString()

if __name__ == "__main__":
    INPUTFILE = "100000.csv"
    OUTPUTFILE = "out.sdf"

    t = time.time()
    with open(OUTPUTFILE, "w") as out:
        with open(INPUTFILE) as inp:
            reader = csv.reader(inp)
            for smi, _, title in reader:
                out.write(calculate(smi, title))
    print(time.time() - t)
If we have millions of SMILES strings, a parallel approach can help. Unfortunately, Jython does not provide an implementation of the multiprocessing library so we need to do this the Java way...

Approach 1 - Using streams
The script below reads in SMILES strings from a CSV file as a stream and passes them one-at-a-time to multiple threads running in parallel to be converted to an SDF entry. The API doesn't allow any access (as far as I can tell) to control the number of threads. The SDF entries are written to the output file in the original order if ".forEachOrdered" is used versus ".forEach". There was a 4.5X speed-up, from 60s to 13s. This was on a machine with 12 physical cores (24 logical, due to the hyperthreading). Timings for forEach() instead of forEachOrdered() were about the same (surprisingly).
# Python
import csv

# Java
import java
import org.openscience.cdk as cdk
from java.nio.file import Files, Paths
from java.util.function import Function, Consumer

sp = cdk.smiles.SmilesParser(cdk.silent.SilentChemObjectBuilder.getInstance())
TITLE = cdk.CDKConstants.TITLE

def smi2sdf(line):
    smi, _, title = next(csv.reader([line]))

    # Read SMILES
    mol = sp.parseSmiles(smi)

    # Set the title
    mol.setProperty(TITLE, title)

    # Do the SDG
    sdg = cdk.layout.StructureDiagramGenerator()
    sdg.generateCoordinates(mol)

    # Write SDF file
    writer = java.io.StringWriter()
    molwriter = cdk.io.SDFWriter(writer)
    molwriter.write(mol)
    molwriter.close() # flushes

    return writer.toString()

class Calculate(Function):
    def apply(self, text):
        return smi2sdf(text)

class Write(Consumer):
    def __init__(self, filename):
        self.mfile = open(filename, "w")
    def accept(self, text):
        self.mfile.write(text)
    def __del__(self):
        self.mfile.close()

if __name__ == "__main__":
    INPUTFILE = "100000.csv"
    OUTPUTFILE = "out.sdf"

    calculate = Calculate()
    write = Write(OUTPUTFILE)

    Files.lines(Paths.get(INPUTFILE)).parallel().map(calculate).forEach(write)
Approach 2 - Using a ThreadPool
Daniel Lowe suggested using a ThreadPool and provided example Java code showing that it ran faster that the streams approach. This was also the case in Jython, where a timing of 9.6s was obtained for 12 threads, a 6X speedup over the serial implementation. The upside of using a ThreadPool is that the number of threads can be controlled explicitly, and it's worth noting that using 24 actually slowed things down to 10.2s - a reminder that "hyperthreading" is marketing BS. A potential downside is that there's no possibility (with this implementation at least) to order the output.
# Python
import csv

# Java
import java
import org.openscience.cdk as cdk
import java.util.concurrent as conc

sp = cdk.smiles.SmilesParser(cdk.silent.SilentChemObjectBuilder.getInstance())
TITLE = cdk.CDKConstants.TITLE

def calculate(smi, title):

    # Read SMILES
    mol = sp.parseSmiles(smi)

    # Set the title
    mol.setProperty(TITLE, title)

    # Do the SDG
    sdg = cdk.layout.StructureDiagramGenerator()
    sdg.generateCoordinates(mol)

    # Write SDF file
    writer = java.io.StringWriter()
    molwriter = cdk.io.SDFWriter(writer)
    molwriter.write(mol)
    molwriter.close() # flushes

    return writer.toString()

class SmiToMol(java.lang.Runnable):

    def __init__(self, smi, title, writer):
        self.smi = smi
        self. title = title
        self.writer = writer

    def run(self):
        self.writer.write(calculate(self.smi, self.title))

class LimitedQueue(conc.LinkedBlockingQueue):
    serialVersionIUD = 1

    def __init__(self, maxSize):
        conc.LinkedBlockingQueue.__init__(self, maxSize)

    def offer(self, e):
        # convert offer to 'put' to make it blocking
        try:
            self.put(e)
            return True
        except InterruptedException as ie:
            Thread.currentThread().interrupt()
        return False

if __name__ == "__main__":
    INPUTFILE = "100000.csv"
    OUTPUTFILE = "out.threadpool.sdf"

    THREADS = 12
    executor = conc.ThreadPoolExecutor(THREADS, THREADS, 0, conc.TimeUnit.MILLISECONDS, LimitedQueue(THREADS * 2))
    with open(OUTPUTFILE, "w") as out:
        with open(INPUTFILE) as inp:
            reader = csv.reader(inp)
            for smi, _, title in reader:
                executor.execute(SmiToMol(smi, title, out))
        executor.shutdown()
        executor.awaitTermination(10000, conc.TimeUnit.SECONDS)
Credits
Thanks to Daniel Lowe and John Mayfield for an interesting discussion about various approaches and what's going on under-the-hood.

Sunday, 31 May 2020

Python patterns for processing large SDF files

Running large SDF files through a processing pipeline is not an uncommon task in cheminformatics/computational chemistry. Here are some patterns I have been using to manage this task while making maximum use of available processing power.

An SDF iterator
Cheminformatics toolkits are great, but have you ever not used one? If you are handing off an entry in an SDF file to third-party software for processing, then there is no need to make your processor do the busy-work of reading the SDF file, building a molecule, and then writing it out again. Here's a lazy iterator over a file-like object that returns SDF entries one-at-a-time.
def sdf_iterator(fileobj):
    data = []
    for line in fileobj:
        data.append(line)
        if line.startswith("$$$$"):
            yield "".join(data)
            data = []

Handle gzipped files
As all chemists know, an atom is mostly empty space. It is fitting therefore that SDF files mirror this property and exhibit excellent compressibility. In practice, one ends up having to deal with a mixture of compressed and uncompressed SDF files, and it's tedious adapting code to handle them both. Here's a helper function one could use for this:
def myopen(filename):
    return gzip.open(filename, "rt") if filename.endswith(".gz") else open(filename)

if __name__ == "__main__":
    # assign inputfile as either an .sdf or .sdf.gz
    with myopen(inputfile) as inp:
        for sdfentry in sdf_iterator(inp):
            # do something with sdfentry

Calling a command-line application from Python to process an SDF entry
For some reason, not everything is wrapped up nicely as a Python library, and often it's necessary to pass an SDF file to a command-line application for processing and get the result back. For example, the result could be another SDF file containing docked poses and results, or protonated molecules, or shape-matched hits. In the best-case scenario, the command-line application can read from standard input and write to standard out. This is the situation I describe below; it's the most efficient way of communicating as disk-access will slow things down. The alternative is to use temporary files - this can be more tricky than you'd think, and is best avoided if at all possible.
def calculate(sdf):
    proc = subprocess.Popen(["noelina", "-option1", "option1value", "-option2", "option2value"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding="utf-8")
    stdout, stderr = proc.communicate(sdf)
    return stdout

if __name__ == "__main__":
    with open("outputfile.sdf") as out:
        with myopen("inputfile.sdf") as inp:
            for sdfentry in sdf_iterator(inp):
                output = calculate(sdfentry)
                out.write(output)
Note that you may wish to inspect and handle error messages. My advice is to pass any errors back to the caller (as a string that is, not an Exception), and let it handle them (e.g. write them to a logfile). This separates the processing of the results from their generation. This is particularly important when we move to parallel processing (see below), where we want the processing of results to be handled by a single process, the primary process.

Parallel processing I
It's all very well doing this from Python, but it would have been faster and simpler to call the command-line application directly using the command-line. The advantage of using Python comes into play when we consider parallel processing.

There are several approaches you can use here. One is to split the SDF file into N chunks, one for each processor, and then run the command-line application N times in the background. And you then wait as you watch all of the chunks finish, except for that last one, which just keeps going seemingly for ever. To avoid this problem, a more sophisticated approach is to split into M chunks where M is a few multiples larger than N, and use GNU parallel to manage a queue over N processors.

I prefer using Python for this as it feels much tidier. Everything is contained in a single fairly-simple script. It avoids the need for manual splitting and collation. The script finishes when everything is finished (so I don't need to keep checking 'top'...but I do anyway :-) ), which means I can queue things up at a higher-level. The magic is handled by the multiprocessing library (see my earlier post for more details):
if __name__ == "__main__":
    INPUTFILE = "myinput.sdf"
    OUTPUTFILE = "myoutput.sdf"

    POOLSIZE = 12 # the number of CPUs to use
    CHUNKSIZE = 250

    pool = multiprocessing.Pool(POOLSIZE)
    with open(OUTPUTFILE, "w") as out:
        with myopen(INPUTFILE) as inp:
            miter = sdf_iterator(inp)
            for data in pool.imap_unordered(calculate, miter, CHUNKSIZE):
                out.write(data)

You may question whether 'calculate' and 'data' are appropriate function/variable names in this case. The thing is, I use this pattern over and over again for different applications, and I just keep the same variable names. Regarding CHUNKSIZE, I try to choose a value that keeps each process running for 10 seconds or so (check 'top' immediately after starting the script); this tends to ensure that all processes are running at 100% instead of waiting to communicate their results to the primary process.

Chunked parallel processing II
The code above will work fine, but it's not as efficient as I would like for this use-case. The command-line application is being called with a single SDF entry each time, and there is a price to pay in terms of process overhead. What about the CHUNKSIZE in the code above? Well, that's solving a different but similar problem; minimising the communication between the N worker processes and the primary process. Each worker is given a chunk of 250 SDF entries, and these are passed one-at-a-time to the calculate function. What we'd really like to do is give the command-line application the whole chunk of 250 in one go. Enter the chunked_sdf_iterator(), which lazily returns chunks of N molecules at a time:
def chunked_sdf_iterator(fileobj, num_mols):
    miter = sdf_iterator(fileobj)
    tmp = []
    i = 0
    for sdf in miter:
        if i==num_mols:
            yield "".join(tmp)
            tmp = []
            i = 0
        i += 1
        tmp.append(sdf)
    yield "".join(tmp) # NOTE!!

Note the final line of the function. It's quite easy to accidentally omit the final instance when writing iterators like this, so always test by checking that the output includes the final case.

Given this function, only small changes are needed to __main__ to use it:
if __name__ == "__main__":
    INPUTFILE = "myinput.sdf"
    OUTPUTFILE = "myoutput.sdf"

    CHUNK_MOLS = 250
    POOLSIZE = 12 # the number of CPUs to use
    CHUNKSIZE = 1

    pool = multiprocessing.Pool(POOLSIZE)

    with open(OUTPUTFILE, "w") as out:
        with myopen(INPUTFILE) as inp:
            miter = chunked_sdf_iterator(inp, CHUNK_MOLS)
            for data in pool.imap_unordered(calculate, miter, CHUNKSIZE):
                out.write(data)

Note that it's possible to keep using a value of CHUNKSIZE > 1 but there's no real benefit and it's simpler this way. Just ensure that the process takes around 10 seconds as before by increasing CHUNK_MOLS.

Tuesday, 18 February 2020

Reflecting on stereochemistry

Stereochemistry is a tricky concept. And writing programs to handle it is equally tricky. Take tetrahedral stereochemistry for example; it's 'interesting' to note that none of the following markers of parity use the same system: R/S in IUPAC names, D/L in biochemical nomenclature, @/@@ in SMILES, atom parity flag 1/2 in MOL files, or InChI's +/-. Which is why a sane cheminformatics library abstracts the representation of stereochemistry away from any one format to one which can be more easily reasoned about and interconverted to all.

Here's the introduction to a description I wrote up last week of how stereochemistry is handled in Open Babel. In particular, it tries to make clear how the different aspects of stereochemistry information (e.g. coordinates versus stereochemical configurations perceived from those coordinates) interact with each other from the point of view of the toolkit.

Open Babel stores stereochemistry as the relative arrangement of a set of atoms in space. For example, for a tetrahedral stereocenter, we store information like “looking from atom 2, atoms 4, 5 and 6 are arranged clockwise around atom 3”. This section describes how a user can work with or manipulate this information. This might be useful to invert a particular center, replace a substituent at a stereocenter, enumerate stereoisomers or determine the number of unspecified stereocenters.

Although Open Babel has data structures to support a variety of forms of stereochemistry, currently little use is made of any stereochemistry other than tetrahedral and cis/trans (and square planar to a certain degree).

We will look first of all at how stereochemistry information is stored, accessed, and modified. Then we describe how this information is deduced from the chemical structure. This chapter should be read in combination with the API documentation (see the Stereochemistry overview page found under “Modules”)...

The full version can be found in the development docs. Thanks to Geoff Hutchison and Stefano Forli for feedback on an early draft.

Image credit: Monument to Paul Walden by Anita Gould (licensed CC-BY-NC)