Flexible Scalability for Research Software
Ernst Strüngmann Institute (ESI) Frankfurt
Ernst Strüngmann Institute (ESI) Frankfurt
Brain Research Institute Universität Bremen
August 7, 2023
Outline
I Big Data Is Great
II Why ACME?
III An Offer You Can Refuse
Outline
I Big Data Is Great
II Why ACME?
III An Offer You Can Refuse
Outline
I Big Data Is Great
II Why ACME?
III An Offer You Can Refuse
# Which subject do we want to analyze?
subIdx = 0
# Take stock of data on disk
data = datasets.fetch_development_fmri(age_group="adult")
atlasCoords = datasets.fetch_coords_power_2011()
# Extract fMRI time-series averaged within spheres @ atlas coords
mask = NiftiSpheresMasker(seeds=atlasCoords)
timeseries = mask.fit_transform(data.func[subIdx],
confounds=data.confounds[subIdx])
# Compute functional connectivity b/w brain regions
estimator = GraphicalLassoCV()
estimator.fit(timeseries)
# Inspect results
plotting.plot_connectome(estimator.covariance_)
connectome.py
def compute_connectome(subIdx):
# Take stock of data on disk
data = datasets.fetch_development_fmri(age_group="adult")
atlasCoords = datasets.fetch_coords_power_2011()
# Extract fMRI time-series averaged within spheres @ atlas coords
masker = NiftiSpheresMasker(seeds=atlasCoords)
timeseries = masker.fit_transform(data.func[subIdx],
confounds=data.confounds[subIdx])
# Compute functional connectivity b/w brain regions
estimator = GraphicalLassoCV()
estimator.fit(timeseries)
return estimator.covariance_
if __name__ == "__main__":
# Compute functional connectivity of subject and save result
con = compute_connectome(sys.argv[1])
np.save("con_{}".format(sys.argv[1]), con)
connectome.py
run_connectome.sh
#!/bin/bash
#
# SLURM script for computing per-subject connectomes
#
#SBATCH -J connectome_batch # Common name for the job-array
#SBATCH -p myPartition # Partition
#SBATCH -c 2 # Use two cores
#SBATCH -t 0-2:00 # Max run-time of 2 hours
#SBATCH --mem 4000 # Request 4 GB of RAM
#SBATCH -o con_%A_%a.out # Redirect stdout/stderr to file
#SBATCH --array=1-33 # Define job-array
source /path/to/conda/etc/profile.d/conda.sh
conda activate myenv
srun python connectome.py "$SLURM_ARRAY_TASK_ID"
connectome.py
run_connectome.sh
sbatch run_connectome.sh
connectome.py
run_connectome.sh
sbatch run_connectome.sh
Submitted batch job 21607933
squeue --me
ACCOUNT JOBID PARTITION NODELIST PRIORITY TIME STATE
fuertingers 21607933_3 8GBXS esi-svhpc46 25228545 0:11 RUNNING
fuertingers 21607933_4 8GBXS esi-svhpc46 25228545 0:11 RUNNING
fuertingers 21607933_5 8GBXS esi-svhpc29 25228545 0:11 RUNNING
fuertingers 21607933_2 8GBXS esi-svhpc24 25228545 0:15 RUNNING
fuertingers 21607933_1 8GBXS esi-svhpc24 25228545 0:16 RUNNING
...
ParallelMap
in Pythondef func(
arg1,
arg2,
…,
kwarg1=val1,
kwarg2=val2,
…):
…
do_this()
…
do_that()
…
return something
from acme import ParallelMap
with ParallelMap(
func,
[arg11,arg12,…],
[arg21,arg22,…],
…,
kwarg1=[val11,val12,…],
kwarg2=[val21,val22,…],
…,
kw1=v1,
kw2=v2,
…) as pmap:
pmap.compute()
Now to actually using ACME
Objective: Evaluate f
for four different values of x
and y = 4
f(
2, 4, z=3) = 18
f(
4, 4, z=3) = 24
f(
6, 4, z=3) = 30
f(
8, 4, z=3) = 36
from acme import ParallelMap
def f(x, y, z=3):
return (x + y) * z
with ParallelMap(f, [2, 4, 6, 8], 4) as pmap:
pmap.compute()
Demo Time!
connectome.py
def compute_connectome(subIdx):
# Take stock of data on disk
data = datasets.fetch_development_fmri(age_group="adult")
atlasCoords = datasets.fetch_coords_power_2011()
# Extract fMRI time-series averaged within spheres @ atlas coords
masker = NiftiSpheresMasker(seeds=atlasCoords)
timeseries = masker.fit_transform(data.func[subIdx],
confounds=data.confounds[subIdx])
# Compute functional connectivity b/w brain regions
estimator = GraphicalLassoCV()
estimator.fit(timeseries)
return estimator.covariance_
if __name__ == "__main__":
# Compute functional connectivity of subject and save result
con = compute_connectome(sys.argv[1])
np.save("con_{}".format(sys.argv[1]), con)
connectome.py
def compute_connectome(subIdx):
# Take stock of data on disk
data = datasets.fetch_development_fmri(age_group="adult")
atlasCoords = datasets.fetch_coords_power_2011()
# Extract fMRI time-series averaged within spheres @ atlas coords
masker = NiftiSpheresMasker(seeds=atlasCoords)
timeseries = masker.fit_transform(data.func[subIdx],
confounds=data.confounds[subIdx])
# Compute functional connectivity b/w brain regions
estimator = GraphicalLassoCV()
estimator.fit(timeseries)
return estimator.covariance_
if __name__ == "__main__":
# Compute functional connectivity of subject and save result
with ParallelMap(compute_connectome, range(21)) as pmap:
results = pmap.compute()
…or
[1]: from connectome import compute_connectome, subjectList
from acme import ParallelMap, esi_cluster_setup
Demo Time!
[1]: from connectome import compute_connectome, subjectList
from acme import ParallelMap, esi_cluster_setup
<slurm_cluster_setup>
SLURM workers ready: 10/10 [elapsed time 00:13 | timeout at 01:00]
ACME - INFO - Parallel computing client ready, dashboard accessible at http://10.100.32.17:37981/status
[1]: from connectome import compute_connectome, subjectList
from acme import ParallelMap, esi_cluster_setup
<slurm_cluster_setup>
SLURM workers ready: 10/10 [elapsed time 00:13 | timeout at 01:00]
ACME - INFO - Parallel computing client ready, dashboard accessible at http://10.100.32.17:37981/status
[3]: with ParallelMap(compute_connectome, subjectList, result_shape=(264, 264, None))) as pmap:
results = pmap.compute()
ACME > ANNOUNCE < This is ACME v. 2023.4
ACME - INFO - Attaching to parallel computing client <Client: ‘tcp://10.100.32.17:32851’ processes=10 threads=10, memory=74.50 GiB>
ACME - INFO - Preparing 21 parallel calls of compute_connectome
using 10 workers
100% |██████████| 21/21 [02:00<00:00]
ACME > ANNOUNCE < SUCCESS!
ACME - INFO - Results have been saved to /cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome.h5
[1]: from connectome import compute_connectome, subjectList
from acme import ParallelMap, esi_cluster_setup
<slurm_cluster_setup>
SLURM workers ready: 10/10 [elapsed time 00:13 | timeout at 01:00]
ACME - INFO - Parallel computing client ready, dashboard accessible at http://10.100.32.17:37981/status
[3]: with ParallelMap(compute_connectome, subjectList, result_shape=(264, 264, None))) as pmap:
results = pmap.compute()
ACME > ANNOUNCE < This is ACME v. 2023.4
ACME - INFO - Attaching to parallel computing client <Client: ‘tcp://10.100.32.17:32851’ processes=10 threads=10, memory=74.50 GiB>
ACME - INFO - Preparing 21 parallel calls of compute_connectome
using 10 workers
100% |██████████| 21/21 [02:00<00:00]
ACME > ANNOUNCE < SUCCESS!
ACME - INFO - Results have been saved to /cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome.h5
…what?
'/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome.h5'
['/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_0.h5',
'/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_1.h5',
'/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_2.h5',
'/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_3.h5',
'/cs/home/fuertingers/ACME_20230720-122528-965199/compute_connectome_payload/compute_connectome_4.h5']
compute_connectome.h5
only points to data written by workers*_payload
directoryOutline
I Big Data Is Great
II Why ACME?
III An Offer You Can Refuse
Home to modern ML/AI libraries
from torch.optim import Adam
from torch.nn import Sequential
from acme import ParallelMap
def hyper_par_tuning(learning_rate, epochs=10):
model = Sequential(...)
optimizer = Adam(model.parameters(), lr=learning_rate, betas=(0.5, 0.999))
...
for epoch in range(epochs):
classifications = model(inputs)
loss = criterion(classifications, labels)
loss.backward()
optimizer.step()
...
Home to modern ML/AI libraries
from torch.optim import Adam
from torch.nn import Sequential
from acme import ParallelMap
def hyper_par_tuning(learning_rate, epochs=10):
model = Sequential(...)
optimizer = Adam(model.parameters(), lr=learning_rate, betas=(0.5, 0.999))
...
for epoch in range(epochs):
classifications = model(inputs)
loss = criterion(classifications, labels)
loss.backward()
optimizer.step()
...
with ParallelMap(hyper_par_tuning, np.linspace(1e-3, 1e-6, 100)) as pmap:
pmap.compute()
Interfaces…
…with R
some_code.R
from rpy2.robjects.packages import STAP
def sample_stats(mu, sd):
with open("some_code.R", "r") as Rfile:
Rcode = Rfile.read()
some_code = STAP(Rcode, "sample_stats")
return np.array(some_code.sample_stats(mu, sd))
with ParallelMap(some_code.sample_stats, [2, 4, 6, 8], [3, 5, 7, 9]) as pmap:
pmap.compute()
Interfaces…
…with system binaries
import subprocess
def freesurf_preproc(subIdx):
cmd = "recon-all -s sub-{subj:d} -i sub-{subj:d}_ses-BL_T1w.nii.gz -all"
subprocess.run(cmd.format(sub=subIdx), text=True, shell=True, check=True)
with ParallelMap(freesurf_preproc, np.arange(101, 151)) as pmap:
pmap.compute()
…and FORTRAN, Java, Octave, …
sbatch
wrappermade for easy-to-use parallelization of code not written with concurrency in mind
application-agnostic: has been used for math optimization, neural networks, fMRI and is the parallelization engine of
based on existing robust parallel computing libraries (dask
, dask-jobqueue
)
small network footprint by shifting concurrency to the filesystem
fully open-source on GitHub PRs welcome!
open to include more HPC centers
<your-institution-here>_cluster_setup
Thank You For Your Attention!
GitHub
Stefan @pantaray
Katharine @KatharineShapcott
Joscha @joschaschmiedt
https://github.com/esi-neuroscience/acme
Contact: stefan.fuertinger@esi-frankfurt.de
Thank You For Your Attention!
GitHub
Stefan @pantaray
Katharine @KatharineShapcott
Joscha @joschaschmiedt
https://github.com/esi-neuroscience/acme
Contact: stefan.fuertinger@esi-frankfurt.de