Linux Clustering with Ruby Queue: Small Is Beautiful

Using Ruby and SQLite to create Linux clusters that take advantage of idle nodes and bypass expensive software solutions.

My friend Dave Clements always is game for a brainstorming session, especially if I'm buying the coffee. Recently, we met at the usual place and I explained my problem to him over the first cup. My office had a bunch of Linux nodes sitting idle and a stack of work lined up for them, but we had no way to distribute the work to them. Plus, the deadline for project completion loomed over us.

Over the second cup of coffee, I related how I had evaluated several packages, such as openMosix and Sun's Grid Engine, but ultimately had decided against them. It all came down to this: I wanted something leaner than everything I'd seen, something fast and easy, not a giant software system that would require weeks of work to install and configure.

After the third cup of coffee, we had it: Why not simply create an NFS-mounted priority queue and let nodes pull jobs from it as fast as they could? No scheduler, no process migration, no central controller, no kernel mods--simply a collection of compute nodes working as fast as possible to complete a list of tasks. But there was one big question: was accessing an NFS-mounted queue concurrently from many nodes possible to do safely? Armed with my favorite development tools--a brilliant IDE named Vim and the Ruby programming language--I aimed to find out.

History

I work for the National Geophysical Data Center's (NGDC) Solar-Terrestrial Physics Division (STP), in the Defense Meteorological Satellite Program (DMSP) group. My boss, Chris Elvidge, and the other scientists in our group study the the night-time lights of Earth from space. The data we receive helps researchers understand changes in human population and the movement of forest fires, among other things. The infrastructure required to do this kind of work is astounding. The following image, showing the average intensity of night-time lights over part of North America, required over 100 gigabytes of input data and 142 terabytes of intermediate files to produce. Over 50,000 separate processes spread across 18 compute nodes and a week of clock time went into its production.

Linux clusters have become the new supercomputers. The economics of teraflop performance built on commodity hardware is impossible to ignore in the current climate of dwindling research funding. However, one critical aspect of cluster-building, namely orchestration, frequently is overlooked by the people doing the buying. The problem facing a developer with clustered systems is analogous to the one facing a home buyer who can afford only a lot and some bricks--he's got a lot of building to do.

Building a Small Brick House on a Shoestring

Yukihiro Matsumoto, aka Matz, has said that "The purpose of Ruby is to maximize programming pleasure", and experience has taught me that enjoying the creative process leads to faster development and higher quality code. Ruby features powerful object-oriented abstraction techniques, extreme dynamism, ease of extensibility and an armada of useful libraries. It is a veritable Swiss Army machete, precisely the sort of tool one should bring into uncharted territory such as the NFS-mounted priority queue I was seeking to build.

The first task I faced when creating Ruby Queue (rq) was to work out the issues with concurrent access to NFS shared storage, and the first bridge I had to cross was how to accomplish NFS-safe locking from within Ruby. Ruby has an fcntl interface similar to Perl's and, like Perl's, the interface requires you to pack a buffer with the struct arguments. This is perfectly safe but, unfortunately, non-portable. I've wondered about this oversight before and decided to address it by writing a little C extension, posixlock, which extends Ruby's built-in File class with a method to apply fcntl, or POSIX-style, advisory locks to a File object. Here is a majority of the code from posixlock.c:


static int
posixlock (fd, operation)
     int fd;
     int operation;
{
  struct flock lock;
  switch (operation & ~LOCK_NB)
    {
    case LOCK_SH:
      lock.l_type = F_RDLCK;
      break;
    case LOCK_EX:
      lock.l_type = F_WRLCK;
      break;
    case LOCK_UN:
      lock.l_type = F_UNLCK;
      break;
    default:
      errno = EINVAL;
      return -1;
    }
  lock.l_whence = SEEK_SET;
  lock.l_start = lock.l_len = 0L;
  return fcntl (fd,
		(operation & LOCK_NB) ? F_SETLK :
		F_SETLKW, &lock);
}

