Milhouse on software, engineering, and Emacs.

Simple Thread Pool in Ruby

TL;DR: In this post I will show how one could implement a simple Thread Pool in Ruby. With this thread pool implementation, We will implement a parallel-ish task runner. Of course this implementation is not “industrial strength”, but it is a fun exercise nonetheless.

What the heck is a thread pool ?

A Thread Pool is an abstraction that you can give a job to, and the job will be executed by one of the many threads contained in a pool. The main motivation for using thread pools is that creating and destroying threads have considerable cost. Creating a pool of such threads and repeatedly re-using them for executing jobs asynchronously can have massive performance benefits for a long-running application.

In this post I will show a little toy implementation that I hope will give you a feel of how they work.

Our home-baked thread pool implementation

The workhorse of our ThreadPool implementation is Ruby’s Thread::Queue. This class is a thread-safe implementation of a queue, and can be straightforwardly used to implement a job queue. By job, I mean anything that responds to the call method (i.e., quacks like a lambda).

The ThreadPool implementation is shown below:

require 'thread'

class ThreadPool
  def initialize(size)
    @size = size
    @queue = Queue.new
    @pool = (1..size).map { Thread.new(&pop_job_loop) }
  end

  def schedule(*args, &blk)
    queue << [blk, args]
  end

  def shutdown
    size.times { schedule { throw :kill } }
    pool.map(&:join)
  end

  protected

  attr_reader :size, :queue, :pool

  private

  def pop_job_loop
    -> { catch(:kill) { loop { rescue_nil(&run_job) } } }
  end

  def rescue_nil
    yield
  rescue => e
    e
  end

  def run_job
    -> { (job, args = queue.pop) && job.call(*args) }
  end
end

In the constructor of the class, we initialize an array of size size of threads. Each thread runs in an infinite loop trying to pop the job queue. The magic is that Thread::Queue#pop will block if the queue is empty, and will only return if some other thread calls Queue#push with some job.

That means that all the threads we created in ThreadPool#initialize are then ready and blocked waiting for Thread::Queue#pop to return. Since Thread::Queue is thread-safe, there is no danger of two threads executing the same job. In addition to that, the jobs are pop‘ed in the order they are push‘ed in the queue.

The ThreadPool#schedule method receives a block and arguments to be passed to it. The schedule method will convert this block to a lambda and push it to the job queue (via Thread::Queue#push). Those lambdas are then executed in another thread.

One little bit of ugliness is the #rescue_nil method. It is there because if a job throws an exception, the thread executing that job is killed and the pool gets drained.

Also, when you want to shutdown the ThreadPool while waiting for all it’s jobs to finish you can call ThreadPool#shutdown. That’s the first time I could figure out some legitimate use of Ruby’s throw/catch mechanism!

Now, we are going to look at a parallel-ish task runner implementation.

Task Runner

Our ThreadPool implementation, offers no mechanism for retrieving the return value of a job. In this section I will show how you can harness the power of ruby lambdas to work around this issue.

Have a look at my solution below:

class TaskRunner
  def initialize(tasks, pool)
    @tasks = tasks
    @pool = pool
    @result_queue = Queue.new
  end

  def run!
    tasks
      .map(&wrap_with_notify)
      .map(&schedule)
      .map(&await)
  end

  protected

  attr_reader :tasks, :result_queue, :pool

  private

  def wrap_with_notify
    -> (task) { -> (*) { result_queue << task.call } }
  end

  def schedule
    -> (task) { pool.schedule([], &task) }
  end

  def await
    -> (*) { result_queue.pop }
  end
end

(I couldn’t find a better name for TaskRunner#wrap_with_notify. If you think you have a better name for it, please, let me know!)

The TaskRunner receives a list of jobs/tasks in its constructor. Those jobs can be executed by calling the #call method on them. Now, in the TaskRunner#run! method we wrap those jobs as new jobs that will notify a result_queue of the return value of the original job (phew! probably pretty confused if you’re not used to higher order functions).

The wrapped jobs are returned by tasks.map(&wrap_with_notify). We then take those wrapped jobs and schedule each of them with #map(&schedule). Now, we can retrieve the return value of those jobs by pop‘ing the result_queue once for each scheduled job. This is done in #map(&await). The careful reader should notice that the return values are retrieved out of order.

As you can see, with this approach we are not dealing well with failure. In the case of an exception, because of the dreaded ThreadPool#rescue_nil method, we will end with the raised exception in the result_queue. In order to deal with such cases in a more elegant way, we should use a higher level abstraction like a future.

That’s it.

(1) If you’re looking for a production-ready library for dealing with concurrency in Ruby you should definitely check the concurrent-ruby gem (from which I actually stole much of the inspiration for this post).

Comments