Show HN: Broadway, RabbitMQ, and the Rise of Elixir

0
0
Show HN: Broadway, RabbitMQ, and the Rise of Elixir

Contents

On this two part sequence we’ll be speaking about what a message queue is and should you would possibly expend one in your utility.
From there we’ll dive into the Broadway library maintained by Plataformatec and to find the way it would possibly maybe most definitely even be leveraged from inside
an utility for data processing. Ultimately, we’ll assemble a sample mission that makes expend of RabbitMQ+Broadway to predicament
feedback from HackerNews so we are able to measure the popularity of Elixir over time. With out further ado, let’s dive truthful
into issues!

Droll ample, a message queue is exactly what it sounds relish…a queue of messages (a rarity certainly to hold a technical
time period be self explanatory :)). On one aspect, you cling messages being printed to the queue by what are known as producers,
and on the other aspect you cling messages being consumed from the queue by what are known as patrons. The messages that the
producers submit to the queue are dispatched to the patrons in a FIFO (first in first out) style. No matter
your message queue implementation, that’s the regular float:

Message producer -> Message queue -> Message shopper

Whereas the outdated description appears to be like reasonably rudimentary, the affect {that a} message queue can cling to your
structure is extremely profound. Basically, it permits your structure to shift from being synchronous to being
asynchronous. For instance, we’ll have religion a consumer registration float in a synchronous world and an asynchronous world.
Our consumer registration float has the subsequent necessities:

  1. Should persist consumer data to our DB
  2. Add consumer to our mailing itemizing
  3. Add their avatar to S3 after doing a little picture modifications

In a non-message queue machine:

                Persist consumer to DB
                Add consumer to mailing itemizing
POST /consumer  ->  Resize picture with ImageMagick  ->  Return 201
                Add picture to S3

In a message queue machine:

                Persist consumer to DB
POST /consumer  ->  Put up uncommon e-mail to RabbitMQ  ->  Return 201
                Put up uncommon picture to RabbitMQ

---------------------------------------------------------------------------------

Contemporary e-mail RabbitMQ queue  ->  Message shopper  ->  Add consumer to mailing itemizing

---------------------------------------------------------------------------------

                                                     Resize picture with ImageMagick
Contemporary avatar RabbitMQ queue  ->  Message shopper  ->  Add picture to S3
                                                     Change DB with S3 area

When performing the assemble consumer motion in a synchronous style, all operations should full prior to returning a
response to the requester. In different phrases, the manipulation of the avatar by ImageMagick+add to S3 and including the
consumer to your MailChimp mailing itemizing should full prior to that you simply simply would possibly as properly acknowledge to the quiz. In uncover so that you simply simply can place retries
or there’s group latency this may extend responding to the quiz. On the other hand, when performing the consumer
introduction in an asynchronous style, you delegate the outside useful useful resource calls to the message patrons and very finest put
the minimal sequence of modifications to the DB. This in flip will consequence in a noteworthy sooner flip spherical to the consumer, whereas the
further operations occur within the background afterwards. Doing this in an asynchronous style won’t be with out its
tradeoffs although. If there are factors for instance with the avatar that the consumer uploaded, there’ll not be any dapper approach to
rollback and fail the consumer introduction motion as a result of the consumer has already been created and also you highest be taught in regards to the disclose after
the reality.

The numerous factor to signal right here is that you simply simply can be dispersing your side-results all the easiest draw by fairly fairly a few entities and rolling them
again might be subtle at highest or very not getting into some eventualities. Planning out some failure factors and their repercussions
will sprint a protracted approach right here and would possibly cling benefit you from designing your self right into a nook.

Enjoying the weblog submit to date? Put collectively me on Twitter @akoutmos


Register for the mailing itemizing and safe notified by e-mail when uncommon weblog posts come out.