static VALUE
rb_file_posixlock (obj, operation)
     VALUE obj;
     VALUE operation;
{
  OpenFile *fptr;
  int ret;
  rb_secure (2);
  GetOpenFile (obj, fptr);
  if (fptr->mode & FMODE_WRITABLE)
    {
      fflush (GetWriteFile (fptr));
    }
retry:
  TRAP_BEG;
  ret =
    posixlock (fileno (fptr->f),
	       NUM2INT (operation));
  TRAP_END;
  if (ret < 0)
    {
      switch (errno)
	{
	case EAGAIN:
	case EACCES:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
	case EWOULDBLOCK:
#endif
	  return Qfalse;
	case EINTR:
#if defined(ERESTART)
	case ERESTART:
#endif
	  goto retry;
	}
      rb_sys_fail (fptr->path);
    }

void
Init_posixlock ()
{
  rb_define_method (rb_cFile, "posixlock",
		    rb_file_posixlock, 1);
}

Granted, it's a bit ugly, but C code almost always is. One of things that's really impressive about Ruby, however, is the code for the interpreter itself is quite readable. The source includes array.c, hash.c and object.c--files even I can make some sense of. In fact, I was able to steal about 98% of the above code from Ruby's File.flock implementation defined in file.c.

Along with posixlock.c, I needed to write an extconf.rb (extension configure) file, which Ruby automagically turns into a Makefile. Here is the complete extconf.rb file used for the posixlock extension:


require 'mkmf' and create_makefile 'posixlock'

Usage of the extension mirrors Ruby's own File.flock call, but this is safe for NFS-mounted files. The example below can be run simultaneously from several NFS clients:


require 'socket'
require 'posixlock'

host = Socket::gethostname
puts "test running on host #{ host }"

File::open('nfs/fcntl_locking.test','a+') do |file|
  file.sync = true
  loop do
    file.posixlock File::LOCK_EX
    file.puts "host : #{ host }"
    file.puts "locked : #{ Time::now }"
    file.posixlock File::LOCK_UN
    sleep 0.42
  end
end

A tail -f of the NFS-mounted file fcntl_locking.test shows the file is being accessed concurrently in a safe fashion. Notice the lack of error checking: Ruby is an exception-based language, so any method that does not succeed raises an error, and a detailed stack trace is printed on standard error.

One of the things to note about this extension is I actually was able to add a method to Ruby's built-in File class. Ruby's classes are open--you can extend any class at any time, even the built-in ones. Obviously, extending the built-in classes should be done with great care, but there is a time and a place for it, and Ruby does not prevent you from doing so where it makes sense. The point here is not that you have to extend Ruby but that you can. And it is not difficult to do.

Having resolved my locking dilemma, the next design choice I had to make was what format to use to store the queue. Ruby has the ability to serialize any object to disk, and it includes a transactionally based, file-backed object storage class, PStore, that I have used successfully as a mini database for many CGI programs. I began by implementing a wrapper on this class that used the posixlock module to ensure NFS-safe transactions and that supported methods such as insert_job, delete_job and find_job. Right away, I started to feel like I was writing a little database.

Not being one to reinvent the wheel (at least not too often), I decided to utilize the SQLite embedded database and the excellent Ruby bindings for it written by Jamis Buck as a storage backend. This really helped get the project moving, as I was freed from writing a lot of database-like functionality.

Many database APIs have made the choice of returning either a hash or an array to represent a database tuple (row). With tuples represented as hashes, you can write code that can be read easily, such as this:


ssn = tuple['ssn']

Yet, you are unable to write natural code, such as:


sql = 
  "insert into jobs values ( #{ tuple.join ',' } )"

or


primary_key, rest = tuple

And with an array representation, you end up with indecipherable code, such as this:


field = tuple[7]

Now, what was field 7 again?

When I first started using the SQLite bindings for Ruby, all of the tuples were returned as hashes, and I had a lot of slightly verbose code that converted tuples from hash to array and back again. Anyone who's spent much time working with Ruby can tell you that Ruby's elegance inspires you to make your own code more elegant. All this converting was not only inelegant but inefficient too. What I wanted was a tuple class that was an array, but one that allowed keyword field access for readability and elegance.

For Ruby, this task was no problem. I wrote a pure Ruby module, ArrayFields, that allowed any array to do exactly this. In Ruby a module not only is a namespace, but it can be mixed in to other classes to impart functionality. The effect is similar but less confusing than multiple inheritance. In fact, Ruby classes not only can be extended in this way, but instances of Ruby objects themselves can be extended dynamically with the functionality of a module--leaving other instances of that same class untouched. Here's an example using Ruby's Observable module, which implements the Publish/Subscribe design pattern:


require 'observer'
publisher = Publisher::new
publisher.extend Observable

In this example, only this specific instance of the Publisher class is extended with Observable's methods.

Jamis was more than happy to work with me to add ArrayFields support to his SQLite package. The way it works is simple: if the ArrayFields module is detected at runtime, then the tuples returned by a query are extended dynamically to support named field access. No other array objects in memory are touched, only those arrays returned as tuples are extended with ArrayFields.

Finally, I was able to write readable code that looked like this:


require 'arrayfields'
require 'sqlite'

...

query = 'select * from jobs order by submitted asc'

tuples = db.execute query 

tuples.each do |tuple|

  jid, command = job['jid'], job['command']

  run command

  job['state'] = 'finished'

 # quoted list of job's fields 

  values = job.map{|val| "'#{ val }'" }.join ','

  sql = "insert into done values( #{ values } )"

  db.execute sql

end

and elegant code, such as this:


jobs.sort_by{ |job| job['submitted'] }

This extension offers more than mere convenience; using arrays over hashes is faster, requires about 30% less memory and makes many operations on tuples more natural to code. Allowing keyword access to the arrays makes the code more readable and frees the developer from remembering field positions or, worse, having to update code if a change to the database schema should change the order of fields. Finally, a reduction in total lines of code almost always aids both development and maintenance.

______________________

Comments

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.

many questions

esnebraska's picture

I have many questions about "Ruby Queue", can I email you directly?

sorry for late reply...

a's picture

sure!

NFS share a single point of failure

Anonymous's picture

'rq' has no central brain, no communication between nodes, and no scheduler

This sounded like a distributed approach (like P2P), however, there is still a central server that export the NFS share and hence a single point of failure, right? (Just try to understand the idea better.)

[RE] NFS share a single point of failure

-a's picture

yes - exactly right. however, at least in many cases, this is not a drawback per se. the reason is that we already have a strong dependancy on NFS; our scripts and binaries reside there, our config files live there, many static data files live there, and even input/output to programs lives there (though we always work on local copies for performance). we are totally dead in the water without NFS. one of the goals of rq was not to ADD a point of failure. we considered using a RDBMS, for example, in which to store the queue but this adds a point of failure unless you do the (huge) task of setting up a HA db. in essence rq leverages our existing single point of failure. also, as far as single points of failure go NFS is a good one: if mounts are 'hard' processing simply hangs as the server reboots. this applies, of course, to ALL files access including that of the db for rq. because of this we can reboot our NFS server even if 30 nodes are currently using the queue - this behaviour, while it can be coded, is much harder to acheive with a TCP connection to a database. we have tested this many times including a run where we simply pressed the power button on the NFS server and all it's nodes. although i'm sure this could potentially cause problems we've experienced zero through our tests and several real power strip failures. sqlite is not perfect but does a VERY good job at maintaining ACID properties within the confines of the filesystems abilities.

kind regards.

-a

A great tool...

David's picture

This tools is really great ! I have downloaded all the binaries and I have tested it. All works correctly except when I try to start a second "feeder" computer... I obtain the following message :
process <18182> is already feeding from this queue
What's wrong ? Do you have any idea ?

a great tool

-a's picture

hmmm. this should not happen UNLESS you are trying to start more than one feeding process from a single host. are you attempting to do this on separate hosts and seeing this? i've never seen that but bugs are always possible. contact me offline and we can work out the problem and post the answer back here.

kind regards.

-a

a great tool

-a's picture

so - turns out this a little bugette resulting from two hosts using the same pidfile when (and only when) the home dir itself is NFS mounted. i actually have support to work around this in the code base but the command line switch was taken out for other reasons. i'll add a small fix and make a release later today. the latest rq also has support for automatic job restart if a node reboots and the ability to sumbit jobs to a specfic host (quite useful for debugging). look for release 2.0.0 on the download site this afternoon (MDT).

kind regards.
-a

a great tool

-a's picture

the buggette is fixed and new version (2.0.0) available for download.

cheers.

-a

why not the maildir solution?

Anonymous's picture

I read the article quickly, it's quite interesting.

To my eyes this looks like a replay of the mbox vs maildir debate, with the current article's solution being, "add more complication to the mbox."

Could you add a little blurb as to why one file containing all the jobs data and requiring complex locking is better than one job per file?

one-job-per-file AFAICT would require much, much simpler locking (with a good filehandling protocol/sequence/scheme perhaps no locking).

I hope I've not badly misunderstood the requirements.

mbox vs. maildir approach

-a's picture

i actually considered that approach. the vsdb project uses that idea for nfs safe transactions. the problem with that idea was in implementing ideas like

deleting: will give ESTALE on remote client nfs box if it's using the job when it's deleted.

searching: requires managing a read lock on each file while iterating

updating: requires managing a write lock on each file while updating

having something as powerful as sqlite under the hood made writing this code at LEAST 50 times easier than it would have been without. it's true you could code a basic job running scheme this way, but there are many problems:

who takes which jobs?

how do you coordinate atomically 'taking' a job to run?

i think you'll see that, as soon as you implement useful features on a system like this, you end up either

a) writing nfs transactions yourself (tricky)

