ACME

Flexible Scalability for Research Software

Stefan
Fürtinger

Ernst Strüngmann Institute (ESI) Frankfurt

Katharine Shapcott

Ernst Strüngmann Institute (ESI) Frankfurt

Joscha Tapani Schmiedt

Brain Research Institute Universität Bremen

August 7, 2023

Outline

I Big Data Is Great

II Why ACME?

III An Offer You Can Refuse

I Big Data Is Great

Outline

I Big Data Is Great

II Why ACME?

III An Offer You Can Refuse

Big Data: A Topical Example

II Why ACME?

Outline

I Big Data Is Great

II Why ACME?

III An Offer You Can Refuse

Get SLURMed

# Which subject do we want to analyze?
subIdx = 0

# Take stock of data on disk
data = datasets.fetch_development_fmri(age_group="adult")
atlasCoords = datasets.fetch_coords_power_2011()

# Extract fMRI time-series averaged within spheres @ atlas coords
mask = NiftiSpheresMasker(seeds=atlasCoords)
timeseries = mask.fit_transform(data.func[subIdx],
                                confounds=data.confounds[subIdx])

# Compute functional connectivity b/w brain regions
estimator = GraphicalLassoCV()
estimator.fit(timeseries)

# Inspect results
plotting.plot_connectome(estimator.covariance_)

Get SLURMed

connectome.py
def compute_connectome(subIdx):

      # Take stock of data on disk
      data = datasets.fetch_development_fmri(age_group="adult")
      atlasCoords = datasets.fetch_coords_power_2011()

      # Extract fMRI time-series averaged within spheres @ atlas coords
      masker = NiftiSpheresMasker(seeds=atlasCoords)
      timeseries = masker.fit_transform(data.func[subIdx],
                                        confounds=data.confounds[subIdx])

      # Compute functional connectivity b/w brain regions
      estimator = GraphicalLassoCV()
      estimator.fit(timeseries)
      return estimator.covariance_

if __name__ == "__main__":

      # Compute functional connectivity of subject and save result
      con = compute_connectome(sys.argv[1])
      np.save("con_{}".format(sys.argv[1]), con)

Get SLURMed

connectome.py
run_connectome.sh
#!/bin/bash
#
# SLURM script for computing per-subject connectomes
#
#SBATCH -J connectome_batch # Common name for the job-array
#SBATCH -p myPartition      # Partition
#SBATCH -c 2                # Use two cores
#SBATCH -t 0-2:00           # Max run-time of 2 hours
#SBATCH --mem 4000          # Request 4 GB of RAM
#SBATCH -o con_%A_%a.out    # Redirect stdout/stderr to file
#SBATCH --array=1-33        # Define job-array

source /path/to/conda/etc/profile.d/conda.sh
conda activate myenv
srun python connectome.py "$SLURM_ARRAY_TASK_ID"

Get SLURMed

connectome.py
run_connectome.sh
sbatch run_connectome.sh

Get SLURMed

connectome.py
run_connectome.sh
sbatch run_connectome.sh
Submitted batch job 21607933

squeue --me
ACCOUNT JOBID PARTITION NODELIST PRIORITY TIME STATE
fuertingers 21607933_3 8GBXS esi-svhpc46 25228545 0:11 RUNNING
fuertingers 21607933_4 8GBXS esi-svhpc46 25228545 0:11 RUNNING
fuertingers 21607933_5 8GBXS esi-svhpc29 25228545 0:11 RUNNING
fuertingers 21607933_2 8GBXS esi-svhpc24 25228545 0:15 RUNNING
fuertingers 21607933_1 8GBXS esi-svhpc24 25228545 0:16 RUNNING
...

Get SLURMed

It All Started With A gist…

Asynchronous Computing Made ESI

  • accelerates “simple”, i.e., embarassingly parallel, workloads
  • wraps sequential code and maps on parallel computing hardware
  • DIY parallelization via context manager ParallelMap in Python
  • built on top of dask and dask-jobqueue to integrate with HPC clusters
pip install esi-acme
conda install -c conda-forge esi-acme

Using ACME

def func(
      arg1,
      arg2,
      …,
      kwarg1=val1,
      kwarg2=val2,
      …):
   …
   do_this()
   …
   do_that()
   …
   return something

from acme import ParallelMap

with ParallelMap(


          func,


          [arg11,arg12,…],


          [arg21,arg22,…],


          …,


          kwarg1=[val11,val12,…],


          kwarg2=[val21,val22,…],


          …,


          kw1=v1,


          kw2=v2,


          …) as pmap:


     pmap.compute()

Now to actually using ACME

An Unpractical Example

def f(x, y, z=3):
    return (x + y) * z

Objective: Evaluate f for four different values of x and y = 4

f(2, 4, z=3) = 18

f(4, 4, z=3) = 24

f(6, 4, z=3) = 30

f(8, 4, z=3) = 36

An Unpractical Example

def f(x, y, z=3):
    return (x + y) * z

