Background Job Processing Using Ractor (Ruby 3)

Ruby 3 is coming out at the end of 2020 🎉 and one of the most expected features is the new approach for parallel execution called Ractor (Ruby’s actor-like concurrent abstraction).

Here in this post, we will build a simple Sidekiq-like worker pool using Ractor.

Remember that at this moment, Ruby 3 isn’t in a stable version yet, so the interface may be changed.

As ko1 explained, Ractor achieves the following goals:

  • Parallel execution in a Ruby interpreter process;
  • Avoid thread-safety issues (especially race issues) by limiting the object sharing;
  • Communication via copying and moving.

The project described in this post is inspired by the worker pool example specified here. It depends on Redis. Here is the repository.

The program basically works running three loops with the following responsibilities:

  • Loop #1: Gets a job from the queue and send it to the pipe;
  • Loop #2: (the pipe) Receives the job and provide it to the workers;
  • Loop #3: (a worker) Takes the job and process it.

This is the first loop:

loop do
job = RedisConn.conn.blpop(['queue:ractor-example'], 1)
next if job.nil?
FileLogger.log("#{JSON.parse(job[1]).to_s}")
pipe << JSON.parse(job[1])
end

The RedisConn is connecting to a Redis instance and poping a job with the blpop method. Then, it sends the message (which is a parsed JSON) to the pipe object.

The pipe is a Ractor instance:

pipe = Ractor.new do
loop do
Ractor.yield Ractor.recv
end
end

Its responsibility is to receive the job (a copy of it) using Ractor.recv and provide it to the workers using Ractor.yield.

We can already see the complete push-type messaging communication with Ractor. The first loop is pushing a message to the Ractor by calling pipe <<, and then it receives the message with the Ractor.recv inside its implementation block.

Looking into what happens in the workers, we can see the second type of communication (pull-type):

workers = (1..WORKERS).map do |worker_id|
Ractor.new(worker_id, pipe) do |worker_id, pipe|
loop do
job = pipe.take
Performer.new.perform(worker_id, job)
end
end
end

The workers receive a worker_id (just used for logging) and the pipe instance as parameters. Inside of their block, at the beginning of the loop, they start taking a job from the pipe (using #take) which is provided by Ractor.yield. Then, the job is processed, and I’ll explain how it works later.

So this is how the job is passing to each Ractor:

  1. The job is sent to the pipe using << (which is an alias for #send);
  2. The pipe receives the job with Ractor.recv;
  3. The pipe provides the job with Ractor.yield;
  4. A worker Ractor takes the job with #take.

In this section, I’ll briefly explain the implementation of the enqueue and the worker classes.

We have a module called Worker that extends the perform_async. This method pushes a job to the Redis queue, specifying its parameters and the class responsible to handle it.

def perform_async(data)
RedisConn.conn.rpush(
'queue:ractor-example',
{ 'klass' => self.name, 'data' => data }.to_json
)

end

Then we have a Fibonacci solver that includes this module, and we’ll use it later for tests:

class FibWorker
include Worker
def perform(data)
fib(data['n'])
end
def fib(n)
if n <= 1
n
else
fib(n - 1) + fib(n - 2)
end
end
end

And finally, the Performer instantiates the related class and performs the job:

result = Object.const_get(klass).new.perform(data)

As you can see in the code snippets before, I inserted some logs for a better understanding of what’s going on when it’s running.

That’s what we’ll see in the log file:

  • A print when it pops a job from Redis;
  • A print when the pipe Ractor receives a job;
  • A print when the FibWorker starts to solve the job (and the worker ID);
  • A print when the FibWorker finishes the job (and the result);

In my test, I initialized the loop.rb (which contains the loop that watches to the queue and instantiates the pipe and the workers), and then I pushed some jobs:

(34..44).each do |n|
FibWorker.perform_async({ 'n' => n })
end

So it’s basically including 10 inputs.

This is the result log:

19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":34}}
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>34}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":35}}
19:20:10-performer-1:FibWorker:{"n"=>34}:start
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>35}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":36}}
19:20:10-performer-2:FibWorker:{"n"=>35}:start
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>36}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":37}}
19:20:10-performer-3:FibWorker:{"n"=>36}:start
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>37}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":38}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":39}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":40}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":41}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":42}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":43}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":44}}
19:20:11-performer-1:FibWorker:{"n"=>34}:5702887
19:20:11-performer-1:FibWorker:{"n"=>37}:start
19:20:11-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>38}}
19:20:11-performer-2:FibWorker:{"n"=>35}:9227465
19:20:11-performer-2:FibWorker:{"n"=>38}:start
19:20:11-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>39}}
19:20:11-performer-3:FibWorker:{"n"=>36}:14930352
19:20:11-performer-3:FibWorker:{"n"=>39}:start
19:20:11-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>40}}
19:20:13-performer-1:FibWorker:{"n"=>37}:24157817
19:20:13-performer-1:FibWorker:{"n"=>40}:start
19:20:13-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>41}}
19:20:14-performer-2:FibWorker:{"n"=>38}:39088169
19:20:14-performer-2:FibWorker:{"n"=>41}:start
19:20:14-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>42}}
19:20:17-performer-3:FibWorker:{"n"=>39}:63245986
19:20:17-performer-3:FibWorker:{"n"=>42}:start
19:20:17-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>43}}
19:20:22-performer-1:FibWorker:{"n"=>40}:102334155
19:20:22-performer-1:FibWorker:{"n"=>43}:start
19:20:22-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>44}}
19:20:30-performer-2:FibWorker:{"n"=>41}:165580141
19:20:30-performer-2:FibWorker:{"n"=>44}:start
19:20:47-performer-3:FibWorker:{"n"=>42}:267914296
19:21:11-performer-1:FibWorker:{"n"=>43}:433494437
19:21:41-performer-2:FibWorker:{"n"=>44}:701408733

As you can see, in the first second, the loop that is poping the jobs from Redis gets all the 10 jobs. Then, as the workers complete the tasks, the pipe is able to deliver the next one.

I highlighted the steps of input number 40, just for example.

Here is another test running more jobs and using 8 workers. The operating system is using all CPUs when needed:

Running with 8 workers and sending dozens of jobs

It was fun to implement parallelism in Ruby using Ractor. Still, there is a huge change in the mindset because of the limiting of sharing objects.

I’m curious to see how it will impact the major gems of the Ruby community.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store