Parallelizing analysis¶
As we approach the exascale barrier, researchers are handling increasingly large volumes of molecular dynamics (MD) data. Whilst MDAnalysis is a flexible and relatively fast framework for complex analysis tasks in MD simulations, implementing a parallel computing framework would play a pivotal role in accelerating the time to solution for such large datasets.
This document illustrates how you can run your own analysis scripts in parallel with MDAnalysis.
Last executed: Sep 23, 2020 with MDAnalysis 2.0.0-dev0
Last updated: August 2020
Minimum version of MDAnalysis: 2.0.0
Packages required:
MDAnalysisData
dask (https://dask.org/)
dask.distributed (https://distributed.dask.org/en/latest/)
[1]:
import MDAnalysis as mda
from MDAnalysisData.adk_equilibrium import fetch_adk_equilibrium
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
Background¶
In MDAnalysis, most implemented analysis methods are based on AnalysisBase
, which provides a generic API for users to write their own trajectory analysis. However, this framework only takes single-core power of the PC by iterating through the trajectory and running a frame-wise analysis. Below we aim to first explore some possible simple implementations of parallelism, including using multiprocessing and dask. We will also discuss the acceleration approaches that should be considered,
ranging from your own multiple-core laptops/desktops to distributed clusters.
Loading files¶
The test files we will be working with here feature adenylate kinase (AdK), a phosopho-transferase enzyme. ([BDPW09]). The trajectory has 4187 frames, which will take quite some time to run the analysis on with the conventional serial (single-core) approach.
Note: downloading these datasets from MDAnalysisData may take some time.
[2]:
adk = fetch_adk_equilibrium()
u = mda.Universe(adk.topology, adk.trajectory)
protein = u.select_atoms('protein')
print(len(u.trajectory))
4187
Radius of gyration¶
For a detail description of this analysis, read Writing your own trajectory.
Here is a common form of single-frame method that we can normally see inside AnalysisBase
. It may contain both some dynamic parts that changes along time either implicitly or explicitly (e.g. AtomGroup
) and some static parts (e.g. a reference frame).
[3]:
def radgyr(atomgroup, masses, total_mass=None):
# coordinates change for each frame
coordinates = atomgroup.positions
center_of_mass = atomgroup.center_of_mass()
# get squared distance from center
ri_sq = (coordinates-center_of_mass)**2
# sum the unweighted positions
sq = np.sum(ri_sq, axis=1)
sq_x = np.sum(ri_sq[:,[1,2]], axis=1) # sum over y and z
sq_y = np.sum(ri_sq[:,[0,2]], axis=1) # sum over x and z
sq_z = np.sum(ri_sq[:,[0,1]], axis=1) # sum over x and y
# make into array
sq_rs = np.array([sq, sq_x, sq_y, sq_z])
# weight positions
rog_sq = np.sum(masses*sq_rs, axis=1)/total_mass
# square root and return
return np.sqrt(rog_sq)
Serial Analysis¶
Below is the serial version of the analysis that we normally use.
[4]:
result = []
for frame in u.trajectory:
result.append(radgyr(atomgroup=protein,
masses=protein.masses,
total_mass=np.sum(protein.masses)))
[5]:
result = np.asarray(result)
labels = ['all', 'x-axis', 'y-axis', 'z-axis']
serial = pd.DataFrame(result, columns=labels)
serial["Frame"] = serial.index
serial = serial.melt(id_vars="Frame", var_name="Axis",
value_name="Radius of gyration (Å)")
sns.lineplot(data=serial, x="Frame", y="Radius of gyration (Å)",
hue="Axis")
[5]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fee546fc6d0>
Parallelization in a simple per-frame fashion¶
Frame-wise form of the function¶
The coordinates of the atomgroup
analysed change with each frame of the trajectory. We need to explicitly point the analysis function to the frame that needs to be analysed with a frame_index
: atomgroup.universe.trajectory[frame_index]
in order to update the positions (and any other dynamic per-frame information) appropriately. Therefore, the first step to making the radgyr
function parallelisable is to add a frame_index
argument.
[6]:
def radgyr_per_frame(frame_index, atomgroup, masses, total_mass=None):
# index the trajectory to set it to the frame_index frame
atomgroup.universe.trajectory[frame_index]
# coordinates change for each frame
coordinates = atomgroup.positions
center_of_mass = atomgroup.center_of_mass()
# get squared distance from center
ri_sq = (coordinates-center_of_mass)**2
# sum the unweighted positions
sq = np.sum(ri_sq, axis=1)
sq_x = np.sum(ri_sq[:,[1,2]], axis=1) # sum over y and z
sq_y = np.sum(ri_sq[:,[0,2]], axis=1) # sum over x and z
sq_z = np.sum(ri_sq[:,[0,1]], axis=1) # sum over x and y
# make into array
sq_rs = np.array([sq, sq_x, sq_y, sq_z])
# weight positions
rog_sq = np.sum(masses*sq_rs, axis=1)/total_mass
# square root and return
return np.sqrt(rog_sq)
Parallelization with dask¶
Dask is a flexible library for parallel computing in Python. It provides advanced parallelism for analytics and has been integrated or utilized in many scientific softwares. It can be scaled from one single computer to a cluster of computers inside a HPC center.
Dask has a dynamic task scheduling system with synchronous (single-threaded), threaded, multiprocessing and distributed schedulers. The wrapping function in dask, dask.delayed
, mimics for loops and wraps Python code into a Dask graph. This code can then be easily run in parallel, and visualized with dask.visualize()
to examine if the task is well distributed. The code inside dask.delayed
is not run immediately on execution, but pushed into a job queue waiting for submission. You can
read more on dask website.
Comaring to multiprocssing
, the downside of multiprocessing
is that it is mostly focused on single-machine multicore parallelism (without extra manager). It is hard to operate on multimachine conditions. Below are two simple examples to use Dask to achieve the same task as multiprocessing
does.
The API of dask
is similar to multiprocessing
. It also creates a pool of workers for your single machine with the given resources.
Note: The threaded scheduler in Dask (similar to threading
in Python) should not be used as it will mess up with the state (timestep) of the trajectory.
[7]:
import dask
import dask.multiprocessing
dask.config.set(scheduler='processes')
[7]:
<dask.config.set at 0x7fee5107a810>
Below is how you can utilize dask.distributed
module to build a local cluster.
Note: this is not really needed for your laptop/desktop. Using dask.distributed may even slow down the performance, but it provides a diagnostic dashboard that can provide valuable insight on performance and progress.
See limitations here: https://distributed.dask.org/en/latest/limitations.html
[8]:
from dask.distributed import Client
client = Client()
client
[8]:
Client
|
Cluster
|
First we have to create a list of jobs and transform them with dask.delayed()
so they can be processed by Dask.
[9]:
job_list = []
for frame_index in range(u.trajectory.n_frames):
job_list.append(dask.delayed(radgyr_per_frame(frame_index,
atomgroup=protein,
masses=protein.masses,
total_mass=np.sum(protein.masses))))
Then we simply use dask.compute()
to get a list of ordered results.
[10]:
result = dask.compute(job_list)
[11]:
daskproc = pd.DataFrame(result[0], columns=labels)
daskproc["Frame"] = daskproc.index
daskproc = daskproc.melt(id_vars="Frame", var_name="Axis",
value_name="Radius of gyration (Å)")
sns.lineplot(data=daskproc, x="Frame", y="Radius of gyration (Å)",
hue="Axis")
[11]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fee59ffacd0>
We can also use the old radgyr
function because dask
is more flexible on the input arguments.
Note: the associated timestamp of protein
will change during the trajectory iteration, so the processes are always aware which timestep the trajectory is in and change the protein
(e.g. its coordinates) accordingly.
[12]:
job_list = []
for frame in u.trajectory:
job_list.append(dask.delayed(radgyr(atomgroup=protein,
masses=protein.masses,
total_mass=np.sum(protein.masses))))
Then we again use dask.compute()
to get a list of ordered results.
[13]:
result = dask.compute(job_list)
[14]:
daskproc2 = pd.DataFrame(result[0], columns=labels)
daskproc2["Frame"] = daskproc2.index
daskproc2 = daskproc2.melt(id_vars="Frame", var_name="Axis",
value_name="Radius of gyration (Å)")
sns.lineplot(data=daskproc2, x="Frame", y="Radius of gyration (Å)",
hue="Axis")
[14]:
<matplotlib.axes._subplots.AxesSubplot at 0x7fee59dd65d0>
We can also use Dask dashboard (with dask.distributed.Client) to examine how jobs are distributed along all the workers. Each green bar below represents one job, i.e. running radgyr
on one frame of the trajectory
See Also¶
The parallel version of MDAnalysis is still under development. For existing solutions and some implementations of parallel analysis, go to PMDA. PMDA ([SFPG+19]) applies the aforementioned split-apply-combine scheme with Dask. In the future, it may provide a framework that consolidates all the parallelisation schemes described in this tutorial.
References¶
[1] Oliver Beckstein, Elizabeth J. Denning, Juan R. Perilla, and Thomas B. Woolf. Zipping and Unzipping of Adenylate Kinase: Atomistic Insights into the Ensemble of Open↔Closed Transitions. Journal of Molecular Biology, 394(1):160–176, November 2009. 00107. URL: https://linkinghub.elsevier.com/retrieve/pii/S0022283609011164, doi:10.1016/j.jmb.2009.09.009.
[2] Richard J. Gowers, Max Linke, Jonathan Barnoud, Tyler J. E. Reddy, Manuel N. Melo, Sean L. Seyler, Jan Domański, David L. Dotson, Sébastien Buchoux, Ian M. Kenney, and Oliver Beckstein. MDAnalysis: A Python Package for the Rapid Analysis of Molecular Dynamics Simulations. Proceedings of the 15th Python in Science Conference, pages 98–105, 2016. 00152. URL: https://conference.scipy.org/proceedings/scipy2016/oliver_beckstein.html, doi:10.25080/Majora-629e541a-00e.
[3] Naveen Michaud-Agrawal, Elizabeth J. Denning, Thomas B. Woolf, and Oliver Beckstein. MDAnalysis: A toolkit for the analysis of molecular dynamics simulations. Journal of Computational Chemistry, 32(10):2319–2327, July 2011. 00778. URL: http://doi.wiley.com/10.1002/jcc.21787, doi:10.1002/jcc.21787.
[4] Max Linke Shujie Fan, Ioannis Paraskevakos, Richard J. Gowers, Michael Gecht, and Oliver Beckstein. PMDA - Parallel Molecular Dynamics Analysis. In Chris Calloway, David Lippa, Dillon Niederhut, and David Shupe, editors, Proceedings of the 18th Python in Science Conference, 134 – 142. 2019. doi:10.25080/Majora-7ddc1dd1-013.