An Unpractical Example

from acme import ParallelMap

def f(x, y, z=3):
    return (x + y) * z

with ParallelMap(f, [2, 4, 6, 8], 4) as pmap:
    pmap.compute()

Demo Time!

Back To SLURM Connectomes…

connectome.py
def compute_connectome(subIdx):

      # Take stock of data on disk
      data = datasets.fetch_development_fmri(age_group="adult")
      atlasCoords = datasets.fetch_coords_power_2011()

      # Extract fMRI time-series averaged within spheres @ atlas coords
      masker = NiftiSpheresMasker(seeds=atlasCoords)
      timeseries = masker.fit_transform(data.func[subIdx],
                                        confounds=data.confounds[subIdx])

      # Compute functional connectivity b/w brain regions
      estimator = GraphicalLassoCV()
      estimator.fit(timeseries)
      return estimator.covariance_

if __name__ == "__main__":

      # Compute functional connectivity of subject and save result
      con = compute_connectome(sys.argv[1])
      np.save("con_{}".format(sys.argv[1]), con)

Back To SLURM Connectomes…

connectome.py
def compute_connectome(subIdx):

      # Take stock of data on disk
      data = datasets.fetch_development_fmri(age_group="adult")
      atlasCoords = datasets.fetch_coords_power_2011()

      # Extract fMRI time-series averaged within spheres @ atlas coords
      masker = NiftiSpheresMasker(seeds=atlasCoords)
      timeseries = masker.fit_transform(data.func[subIdx],
                                        confounds=data.confounds[subIdx])

      # Compute functional connectivity b/w brain regions
      estimator = GraphicalLassoCV()
      estimator.fit(timeseries)
      return estimator.covariance_

if __name__ == "__main__":

      # Compute functional connectivity of subject and save result
      with ParallelMap(compute_connectome, range(21)) as pmap:
          results = pmap.compute()

or

Back To SLURM Connectomes…

…in Jupyter

[1]: from connectome import compute_connectome, subjectList
     from acme import ParallelMap

Back To SLURM Connectomes…

…in Jupyter

[1]: from connectome import compute_connectome, subjectList
     from acme import ParallelMap
     from dask_jobqueue import SLURMCluster
[2]: cluster = SLURMCluster(queue="8GBXS", cores=1, memory="8GB", processes=1,
                            local_directory="/path/to/somewhere", python=sys.executable,
                            job_directives_skip=["-t", "--mem"])
     cluster.scale(10)

Back To SLURM Connectomes…

…in Jupyter

[1]: from connectome import compute_connectome, subjectList
     from acme import ParallelMap

Back To SLURM Connectomes…

…in Jupyter

[1]: from connectome import compute_connectome, subjectList
     from acme import ParallelMap, esi_cluster_setup
[2]: myClient = esi_cluster_setup(n_workers=10, partition="8GBXS")

Demo Time!

Back To SLURM Connectomes…

…in Jupyter

[1]: from connectome import compute_connectome, subjectList
     from acme import ParallelMap, esi_cluster_setup
[2]: myClient = esi_cluster_setup(n_workers=10, partition="8GBXS")

<slurm_cluster_setup> SLURM workers ready: 10/10 [elapsed time 00:13 | timeout at 01:00]
ACME - INFO - Parallel computing client ready, dashboard accessible at http://10.100.32.17:37981/status

[3]: with ParallelMap(compute_connectome, subjectList, result_shape=(264, 264, None))) as pmap:
         results = pmap.compute()

ACME > ANNOUNCE < This is ACME v. 2023.4
ACME - INFO - Attaching to parallel computing client <Client: ‘tcp://10.100.32.17:32851’ processes=10 threads=10, memory=74.50 GiB>

Back To SLURM Connectomes…

…in Jupyter

[1]: from connectome import compute_connectome, subjectList
     from acme import ParallelMap, esi_cluster_setup
[2]: myClient = esi_cluster_setup(n_workers=10, partition="8GBXS")

<slurm_cluster_setup> SLURM workers ready: 10/10 [elapsed time 00:13 | timeout at 01:00]
ACME - INFO - Parallel computing client ready, dashboard accessible at http://10.100.32.17:37981/status

[3]: with ParallelMap(compute_connectome, subjectList, result_shape=(264, 264, None))) as pmap:
         results = pmap.compute()

ACME > ANNOUNCE < This is ACME v. 2023.4
ACME - INFO - Attaching to parallel computing client <Client: ‘tcp://10.100.32.17:32851’ processes=10 threads=10, memory=74.50 GiB>
ACME - INFO - Preparing 21 parallel calls of compute_connectome using 10 workers
100% |██████████| 21/21 [02:00<00:00]
ACME > ANNOUNCE < SUCCESS!
ACME - INFO - Results have been saved to /cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome.h5

Back To SLURM Connectomes…

…in Jupyter

[1]: from connectome import compute_connectome, subjectList
     from acme import ParallelMap, esi_cluster_setup
