Using MySQL for Load Balancing and Job Control under Slurm

Job Control Stack

What is really needed is some sort of job scheduling stack where one could store whatever parameters are required for each atomic processing step. Whatever information would be needed to find the right data to process could be pushed onto this list, and the various scripts could then pop the next unprocessed item whenever a node becomes free. Let's say the data we need to process is archived one file per day and we want to process several years of it. The job control stack idea is that we can make a list of the files we want to process. This may be as simple as running ls or find. We then can push their details onto the stack and start processing them in parallel. We ask Slurm to provide us with some processors and start MATLAB on each along with our second-tier processing script. Each instance is a loop not unlike before, but instead of processing some ad hoc chunk of the data, each iteration of the loop queries the stack for the next available unprocessed data. If there is data to process, it gets passed to the low-level processing script. If not, the loop exits, and MATLAB terminates. The system doesn't care if we have 365 days to process or 2,431, or whether we want to spread those across two or 200 processors. This means that we don't have to care either.

When the main job is done, and we find that some number of days didn't process properly, we just make a list of the failed files, push it into the job stack and run it again (after fixing the reason for the initial failure, of course). There should be very little reason to edit any of the scripts to do this.

This idea seems to solve all the big issues:

  • No ad hoc chunking of data: one process, one file. Repeat as necessary.

  • Node resource utilization is higher and more even: if a processor hits a series of days that process quickly, it just grabs more days to process until there are no more.

  • If a node dies—either because of a system error or because of some other inadequately trapped error (missing files, short files/array indexing problems)—the processing balances over the remaining nodes. You lose and need to reprocess one day's worth of data, but the rest of the data gets done elsewhere.

MySQL Implementation

It would be nice to be able to do this with a flat file for simplicity, but this approach led to some serious issues with collisions between different processors grabbing the same data to process. The need to be able to lock data once a processor has grabbed it led me to implementing this job stack with a MySQL database table.

Listing 2. SQL CREATE TABLE Script to Define the Basic Job Control Table


CREATE TABLE `JobManagement` ( `task_id` int(11) NOT 
 ↪NULL, `entry_id` int(11) NOT NULL AUTO_INCREMENT, 
 ↪`node_id` int(11) DEFAULT NULL, `node_start` timestamp 
 ↪NOT NULL DEFAULT '0000-00-00 00:00:00', `node_end` 
 ↪timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', 
 ↪`datapath` varchar(256) DEFAULT NULL, `task_name` 
 ↪varchar(128) NOT NULL, PRIMARY KEY (`task_id`,`entry_id`) ) 
 ↪ENGINE=MyISAM AUTO_INCREMENT=2 DEFAULT CHARSET=latin1;

Listing 2 is a MySQL CREATE TABLE command to create the job control table as it is currently implemented. Most of the values in this table are populated when runs are pushed onto the stack. node_start and node_id are populated when the job is popped from it and begins active processing. node_end is filled in at the completion of a run. node_start, node_end and node_id aren't particularly necessary, but they collect useful statistics about runtime performance (although you can get the same information from Slurm's sacct command).

Listing 3. Job control stack push function implemented in MATLAB. sDataPath is the absolute path to the data file to process. This gets pushed onto the job control table to make it available for processing.


function push_job_table(iTaskID, sJobName, sDataPath)

sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';

sSQL = sprintf(['echo "insert into JobManagement (task_id, year, ' ...
                'doy, task_name) values (%d, \\"%s\\", \\"%s\\");"'],
iTaskID, ...
               sDataPath, sJobName);

[status, cmdout] = system([sSQL ' | ' sMYSQL]);

Jobs to be run are pushed to the stack by a routine like that shown in Listing 3. When a job is pushed to the stack, task_id takes on the process ID Slurm assigns to the overall processing run, and entry_id gets assigned a counter that starts at 1. This counter increments with each processing run added under that task_id. Taken together, entry_id and task_id are the primary key for the table and, thus, are unique for every record in the table. The actual information needed to retrieve data to process is stored in datapath. datapath specifies an absolute path to the primary data file that needs to be processed and can be populated directly from ls or find in most cases but can also come from routines with more sophisticated logic if we need to match up multiple input files or check that files exist first and so on. When a processor queries the database for a new job, as in Listing 4, the value of datapath is the primary information that gets returned.

