Using MySQL for Load Balancing and Job Control under Slurm

Like most things these days, modern atmospheric science is all about big data. Whether it's an instrument flying in an aircraft taking sets of images several times a second and producing three quarters of a terabyte of data per flight day over a two-week campaign or a satellite instrument producing hundreds of gigs of spectral data daily over a 10–15 year lifetime, data volume is enormous. Simply analyzing a day's worth of data to keep track of basic instrument stability is CPU-intensive. Fully processing a day to retrieve the state of the atmosphere or looking at trends across a decade's worth of data is exponentially so.

High-performance parallel cluster computing is the name of the game. For years I've done this on a very basic level by kicking off a handful of copies of my processing scripts on a couple computers around the lab, but after a recent move into a new lab, I got my first chance to work on a real cluster system, processing data from a satellite-borne hyperspectral sounder called AIRS (see Resources). AIRS is one of the instruments onboard NASA's AQUA satellite that was launched in late 2002 and has been in continuous operation since. Data from AIRS and similar instruments is used to map out vertical profiles of atmospheric temperature and trace gases globally, but we have to be able to process it first.

The cluster computing game here is strictly to get a whole lot of computers doing the same thing to a whole lot of data so that we can process it faster than we collect it (much faster would be preferable). Since I was new to this game just a few months ago, I've had much to learn about cluster computing and how to design algorithms and processing software to take advantage of multiple CPUs for processing. This was my first experience where I had hundreds of CPUs at my disposal, and it really has changed how I process data in general. I started this article to describe how I was shown to parallelize this type of data processing and a method I put together that makes the process much cleaner.

Basic Slurm

The cluster system here consists of 240 compute nodes, each with dual, 8-core processors and 64GB of main memory running Red Hat Enterprise Linux. Cluster jobs are scheduled to run through the Slurm workload manager (see Resources). In a nutshell, Slurm is a suite of programs that works to allocate computer resources among users and compute jobs and enforce sharing rules to make sure everyone gets a chance to get their work in. The two most important programs in the suite for actually working on the system are sbatch and srun.

sbatch is the entry point to the Slurm scheduler and reads a high-level Bash control script that specifies job parameters (number of nodes needed, memory per process, expected run times and so on) and spawns the requested number of identical jobs via calls to srun.

Listing 1. Top-level Slurm/sbatch script. This scripts specifies the request for CPUs, memory and time on the cluster and sets up the program that will be run on each processing node.

#!/bin/bash
# sbatch options
#SBATCH --job-name=RUN_CALFLAG
#SBATCH --partition=batch
#SBATCH --qos=short
#SBATCH --ntasks=20
#SBATCH --mem-per-cpu=18000
#SBATCH --cpus-per-task 1
#SBATCH --time=00:20:00

# matlab options
MATLAB=/usr/bin/matlab
MATOPT=' -nojvm -nodisplay -nosplash'

srun --output=~/run_calflag.log 
 ↪$MATLAB $MATOPT -r "run_calflag; exit;"

The script in Listing 1 asks Slurm to allocate 20 processors (ntasks/cpus-per-task), allocate 18GB of memory (mem-per-cpu) to each and run a job (srun ...) on each that will take around 20 minutes (time) to run. The partition and qos directives help the system manage its resources and set rules for the number of processors a user is allowed, CPU time limits and so on. The job-name directive puts a name to your task to help you separate your jobs in an squeue list of the system queue.

The srun request shown here is to run an instance of MATLAB on each of the allocated nodes and, in each instance, run the script run_calflag and exit. Any message output is sent to the file specified by the output parameter. run_calflag could be a simple "hello world" script, or it could be a loop to process a thousand files. It also doesn't have to be MATLAB. MATLAB is our tool of choice here, and examples in this article use it in a background sort of way. There is no need to understand MATLAB to keep reading.

As long as this request doesn't violate any cluster good behavior rules by hogging processors, hogging memory and so on, Slurm queues the request until such time as processors are available to run it. When the resources are available, Slurm starts the processing by grabbing 20 CPUs and, then, starting a copy of matlab/run_calflag on each one. Once the control scripts are in order, this request is submitted to Slurm through the sbatch command:


sbatch run_calflag_batch.sh

Slurm also manages a set of environment variables that can be used to pass some job parameters into the processing scripts.

Basic Data Chunking

Listing 1 already shows enough to see one issue with parallelizing this kind of data processing: how does one chunk up the data to pass to each instance of the run_calflag started by srun? If I want to process one year, should I ask for 365 processors and do one day on each? 52 processors and one per week? One per month? How do I handle leap years? The cluster resource allocation rules prevent me from doing the 365 processor idea, but other than that, there is no clear, single answer.

