Tag Archives: Distributed

Parallel Elixir Run

Last time we’ve managed to access a worker from multiple processes. This time, we’ll try to distribute worker’s load across several processes. The problem we’re trying to solve is to calculate occurrence of letters in given list of texts. There is no need for shared state, we just want to distribute the load. Hence, the Agent is not really needed. Task supports exactly what we need, the async call.

It works like this:

  • invoke a job, asynchronously
  • wait for job to finish and return results

And you have the corresponding methods:

Or, taken from the documentation:

task = Task.async(fn -> do_some_work() end)
res  = do_some_other_work()
res + Task.await(task)

Here, the important thing to notice is that you need to await for the result, and that both processes are linked so any crashes on either side will cause the other to fail as well. This suits our purpose quite nicely, so no worries there.

The above example shows how to spawn and await a single process. But, we’d like to spawn more, and distribute an unknown load evenly across those processes / workers, later collecting all of their results. And, funny enough, Enum.map/2 to the rescue!

@spec frequency([String.t], pos_integer) :: Dict.t
def frequency(texts, workers) do
  texts
    |> group_by(workers)
    |> Enum.map(fn(texts_for_a_worker) ->
      Task.async(fn -> count_frequencies(texts_for_a_worker) end)
    end)
    |> Enum.map(&(Task.await(&1, 10000)))
    |> merge_results
end

So, the actual magic happens with first Enum.map/2, which spawns a new worker and delivers it the texts to process and in the second Enum.map/2, which awaits each worker, mapping it’s results. The complete example is a bit more elaborate and can be found at Github.

So how much speed we’ve gained? To get a rough estimate, I’ve created a list of ten thousand short poems (through List.duplicate/2 on some national anthems, I’m not that eager to write poems yet!) and put it through the loops, while changing the number of workers. Here are some very much informal results:

  • 1 worker – 12 seconds
  • 4 workers – 5 seconds
  • 16 workers – 5 seconds
  • 32 workers – 5 seconds
  • 128 workers – 5 seconds

I didn’t go so far as to inspect the reasons behind the top off, that might be an interesting exercise for later time. The take away is that we do have some significant gain compared to a single process, with little more code than usual, so it’s a win in my book 🙂

Tagged , , ,

Elixir, Agent Elixir

So far, the exercism.io problems were fun to solve, but were contained within single process domain. The Bank Account problem opens up a new dimension 🙂

The task is rather simple: create a simplistic bank account manager that can open or close a bank account, query it’s balance and deposit to / redraw from it. The catch is, it needs to support access from multiple processes. Finally some distributed action!

The main issues here are:

  • how to communicate with other processes
  • how to keep state in sync

There are a number of techniques at our disposal, like ETS (Erlang Term Storage), GenServer or we can wire our own as described in docs about state in processes. The main thing to understand is that we actually only need message passing between processes to complete this task, with a little help from the Agent (sorry Neo :-)).

What we want to do is:

  • spawn a new Agent
  • remember it’s pid
  • perform all account management operations using that pid
  • stop Agent when done

The Agent’s pid serves as our account number for this simple exercise.
The simplest solution could look like this:

defmodule BankAccount do
  @opaque account :: pid

  @spec open_bank() :: account
  def open_bank() do
    Agent.start_link(fn -> 0 end) |> elem(1)
  end

  @spec close_bank(account) :: none
  def close_bank(account) do
    Agent.stop(account)
  end

  @spec balance(account) :: integer
  def balance(account) do
    Agent.get(account, &(&1))
  end

  @spec update(account, integer) :: any
  def update(account, amount) do
    Agent.update(account, &(&1 + amount))
  end
end

What goes on is, when we open the bank account, Agent is started and linked to current process and we just need to store it’s pid, that acts as our account number, for future account operations.

Spawning an Agent requires only a single function, one that initializes the state. Agent then exposes get/3 and update/3 which can be used to change the state accordingly. All those state manipulating functions expect earlier stored pid, so they can access the related Agent, plus a function that operates on the state.

Very very simple and straight forward. It hides the underlying stuff like processes, message passing and message receiving in a nice manner and really allows us to concentrate on the business side of things. Much appreciated, and kind of justifies the hipe 🙂

And testing this isn’t much of a problem either!

setup do
  account = BankAccount.open_bank()
  { :ok, [ account: account ] }
end

test "incrementing balance from another process then checking it from test process", context do
  assert BankAccount.balance(context[:account]) == 0
  this = self()
  spawn(fn ->
    BankAccount.update(context[:account], 20)
    send(this, :continue)
  end)
  receive do
    :continue -> :ok
  after
    1000 -> flunk("Timeout")
  end
  assert BankAccount.balance(context[:account]) == 20
end

Basically, it just spawns a new process that makes a deposit to our bank account and then notifies of job completion (line 11 in the above code). The main test process is set to receive a message of type :continue (line 14) after which it can verify that the deposit has been made indeed.

This message passing between main test process and spawned one is the key to all distributed computing in Elixir / Erlang. Even without Agent’s help, it looks and works quite nicely.

The entire code can be found at Github.

Tagged , , , ,