[2]: myClient = esi_cluster_setup(n_workers=10, partition="8GBXS")

<slurm_cluster_setup> SLURM workers ready: 10/10 [elapsed time 00:13 | timeout at 01:00]
ACME - INFO - Parallel computing client ready, dashboard accessible at http://10.100.32.17:37981/status

[3]: with ParallelMap(compute_connectome, subjectList, result_shape=(264, 264, None))) as pmap:
         results = pmap.compute()

ACME > ANNOUNCE < This is ACME v. 2023.4
ACME - INFO - Attaching to parallel computing client <Client: ‘tcp://10.100.32.17:32851’ processes=10 threads=10, memory=74.50 GiB>
ACME - INFO - Preparing 21 parallel calls of compute_connectome using 10 workers
100% |██████████| 21/21 [02:00<00:00]
ACME > ANNOUNCE < SUCCESS!
ACME - INFO - Results have been saved to /cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome.h5

…what?

Back To SLURM Connectomes…

…in Jupyter

[4]: pmap.results_container
'/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome.h5'
[5]: results
['/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_0.h5',
 '/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_1.h5',
 '/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_2.h5',
 '/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_3.h5',
 '/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_4.h5']
  • single aggregate results file compute_connectome.h5 only points to data written by workers
  • each worker saves results independently to files in the *_payload directory
  • leverages HDF5 Virtual Datasets

Communication via Filesystem

III An Offer You Can Refuse

Outline

I Big Data Is Great

II Why ACME?

III An Offer You Can Refuse

Why Python?

Home to modern ML/AI libraries

from torch.optim import Adam
from torch.nn import Sequential
from acme import ParallelMap

def hyper_par_tuning(learning_rate, epochs=10):
  model = Sequential(...)
  optimizer = Adam(model.parameters(), lr=learning_rate, betas=(0.5, 0.999))
  ...
  for epoch in range(epochs):
    classifications = model(inputs)
    loss = criterion(classifications, labels)
    loss.backward()
    optimizer.step()
    ...

Why Python?

Home to modern ML/AI libraries

from torch.optim import Adam
from torch.nn import Sequential
from acme import ParallelMap

def hyper_par_tuning(learning_rate, epochs=10):
  model = Sequential(...)
  optimizer = Adam(model.parameters(), lr=learning_rate, betas=(0.5, 0.999))
  ...
  for epoch in range(epochs):
    classifications = model(inputs)
    loss = criterion(classifications, labels)
    loss.backward()
    optimizer.step()
    ...
with ParallelMap(hyper_par_tuning, np.linspace(1e-3, 1e-6, 100)) as pmap:
  pmap.compute()

Why Python?

Interfaces

…with R

some_code.R
sample_stats <- function(d_mu, d_sd) {
    samp <- rnorm(10^6, d_mu, d_sd)
    c(s_mu = mean(samp), s_sd = sd(samp))
}
from rpy2.robjects.packages import STAP

def sample_stats(mu, sd):
  with open("some_code.R", "r") as Rfile:
    Rcode = Rfile.read()
  some_code = STAP(Rcode, "sample_stats")
  return np.array(some_code.sample_stats(mu, sd))

with ParallelMap(some_code.sample_stats, [2, 4, 6, 8], [3, 5, 7, 9]) as pmap:
  pmap.compute()

Why Python?

Interfaces

…with system binaries

import subprocess

def freesurf_preproc(subIdx):
  cmd = "recon-all -s sub-{subj:d} -i sub-{subj:d}_ses-BL_T1w.nii.gz -all"
  subprocess.run(cmd.format(sub=subIdx), text=True, shell=True, check=True)

with ParallelMap(freesurf_preproc, np.arange(101, 151)) as pmap:
  pmap.compute()

…and FORTRAN, Java, Octave, …

ACME

ACME is not

  • a general purpose parallelization framework
  • up for tasks requiring inter-worker communication
  • built for for processing shared distributed data structures
  • the best way to tackle CFD problems/PDE discretizations
  • tuned for bleeding edge performance
  • another sbatch wrapper

ACME is

  • made for easy-to-use parallelization of code not written with concurrency in mind

  • application-agnostic: has been used for math optimization, neural networks, fMRI and is the parallelization engine of

  • based on existing robust parallel computing libraries (dask, dask-jobqueue)

  • small network footprint by shifting concurrency to the filesystem

  • fully open-source on GitHub PRs welcome!

  • open to include more HPC centers

    <your-institution-here>_cluster_setup

Thank You For Your Attention!

GitHub

Stefan @pantaray

Katharine @KatharineShapcott

Joscha @joschaschmiedt

https://github.com/esi-neuroscience/acme

Contact: stefan.fuertinger@esi-frankfurt.de

Thank You For Your Attention!

GitHub

Stefan @pantaray

Katharine @KatharineShapcott

Joscha @joschaschmiedt

https://github.com/esi-neuroscience/acme

Contact: stefan.fuertinger@esi-frankfurt.de