Being new to cluster computing, I looked at what my colleagues have done to set up their processing runs. Most have adopted a three-tier system to run these jobs:

  • A bash script with sbatch directives and srun calls to kick off everything.

  • A MATLAB script (called by srun in the previous script) that is run on each compute node that uses some node ID in the Slurm environment (usually SLURM_PROC_ID) to index processing into the range of years/days/files to process. The most common approaches are to request 12 CPUs and assign one month per CPU or request 52 and assign a week per CPU. This script then loops over the years/days/files "assigned" to this node. Within this loop, calls are made to a final MATLAB script that does the actual processing for each year/day/file in the sequence.

This approach certainly can work, but it has some significant issues:

  • Ad hoc chunking of data: in general, how does one split "x" things to process over "y" nodes? In practice, this seems to mean you have to edit run scripts and tailor them to just about every run you wish to do. (It is almost guaranteed that you will do multiple runs in this business: once to try to process a contiguous period of data and a second time to re-run the now non-contiguous set of days that failed for one reason or another.)

  • Does not utilize allocated system resources well: parallelizing by month, the node processing February is pretty much guaranteed to sit idle for 8–10% of the total run time simply because it's 2–3 days shorter than other months. Or, if the lower-level processing fails and takes out the entire process running on that CPU, the rest of that chunk will need to be reprocessed later and the processor sits idle while the rest of the processors finish.

  • Does not utilize my time very well: any time I need to change either the number of processing jobs or the number of nodes to spread them over, I have to recalculate manually how to spread out the job and, likely, edit my control scripts. I absolutely hate having to keep a dozen scripts lying around all to do the same thing but each for some special edge case.

Okay, then, how do we get around this?

Job Control Stack

What is really needed is some sort of job scheduling stack where one could store whatever parameters are required for each atomic processing step. Whatever information would be needed to find the right data to process could be pushed onto this list, and the various scripts could then pop the next unprocessed item whenever a node becomes free. Let's say the data we need to process is archived one file per day and we want to process several years of it. The job control stack idea is that we can make a list of the files we want to process. This may be as simple as running ls or find. We then can push their details onto the stack and start processing them in parallel. We ask Slurm to provide us with some processors and start MATLAB on each along with our second-tier processing script. Each instance is a loop not unlike before, but instead of processing some ad hoc chunk of the data, each iteration of the loop queries the stack for the next available unprocessed data. If there is data to process, it gets passed to the low-level processing script. If not, the loop exits, and MATLAB terminates. The system doesn't care if we have 365 days to process or 2,431, or whether we want to spread those across two or 200 processors. This means that we don't have to care either.

When the main job is done, and we find that some number of days didn't process properly, we just make a list of the failed files, push it into the job stack and run it again (after fixing the reason for the initial failure, of course). There should be very little reason to edit any of the scripts to do this.

This idea seems to solve all the big issues:

  • No ad hoc chunking of data: one process, one file. Repeat as necessary.

  • Node resource utilization is higher and more even: if a processor hits a series of days that process quickly, it just grabs more days to process until there are no more.

  • If a node dies—either because of a system error or because of some other inadequately trapped error (missing files, short files/array indexing problems)—the processing balances over the remaining nodes. You lose and need to reprocess one day's worth of data, but the rest of the data gets done elsewhere.

MySQL Implementation

It would be nice to be able to do this with a flat file for simplicity, but this approach led to some serious issues with collisions between different processors grabbing the same data to process. The need to be able to lock data once a processor has grabbed it led me to implementing this job stack with a MySQL database table.

Listing 2. SQL CREATE TABLE Script to Define the Basic Job Control Table

CREATE TABLE `JobManagement` ( `task_id` int(11) NOT 
 ↪NULL, `entry_id` int(11) NOT NULL AUTO_INCREMENT, 
 ↪`node_id` int(11) DEFAULT NULL, `node_start` timestamp 
 ↪NOT NULL DEFAULT '0000-00-00 00:00:00', `node_end` 
 ↪timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', 
 ↪`datapath` varchar(256) DEFAULT NULL, `task_name` 
 ↪varchar(128) NOT NULL, PRIMARY KEY (`task_id`,`entry_id`) ) 
 ↪ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;

Listing 2 is a MySQL CREATE TABLE command to create the job control table as it is currently implemented. Most of the values in this table are populated when runs are pushed onto the stack. node_start and node_id are populated when the job is popped from it and begins active processing. node_end is filled in at the completion of a run. node_start, node_end and node_id aren't particularly necessary, but they collect useful statistics about runtime performance (although you can get the same information from Slurm's sacct command).

Listing 3. Job control stack push function implemented in MATLAB. sDataPath is the absolute path to the data file to process. This gets pushed onto the job control table to make it available for processing.

