In the past year, our team at FindHotel developed a rather sophisticated automated campaign building system for Google AdWords. This means that we have lots of long-running jobs, which at the end will save some (quite large) campaigns into our MongoDB cluster. Our stack consists of a distributed Ruby on Rails application and we use Resque for background job processing.
TL;DR
You can use Resque in a non-trivial way (shown here) to add parallel execution and speed up your complex, long-running background jobs (and not just by a little bit).
The long version
Resque is a great background jobs system, perhaps the best; for MRI at least. It has many awesome plugins, some of which are also being used by us (Resque Retry, Resque Scheduler, Resque Status, to name a few).
For some use cases, this setup works perfectly fine; eventually, though, it comes the time when sequential execution becomes the bottleneck, not to mention you want to make use of all those Xeon cores you have in the cloud anyway.
That is the probably the point when you wish you had chosen JRuby from the very beginning; but you are a true hacker, you want real concurrency, and you want it now. If you are interested in one way of doing this, then keep on reading.
Before getting to it, here is a public repo that contains the sample framework I'm demoing here.
Here comes the code
I’ve implemented a simple service (called SampleService
- how smart 😉 ) to demonstrate the advantage of the concurrent execution. The service does nothing else than simply inserting documents into the DB.
The chosen DB is MongoDB 3.0.6 (configured to use the WiredTiger as storage engine), using Mongoid 5.0.0 as ORM.
class SampleService
include ResqueConcurrencyAdapter
MONGODB_BATCH_SIZE = 5000
def initialize(nr_of_documents)
@nr_of_documents = nr_of_documents
end
def run_sequentially
self.class.generate_documents(@nr_of_documents)
nil
end
def run_concurrently
total_documents = @nr_of_documents
documents_per_job = MONGODB_BATCH_SIZE
job_count = total_documents / documents_per_job
arguments_array = job_count.times.map { documents_per_job }
if total_documents % documents_per_job != 0
arguments_array << total_documents % documents_per_job
end
execute_concurrently('SampleService', 'generate_documents', arguments_array)
nil
end
def self.generate_documents(count)
(0...count).each_slice(MONGODB_BATCH_SIZE) do |slice|
SampleDocument.collection.insert_many(slice.map { { text: SecureRandom.base64(50) } })
end
end
end
As you can see we have two methods: one for generating and inserting the documents sequentially, another one for concurrent generation and insert. Of course, we use bulk inserts.
The sequential execution is straightforward, the concurrent one is where the magic happens. It uses the ResqueConcurrencyAdapter
module’s execute_concurrently
method to achieve parallelism. Let’s have a look at that.
What this adapter does is quite simple: it enqueues as many (JobProcessor
type) jobs as needed, tells them which class method to call, with which arguments, and then blocks the execution until they all finish.
There is a JobSupervisor
object that tracks the progress and the status of all the spawned jobs. When the process is done, it gives back the control to the caller, returning the supervisor object. With the supervisor on hand, the caller service can check the status of the jobs, and their results (which are saved on JobInfo
objects).
This is not part of the sample, but you can have a look at the JobSupervisor#iterate_job_infos
method, you will get the idea.
module ResqueConcurrencyAdapter
def execute_concurrently(klass, method, arguments_array)
supervisor = JobSupervisor.create
arguments_array.each_with_index do |arguments, i|
Resque.enqueue(JobProcessor, {
klass: klass,
method: method,
method_arguments: arguments,
job_id: i,
supervisor_id: supervisor.id.to_s
})
supervisor.increase_enqueued_jobs_count
end
supervisor.wait_for_completion
supervisor
end
end
I’ve written some benchmarking code:
nr_of_documents_values = [5000, 10_000, 100_000, 1_000_000, 10_000_000]
nr_of_documents_values.each do |nr_of_documents|
Benchmark.bm do |x|
SampleDocument.delete_all
x.report("sequential #{nr_of_documents}") { SampleService.new(nr_of_documents).run_sequentially }
SampleDocument.delete_all
x.report("parallel #{nr_of_documents}") { SampleService.new(nr_of_documents).run_concurrently }
end
end
Of course, this is just a dummy service, where the only thing we do is inserting into the database, but the results prove that it’s worth using this approach. These are the numbers on a MacBook Pro (Quad i7 2.2GHz).
It starts getting interesting around 1M, and it is a clear winner when we get to 10M:
$ ruby main.rb
user system total real
sequential 5000 0.070000 0.000000 0.070000 ( 0.148114)
parallel 5000 0.010000 0.000000 0.010000 ( 1.011989)
user system total real
sequential 10000 0.150000 0.000000 0.150000 ( 0.306701)
parallel 10000 0.010000 0.000000 0.010000 ( 1.011901)
user system total real
sequential 100000 1.570000 0.020000 1.590000 ( 3.072587)
parallel 100000 0.040000 0.010000 0.050000 ( 2.044006)
user system total real
sequential 1000000 15.600000 0.200000 15.800000 ( 31.412441)
parallel 1000000 0.340000 0.030000 0.370000 ( 23.699169)
user system total real
sequential 10000000 164.340000 2.280000 166.620000 (332.095931)
parallel 10000000 4.770000 0.430000 5.200000 (207.892764)
I also ran the 10M test on a desktop PC with better specs (Ubuntu 14.04 - Quad i7 3.2GHz):
sequential 124.950000 0.560000 125.510000 (165.600020)
parallel 2.300000 0.180000 2.480000 ( 79.669728)
Robustness concerns
"So, what happens if a job fails?" you might ask. And it’s a legitimate question. For those situations, I’d recommend using the Resque Retry plugin (not included in the example). This would make sure that a job always finishes successfully.
Also, the blocking condition of the supervisor (see JobSupervisor#jobs_in_progress
) can be improved (i.e. wait until all the job statuses are in state "SUCCESS").
Another improvement might be the use of a timeout, to handle those really messed up situations - for example a job not terminating within, say, 2 hours - and in which case we want to abort it, making sure a cleanup is performed afterwards.
How we use it
The way all this concept is embedded in our system is very simple. We have long-running background jobs, which every now and then have to do some work that can (and should) be parallelized.
We just include the ResqueConcurrencyAdapter
, and call the execute_concurrently
method whenever it’s needed. So we have one “parent” job and a lot of “children” JobProcessor
jobs.
Further ideas
You probably noticed that the JobSupervisor
is polling the DB every second, and you started wondering how good is that. You are right, all the job tracking objects (JobSupervisor
, JobInfo
) could live in a place like Redis. The reason we store them in the database is that we want the data to be persistent, because in complex cases there’s more information to be stored than just a status and a message. For simple situations, a Redis-based solution would be a better (and faster) approach.
If MongoDB is removed from the dependencies, then this small framework could be wrapped into a gem and bundled easily within any application. Maybe this will be the subject of a future blog post.