Parallel Elixir Run

Last time we’ve managed to access a worker from multiple processes. This time, we’ll try to distribute worker’s load across several processes. The problem we’re trying to solve is to calculate occurrence of letters in given list of texts. There is no need for shared state, we just want to distribute the load. Hence, the Agent is not really needed. Task supports exactly what we need, the async call.

It works like this:

  • invoke a job, asynchronously
  • wait for job to finish and return results

And you have the corresponding methods:

Or, taken from the documentation:

task = Task.async(fn -> do_some_work() end)
res  = do_some_other_work()
res + Task.await(task)

Here, the important thing to notice is that you need to await for the result, and that both processes are linked so any crashes on either side will cause the other to fail as well. This suits our purpose quite nicely, so no worries there.

The above example shows how to spawn and await a single process. But, we’d like to spawn more, and distribute an unknown load evenly across those processes / workers, later collecting all of their results. And, funny enough, Enum.map/2 to the rescue!

@spec frequency([String.t], pos_integer) :: Dict.t
def frequency(texts, workers) do
  texts
    |> group_by(workers)
    |> Enum.map(fn(texts_for_a_worker) ->
      Task.async(fn -> count_frequencies(texts_for_a_worker) end)
    end)
    |> Enum.map(&(Task.await(&1, 10000)))
    |> merge_results
end

So, the actual magic happens with first Enum.map/2, which spawns a new worker and delivers it the texts to process and in the second Enum.map/2, which awaits each worker, mapping it’s results. The complete example is a bit more elaborate and can be found at Github.

So how much speed we’ve gained? To get a rough estimate, I’ve created a list of ten thousand short poems (through List.duplicate/2 on some national anthems, I’m not that eager to write poems yet!) and put it through the loops, while changing the number of workers. Here are some very much informal results:

  • 1 worker – 12 seconds
  • 4 workers – 5 seconds
  • 16 workers – 5 seconds
  • 32 workers – 5 seconds
  • 128 workers – 5 seconds

I didn’t go so far as to inspect the reasons behind the top off, that might be an interesting exercise for later time. The take away is that we do have some significant gain compared to a single process, with little more code than usual, so it’s a win in my book :-)

Tagged , , ,

Elixir, Agent Elixir

So far, the exercism.io problems were fun to solve, but were contained within single process domain. The Bank Account problem opens up a new dimension :-)

The task is rather simple: create a simplistic bank account manager that can open or close a bank account, query it’s balance and deposit to / redraw from it. The catch is, it needs to support access from multiple processes. Finally some distributed action!

The main issues here are:

  • how to communicate with other processes
  • how to keep state in sync

There are a number of techniques at our disposal, like ETS (Erlang Term Storage), GenServer or we can wire our own as described in docs about state in processes. The main thing to understand is that we actually only need message passing between processes to complete this task, with a little help from the Agent (sorry Neo :-)).

What we want to do is:

  • spawn a new Agent
  • remember it’s pid
  • perform all account management operations using that pid
  • stop Agent when done

The Agent’s pid serves as our account number for this simple exercise.
The simplest solution could look like this:

defmodule BankAccount do
  @opaque account :: pid

  @spec open_bank() :: account
  def open_bank() do
    Agent.start_link(fn -> 0 end) |> elem(1)
  end

  @spec close_bank(account) :: none
  def close_bank(account) do
    Agent.stop(account)
  end

  @spec balance(account) :: integer
  def balance(account) do
    Agent.get(account, &(&1))
  end

  @spec update(account, integer) :: any
  def update(account, amount) do
    Agent.update(account, &(&1 + amount))
  end
end

What goes on is, when we open the bank account, Agent is started and linked to current process and we just need to store it’s pid, that acts as our account number, for future account operations.

Spawning an Agent requires only a single function, one that initializes the state. Agent then exposes get/3 and update/3 which can be used to change the state accordingly. All those state manipulating functions expect earlier stored pid, so they can access the related Agent, plus a function that operates on the state.

Very very simple and straight forward. It hides the underlying stuff like processes, message passing and message receiving in a nice manner and really allows us to concentrate on the business side of things. Much appreciated, and kind of justifies the hipe :-)