function push_job_table(iTaskID, sJobName, sDataPath)

sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';

sSQL = sprintf(['echo "insert into JobManagement (task_id, year, ' ...
                'doy, task_name) values (%d, \\"%s\\", \\"%s\\");"'],
iTaskID, ...
               sDataPath, sJobName);

[status, cmdout] = system([sSQL ' | ' sMYSQL]);

Jobs to be run are pushed to the stack by a routine like that shown in Listing 3. When a job is pushed to the stack, task_id takes on the process ID Slurm assigns to the overall processing run, and entry_id gets assigned a counter that starts at 1. This counter increments with each processing run added under that task_id. Taken together, entry_id and task_id are the primary key for the table and, thus, are unique for every record in the table. The actual information needed to retrieve data to process is stored in datapath. datapath specifies an absolute path to the primary data file that needs to be processed and can be populated directly from ls or find in most cases but can also come from routines with more sophisticated logic if we need to match up multiple input files or check that files exist first and so on. When a processor queries the database for a new job, as in Listing 4, the value of datapath is the primary information that gets returned.

Listing 4. Job control stack pop function implemented in MATLAB. Queries job control table for next available data file to process. If data is available, the path to it is returned. If none is available, returns an empty string causing further processing to end.11846l4.qrk

function stJobEntry = pop_job_table(taskid, entryid)

% determine which node we are on for table update (for performance
% tracking)
iNodeID = str2num(getenv('SLURM_PROCID'));

sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';

% select one entry from the job management table that hasn't been
% done yet. Immediately update with this processor's node_id to try
% to lock the row out from further requests.
sPOPSQL = sprintf(['echo "set @B=%d; set @C=%d;' ...
                   'select @A:=entry_id as entry_id, datapath from ' ...
                   'JobManagement where task_id = @B and entry_id = @C' ...
                   'and node_id ' ...
                   'is null limit 1 for update;' ...
                   'update JobManagement set node_id = %d,' ...
                   'node_start = now() where task_id = @B and ' ...
                   'entry_id = @A;' ...
                   'set @A=0;"'], taskid, entryid, iNodeID);

% execute the table pop and get a day that is not being processed
[status, cmdout] = system([sPOPSQL ' | ' sMYSQL ' | head -2 | tail ' ...
                    '-1']);

stJobEntry = struct('entry',0, 'datapath','');

if length(cmdout) > 0
    % parse cmdout to entry_id, datapath
    iTokens = str2num(cmdout);
    stJobEntry.entry = iTokens(1);
    stJobEntry.datapath = iTokens(2);
end

Listing 4 gives us the pop function for our stack. The idea in the pop is to select the next available row in the database and to lock it for update. The record is fully locked out by updating it with the node_id for the processor grabbing it and also the start time for processing. This is done in one command to minimize the CPU time in which another processor can grab this same record. Once the node_id has been set, the record never will come into consideration again.

The record retrieved or, really, the datapath stored within it, is passed out to a MATLAB structure that gets returned to the calling function where it will be used to start processing. push and pop are all that is really necessary, but because we went to the effort to include node_stop to track runtime, a routine to close out the job table entry is needed. Listing 5 shows one version of this close-out that simply updates the record to add the end time in the node_stop field. No explicit locking is needed here.

Listing 5. Once processing for a job table entry is complete, update the table with the time processing ended for basic runtime statistics.

function close_job_table_entry(iTaskID, iEntryID)

sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';

% close out day by adding processing end time to db table
sSQL = sprintf(['echo "update JobManagement set node_end=now() ' ...
                'where task_id = %d and entry_id = %d;"'], ...
               iTaskID, iEntryID);

[status, cmdout] = system([sSQL ' | ' sMYSQL]);

Some readers probably are wondering why I am doing database work through shell commands and system() escapes instead of some native database access. It's for three reasons really. First, I'm lazy, and I couldn't get any sort of MATLAB-native database connectivity to work. Second, this way I easily can develop and test at the command line and then paste into my code, and finally, I find this approach more easily portable. If we ever decide to become an IDL or Python shop, I can have these routines ported as quickly as I can look up their respective system calls. System shell escapes may be the slower, but in this use, they add only minutes onto runs that take tens of hours. There is not much point in chasing down those few minutes.

Wrap Up

Hopefully you now are convinced that a job control table is the best way to run jobs under Slurm. This approach has improved processing times significantly on several things we process routinely. Runs that used to take days now finish in one day, and I am not spending any time at all rewriting scripts for every run, so it definitely has worked for me.

Resources

AIRS: http://airs.jpl.nasa.gov

Slurm: http://slurm.schedmd.com/slurm.html

Load Disqus comments