Contents
On this two part sequence we’ll be speaking about what a message queue is and should you would possibly expend one in your utility.
From there we’ll dive into the Broadway library maintained by Plataformatec and to find the way it would possibly maybe most definitely even be leveraged from inside
an utility for data processing. Ultimately, we’ll assemble a sample mission that makes expend of RabbitMQ+Broadway to predicament
feedback from HackerNews so we are able to measure the popularity of Elixir over time. With out further ado, let’s dive truthful
into issues!
Droll ample, a message queue is exactly what it sounds relish…a queue of messages (a rarity certainly to hold a technical
time period be self explanatory :)). On one aspect, you cling messages being printed to the queue by what are known as producers,
and on the other aspect you cling messages being consumed from the queue by what are known as patrons. The messages that the
producers submit to the queue are dispatched to the patrons in a FIFO (first in first out) style. No matter
your message queue implementation, that’s the regular float:
Message producer -> Message queue -> Message shopper
Whereas the outdated description appears to be like reasonably rudimentary, the affect {that a} message queue can cling to your
structure is extremely profound. Basically, it permits your structure to shift from being synchronous to being
asynchronous. For instance, we’ll have religion a consumer registration float in a synchronous world and an asynchronous world.
Our consumer registration float has the subsequent necessities:
- Should persist consumer data to our DB
- Add consumer to our mailing itemizing
- Add their avatar to S3 after doing a little picture modifications
In a non-message queue machine:
Persist consumer to DB
Add consumer to mailing itemizing
POST /consumer -> Resize picture with ImageMagick -> Return 201
Add picture to S3
In a message queue machine:
Persist consumer to DB
POST /consumer -> Put up uncommon e-mail to RabbitMQ -> Return 201
Put up uncommon picture to RabbitMQ
---------------------------------------------------------------------------------
Contemporary e-mail RabbitMQ queue -> Message shopper -> Add consumer to mailing itemizing
---------------------------------------------------------------------------------
Resize picture with ImageMagick
Contemporary avatar RabbitMQ queue -> Message shopper -> Add picture to S3
Change DB with S3 area
When performing the assemble consumer motion in a synchronous style, all operations should full prior to returning a
response to the requester. In different phrases, the manipulation of the avatar by ImageMagick+add to S3 and including the
consumer to your MailChimp mailing itemizing should full prior to that you simply simply would possibly as properly acknowledge to the quiz. In uncover so that you simply simply can place retries
or there’s group latency this may extend responding to the quiz. On the other hand, when performing the consumer
introduction in an asynchronous style, you delegate the outside useful useful resource calls to the message patrons and very finest put
the minimal sequence of modifications to the DB. This in flip will consequence in a noteworthy sooner flip spherical to the consumer, whereas the
further operations occur within the background afterwards. Doing this in an asynchronous style won’t be with out its
tradeoffs although. If there are factors for instance with the avatar that the consumer uploaded, there’ll not be any dapper approach to
rollback and fail the consumer introduction motion as a result of the consumer has already been created and also you highest be taught in regards to the disclose after
the reality.
The numerous factor to signal right here is that you simply simply can be dispersing your side-results all the easiest draw by fairly fairly a few entities and rolling them
again might be subtle at highest or very not getting into some eventualities. Planning out some failure factors and their repercussions
will sprint a protracted approach right here and would possibly cling benefit you from designing your self right into a nook.
Enjoying the weblog submit to date? Put collectively me on Twitter @akoutmos
Register for the mailing itemizing and safe notified by e-mail when uncommon weblog posts come out.
In speedy, Broadway is a library that lets you assemble multi-stage data processing pipelines. By talent of the amenities
provided to you in Broadway, that you simply simply would possibly as properly provide an explanation for the way you would possibly relish data to be processed by the pipeline and with what
stage of concurrency. In uncover to vitality these data pipelines, Broadway requires a messaging queue and presently
formally helps 3: Amazon SQS, Google Pub/Sub and RabbitMQ. For the wants of this tutorial we will be the utilization of
RabbitMQ given we are able to host it ourselves on our native machine. One different important factors that Broadway offers out
of the field are back-tension so the pipeline won’t be repeatedly flooded, batching in order that messages would possibly even be grouped if wished and
beautiful shutdowns which manufacture improbable objects in flight are processed prior to the BEAM is terminated. A extra full itemizing
of factors would possibly even be chanced on right here: https://hexdocs.pm/broadway/Broadway.html#module-built-in-points
In uncover to showcase the capabilities of Broadway and RabbitMQ, we are able to setup an utility which leverages every to
ingest and route of data from the HackerNews API. Notably, we’ll be occurring a plug by memory lane and
processing all HackerNews objects since Elixir turned as soon as first launched by José Valim in 2011
(https://news.ycombinator.com/merchandise?id=2306006). The draw is to look out how the frequency of the phrases Elixir
and Erlang
cling modified over time in diverse HackerNews entries. Since I’m firm believer in monitoring my functions, I’ll be
including Prometheus+Grafana to this mission nonetheless is per likelihood not retaining them as I cling different posts that quilt monitoring and
Prometheus specifically (https://akoutmos.com/post/prometheus-postgis-and-phoenix/). As properly as, this tutorial assumes
you cling Docker and Docker Produce up and operating locally.
Step 1: Abolish a weird Elixir mission with required dependencies – commit
To provoke, we’ll provoke off by making a vanilla Elixir mission with a supervisor. To assemble your utility flee the
following in your terminal:
$ mix uncommon elixir_popularity --sup
After that’s finished, originate the mix.exs
and add the subsequent to the deps
attribute:
defp deps attain
[
{:jason, "~> 1.1.2"},
{:httpoison, "~> 1.6.1"},
{:broadway, "~> 0.4.0"},
{:broadway_rabbitmq, "~> 0.4.0"},
{:ecto_sql, "~> 3.0"},
{:postgrex, ">= 0.0.0"},
{:gen_rmq, "~> 2.3.0"}
]
finish
Along with your dependencies in set aside, flee mix deps.safe
from the terminal and also you can be staunch to sprint!
Step 2: Setup sample Docker Produce stack – commit
For our sample stack, we’ll be operating a few suppliers. Some are required and others are elective counting on
whether or not you design finish to hold to hold monitoring in set aside to your utility:
- RabbitMQ (REQUIRED) – We’re in a position to be the utilization of RabbitMQ as our message queue machine to route of our HackerNews objects. We’re in a position to be writing 2 Broadway pipelines which is ready to be learn from fairly fairly a few queues in uncover to put their required work.
- Postgres (REQUIRED) – After now we cling fetched our objects from the HackerNews API and they also’ve lengthy earlier by the vital processing steps in Broadway, we are able to persist our findings in Postgres for later querying.
- Postgres Exporter (OPTIONAL) – We’re in a position to expend this to safe effectivity statistics out of Postgres so we are able to to find the way it behaves under load.
- Grafana (OPTIONAL) – Grafana will glean time sequence data out of Prometheus and itemizing it.
- Prometheus (OPTIONAL) – Prometheus will pull statistics from the numerous data sources (cAdvisor, Postgres Exporter, and RabbitMQ)
- cAdvisor (OPTIONAL) – cAdvisor will safe metrics on the host machine operating the entire containers.
It is attainable you may even cling seen that our Elixir utility is not any the place to be thought-about on this itemizing of suppliers. The set off of that
being that we’ll be interacting with our utility by an IEx session on our native machine, and very finest require the
supporting suppliers to be operating in containers. In a producing-esque environment, operating this route of in an IEx
session is presumably not what you design finish to hold…nonetheless that’s an educational and I’ll design finish some shortcuts :). Beneath is a duplicate of
the docker-construct.yml
file that needs to be within the root of your mission (be at liberty to sort out any suppliers that you simply simply attain
not require):
model: '3.7'
suppliers:
cadvisor:
picture: google/cadvisor: v0.33.0
ports:
- '8080: 8080'
volumes:
- /: /rootfs: ro
- /var/flee: /var/flee: ro
- /sys: /sys: ro
- /var/lib/docker/: /var/lib/docker: ro
rabbitmq:
picture: rabbitmq: 3.8
ports:
- '5672: 5672'
- '15672: 15672'
- '15692: 15692'
volumes:
- ./docker/rabbitmq/plugins: /and so forth/rabbitmq/enabled_plugins
- ./docker/rabbitmq/rabbitmq.conf: /and so forth/rabbitmq/rabbitmq.conf: ro
- rabbitmq-information: /var/lib/rabbitmq
postgres:
picture: postgres: 12.0
ports:
- '5432: 5432'
volumes:
- postgres-information: /var/lib/postgresql/data
environment:
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
postgres_exporter:
picture: wrouesnel/postgres_exporter: v0.7.0
ports:
- '9187: 9187'
depends_on:
- postgres
environment:
DATA_SOURCE_USER: postgres
DATA_SOURCE_PASS: postgres
DATA_SOURCE_URI: postgres: 5432/?sslmode=disable
grafana:
picture: grafana/grafana: 6.4.4
depends_on:
- prometheus
ports:
- '3000: 3000'
volumes:
- grafana-information: /var/lib/grafana
- ./docker/grafana/: /and so forth/grafana/provisioning/
env_file:
- ./docker/grafana/.env
prometheus:
picture: promenade/prometheus: v2.13.0
ports:
- '9090: 9090'
volumes:
- ./docker/prometheus/: /and so forth/prometheus/
- prometheus-information: /prometheus
expose:
- '--config.file=/and so forth/prometheus/config.yml'
- '--storage.tsdb.route=/prometheus'
- '--web.console.libraries=/usr/piece/prometheus/console_libraries'
- '--web.console.templates=/usr/piece/prometheus/consoles'
volumes:
postgres-information: {}
rabbitmq-information: {}
prometheus-information: {}
grafana-information: {}
Along with your docker-construct.yml
file in set aside, you will furthermore should add a config/config.exs
file to allow your
utility to keep up a correspondence alongside along with your Postgres event. In config/config.exs
add the subsequent:
expend Combine.Config
config :elixir_popularity, ecto_repos: [ElixirPopularity.Repo]
config :elixir_popularity, ElixirPopularity.Repo,
database: "elixir_popularity_repo",
username: "postgres",
password: "postgres",
hostname: "localhost",
log: false
config :logger, :console, format: "[$level] $messagen"
In uncover for this configuration to work, you’ll furthermore should assemble the ElixirPopularity.Repo
module. Abolish
a file at lib/elixir_popularity/repo.ex
with the subsequent contents:
defmodule ElixirPopularity.Repo attain
expend Ecto.Repo,
otp_app: :elixir_popularity,
adapter: Ecto.Adapters.Postgres
finish
You’ll furthermore should provoke the ElixirPopularity.Repo
under your utility supervisor in lib/utility.ex
:
defmodule ElixirPopularity.Software program program attain
expend Software program program
def provoke(_type, _args) attain
children = [
ElixirPopularity.Repo
]
opts = [strategy: :one_for_one, name: ElixirPopularity.Supervisor]
Supervisor.start_link(children, opts)
finish
finish
The very last item that you simply simply are going to should attain is to glean a duplicate of the docker
listing from the GitHub repo related
with this tutorial (https://github.com/akoutmos/elixir_popularity). I received’t deep dive into what exactly is in there, nonetheless
briefly it is the Grafana+Prometheus+RabbitMQ configuration information wished for the tutorial.
Now that each half is up to date on the Elixir aspect and the entire configuration information are in set aside, be at liberty to provoke
the Docker Produce stack by operating:
$ docker-constructing up
As soon as Docker Produce is up and operating, you desires in snort to flee mix ecto.assemble && iex -S mix
with none DB
connection errors.
Step 3: Add HackerNews API module and supporting struct – commit
In uncover to glean data from the HackerNews API we’ll should assemble a helper module to surely manufacture the HTTP calls to
the API and a struct which is ready to house the entire related data that turned as soon as fetched. Let’s provoke by creating our struct module.
Abolish a file lib/hackernews_item.ex
with the subsequent contents:
defmodule ElixirPopularity.HackernewsItem attain
alias __MODULE__
defstruct textual content: nil, type: nil, title: nil, time: nil
def assemble(textual content, type, title, unix_time) attain
%HackernewsItem{
textual content: textual content,
type: type,
title: title,
time: get_date_time(unix_time)
}
finish
defp get_date_time(nil), attain: nil
defp get_date_time(unix_time) attain
{:okay, date_time} = DateTime.from_unix(unix_time)
DateTime.to_date(date_time)
finish
finish
As soon as that’s full, we are able to assemble our HackerNews API module. Our module will reside within the file
lib/hackernews_api.ex
. Forward of diving into the way it works, right here is the module in its entirety:
defmodule ElixirPopularity.HackernewsApi attain
@moduledoc """
Fetches data from the Hackernews API, extracts the vital fields,
and massages some data
"""
require Logger
alias ElixirPopularity.HackernewsItem
def get_hn_item(resource_id) attain
get_hn_item(resource_id, 4)
finish
defp get_hn_item(resource_id, retry) attain
response =
resource_id
|> gen_api_url()
|> HTTPoison.safe([], hackney: [pool: :hn_id_pool])
with {_, {:okay, physique}} <- {"hn_api", handle_response(response)},
{_, {:okay, decoded_payload}} <- {"payload_decode", Jason.decode(physique)},
{_, {:okay, data}} <- {"massage_data", massage_data(decoded_payload)} do
information
else
{stage, error} ->
Logger.warn(
"Failed try #{5 - retry} at stage "#{stage}" with Hackernews ID of #{resource_id}. Error important factors: #{
to find(error)
}"
)
if retry > 0 attain
get_hn_item(resource_id, retry - 1)
else
Logger.warn("Didn't retrieve Hackernews ID of #{resource_id}.")
:error
finish
finish
finish
defp handle_response({:okay, %HTTPoison.Response{status_code: 200, physique: physique}}) attain
{:okay, physique}
finish
defp handle_response({_, invalid_response}) attain
{:error, invalid_response}
finish
defp gen_api_url(resource_id) attain
"https://hacker-news.firebaseio.com/v0/merchandise/#{resource_id}.json"
finish
defp massage_data(data) attain
{:okay,
HackernewsItem.assemble(
data["text"],
data["type"],
data["title"],
data["time"]
)}
finish
finish
The entry conceal this module is get_hn_item/1
which is ready to then scenario a max retries restrict and by the magic of tail
name recursion, are trying to retrieve the given HackerNews useful useful resource. Whether or not it is ready to efficiently glean the HackerNews
useful useful resource, then this would possibly often decode the JSON payload and therapeutic massage it into the HackernewsItem
struct within the massage_data/1
attribute. Disasters might be logged as a result of it could be for ease of debugging and an :error
atom might be returned if all retry
makes an are trying are exhausted. It is attainable you may even cling furthermore seen that our HTTPoison.safe
name is referencing a hackney pool. The
set off of that being that we’ll be making a intensive sequence of concurrent requests to HackerNews and for effectivity
causes it would be highest to hold a pool of connections originate to the the identical host. In uncover to assemble the pool, we’ll should
add the subsequent to our lib/utility.ex
file:
def provoke(_type, _args) attain
children = [
:hackney_pool.child_spec(:hn_id_pool, timeout: 15000, max_connections: 100),
ElixirPopularity.Repo
]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
finish
In uncover to try all of this out, flee iex -S mix
to originate up an Elixir REPL and flee
ElixirPopularity.HackernewsApi.get_hn_item(2306006)
. You must at all times unexcited to find the subsequent output:
iex(1) ▶ ElixirPopularity.HackernewsApi.get_hn_item(2306006)
%ElixirPopularity.HackernewsItem{
textual content: "",
time: ~D[2011-03-09],
title: "Elixir: Simple Object Orientation and charming syntax on high of Erlang",
type: "fable"
}
Points are coming collectively! We now have our HackerNews API module in set aside and may now glean data from HackerNews. Subsequent
we’ll be making a module which is ready to encourage up interface with RabbitMQ for expend with Broadway.
Step 4: Abolish GenRMQ module to keep up a correspondence with RabbitMQ – commit
For the final step on this two part tutorial, we’ll be making a module which is ready to be aged to submit RabbitMQ. Since
we’ll be the utilization of Broadway to make use of from RabbitMQ, we’ll want one factor to surely submit messages to the RabbitMQ. For
this motive we’ll be the utilization of gen_rmq
as a result of it abstracts away the entire boilerplate linked to creating and managing a sound
originate channel to RabbitMQ. Inside the occasion you need, we added gen_rmq
to our dependencies earlier on and already pulled it down.
So no modifications desired to our mix.exs
file. Abolish a file at lib/rmq_publisher.ex
and add the subsequent:
defmodule ElixirPopularity.RMQPublisher attain
@behaviour GenRMQ.Creator
@rmq_uri "amqp://rabbitmq:[email protected]: 5672"
@hn_exchange "hn_analytics"
@hn_item_ids_queue "item_ids"
@hn_bulk_item_data_queue "bulk_item_data"
@publish_options [persistent: false]
def init attain
create_rmq_resources()
[
uri: @rmq_uri,
exchange: @hn_exchange
]
finish
def start_link attain
GenRMQ.Creator.start_link(__MODULE__, title: __MODULE__)
finish
def hn_id_queue_size attain
GenRMQ.Creator.message_count(__MODULE__, @hn_item_ids_queue)
finish
def publish_hn_id(hn_id) attain
GenRMQ.Creator.submit(__MODULE__, hn_id, @hn_item_ids_queue, @publish_options)
finish
def publish_hn_items(objects) attain
GenRMQ.Creator.submit(__MODULE__, objects, @hn_bulk_item_data_queue, @publish_options)
finish
def item_id_queue_name, attain: @hn_item_ids_queue
def bulk_item_data_queue_name, attain: @hn_bulk_item_data_queue
defp create_rmq_resources attain
# Setup RabbitMQ connection
{:okay, connection} = AMQP.Connection.originate(@rmq_uri)
{:okay, channel} = AMQP.Channel.originate(connection)
# Abolish alternate
AMQP.Alternate.repeat(channel, @hn_exchange, :topic, sturdy: lawful)
# Abolish queues
AMQP.Queue.repeat(channel, @hn_item_ids_queue, sturdy: lawful)
AMQP.Queue.repeat(channel, @hn_bulk_item_data_queue, sturdy: lawful)
# Bind queues to alternate
AMQP.Queue.bind(channel, @hn_item_ids_queue, @hn_exchange, routing_key: @hn_item_ids_queue)
AMQP.Queue.bind(channel, @hn_bulk_item_data_queue, @hn_exchange, routing_key: @hn_bulk_item_data_queue)
# Shut the channel as a result of it's not wished
# GenRMQ will scenario up its cling channel
AMQP.Channel.finish(channel)
finish
finish
It is going to additionally appear relish there’s terribly lots occurring right here, so let’s dive in and rupture it down. We’re going to be
imposing the GenRMQ.Creator
behaviour which requires us to hold a init/0
attribute which returns some choices
for gen_rmq
to assemble the originate channel to RabbitMQ. Notably, these choices are the alternate on which gen_rmq
will function and the URI of the RabbitMQ server. Forward of returning these choices although, we manufacture a personal name to
create_rmq_resource/0
which leverages the AMQP library (gen_rmq
is constructed on high of the AMQP library) to assemble some
queues and bind them to our desired alternate. The leisure of the capabilities within the module can now be leveraged to
put diverse actions towards RabbitMQ identical to publishing messages and queue sizes.
In uncover to hold our gen_rmq
module provoke inside a route of, we’ll should add it as a child in our utility module
lib/utility.ex
:
def provoke(_type, _args) attain
children = [
:hackney_pool.child_spec(:hn_id_pool, timeout: 15000, max_connections: 100),
ElixirPopularity.Repo,
%{
id: ElixirPopularity.RMQPublisher,
start: {ElixirPopularity.RMQPublisher, :start_link, []}
}
]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
finish
With all this in set aside, we are able to flee a speedy sanity check out (to manufacture improbable each half is working) by opening up our Elixir
REPL and operating a few instructions:
iex(1) ▶ ElixirPopularity.RMQPublisher.hn_id_queue_size()
0
iex(2) ▶ Enum.each(1..15, &ElixirPopularity.RMQPublisher.publish_hn_id("#{&1}"))
:okay
iex(3) ▶ ElixirPopularity.RMQPublisher.hn_id_queue_size()
15
As an added bonus, when developing our Docker Produce stack, we enabled the RabbitMQ administrator dashboard! To
to find some statistics on RabbitMQ, originate up http://localhost: 15672
in your browser, and enter the massive secret
credentials of Username: rabbitmq | Password: rabbitmq
to enter the dashboard. As soon as there that you simply simply would possibly as properly to find at your
leisure or navigate to http://localhost: 15672/#/queues/%2F/item_ids
to look out the situation of the queue after we printed
these 15 messages:
It is attainable you may current you with the choice to furthermore purge the queue of all messages since we don’t cling any patrons in the intervening time for our queue. There’s a button
on the underside of the Queue item_ids
web page known as Purge Messages
. Clicking that may empty the queue of all messages.
Neatly finished and thanks for sticking with me to the tip! We coated reasonably fairly a few floor and hopefully you picked up a
couple of frosty pointers and proposals alongside the approach. To recap, in Piece 1 of our RabbitMQ+Broadway sequence we examined the
tradeoffs that cling to be weighed when deciding whether or not a message queue is the truthful instrument for the job. We furthermore talked
about what exactly Broadway is and the way it leverages RabbitMQ to current an spectacular data processing pipeline. Ultimately, we
laid down the muse for our HackerNews data mining instrument. In Piece 2 we’ll attain off the implementation of our
HackerNews data miner by imposing the vital Broadway modules and writing our outcomes to Postgres.
Be happy to proceed feedback or suggestions and even what you would possibly relish to look out within the subsequent tutorial. Until subsequent time!
Beneath are some further sources in uncover so that you simply simply can deep dive into any of the issues coated within the submit.
- https://github.com/plataformatec/broadway
- https://github.com/plataformatec/broadway_rabbitmq
- https://www.rabbitmq.com/tutorials/tutorial-one-elixir.html
- https://www.rabbitmq.com/prometheus.html
- https://hexdocs.pm/gen_rmq/2.3.0/readme.html
- https://www.cloudamqp.com/blog/2014-12-03-what-is-message-queuing.html