And testing this isn’t much of a problem either!

setup do
  account = BankAccount.open_bank()
  { :ok, [ account: account ] }
end

test "incrementing balance from another process then checking it from test process", context do
  assert BankAccount.balance(context[:account]) == 0
  this = self()
  spawn(fn ->
    BankAccount.update(context[:account], 20)
    send(this, :continue)
  end)
  receive do
    :continue -> :ok
  after
    1000 -> flunk("Timeout")
  end
  assert BankAccount.balance(context[:account]) == 20
end

Basically, it just spawns a new process that makes a deposit to our bank account and then notifies of job completion (line 11 in the above code). The main test process is set to receive a message of type :continue (line 14) after which it can verify that the deposit has been made indeed.

This message passing between main test process and spawned one is the key to all distributed computing in Elixir / Erlang. Even without Agent’s help, it looks and works quite nicely.

The entire code can be found at Github.

Tagged , , , ,

Unfolding Elixir

WARNING: not for the allergy prone people! :-)

Last Elixir problem from exercism.io was related to how to detect stuff a person is allergic to, based on her allergy index / flags.

For example, given these allergens:

@allergens %{
  1 => "eggs",
  2 => "peanuts",
  4 => "shellfish",
  8 => "strawberries",
  16 => "tomatoes",
  32 => "chocolate",
  64 => "pollen",
  128 => "cats"
}

A person with allergy index 5 would be allergic to eggs and shellfish.

Since index flags are powers of two, a simple solution is:

import Bitwise

def allergic_to(flags) do
  @allergens |> Enum.filter(fn({flag, _}) ->
    (flags &&& flag) != 0
  end) |> Dict.values
end

This uses the Bitwise.&&&/2 bitwise AND operator, and it works :D

But, what originally came to my mind was something like this (pseudo code):

  • take first allergen who’s value is lesser or equal to person’s allergy index / flags
  • deduct allergen’s value from allergy index and add it to the list
  • repeat until remaining value reaches zero

So, for the sake of exercise (this is exercism.io after all), I wanted to see how this could be implemented. Enum.reduce/3 was the initial thought, but it can’t really work since collection elements are visited only once. Here’s the code that works for most cases:

def allergic_to(flags) do
  @allergens |> Enum.reverse |> Enum.reduce({[], flags}, fn({flag, allergen}, {allergies, remaining}) ->
    if flag <= remaining do
      {[allergen | allergies], remaining - flag}
    else
      {allergies, remaining}
    end
  end) |> elem(0)
end

Unfortunately, it gives the wrong result for allergic index 509 for example.

Namely, it traverses allergens 128, 64 etc. in sequence and this includes peanuts as well. But, the idea was to reuse a specific allergen as long as it’s value is lesser than or equal to the remaining allergic index.

So, the formula should actually be: 509 – 128 = 381 – 128 = 253 – 128 = 125 – 64 = 61 – 32 = 29 – 16 = 13 – 8 = 5 – 4 = 1 – 1 = 0. And, the allergens would be 128, 64, 32, 16, 8, 4 and 1. Allergen 2, peanuts, would be excluded.

So, I needed a way to turn allergic index and allergens into a loop that could reuse allergens as needed.

Stream.unfold/2 to the rescue! Basically, it can turn anything into a stream, since you define a starting point and a function that calculates the next value. As long as you don’t return nil, the stream continues. And because of the next function, it is as flexible as you want it to be.

With that in mind, here’s what the Stream.unfold/2 solution looks like:

def allergic_to(flags) do
  Stream.unfold({flags, @allergens |> Enum.max |> elem(0)}, fn({_, 0}) -> nil; ({remaining, flag}) ->
    if flag <= remaining do
      {@allergens[flag], {remaining - flag, flag}}
    else
      {nil, {remaining, Bitwise.bsr(flag, 1)}}
    end
  end) |> Enum.to_list |> Enum.reject(&(!&1))
end

It’s not beautiful or easy to read as the Bitwise.&&&/2 solution, but, as mentioned before, this was just an exercise and a nice way to see how Stream.unfold/2 works.

