Concurrency with Resque (hacking our way through)

In the past year, our team at FindHotel developed a rather sophisticated automated campaign building system for Google AdWords. This means that we have lots of long-running jobs, which at the end will save some (quite large) campaigns into our MongoDB cluster. Our stack consists of a distributed Ruby on Rails application and we use Resque for background job processing.

TL;DR

You can use Resque in a non-trivial way (shown here) to add parallel execution and speed up your complex, long-running background jobs (and not just by a little bit).

The long version

Resque is a great background jobs system, perhaps the best; for MRI at least. It has many awesome plugins, some of which are also being used by us (Resque Retry, Resque Scheduler, Resque Status, to name a few).

For some use cases, this setup works perfectly fine; eventually, though, it comes the time when sequential execution becomes the bottleneck, not to mention you want to make use of all those Xeon cores you have in the cloud anyway.

That is the probably the point when you wish you had chosen JRuby from the very beginning; but you are a true hacker, you want real concurrency, and you want it now. If you are interested in one way of doing this, then keep on reading.

Before getting to it, here is a public repo that contains the sample framework I'm demoing here.

Here comes the code

I’ve implemented a simple service (called SampleService - how smart 😉 ) to demonstrate the advantage of the concurrent execution. The service does nothing else than simply inserting documents into the DB.

The chosen DB is MongoDB 3.0.6 (configured to use the WiredTiger as storage engine), using Mongoid 5.0.0 as ORM.

class SampleService
 include ResqueConcurrencyAdapter
 MONGODB_BATCH_SIZE = 5000

 def initialize(nr_of_documents)
   @nr_of_documents = nr_of_documents
 end

 def run_sequentially
   self.class.generate_documents(@nr_of_documents)
   nil
 end

 def run_concurrently
   total_documents = @nr_of_documents
   documents_per_job = MONGODB_BATCH_SIZE
   job_count = total_documents / documents_per_job

   arguments_array = job_count.times.map { documents_per_job }
   if total_documents % documents_per_job != 0
     arguments_array << total_documents % documents_per_job
   end
   execute_concurrently('SampleService', 'generate_documents', arguments_array)
   nil
 end

 def self.generate_documents(count)
   (0...count).each_slice(MONGODB_BATCH_SIZE) do |slice|
     SampleDocument.collection.insert_many(slice.map { { text: SecureRandom.base64(50) } })
   end
 end
end

As you can see we have two methods: one for generating and inserting the documents sequentially, another one for concurrent generation and insert. Of course, we use bulk inserts.

The sequential execution is straightforward, the concurrent one is where the magic happens. It uses the ResqueConcurrencyAdapter module’s execute_concurrently method to achieve parallelism. Let’s have a look at that.

What this adapter does is quite simple: it enqueues as many (JobProcessor type) jobs as needed, tells them which class method to call, with which arguments, and then blocks the execution until they all finish.

There is a JobSupervisor object that tracks the progress and the status of all the spawned jobs. When the process is done, it gives back the control to the caller, returning the supervisor object. With the supervisor on hand, the caller service can check the status of the jobs, and their results (which are saved on JobInfo objects).

This is not part of the sample, but you can have a look at the JobSupervisor#iterate_job_infos method, you will get the idea.

module ResqueConcurrencyAdapter

  def execute_concurrently(klass, method, arguments_array)
    supervisor = JobSupervisor.create

    arguments_array.each_with_index do |arguments, i|
      Resque.enqueue(JobProcessor, {
        klass: klass,
        method: method,
        method_arguments: arguments,
        job_id: i,
        supervisor_id: supervisor.id.to_s
      })
      supervisor.increase_enqueued_jobs_count
    end

    supervisor.wait_for_completion

    supervisor
  end
end

I’ve written some benchmarking code:

nr_of_documents_values = [5000, 10_000, 100_000, 1_000_000, 10_000_000]

nr_of_documents_values.each do |nr_of_documents|
 Benchmark.bm do |x|
   SampleDocument.delete_all
   x.report("sequential #{nr_of_documents}") { SampleService.new(nr_of_documents).run_sequentially }

   SampleDocument.delete_all
   x.report("parallel #{nr_of_documents}") { SampleService.new(nr_of_documents).run_concurrently }
 end
end

Of course, this is just a dummy service, where the only thing we do is inserting into the database, but the results prove that it’s worth using this approach. These are the numbers on a MacBook Pro (Quad i7 2.2GHz).

It starts getting interesting around 1M, and it is a clear winner when we get to 10M:

$ ruby main.rb
       user     system      total        real
sequential 5000    0.070000   0.000000   0.070000 (  0.148114)
parallel   5000    0.010000   0.000000   0.010000 (  1.011989)
       user     system      total        real
sequential 10000   0.150000   0.000000   0.150000 (  0.306701)
parallel   10000    0.010000   0.000000   0.010000 (  1.011901)
       user     system      total        real
sequential 100000   1.570000   0.020000   1.590000 (  3.072587)
parallel   100000    0.040000   0.010000   0.050000 (  2.044006)
       user     system      total        real
sequential 1000000  15.600000   0.200000  15.800000 ( 31.412441)
parallel   1000000   0.340000   0.030000   0.370000 ( 23.699169)
       user     system      total        real
sequential 10000000 164.340000   2.280000 166.620000 (332.095931)
parallel   10000000   4.770000   0.430000   5.200000 (207.892764)

I also ran the 10M test on a desktop PC with better specs (Ubuntu 14.04 - Quad i7 3.2GHz):

sequential 124.950000   0.560000 125.510000 (165.600020)
parallel     2.300000   0.180000   2.480000 ( 79.669728)

Robustness concerns

"So, what happens if a job fails?" you might ask. And it’s a legitimate question. For those situations, I’d recommend using the Resque Retry plugin (not included in the example). This would make sure that a job always finishes successfully.

Also, the blocking condition of the supervisor (see JobSupervisor#jobs_in_progress) can be improved (i.e. wait until all the job statuses are in state "SUCCESS").

Another improvement might be the use of a timeout, to handle those really messed up situations - for example a job not terminating within, say, 2 hours - and in which case we want to abort it, making sure a cleanup is performed afterwards.

How we use it

The way all this concept is embedded in our system is very simple. We have long-running background jobs, which every now and then have to do some work that can (and should) be parallelized.

We just include the ResqueConcurrencyAdapter, and call the execute_concurrently method whenever it’s needed. So we have one “parent” job and a lot of “children” JobProcessor jobs.

Further ideas

You probably noticed that the JobSupervisor is polling the DB every second, and you started wondering how good is that. You are right, all the job tracking objects (JobSupervisor, JobInfo) could live in a place like Redis. The reason we store them in the database is that we want the data to be persistent, because in complex cases there’s more information to be stored than just a status and a message. For simple situations, a Redis-based solution would be a better (and faster) approach.

If MongoDB is removed from the dependencies, then this small framework could be wrapped into a gem and bundled easily within any application. Maybe this will be the subject of a future blog post.