In speedy, Broadway is a library that lets you assemble multi-stage data processing pipelines. By talent of the amenities
provided to you in Broadway, that you simply simply would possibly as properly provide an explanation for the way you would possibly relish data to be processed by the pipeline and with what
stage of concurrency. In uncover to vitality these data pipelines, Broadway requires a messaging queue and presently
formally helps 3: Amazon SQS, Google Pub/Sub and RabbitMQ. For the wants of this tutorial we will be the utilization of
RabbitMQ given we are able to host it ourselves on our native machine. One different important factors that Broadway offers out
of the field are back-tension so the pipeline won’t be repeatedly flooded, batching in order that messages would possibly even be grouped if wished and
beautiful shutdowns which manufacture improbable objects in flight are processed prior to the BEAM is terminated. A extra full itemizing
of factors would possibly even be chanced on right here: https://hexdocs.pm/broadway/Broadway.html#module-built-in-points

In uncover to showcase the capabilities of Broadway and RabbitMQ, we are able to setup an utility which leverages every to
ingest and route of data from the HackerNews API. Notably, we’ll be occurring a plug by memory lane and
processing all HackerNews objects since Elixir turned as soon as first launched by José Valim in 2011
(https://news.ycombinator.com/merchandise?id=2306006). The draw is to look out how the frequency of the phrases Elixir and Erlang
cling modified over time in diverse HackerNews entries. Since I’m firm believer in monitoring my functions, I’ll be
including Prometheus+Grafana to this mission nonetheless is per likelihood not retaining them as I cling different posts that quilt monitoring and
Prometheus specifically (https://akoutmos.com/post/prometheus-postgis-and-phoenix/). As properly as, this tutorial assumes
you cling Docker and Docker Produce up and operating locally.

Step 1: Abolish a weird Elixir mission with required dependencies – commit

To provoke, we’ll provoke off by making a vanilla Elixir mission with a supervisor. To assemble your utility flee the
following in your terminal:

$ mix uncommon elixir_popularity --sup

After that’s finished, originate the mix.exs and add the subsequent to the deps attribute:

defp deps attain
  [
    {:jason, "~> 1.1.2"},
    {:httpoison, "~> 1.6.1"},
    {:broadway, "~> 0.4.0"},
    {:broadway_rabbitmq, "~> 0.4.0"},
    {:ecto_sql, "~> 3.0"},
    {:postgrex, ">= 0.0.0"},
    {:gen_rmq, "~> 2.3.0"}
  ]
finish

Along with your dependencies in set aside, flee mix deps.safe from the terminal and also you can be staunch to sprint!

Step 2: Setup sample Docker Produce stack – commit

For our sample stack, we’ll be operating a few suppliers. Some are required and others are elective counting on
whether or not you design finish to hold to hold monitoring in set aside to your utility:

  • RabbitMQ (REQUIRED) – We’re in a position to be the utilization of RabbitMQ as our message queue machine to route of our HackerNews objects. We’re in a position to be writing 2 Broadway pipelines which is ready to be learn from fairly fairly a few queues in uncover to put their required work.
  • Postgres (REQUIRED) – After now we cling fetched our objects from the HackerNews API and they also’ve lengthy earlier by the vital processing steps in Broadway, we are able to persist our findings in Postgres for later querying.
  • Postgres Exporter (OPTIONAL) – We’re in a position to expend this to safe effectivity statistics out of Postgres so we are able to to find the way it behaves under load.
  • Grafana (OPTIONAL) – Grafana will glean time sequence data out of Prometheus and itemizing it.
  • Prometheus (OPTIONAL) – Prometheus will pull statistics from the numerous data sources (cAdvisor, Postgres Exporter, and RabbitMQ)
  • cAdvisor (OPTIONAL) – cAdvisor will safe metrics on the host machine operating the entire containers.

It is attainable you may even cling seen that our Elixir utility is not any the place to be thought-about on this itemizing of suppliers. The set off of that
being that we’ll be interacting with our utility by an IEx session on our native machine, and very finest require the
supporting suppliers to be operating in containers. In a producing-esque environment, operating this route of in an IEx
session is presumably not what you design finish to hold…nonetheless that’s an educational and I’ll design finish some shortcuts :). Beneath is a duplicate of
the docker-construct.yml file that needs to be within the root of your mission (be at liberty to sort out any suppliers that you simply simply attain
not require):

model:  '3.7'

suppliers: 
  cadvisor: 
    picture:  google/cadvisor: v0.33.0
    ports: 
      - '8080: 8080'
    volumes: 
      - /: /rootfs: ro
      - /var/flee: /var/flee: ro
      - /sys: /sys: ro
      - /var/lib/docker/: /var/lib/docker: ro

  rabbitmq: 
    picture:  rabbitmq: 3.8
    ports: 
      - '5672: 5672'
      - '15672: 15672'
      - '15692: 15692'
    volumes: 
      - ./docker/rabbitmq/plugins: /and so forth/rabbitmq/enabled_plugins
      - ./docker/rabbitmq/rabbitmq.conf: /and so forth/rabbitmq/rabbitmq.conf: ro
      - rabbitmq-information: /var/lib/rabbitmq

  postgres: 
    picture:  postgres: 12.0
    ports: 
      - '5432: 5432'
    volumes: 
      - postgres-information: /var/lib/postgresql/data
    environment: 
      POSTGRES_PASSWORD:  postgres
      POSTGRES_USER:  postgres

  postgres_exporter: 
    picture:  wrouesnel/postgres_exporter: v0.7.0
    ports: 
      - '9187: 9187'
    depends_on: 
      - postgres
    environment: 
      DATA_SOURCE_USER:  postgres
      DATA_SOURCE_PASS:  postgres
      DATA_SOURCE_URI:  postgres: 5432/?sslmode=disable

  grafana: 
    picture:  grafana/grafana: 6.4.4
    depends_on: 
      - prometheus
    ports: 
      - '3000: 3000'
    volumes: 
      - grafana-information: /var/lib/grafana
      - ./docker/grafana/: /and so forth/grafana/provisioning/
    env_file: 
      - ./docker/grafana/.env

  prometheus: 
    picture:  promenade/prometheus: v2.13.0
    ports: 
      - '9090: 9090'
    volumes: 
      - ./docker/prometheus/: /and so forth/prometheus/
      - prometheus-information: /prometheus
    expose: 
      - '--config.file=/and so forth/prometheus/config.yml'
      - '--storage.tsdb.route=/prometheus'
      - '--web.console.libraries=/usr/piece/prometheus/console_libraries'
      - '--web.console.templates=/usr/piece/prometheus/consoles'

volumes: 
  postgres-information:  {}
  rabbitmq-information:  {}
  prometheus-information:  {}
  grafana-information:  {}

Along with your docker-construct.yml file in set aside, you will furthermore should add a config/config.exs file to allow your
utility to keep up a correspondence alongside along with your Postgres event. In config/config.exs add the subsequent:

expend Combine.Config

config :elixir_popularity, ecto_repos:  [ElixirPopularity.Repo]

config :elixir_popularity, ElixirPopularity.Repo,
  database:  "elixir_popularity_repo",
  username:  "postgres",
  password:  "postgres",
  hostname:  "localhost",
  log:  false

config :logger, :console, format:  "[$level] $messagen"

In uncover for this configuration to work, you’ll furthermore should assemble the ElixirPopularity.Repo module. Abolish
a file at lib/elixir_popularity/repo.ex with the subsequent contents:

defmodule ElixirPopularity.Repo attain
  expend Ecto.Repo,
    otp_app:  :elixir_popularity,
    adapter:  Ecto.Adapters.Postgres
finish

You’ll furthermore should provoke the ElixirPopularity.Repo under your utility supervisor in lib/utility.ex:

defmodule ElixirPopularity.Software program program attain
  expend Software program program

  def provoke(_type, _args) attain
    children = [
      ElixirPopularity.Repo
    ]

    opts = [strategy: :one_for_one, name: ElixirPopularity.Supervisor]
    Supervisor.start_link(children, opts)
  finish
finish

The very last item that you simply simply are going to should attain is to glean a duplicate of the docker listing from the GitHub repo related
with this tutorial (https://github.com/akoutmos/elixir_popularity). I received’t deep dive into what exactly is in there, nonetheless
briefly it is the Grafana+Prometheus+RabbitMQ configuration information wished for the tutorial.

Now that each half is up to date on the Elixir aspect and the entire configuration information are in set aside, be at liberty to provoke
the Docker Produce stack by operating:

$ docker-constructing up

As soon as Docker Produce is up and operating, you desires in snort to flee mix ecto.assemble && iex -S mix with none DB
connection errors.

Step 3: Add HackerNews API module and supporting struct – commit

In uncover to glean data from the HackerNews API we’ll should assemble a helper module to surely manufacture the HTTP calls to
the API and a struct which is ready to house the entire related data that turned as soon as fetched. Let’s provoke by creating our struct module.
Abolish a file lib/hackernews_item.ex with the subsequent contents:

defmodule ElixirPopularity.HackernewsItem attain
  alias __MODULE__

  defstruct textual content:  nil, type:  nil, title:  nil, time:  nil

  def assemble(textual content, type, title, unix_time) attain
    %HackernewsItem{
      textual content:  textual content,
      type:  type,
      title:  title,
      time:  get_date_time(unix_time)
    }
  finish

  defp get_date_time(nil), attain:  nil

  defp get_date_time(unix_time) attain
    {:okay, date_time} = DateTime.from_unix(unix_time)

    DateTime.to_date(date_time)
  finish
finish

As soon as that’s full, we are able to assemble our HackerNews API module. Our module will reside within the file
lib/hackernews_api.ex. Forward of diving into the way it works, right here is the module in its entirety:

defmodule ElixirPopularity.HackernewsApi attain
  @moduledoc """
  Fetches data from the Hackernews API, extracts the vital fields,
  and massages some data
  """

  require Logger

  alias ElixirPopularity.HackernewsItem

  def get_hn_item(resource_id) attain
    get_hn_item(resource_id, 4)
  finish

  defp get_hn_item(resource_id, retry) attain
    response =
      resource_id
      |> gen_api_url()
      |> HTTPoison.safe([], hackney:  [pool: :hn_id_pool])

    with {_, {:okay, physique}} <- {"hn_api", handle_response(response)},
         {_, {:okay, decoded_payload}} <- {"payload_decode", Jason.decode(physique)},
         {_, {:okay, data}} <- {"massage_data", massage_data(decoded_payload)} do
      information
    else
      {stage, error} ->
        Logger.warn(
          "Failed try #{5 - retry} at stage "#{stage}" with Hackernews ID of #{resource_id}. Error important factors: #{
            to find(error)
          }"
        )

        if retry > 0 attain
          get_hn_item(resource_id, retry - 1)
        else
          Logger.warn("Didn't retrieve Hackernews ID of #{resource_id}.")
          :error
        finish
    finish
  finish

  defp handle_response({:okay, %HTTPoison.Response{status_code:  200, physique:  physique}}) attain
    {:okay, physique}
  finish

  defp handle_response({_, invalid_response}) attain
    {:error, invalid_response}
  finish

  defp gen_api_url(resource_id) attain
    "https://hacker-news.firebaseio.com/v0/merchandise/#{resource_id}.json"
  finish

  defp massage_data(data) attain
    {:okay,
     HackernewsItem.assemble(
       data["text"],
       data["type"],
       data["title"],
       data["time"]
     )}
  finish
finish

The entry conceal this module is get_hn_item/1 which is ready to then scenario a max retries restrict and by the magic of tail
name recursion, are trying to retrieve the given HackerNews useful useful resource. Whether or not it is ready to efficiently glean the HackerNews
useful useful resource, then this would possibly often decode the JSON payload and therapeutic massage it into the HackernewsItem struct within the massage_data/1
attribute. Disasters might be logged as a result of it could be for ease of debugging and an :error atom might be returned if all retry
makes an are trying are exhausted. It is attainable you may even cling furthermore seen that our HTTPoison.safe name is referencing a hackney pool. The
set off of that being that we’ll be making a intensive sequence of concurrent requests to HackerNews and for effectivity
causes it would be highest to hold a pool of connections originate to the the identical host. In uncover to assemble the pool, we’ll should
add the subsequent to our lib/utility.ex file:

def provoke(_type, _args) attain
  children = [
    :hackney_pool.child_spec(:hn_id_pool, timeout: 15000, max_connections: 100),
    ElixirPopularity.Repo
  ]

  opts = [strategy: :one_for_one]
  Supervisor.start_link(children, opts)
finish

In uncover to try all of this out, flee iex -S mix to originate up an Elixir REPL and flee
ElixirPopularity.HackernewsApi.get_hn_item(2306006). You must at all times unexcited to find the subsequent output:

iex(1) ▶ ElixirPopularity.HackernewsApi.get_hn_item(2306006)
%ElixirPopularity.HackernewsItem{
  textual content: "",
  time: ~D[2011-03-09],
  title: "Elixir: Simple Object Orientation and charming syntax on high of Erlang",
  type: "fable"
}

Points are coming collectively! We now have our HackerNews API module in set aside and may now glean data from HackerNews. Subsequent
we’ll be making a module which is ready to encourage up interface with RabbitMQ for expend with Broadway.

Step 4: Abolish GenRMQ module to keep up a correspondence with RabbitMQ – commit

For the final step on this two part tutorial, we’ll be making a module which is ready to be aged to submit RabbitMQ. Since
we’ll be the utilization of Broadway to make use of from RabbitMQ, we’ll want one factor to surely submit messages to the RabbitMQ. For
this motive we’ll be the utilization of gen_rmq as a result of it abstracts away the entire boilerplate linked to creating and managing a sound
originate channel to RabbitMQ. Inside the occasion you need, we added gen_rmq to our dependencies earlier on and already pulled it down.
So no modifications desired to our mix.exs file. Abolish a file at lib/rmq_publisher.ex and add the subsequent:

defmodule ElixirPopularity.RMQPublisher attain
  @behaviour GenRMQ.Creator

  @rmq_uri "amqp://rabbitmq:[email protected]: 5672"
  @hn_exchange "hn_analytics"
  @hn_item_ids_queue "item_ids"
  @hn_bulk_item_data_queue "bulk_item_data"
  @publish_options [persistent: false]

  def init attain
    create_rmq_resources()

    [
      uri: @rmq_uri,
      exchange: @hn_exchange
    ]
  finish

  def start_link attain
    GenRMQ.Creator.start_link(__MODULE__, title:  __MODULE__)
  finish

  def hn_id_queue_size attain
    GenRMQ.Creator.message_count(__MODULE__, @hn_item_ids_queue)
  finish

  def publish_hn_id(hn_id) attain
    GenRMQ.Creator.submit(__MODULE__, hn_id, @hn_item_ids_queue, @publish_options)
  finish

  def publish_hn_items(objects) attain
    GenRMQ.Creator.submit(__MODULE__, objects, @hn_bulk_item_data_queue, @publish_options)
  finish

  def item_id_queue_name, attain:  @hn_item_ids_queue

  def bulk_item_data_queue_name, attain:  @hn_bulk_item_data_queue

  defp create_rmq_resources attain
    # Setup RabbitMQ connection
    {:okay, connection} = AMQP.Connection.originate(@rmq_uri)
    {:okay, channel} = AMQP.Channel.originate(connection)

    # Abolish alternate
    AMQP.Alternate.repeat(channel, @hn_exchange, :topic, sturdy:  lawful)

    # Abolish queues
    AMQP.Queue.repeat(channel, @hn_item_ids_queue, sturdy:  lawful)
    AMQP.Queue.repeat(channel, @hn_bulk_item_data_queue, sturdy:  lawful)

    # Bind queues to alternate
    AMQP.Queue.bind(channel, @hn_item_ids_queue, @hn_exchange, routing_key:  @hn_item_ids_queue)
    AMQP.Queue.bind(channel, @hn_bulk_item_data_queue, @hn_exchange, routing_key:  @hn_bulk_item_data_queue)

    # Shut the channel as a result of it's not wished
    # GenRMQ will scenario up its cling channel
    AMQP.Channel.finish(channel)
  finish
finish

It is going to additionally appear relish there’s terribly lots occurring right here, so let’s dive in and rupture it down. We’re going to be
imposing the GenRMQ.Creator behaviour which requires us to hold a init/0 attribute which returns some choices
for gen_rmq to assemble the originate channel to RabbitMQ. Notably, these choices are the alternate on which gen_rmq
will function and the URI of the RabbitMQ server. Forward of returning these choices although, we manufacture a personal name to
create_rmq_resource/0 which leverages the AMQP library (gen_rmq is constructed on high of the AMQP library) to assemble some
queues and bind them to our desired alternate. The leisure of the capabilities within the module can now be leveraged to
put diverse actions towards RabbitMQ identical to publishing messages and queue sizes.

In uncover to hold our gen_rmq module provoke inside a route of, we’ll should add it as a child in our utility module
lib/utility.ex:

def provoke(_type, _args) attain
  children = [
    :hackney_pool.child_spec(:hn_id_pool, timeout:  15000, max_connections:  100),
    ElixirPopularity.Repo,
    %{
      id:  ElixirPopularity.RMQPublisher,
      start:  {ElixirPopularity.RMQPublisher, :start_link, []}
    }
  ]

  opts = [strategy: :one_for_one]
  Supervisor.start_link(children, opts)
finish

With all this in set aside, we are able to flee a speedy sanity check out (to manufacture improbable each half is working) by opening up our Elixir
REPL and operating a few instructions:

iex(1) ▶ ElixirPopularity.RMQPublisher.hn_id_queue_size()
0
iex(2) ▶ Enum.each(1..15, &ElixirPopularity.RMQPublisher.publish_hn_id("#{&1}"))
:okay
iex(3) ▶ ElixirPopularity.RMQPublisher.hn_id_queue_size()
15

As an added bonus, when developing our Docker Produce stack, we enabled the RabbitMQ administrator dashboard! To
to find some statistics on RabbitMQ, originate up http://localhost: 15672 in your browser, and enter the massive secret
credentials of Username: rabbitmq | Password: rabbitmq to enter the dashboard. As soon as there that you simply simply would possibly as properly to find at your
leisure or navigate to http://localhost: 15672/#/queues/%2F/item_ids to look out the situation of the queue after we printed
these 15 messages:

It is attainable you may current you with the choice to furthermore purge the queue of all messages since we don’t cling any patrons in the intervening time for our queue. There’s a button
on the underside of the Queue item_ids web page known as Purge Messages. Clicking that may empty the queue of all messages.

Neatly finished and thanks for sticking with me to the tip! We coated reasonably fairly a few floor and hopefully you picked up a
couple of frosty pointers and proposals alongside the approach. To recap, in Piece 1 of our RabbitMQ+Broadway sequence we examined the
tradeoffs that cling to be weighed when deciding whether or not a message queue is the truthful instrument for the job. We furthermore talked
about what exactly Broadway is and the way it leverages RabbitMQ to current an spectacular data processing pipeline. Ultimately, we
laid down the muse for our HackerNews data mining instrument. In Piece 2 we’ll attain off the implementation of our
HackerNews data miner by imposing the vital Broadway modules and writing our outcomes to Postgres.

Be happy to proceed feedback or suggestions and even what you would possibly relish to look out within the subsequent tutorial. Until subsequent time!

Beneath are some further sources in uncover so that you simply simply can deep dive into any of the issues coated within the submit.


comments powered by

LEAVE A REPLY

Please enter your comment!
Please enter your name here