As always, you can find the entire code at Github.

Tagged , ,

Elixir Function Headers

While playing with the new exercise.io problem, the prime factorization of a number, I came across an interesting feature in Elixir.

I tried defining a couple of incarnations of pattern matched function with default values, like this:

defp decompose(1, divisor \\ 2, acc \\ []) do
  ...
end

defp decompose(number, divisor \\ 2, acc \\ []) do
  ...
end

But, that did not do well with Elixir compiler. It seems you need to add an empty clause, sort of a function header, as explained in this Github issue:

# function header / empty clause
defp decompose(number, divisor \\ 2, acc \\ [])

defp decompose(1, divisor, acc) do
  ...
end

defp decompose(number, divisor, acc) do
  ...
end

And then, only that empty clause requires default values. Nice to know!

Tagged , ,

Elixir fun continues

These last few days I’ve continued the Elixir exercises journey on exercism.io:

Nothing fancy there, but I still picked up a few tricks. So, in no particular order :-)

If you use map, the ordering of keys is not maintained as with keyword lists.
Also, there is a way to pass anonymous function to pipe as well! Behold:

defmodule Roman do
  @roman_values [
    {1000, "M"},
    {900, "CM"},
    {500, "D"},
    {400, "CD"},
    {100, "C"},
    {90, "XC"},
    {50, "L"},
    {40, "XL"},
    {10, "X"},
    {9, "IX"},
    {5, "V"},
    {4, "IV"},
    {1, "I"}
  ] # using keyowrd list instead of map because ordering is important

  @doc """
  Convert the number to a roman number.
  """
  @spec numerals(pos_integer) :: String.t
  def numerals(0) do
    ""
  end
  def numerals(number) do
    @roman_values |> Enum.find(fn({arabic, roman}) -> number >= arabic end) |> fn({arabic, roman}) ->
      roman <> numerals(number - arabic)
    end.()
  end
end

Method signature doesn’t have to match the arguments:

defmodule Gigasecond do
  @doc """
  Calculate a date one billion seconds after an input date.
  """
  @spec from({{pos_integer, pos_integer, pos_integer}, {pos_integer, pos_integer, pos_integer}}) :: :calendar.datetime
  def from(datetime) do
    datetime
      |> :calendar.datetime_to_gregorian_seconds
      |> +(1_000_000_000)
      |> :calendar.gregorian_seconds_to_datetime
  end
end

Short but sweet and still very much fun :-)

Tagged , ,

Elixir on the run

Continuing with Elixir problems from exercism.io, this time something completely different :D
It’s a variation of the chess board and grains of wheat story, and basically you need to create a function that returns the total number on the chess board, along with one that returns the number of grains on given square. Not much to it, just return 2 to the power of desired square – 1 and that’s it:

def square(number) do
  :math.pow(2, number - 1) |> round
end

So this works, but there are a few issues. And one of those is nicely mentioned in tests:

NOTE: :math.pow/2 doesn’t do what you’d expect:
`:math.pow(2, 64) == :math.pow(2, 64) – 1` is true.
It’s best to avoid functions operating on floating point numbers for very large numbers.

Also, this solution throws “(ArithmeticError) bad argument in arithmetic expression” for large numbers, e.g. 32768. So, off to seek another solution. Enum.reduce/2 to the rescue:

def square(number) do
  1..number |> Enum.reduce(fn(x, acc) -> acc * 2 end)
end

It returns correct results, but for very large numbers it is rather slow. E.g. 1.2 seconds for square(262144). So, Jim Menard offered a nice solution:

use Bitwise

def square(number) do
  bsl(1, number - 1)
end

One forgets that multiplying by 2 is equal to bit shift left operation. And, of course, there is the related Bitwise.bsl/2 method that does just that. Now the square(262144) runs as fast as square(1). Who says Elixir/Erlang isn’t fast! :D

Tagged , ,

Wishful types

In the last exercise, there was some redundancy in the code I wished to eliminate.
For example, there is a type definition that defines possible week days:

@type weekday ::
  :monday |
  :tuesday |
  :wednesday |
  :thursday |
  :friday |
  :saturday |
  :sunday

So, someone invoking the meetup date calculation, would have to use one of the valid atoms for the desired day in the week.

Now, to match those agains Erlang’s calendar.day_of_the_week/1, I needed a way to match say :wednesday to 3, since day_of_the_week returns day number, not the atom fom above type.

An easy solution:

@weekdays %{
  :monday => 1,
  :tuesday => 2,
  :wednesday => 3,
  :thursday => 4,
  :friday => 5,
  :saturday => 6,
  :sunday => 7
}

No magic there, just repeat the atoms, map values from day_of_the_week on them, and off we go.

But, it would be much nicer if I could just do:

@weekdays weekday |> Enum.with_index(1) |> Enum.into(%{})

That would produce the exact same map, just in a dry way.

Another thing that could benefit from the same pattern was the schedule policy filtering.
So, say that we have a few functions we would like to map to a constant in the module and use like this:

@type schedule ::
  :first |
  :second |
  :third |
  :fourth |
  :last |
  :teenth

@schedules %{
  :first => &first/1,
  :second => &second/1,
  :third => &third/1,
  :fourth => &fourth/1,
  :last => &last/1,
  :teenth => &teenth/1
}

defp filter_by_schedule(dates, schedule) do
  @schedules[schedule] |> apply([dates])
end

defp first(dates) do
  dates |> hd
end

defp second(dates) do
  dates |> Enum.at(1)
end

defp third(dates) do
  dates |> Enum.at(2)
end

defp fourth(dates) do
  dates |> Enum.at(3)
end

defp last(dates) do
  dates |> Enum.reverse |> hd
end

defp teenth(dates) do
  dates |> Enum.find(fn({_, _, day}) ->
    day >= 13 && day <= 19
end)
end

That way, the @schedules map would be a constant, and the code would be much cleaner. Also, it would be nice if I could also:

 @schedules schedule |> Enum.map(fn(sch) -> %{sch => __info__(:functions)[sch]} end) |> Enum.into(%{})

Unfortunately, couldn’t find a way to acomplish this. So, if you have any suggestions, I’m all ears!

Tagged , ,

Elixir/Erlang interop games

Today’s exercise was related to calculating next imaginary meetup date. Rules for next date might be something along “every first friday”, “last saturday” etc.

It seems that Elixir decided to leave all Date stuff to Erlang, so today I learned how to use Erlang from Elixir. Nice! For this exercise, the Erlang’s calendar offered all the needed functions. And this is how it is used:

defp matches_weekday?(date, weekday) do
  weekdays = %{
    :monday => 1,
    :tuesday => 2,
    :wednesday => 3,
    :thursday => 4,
    :friday => 5,
    :saturday => 6,
    :sunday => 7
  }

  :calendar.day_of_the_week(date) == weekdays[weekday]
end

So, you just need to prefix the desired module with “:” and off you go. Very nice! Still have to find out how ti include custom Erlang libraries, but that was not needed here, so until next time.

Another thing I learned is how to invoke aribtrary function in run time. E.g. for this exercise, I had a set of policies to filter matched week days against. E.g. for “meetup falls on 2nd tuesday” I’d first filter out all tuesdays for the given month, and then fetch the 2nd date from the list. For another rule, e.g. “last saturday”, I’d filter out saturdays and get the last one. And I hated the idea of some giant case pattern, so decided to:

  • store those functions in a map, let’s call them filters or rule matching policies
  • fetch policy appropriate to given schedule rule
  • apply the policy to matching weekdays

And, no surprise there, there is the Kernel.apply/2 method that does just that, applies a function to a set of arguments. Let’s see it in action:

defp filter_by_schedule(dates, schedule) do
  schedule_filter = %{
    :first => &first/1,
    :second => &second/1,
    :third => &third/1,
    :fourth => &fourth/1,
    :last => &last/1,
    :teenth => &teenth/1
  }[schedule]

  schedule_filter |> apply([dates])
end

defp first(dates) do
  dates |> hd
end

defp second(dates) do
  dates |> Enum.at(1)
end