b) having a central brain that 'decides' which jobs go where (naming conventions). realize that 'rq' has no central brain, no communication between nodes, and no scheduler. each host simply works as fast as possible to finish the list of jobs. this is possible because taking a job from the queue and starting to run it is an atomic action.

in any case i think you have understood a part of the problem well and i hope this sheds some light.

tuplespaces

Anonymous's picture

> who takes which jobs?

> how do you coordinate atomically 'taking' a job to run?

TupleSpaces can be used as the basis for this kind of "pull-driven"
set up --- clients pull tuples (jobs) from the tuplespace and leave
behind 'pending' tuples, later they pull the pending tuple and write
back their finished tuple. An admin program hooks up to add new jobs
(tuples), or to read all tuples (or particular kinds of tuples) to
provide status, or to collect finished job-tuples.

tuplespaces

-a's picture

yes - a great idea. this was defintely on my initial list of design ideas. the problem, for us, is that the current security environment on government machines makes ANY sort of networked programming extremely laden with red tape. any tuplespace requires a client/server type architchture which, of course, requires networking. 'rq' is in fact essentially a tuplespace -- it's a single database table containing jobs at tuples ;-)... clients simply pull jobs from it as you suggest. the difference? the networking is handled via NFS - not on top of TCP/UDP etc. in any case, i agree with you that a tuplespace can be a good solution for this sort of problem domain but it would not fly in our shop. the red tape for a 30 node cluster would mean months of time wasted, the NFS model allows a scientist to set up a 30 node cluster SANS sysad in under 30 minutes.

