John Mahoney

cluster tips

How to use the cluster

Notes for me on how to use the Daemon cluster stolen with permission from Ryan by way of Dmitry.

Make an account here. Please note that you must be MURI-affiliated and pre-approved to use these computing resources.

Get the dispatcher.py script.

git clone https://gist.github.com/9a4997dc67efed08366c.git
#!/usr/bin/env python

"""
You might need to run the following before running this script:

pip install --upgrade --user click clusterlib decorator logbook minibar pathlib sarge

Usage:

dispatcher.py ./script.py dispatch arg1 arg2 arg3

"""

from __future__ import division, print_function
from imp import load_source
from sys import argv, executable as python, stdout
import click
from clusterlib.scheduler import queued_or_running_jobs, submit
from clusterlib.storage import sqlite3_dumps, sqlite3_loads
from decorator import decorator
from logbook import Logger, StreamHandler
from minibar import bar
from pathlib import Path
from sarge import get_both


VALID_PARTITIONS = ['parallel', 'bigmem', 'serial']

BAR_TEMPLATE = "Dispatching tasks: {i}/{total} {bar:fill}"

LOG = Logger('Dispatcher')
LOG_STREAM = StreamHandler(stdout)


@decorator
def autotype(func, *args, **kwargs):
"""
Coerse string arguments in to ints or floats.
"""
def parse(arg):
try:
arg = float(arg)
if arg.is_integer():
return int(arg)
else:
return arg
except ValueError:
return arg

args = [ parse(arg) for arg in args ]
kwargs = {key: parse(arg) for key, arg in kwargs.items()}

return func(*args, **kwargs)


@click.group()
@click.pass_context
@click.argument('script')
def cli(ctx, script):
"""
Command group. Constructs things needed bt all commands: imports the script and builds the nosql
database path.
"""
ctx.obj['script'] = load_source('script', script)
ctx.obj['script_name'] = script
ctx.obj['NOSQL_PATH'] = str((Path('.') / "{}.sqlite3".format(ctx.obj['script'].NAME)).absolute())


@cli.command()
@click.pass_context
@click.argument('args', nargs=-1)
@click.option('--mem')
@click.option('--partition', default="parallel", type=click.Choice(VALID_PARTITIONS))
def dispatch(ctx, args, mem, partition):
"""
The primary function. It iterates over all parameters, and schedules all sets of parameters that 
are not running, queued, or finished (e.g. have not yet been run, or were killed).
"""
# write out a file with the exact command that called this function.
with open('run_command.txt', 'w') as cmd_file:
cmd_file.write(' '.join(argv))

# collect queued, running, and finished jobs.
scheduled_jobs = set(queued_or_running_jobs())
done_jobs = sqlite3_loads(ctx.obj['NOSQL_PATH'])

jobs = []

# iterate over all parameters.
for param in autotype(ctx.obj['script'].params)(*args):

try:
param[0]
except:
param = (param,)
param_string = ' '.join(str(_) for _ in param)
job_cmd = "{} {} {} main {}".format(python, __file__, ctx.obj['script_name'], param_string)
job_name = "{}_{}".format(ctx.obj['script'].NAME, param_string.replace(' ', '_'))

if job_name not in scheduled_jobs and job_name not in done_jobs:

jobs.append((job_cmd, job_name))

# dispatch all jobs that need dispatching.
for job_cmd, job_name in bar(jobs, template=BAR_TEMPLATE):

script = submit(job_command=job_cmd,
job_name=job_name,
time="00:00:00",
memory=(mem if mem else (2000 if partition == "serial" else 4000)),
backend='slurm')

script += " --partition={}".format(partition)

get_both(script)


@cli.command()
@click.pass_context
@click.argument('args', nargs=-1)
def main(ctx, args):
"""
This function is run on every compute node.
"""
job_name = "{}_{}".format(ctx.obj['script'].NAME, '_'.join(args))
LOG.info("Arguments: {}".format(args))
autotype(ctx.obj['script'].compute)(*args)
LOG.info("Finished.")
sqlite3_dumps({job_name: "JOB DONE"}, ctx.obj['NOSQL_PATH'])


if __name__ == '__main__':
with LOG_STREAM.applicationbound():
cli(obj={})

Install all the dependencies for that.

pip install --upgrade --user click clusterlib decorator logbook minibar pathlib sarge

Make /.local/bin.

cd ~/
mkdir .local
cd .local/
mkdir bin
cd ~/

Move dispatcher into /.local/bin.

mv dispatcher.py ~/.local/bin

Make it executable.

chmod +x dispatcher.py

Link it.

ln -s dispatacher.py dispatcher

Add /.local/bin to path.

PATH=$HOME/.local/bin:$PATH

Install all your necessary libs: e.g. networkx, pydot.

pip install --user networkx pydot --force

Move your ssh keys from your current computer to the cluster.

scp -r .ssh/ user@demon.cse.ucdavis.edu:/home/jmahoney/

Install cmpy.

git clone git@vcs.cse.ucdavis.edu:cmpy.git
cd cmpy/
pip install setup.py 

Write a script for testing. Or get one here https://gist.github.com/Autoplectic/2b154ccc8883cdd919f9.

nano example.py

NAME = "crypticOrderReversibilityTest"

import cmpy
from cmpy.machines import build_eM
from cmpy.orderlygen import ICDFAGenerator
from cmpy.orderlygen.pyicdfa import machine_to_int

import yaml
from itertools import islice

def params(nstate, nsym, nnode):
for i in range(nnode):
yield nstate, nsym, nnode, i

def compute(nstate, nsym, nnode, i):
lib = ICDFAGenerator(nstate, nsym)
rev = []
for m in islice(lib, i, None, nnode):
k = m.cryptic_order()
mrev = build_eM(m.reverse(), transients=False)
krev = mrev.cryptic_order()

rev.append( (machine_to_int(m, 0), k==krev) )

with open("cryptic_order_{}.data".format(i), "w") as datafile:
datafile.write(yaml.dump(rev))

def aggregate():
p = pathlib.Path(".")
fs = []
for f in p.glob("*.data"):
fs.extend(yaml.load(f.open()))

Run it.

dipatcher.py ./test.py dispatch 1000

Other commands.

squeue # to see current queued jobs
sinfo # to see the status of the clusters
sinfo | grep _username_ | less # to see a user’s jobs
scancel -u jmahoney # to cancel jobs