defp third(dates) do
  dates |> Enum.at(2)
end

defp fourth(dates) do
  dates |> Enum.at(3)
end

defp last(dates) do
  dates |> Enum.reverse |> hd
end

defp teenth(dates) do
  dates |> Enum.find(fn({_, _, day}) ->
    day >= 13 && day <= 19
  end)
end

So basically, the schedule filter policy is extracted from the map that maps schedule type to appropriate filter function. And the rest is easy :-)

You can find the complete solution at exercism.io or Github.

Tagged , ,

Transform Elixir

In “Grading (with) Elixir” I complained that using Enum.reduce/3 was a bit cumbersome for a simple transform of map values. Funny enough, the last exercism.io problem I’ve been playing with, the Transform part of ETL kind of hints why Enum.reduce/3 is just the right tool for the job after all. Let see why.

The exercise describes a problem in which you need to take old scrabble scoring system and convert it to the new one. The old system groups letters by their value (point) while the new one requires letters to be the keys and point their respective values, to make scoring a more enjoyable process.

So, basically, we need to take old scoring data and transform it to the new one. Let’s see how:

def transform(input) do
  input |> Enum.reduce(%{}, fn({value, keys}, acc) ->
    keys |> Enum.map(&({String.downcase(&1), value})) |> Enum.into(acc)
  end)
end

So this time, we just take the input stream, and transform the input’s values into value => key tuple list, and pack all of that into a single map.

And vuala, the reduce now makes much more sense :-)

As always, you can find the complete example @ Github.

And as a parting quote, here’s a “deja vu” situation described from that same exercise:

Extract-Transform-Load (ETL) is a fancy way of saying, “We have some crufty, legacy data over in this system, and now we need it in this shiny new system over here, so we’re going to migrate this.”

(Typically, this is followed by, “We’re only going to need to run this once.” That’s then typically followed by much forehead slapping and moaning about how stupid we could possibly be.)

If I had a dime for every time it actually happened! :-)

 

Tagged , ,

Grading (with) Elixir

The last Elixir exercise from exercism.io was short but sweet. It involved working with maps and an interesting problem of transforming one map to another, while changing the inner structure of map value(s).

To make things concrete, the task was to, starting with people grouped by grades, build the same grade grouped list of people, but this time sort them alphabetically. So, one would start with:

%{4 => ["Christopher","Jennifer", "Bart"], 6 => ["Kareem", "Anna"]}

and expect the sorted map:

%{4 => ["Bart", "Christopher","Jennifer"], 6 => ["Anna", "Kareem"]}

Now, this isn’t much of an issue code wise, but I expected there to be a method like Enum.map/2 that would “remap” the values easily, but there isn’t. So, the only viable solution I could find was to actually use Enum.reduce/3:

@spec sort(Dict) :: Dict.t
def sort(db) do
  db |> Enum.reduce(%{}, fn({grade, names}, acc) ->
    Map.put(acc, grade, Enum.sort(names))
  end)
end

This essentially introduces an accumulator map that gets filled with same keys and transformed values. Nice, but kind of disappointed it didn’t support direct mapping.

EDIT: Per Martin’s hint, there is a way to use Enum.map/2, but one needs to use it in combination with Enum.into/2, like this:

@spec sort(Dict) :: Dict.t
def sort(db) do
 db |> Enum.map(fn({grade, names}) -> {grade, Enum.sort(names)} end) |> Enum.into(%{})
end

Another thing that became obvious today was that the ternary operator macro is a function, and you can use it like:

if(condition, do: x, else: y)

or

if condition, do: x, else: y

A subtle difference, but can help with readability. E.g. the last one is going to need () around it in some complex expressions.

And finally, another nice thing (comming from Javascript and Ruby I really like it) is that you can use “||” like this:

@spec grade(Dict.t, pos_integer) :: [String]
def grade(db, grade) do
  db[grade] || []
end

So, if there is a key in the map, get it’s value, otherwise return some default value, an empty list in this case.

And, as always, you can find the entire code sample at Github project.

Tagged , ,
Follow

Get every new post delivered to your Inbox.

Join 151 other followers