Listing 4. Job control stack pop function implemented in MATLAB. Queries job control table for next available data file to process. If data is available, the path to it is returned. If none is available, returns an empty string causing further processing to end.11846l4.qrk


function stJobEntry = pop_job_table(taskid, entryid)

% determine which node we are on for table update (for performance
% tracking)
iNodeID = str2num(getenv('SLURM_PROCID'));

sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';

% select one entry from the job management table that hasn't been
% done yet. Immediately update with this processor's node_id to try
% to lock the row out from further requests.
sPOPSQL = sprintf(['echo "set @B=%d; set @C=%d;' ...
                   'select @A:=entry_id as entry_id, datapath from ' ...
                   'JobManagement where task_id = @B and entry_id = @C' ...
                   'and node_id ' ...
                   'is null limit 1 for update;' ...
                   'update JobManagement set node_id = %d,' ...
                   'node_start = now() where task_id = @B and ' ...
                   'entry_id = @A;' ...
                   'set @A=0;"'], taskid, entryid, iNodeID);

% execute the table pop and get a day that is not being processed
[status, cmdout] = system([sPOPSQL ' | ' sMYSQL ' | head -2 | tail ' ...
                    '-1']);

stJobEntry = struct('entry',0, 'datapath','');

if length(cmdout) > 0
    % parse cmdout to entry_id, datapath
    iTokens = str2num(cmdout);
    stJobEntry.entry = iTokens(1);
    stJobEntry.datapath = iTokens(2);
end

Listing 4 gives us the pop function for our stack. The idea in the pop is to select the next available row in the database and to lock it for update. The record is fully locked out by updating it with the node_id for the processor grabbing it and also the start time for processing. This is done in one command to minimize the CPU time in which another processor can grab this same record. Once the node_id has been set, the record never will come into consideration again.

The record retrieved or, really, the datapath stored within it, is passed out to a MATLAB structure that gets returned to the calling function where it will be used to start processing. push and pop are all that is really necessary, but because we went to the effort to include node_stop to track runtime, a routine to close out the job table entry is needed. Listing 5 shows one version of this close-out that simply updates the record to add the end time in the node_stop field. No explicit locking is needed here.

Listing 5. Once processing for a job table entry is complete, update the table with the time processing ended for basic runtime statistics.


function close_job_table_entry(iTaskID, iEntryID)

sMYSQL = 'mysql -h myhost -u myuser -p<password> myDB';

% close out day by adding processing end time to db table
sSQL = sprintf(['echo "update JobManagement set node_end=now() ' ...
                'where task_id = %d and entry_id = %d;"'], ...
               iTaskID, iEntryID);

[status, cmdout] = system([sSQL ' | ' sMYSQL]);

Some readers probably are wondering why I am doing database work through shell commands and system() escapes instead of some native database access. It's for three reasons really. First, I'm lazy, and I couldn't get any sort of MATLAB-native database connectivity to work. Second, this way I easily can develop and test at the command line and then paste into my code, and finally, I find this approach more easily portable. If we ever decide to become an IDL or Python shop, I can have these routines ported as quickly as I can look up their respective system calls. System shell escapes may be the slower, but in this use, they add only minutes onto runs that take tens of hours. There is not much point in chasing down those few minutes.

Wrap Up

Hopefully you now are convinced that a job control table is the best way to run jobs under Slurm. This approach has improved processing times significantly on several things we process routinely. Runs that used to take days now finish in one day, and I am not spending any time at all rewriting scripts for every run, so it definitely has worked for me.

Resources

AIRS: http://airs.jpl.nasa.gov

Slurm: http://slurm.schedmd.com/slurm.html

______________________

As a chronic suffer of Multiple Avocation Disorder, Steven lives life in a desperate search for days with more hours in which to stuff new activities.