one last thing - if one WERE designing a tuplespace to contain, say, 100000 jobs one would certainly layer it on top of some sort of persistent and transactionally based storage (i hope) and sqlite is a good fit for that. the hitch is, once you've layer your tuplespace server on top of sqlite you don't really need it anymore unless you don't want to go the route of NFS (a possibility). and, of course, if you layer it on top of a network available RDBMS (postgresql for example) you also then don't need a tuplespace any longer.

tuplespaces ARE very attractive for heterogeneous environments and i think a product using that technology (perhaps with sqlite as a backend) would be successful if written. it would share one of the features of rq in that it also would 'auto load-balance' as each client simply took jobs from the queue as fast as possible.

kind regards.

-a

continuing...

-a's picture

sorry to follow up my own post, but i sent prematurely...

in summary:

maildir solves a 'multiple writer single reader' problem - rq solves a (very different) 'multiple writer multiple reader problem.'

cheers.

-a

Great article

gavin's picture

Great article, Ara. I only understood 50% of it, but the picture sure is perty.

Easy, but powerful 8-)

Anonymous's picture

Hi A.

This looks easy, like all great ideas. I mean - a computer cannot be faster, than it is built for. So just pull out the tasks - and when the working machine is ready - get the next one.

So when you are running out of proc-time - you just buy another bunch of machines 8-)))))

Marco from: Travel Discount Hotels
Yes, it's true - there are no more lovers left alive,
no one has survived... That's why love has died. PSB

Starting jobs at reboot

chris2's picture

"In this way, an ordinary user can set up a process that is running at all times, even after a machine reboot."

Most modern cron(1) also support @reboot which is run just after cron starts.

@reboot

Anonymous's picture

on second thought the @reboot approach is not quite the same: the crontab/lockfile approach i use creates an 'immortal' daemon. eg. the daemon is restarted even if it died (bug) or was killed (accident). using the @reboot method does not ensure the daemon is ALWAYS running. one could argue that a GOOD thing. regardless, they are not quite the same.

cheers.

you learn something everyday

-a's picture

that's a great tip. i'll take it!

cheers.

ruby

Anonymous's picture

Let's declare this "Ruby Queuesday"

Just a small remark - I'm

Anonymous's picture

Just a small remark - I'm using rq-3.4.0 gem and had to change this command:
rq queue feed --daemon -l=~/rq.log
to:
rq queue feed --daemon -l ~/rq.loq
I.e. I had to remove the "=" sign.

White Paper
Linux Management with Red Hat Satellite: Measuring Business Impact and ROI

Linux has become a key foundation for supporting today's rapidly growing IT environments. Linux is being used to deploy business applications and databases, trading on its reputation as a low-cost operating environment. For many IT organizations, Linux is a mainstay for deploying Web servers and has evolved from handling basic file, print, and utility workloads to running mission-critical applications and databases, physically, virtually, and in the cloud. As Linux grows in importance in terms of value to the business, managing Linux environments to high standards of service quality — availability, security, and performance — becomes an essential requirement for business success.

Learn More

Sponsored by Red Hat

White Paper
Private PaaS for the Agile Enterprise

If you already use virtualized infrastructure, you are well on your way to leveraging the power of the cloud. Virtualization offers the promise of limitless resources, but how do you manage that scalability when your DevOps team doesn’t scale? In today’s hypercompetitive markets, fast results can make a difference between leading the pack vs. obsolescence. Organizations need more benefits from cloud computing than just raw resources. They need agility, flexibility, convenience, ROI, and control.

Stackato private Platform-as-a-Service technology from ActiveState extends your private cloud infrastructure by creating a private PaaS to provide on-demand availability, flexibility, control, and ultimately, faster time-to-market for your enterprise.

Learn More

Sponsored by ActiveState