Background Job Processing Using Ractor (Ruby 3)
Ruby 3 is coming out at the end of 2020 🎉 and one of the most expected features is the new approach for parallel execution called Ractor (Ruby’s actor-like concurrent abstraction).
Here in this post, we will build a simple Sidekiq-like worker pool using Ractor.
Remember that at this moment, Ruby 3 isn’t in a stable version yet, so the interface may be changed.
Why Ractor?
As ko1 explained, Ractor achieves the following goals:
- Parallel execution in a Ruby interpreter process;
- Avoid thread-safety issues (especially race issues) by limiting the object sharing;
- Communication via copying and moving.
The project described in this post is inspired by the worker pool example specified here. It depends on Redis. Here is the repository.
The Loops
The program basically works running three loops with the following responsibilities:
- Loop #1: Gets a job from the queue and send it to the
pipe
; - Loop #2: (the
pipe
) Receives the job and provide it to theworkers
; - Loop #3: (a
worker
) Takes the job and process it.
This is the first loop:
loop do
job = RedisConn.conn.blpop(['queue:ractor-example'], 1)
next if job.nil?
FileLogger.log("#{JSON.parse(job[1]).to_s}")
pipe << JSON.parse(job[1])
end
The RedisConn
is connecting to a Redis instance and poping a job with the blpop
method. Then, it sends the message (which is a parsed JSON) to the pipe
object.
The pipe
is a Ractor
instance:
pipe = Ractor.new do
loop do
Ractor.yield Ractor.recv
end
end
Its responsibility is to receive the job (a copy of it) using Ractor.recv
and provide it to the workers using Ractor.yield
.
We can already see the complete push-type messaging communication with Ractor. The first loop is pushing a message to the Ractor by calling pipe <<
, and then it receives the message with the Ractor.recv
inside its implementation block.
Looking into what happens in the workers, we can see the second type of communication (pull-type):
workers = (1..WORKERS).map do |worker_id|
Ractor.new(worker_id, pipe) do |worker_id, pipe|
loop do
job = pipe.take
Performer.new.perform(worker_id, job)
end
end
end
The workers receive a worker_id
(just used for logging) and the pipe
instance as parameters. Inside of their block, at the beginning of the loop, they start taking a job from the pipe
(using #take
) which is provided by Ractor.yield
. Then, the job is processed, and I’ll explain how it works later.
So this is how the job is passing to each Ractor:
- The job is sent to the
pipe
using<<
(which is an alias for#send
); - The
pipe
receives the job withRactor.recv
; - The
pipe
provides the job withRactor.yield
; - A
worker
Ractor takes the job with#take
.
The Worker
In this section, I’ll briefly explain the implementation of the enqueue and the worker classes.
We have a module
called Worker
that extends the perform_async
. This method pushes a job to the Redis queue, specifying its parameters and the class responsible to handle it.
def perform_async(data)
RedisConn.conn.rpush(
'queue:ractor-example',
{ 'klass' => self.name, 'data' => data }.to_json
)
end
Then we have a Fibonacci solver that includes this module, and we’ll use it later for tests:
class FibWorker
include Worker def perform(data)
fib(data['n'])
end def fib(n)
if n <= 1
n
else
fib(n - 1) + fib(n - 2)
end
end
end
And finally, the Performer
instantiates the related class and performs the job:
result = Object.const_get(klass).new.perform(data)
Testing!
As you can see in the code snippets before, I inserted some logs for a better understanding of what’s going on when it’s running.
That’s what we’ll see in the log file:
- A print when it pops a job from Redis;
- A print when the
pipe
Ractor receives a job; - A print when the
FibWorker
starts to solve the job (and the worker ID); - A print when the
FibWorker
finishes the job (and the result);
In my test, I initialized the loop.rb
(which contains the loop that watches to the queue and instantiates the pipe
and the workers
), and then I pushed some jobs:
(34..44).each do |n|
FibWorker.perform_async({ 'n' => n })
end
So it’s basically including 10 inputs.
This is the result log:
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":34}}
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>34}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":35}}
19:20:10-performer-1:FibWorker:{"n"=>34}:start
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>35}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":36}}
19:20:10-performer-2:FibWorker:{"n"=>35}:start
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>36}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":37}}
19:20:10-performer-3:FibWorker:{"n"=>36}:start
19:20:10-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>37}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":38}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":39}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":40}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":41}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":42}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":43}}
19:20:10-redis-blpop-{"klass":"FibWorker","data":{"n":44}}
19:20:11-performer-1:FibWorker:{"n"=>34}:5702887
19:20:11-performer-1:FibWorker:{"n"=>37}:start
19:20:11-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>38}}
19:20:11-performer-2:FibWorker:{"n"=>35}:9227465
19:20:11-performer-2:FibWorker:{"n"=>38}:start
19:20:11-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>39}}
19:20:11-performer-3:FibWorker:{"n"=>36}:14930352
19:20:11-performer-3:FibWorker:{"n"=>39}:start
19:20:11-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>40}}
19:20:13-performer-1:FibWorker:{"n"=>37}:24157817
19:20:13-performer-1:FibWorker:{"n"=>40}:start
19:20:13-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>41}}
19:20:14-performer-2:FibWorker:{"n"=>38}:39088169
19:20:14-performer-2:FibWorker:{"n"=>41}:start
19:20:14-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>42}}
19:20:17-performer-3:FibWorker:{"n"=>39}:63245986
19:20:17-performer-3:FibWorker:{"n"=>42}:start
19:20:17-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>43}}
19:20:22-performer-1:FibWorker:{"n"=>40}:102334155
19:20:22-performer-1:FibWorker:{"n"=>43}:start
19:20:22-pipe-recv-{"klass"=>"FibWorker", "data"=>{"n"=>44}}
19:20:30-performer-2:FibWorker:{"n"=>41}:165580141
19:20:30-performer-2:FibWorker:{"n"=>44}:start
19:20:47-performer-3:FibWorker:{"n"=>42}:267914296
19:21:11-performer-1:FibWorker:{"n"=>43}:433494437
19:21:41-performer-2:FibWorker:{"n"=>44}:701408733
As you can see, in the first second, the loop that is poping the jobs from Redis gets all the 10 jobs. Then, as the workers complete the tasks, the pipe is able to deliver the next one.
I highlighted the steps of input number 40, just for example.
Here is another test running more jobs and using 8 workers. The operating system is using all CPUs when needed:
Conclusions
It was fun to implement parallelism in Ruby using Ractor. Still, there is a huge change in the mindset because of the limiting of sharing objects.
I’m curious to see how it will impact the major gems of the Ruby community.