Using MySQL for Load Balancing and Job Control under Slurm
Like most things these days, modern atmospheric science is all about big data. Whether it's an instrument flying in an aircraft taking sets of images several times a second and producing three quarters of a terabyte of data per flight day over a two-week campaign or a satellite instrument producing hundreds of gigs of spectral data daily over a 10–15 year lifetime, data volume is enormous. Simply analyzing a day's worth of data to keep track of basic instrument stability is CPU-intensive. Fully processing a day to retrieve the state of the atmosphere or looking at trends across a decade's worth of data is exponentially so.
High-performance parallel cluster computing is the name of the game. For years I've done this on a very basic level by kicking off a handful of copies of my processing scripts on a couple computers around the lab, but after a recent move into a new lab, I got my first chance to work on a real cluster system, processing data from a satellite-borne hyperspectral sounder called AIRS (see Resources). AIRS is one of the instruments onboard NASA's AQUA satellite that was launched in late 2002 and has been in continuous operation since. Data from AIRS and similar instruments is used to map out vertical profiles of atmospheric temperature and trace gases globally, but we have to be able to process it first.
The cluster computing game here is strictly to get a whole lot of computers doing the same thing to a whole lot of data so that we can process it faster than we collect it (much faster would be preferable). Since I was new to this game just a few months ago, I've had much to learn about cluster computing and how to design algorithms and processing software to take advantage of multiple CPUs for processing. This was my first experience where I had hundreds of CPUs at my disposal, and it really has changed how I process data in general. I started this article to describe how I was shown to parallelize this type of data processing and a method I put together that makes the process much cleaner.
The cluster system here consists of 240 compute nodes, each with
dual, 8-core processors and 64GB of main memory running Red Hat
Enterprise Linux. Cluster jobs are scheduled to run through the
Slurm workload manager (see Resources). In a nutshell, Slurm is a suite of programs
that works to allocate computer resources among users and compute
jobs and enforce sharing rules to make sure everyone gets a chance
to get their work in. The two most important programs in the suite
for actually working on the system are
sbatch is the entry point to the Slurm scheduler and reads a
high-level Bash control script that specifies job parameters (number
of nodes needed, memory per process, expected run times and so on) and
spawns the requested number of identical jobs via calls to srun.
Listing 1. Top-level Slurm/sbatch script. This scripts specifies the request for CPUs, memory and time on the cluster and sets up the program that will be run on each processing node.
#!/bin/bash # sbatch options #SBATCH --job-name=RUN_CALFLAG #SBATCH --partition=batch #SBATCH --qos=short #SBATCH --ntasks=20 #SBATCH --mem-per-cpu=18000 #SBATCH --cpus-per-task 1 #SBATCH --time=00:20:00 # matlab options MATLAB=/usr/bin/matlab MATOPT=' -nojvm -nodisplay -nosplash' srun --output=~/run_calflag.log ↪$MATLAB $MATOPT -r "run_calflag; exit;"
The script in Listing 1 asks Slurm to allocate 20 processors
allocate 18GB of memory (
mem-per-cpu) to each
and run a job (
srun ...) on each that will take around 20
time) to run. The
qos directives help the
system manage its resources and set rules for the number of processors a
user is allowed, CPU time limits and so on. The
directive puts a name to your task to help you separate your jobs in
squeue list of the system queue.
srun request shown here is to run an instance of MATLAB on each
of the allocated nodes and, in each instance, run the script
run_calflag and exit. Any message output is sent to the file
specified by the
run_calflag could be a simple
"hello world" script, or it could be a loop to process a thousand
files. It also doesn't have to be MATLAB. MATLAB is our
tool of choice here, and examples in this article use it in a
background sort of way. There is no need to understand MATLAB to
As long as this request doesn't violate any cluster good behavior
rules by hogging processors, hogging memory and so on, Slurm queues the
request until such time as processors are available to run
it. When the resources are available, Slurm starts the processing by
grabbing 20 CPUs and, then, starting a copy of
run_calflag on each
one. Once the control scripts are in order, this request is
submitted to Slurm through the
Slurm also manages a set of environment variables that can be used to pass some job parameters into the processing scripts.
Basic Data Chunking
Listing 1 already shows enough to see one issue with parallelizing
this kind of data processing: how does one chunk up the data to pass
to each instance of the
run_calflag started by
srun? If I want
to process one year, should I ask for 365 processors and do one day on
each? 52 processors and one per week? One per month? How do I handle
leap years? The cluster resource allocation rules prevent me from
doing the 365 processor idea, but other than that, there is no clear,
Being new to cluster computing, I looked at what my colleagues have done to set up their processing runs. Most have adopted a three-tier system to run these jobs:
A bash script with
sruncalls to kick off everything.
A MATLAB script (called by
srunin the previous script) that is run on each compute node that uses some node ID in the Slurm environment (usually
SLURM_PROC_ID) to index processing into the range of years/days/files to process. The most common approaches are to request 12 CPUs and assign one month per CPU or request 52 and assign a week per CPU. This script then loops over the years/days/files "assigned" to this node. Within this loop, calls are made to a final MATLAB script that does the actual processing for each year/day/file in the sequence.
This approach certainly can work, but it has some significant issues:
Ad hoc chunking of data: in general, how does one split "x" things to process over "y" nodes? In practice, this seems to mean you have to edit run scripts and tailor them to just about every run you wish to do. (It is almost guaranteed that you will do multiple runs in this business: once to try to process a contiguous period of data and a second time to re-run the now non-contiguous set of days that failed for one reason or another.)
Does not utilize allocated system resources well: parallelizing by month, the node processing February is pretty much guaranteed to sit idle for 8–10% of the total run time simply because it's 2–3 days shorter than other months. Or, if the lower-level processing fails and takes out the entire process running on that CPU, the rest of that chunk will need to be reprocessed later and the processor sits idle while the rest of the processors finish.
Does not utilize my time very well: any time I need to change either the number of processing jobs or the number of nodes to spread them over, I have to recalculate manually how to spread out the job and, likely, edit my control scripts. I absolutely hate having to keep a dozen scripts lying around all to do the same thing but each for some special edge case.
Okay, then, how do we get around this?
Job Control Stack
What is really needed is some sort of job scheduling stack where one
could store whatever parameters are required for each atomic
processing step. Whatever information would be needed to find the
right data to process could be pushed onto this list, and the various
scripts could then pop the next unprocessed item whenever a node
becomes free. Let's say the data we need to process is archived one
file per day and we want to process several years of it. The job
control stack idea is that we can make a list of the files we want to
process. This may be as simple as running
find. We then
can push their details onto the stack and start processing them in
parallel. We ask Slurm to provide us with some processors and start
MATLAB on each along with our second-tier processing script. Each
instance is a loop not unlike before, but instead of processing some
ad hoc chunk of the data, each iteration of the loop queries the stack
for the next available unprocessed data. If there is data to process,
it gets passed to the low-level processing script. If not, the loop
exits, and MATLAB terminates. The system doesn't care if we have 365
days to process or 2,431, or whether we want to spread those across two or
200 processors. This means that we don't have to care either.
When the main job is done, and we find that some number of days didn't process properly, we just make a list of the failed files, push it into the job stack and run it again (after fixing the reason for the initial failure, of course). There should be very little reason to edit any of the scripts to do this.
This idea seems to solve all the big issues:
No ad hoc chunking of data: one process, one file. Repeat as necessary.
Node resource utilization is higher and more even: if a processor hits a series of days that process quickly, it just grabs more days to process until there are no more.
If a node dies—either because of a system error or because of some other inadequately trapped error (missing files, short files/array indexing problems)—the processing balances over the remaining nodes. You lose and need to reprocess one day's worth of data, but the rest of the data gets done elsewhere.
It would be nice to be able to do this with a flat file for simplicity, but this approach led to some serious issues with collisions between different processors grabbing the same data to process. The need to be able to lock data once a processor has grabbed it led me to implementing this job stack with a MySQL database table.
Listing 2. SQL
CREATE TABLE Script to Define the Basic Job Control
CREATE TABLE `JobManagement` ( `task_id` int(11) NOT ↪NULL, `entry_id` int(11) NOT NULL AUTO_INCREMENT, ↪`node_id` int(11) DEFAULT NULL, `node_start` timestamp ↪NOT NULL DEFAULT '0000-00-00 00:00:00', `node_end` ↪timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', ↪`datapath` varchar(256) DEFAULT NULL, `task_name` ↪varchar(128) NOT NULL, PRIMARY KEY (`task_id`,`entry_id`) ) ↪ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;
Listing 2 is a MySQL
CREATE TABLE command to create the job control
table as it is currently implemented. Most of the values in this table
are populated when runs are pushed onto the stack.
node_id are populated when the job is popped from it and begins
node_end is filled in at the completion of a
node_id aren't particularly necessary,
but they collect useful statistics about runtime performance (although
you can get the same information from Slurm's
Listing 3. Job control stack
push function implemented in
sDataPath is the absolute path to the data file to
process. This gets pushed onto the job control table to make it
available for processing.
function push_job_table(iTaskID, sJobName, sDataPath) sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB'; sSQL = sprintf(['echo "insert into JobManagement (task_id, year, ' ... 'doy, task_name) values (%d, \\"%s\\", \\"%s\\");"'], iTaskID, ... sDataPath, sJobName); [status, cmdout] = system([sSQL ' | ' sMYSQL]);
Jobs to be run are pushed to the stack by a routine like that shown in
Listing 3. When a job is pushed to the stack,
task_id takes on the
process ID Slurm assigns to the overall processing run, and
gets assigned a counter that starts at 1. This counter increments
with each processing run added under that
task_id. Taken together,
task_id are the primary key for the table and, thus, are
unique for every record in the table. The actual information needed to
retrieve data to process is stored in
an absolute path to the primary data file that needs to be processed
and can be populated directly from
find in most cases but can
also come from routines with more sophisticated logic if we need to
match up multiple input files or check that files exist first and so on.
When a processor queries the database for a new job, as in Listing 4, the
datapath is the primary information that gets returned.
Listing 4. Job control stack
pop function implemented in
MATLAB. Queries job control table for next available data file to
process. If data is available, the path to it is returned. If none is
available, returns an empty string causing further processing to end.11846l4.qrk
function stJobEntry = pop_job_table(taskid, entryid) % determine which node we are on for table update (for performance % tracking) iNodeID = str2num(getenv('SLURM_PROCID')); sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB'; % select one entry from the job management table that hasn't been % done yet. Immediately update with this processor's node_id to try % to lock the row out from further requests. sPOPSQL = sprintf(['echo "set @B=%d; set @C=%d;' ... 'select @A:=entry_id as entry_id, datapath from ' ... 'JobManagement where task_id = @B and entry_id = @C' ... 'and node_id ' ... 'is null limit 1 for update;' ... 'update JobManagement set node_id = %d,' ... 'node_start = now() where task_id = @B and ' ... 'entry_id = @A;' ... 'set @A=0;"'], taskid, entryid, iNodeID); % execute the table pop and get a day that is not being processed [status, cmdout] = system([sPOPSQL ' | ' sMYSQL ' | head -2 | tail ' ... '-1']); stJobEntry = struct('entry',0, 'datapath',''); if length(cmdout) > 0 % parse cmdout to entry_id, datapath iTokens = str2num(cmdout); stJobEntry.entry = iTokens(1); stJobEntry.datapath = iTokens(2); end
Listing 4 gives us the
pop function for our stack.
The idea in the
is to select the next available row in the database and to lock it for
update. The record is fully locked out by updating it with the
for the processor grabbing it and also the start time for
processing. This is done in one command to minimize the CPU time in
which another processor can grab this same record. Once the
has been set, the record never will come into consideration again.
The record retrieved or, really, the datapath stored within it, is
passed out to a MATLAB structure that gets returned to the calling
function where it will be used to start processing.
all that is really necessary, but because we went to the effort to
node_stop to track runtime, a routine to close out the job
table entry is needed. Listing 5 shows one version of this close-out
that simply updates the record to add the end time in the
field. No explicit locking is needed here.
Listing 5. Once processing for a job table entry is complete, update the table with the time processing ended for basic runtime statistics.
function close_job_table_entry(iTaskID, iEntryID) sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB'; % close out day by adding processing end time to db table sSQL = sprintf(['echo "update JobManagement set node_end=now() ' ... 'where task_id = %d and entry_id = %d;"'], ... iTaskID, iEntryID); [status, cmdout] = system([sSQL ' | ' sMYSQL]);
Some readers probably are wondering why I am doing database work
through shell commands and
system() escapes instead of some native
database access. It's for three reasons really. First, I'm lazy, and I couldn't
get any sort of MATLAB-native database connectivity to work. Second,
this way I easily can develop and test at the command line and then
paste into my code, and finally, I find this approach more easily
portable. If we ever decide to become an IDL or Python shop, I can
have these routines ported as quickly as I can look up their
respective system calls. System shell escapes may be the slower, but
in this use, they add only minutes onto runs that take tens of
hours. There is not much point in chasing down those few
Hopefully you now are convinced that a job control table is the best way to run jobs under Slurm. This approach has improved processing times significantly on several things we process routinely. Runs that used to take days now finish in one day, and I am not spending any time at all rewriting scripts for every run, so it definitely has worked for me.