Tutorial

An introduction to scriptable sockets with ZeroMQ

LourensNaud
zeromq-logo1

Lourens Naudé explains how ZeroMQ allows scalable, extendable and less painful distributed systems for all.

Most software these days
is social software – moving pieces in a larger ecosystem, from
mobile phones to data centres. The cost of exposing and maintaining
these integration points are still too expensive. Code refactoring
is a common practice at most shops, but how often do we find
stories for reworking communications infrastructure in sprint
backlogs? A myriad of projects continue to reinvent the wheel with
network specific code that often requires years of experience and
knowledge of specs to get right. There’s a better way. But
first…

What’s wrong with sockets?

Not everyone can write good network services. A strong developer
profile with knowledge of I/O multiplexing, buffer management and
protocols is required. APIs, socket options and error codes also
aren’t portable, however high level languages tend to handle them
very well.

Most are limited to being locked into two communication
patterns: bi-directional or multicast, with lock step connection
setup. This does not conform to the scalable data and work
distribution patterns required by modern software and business
needs.

Instant messaging for Apps

What is ZeroMQ?

  • Broker – there’s no central “message queue” or persistence,
    thus no Single Point of Failure (SPOF)
  • Admin – nothing to administer, other than your apps off
    course
  • Cost – BSD sockets API, easy to learn patterns and
    abstractions
  • Waste – light footprint, minimal allocations and system
    calls
  • Latency (almost) – designed from the ground up for minimal
    latency in the Financial Services industry
  • Price tag

It’s not a server or a message queue, but an embeddable library
that can be either of them, or anything you want. It’s perfect for
a small team where distributed software can be rolled out and
maintained by developers, without Ops intervention. The sweet spot
between complex messaging infrastructure and low level transport
has with the ability to still use messaging patterns for
communication. A huge bindings ecosystem (30+ languages) supports
building out language agnostic components, with uniform messaging
abstractions and a BSD style API. Interesting fact: Sergey
Aleynikov, the infamous Golden Sachs programmer, wrote the first
Erlang binding.

It’s asynchronous by design and part concurrency, part
communications framework. We’ll look at concurrency specifics later
on in this article.

A little history

A few years ago iMatix
Corporation
worked on the AMQP
protocol specification
along with some giants in the financial
services industry. ØMQ started as a back of the napkin diagram
during discussions between Martin
Sustrik
and Pieter Hintjens
which resulted in collaboration between iMatix Corporation and a
large community of contributors. Its design evolved with the best
practices in place; lessons learnt from the AMQP effort
(brokerless) and fierce optimization. Thus, eventually the project
had a major breakthrough by adding message queue semantics to BSD
sockets.

Due to its low latency and high throughput characteristics,
ZeroMQ penetrated diverse environments such as AT&T, Twitter,
Github, many financial institutions and even the
Samsung Galaxy S3
. Today it’s making its way into several
start-ups as well.

The zguide was written
to “sell” this game changing technology to a community and became a
documentation poster child for other projects. A lightweight
contribution process with concise patterns and multiple languages
(source code as well) helped this documentation become the basis of
a
book
.

You may also come across the Crossroads I/O project, which is a
fork of ZeroMQ by some of the original developers (Martin Sustrik
and Martin Lucina).

What is a ZeroMQ socket?

This question is very common with newcomers and I’ll try my best
to demystify it here. ZeroMQ sockets are an API model – it provides
small embeddable servers that work over various transports and
speak to your apps using a socket metaphor. A familiar BSD socket API (bind, connect, send and
receive), exposes connection management and messaging interface.
These sockets send or receive length specified opaque blobs
(messages) shuffled into a framed wire protocol, using any
of the supported transports: tcp, ipc or inproc (more on this
later). There are no byte streams, but messages go on the wire.

Supported socket pairs:

  • REQ/REP – client/server (RPC)
  • PUB/SUB – data distribution
  • PUSH/PULL – task distribution
  • PAIR – exclusively for inter-thread communication and
    others

Common socket pairs

Each socket has behaviour specific to its role in a messaging
pattern. SUB sockets for example don’t know how to send data
(they’re unidirectional), while REQ sockets can send and receive
data. A router (fair-queued, round-robin, fan-out, depending on the
socket’s role in a given pattern) and an I/O multiplexor complete
these “scriptable” sockets.

Here’s a classic
“apart from” scenario
by Pieter Hintjens, dreamt up on
the ZeroMQ wiki. Enjoy!

Rowan: APART FROM portability, message framing,
super fast asynchronous I/O, support for every bloody language
anyone cares about, huge community, price tag of zero, mind-blowing
performance, protection from memory overflows, loads of internal
consistency checks, patterns like pub/sub and request/reply,
batching, and seamless support for inter-thread transport as well
as TCP and multicast, ZEROMQ IS JUST SOCKETS!!!

Amari: Sure!

Kim: You forgot zero-copy and automatic TCP
reconnection…

Touching the water

In this example we have a parallel work distribution pipeline.
The goal is to move jobs from the ventilator and collect results in
the sink as fast as possible. The workers tier is composed of
simple devices – frontend and backend socket pairs (see the code
below). Now, imagine that any tier or any worker can be deployed in
any language. To scale out worker capacity, just add more to the
tier. Need to remove the SPOF at the edges? Just spawn more. Are
the results sink not able to keep up? Spawn more.

Parallel task distribution and
collection.

zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_PULL);
void *backend = zsocket_new (ctx, ZMQ_PUSH);
# connect to the ventilator
zsocket_connect(frontend, "tcp://localhost:5001");
# connect to the results sink
zsocket_connect(backend, "tcp://localhost:5002");

while (true) {
    zmsg_t *msg = zmsg_recv(frontend);
    zmsg_t *result = process(msg);
    zmsg_send(&result, backend);
}

Common use cases

These are by no means a complete set of patterns, I’ve just
picked the most appropriate to cover in this article. Please
consult the zguide for more high level patterns.

RPC (REQ/REP)

The pattern connects a set of clients to a set of services and
is useful for remote procedure calls (RPC) and task distribution.
Either end supports multiplexed I/O and can thus handle several
thousand connections.

 1:1 req / reply – the
client/server control socket style we’re all familiar
with.

 REQ socket can connect to
multiple REP sockets for failover (round-robin).

 Bind, ready to serve
several thousand clients.

This is the only lockstep (must send before recv, always in
order) socket pair. Despite this “limitation”, REQ/REP pairs can
implement custom routing between client and a frontend, which then
load balances to work out a set of backends. The other patterns
covered here are downstream only. This bi-directional REQ/REP
pattern is useful for flow control by signalling upstream providers
in downstream topologies, such as pipeline and pub-sub.

Data Distribution (PUB/SUB)

This pattern connects a set of publishers to a set of
subscribers and is useful for data distribution requirements such
as game state, market data and other feeds. ZeroMQ can saturate
most network pipes and is suitable for high volume replication.

Standard pub-sub model.

Inverted pub/sub for
logging.

ZeroMQ 2.2 PUB sockets distributes while SUB sockets filter.
This removes a lot of processing overhead from the PUB socket, but
can also cause severe network congestion when there are a large
number of subscribers and you have filters that won’t match against
messages sent. Subscription forwarding in 3.2 changes this to move
filtering to the PUB socket.

A protocol extension passes subscriptions from SUB sockets to
upstream peers (PUB sockets) in order to conserve bandwidth in such
scenarios. This protocol change is currently not backwards
compatible with ZeroMQ 2.x PUB/SUB sockets, but it’s being
addressed.

Work Distribution (PUSH/PULL)

The pipeline patterns connect nodes in a fan-out / fan-in
pattern that can have multiple steps. Its primary use case is for
parallel task distribution and collection.

Pipeline – one producer, many
consumers.

Parallel task
distribution.

The pipeline is usually infinitely extendable, much like UNIX
pipes. Tasks are distributed in round-robin fashion to PULL
sockets. This pattern is suitable for workloads with variable
processing overhead such as image processing or video
transcoding.

Messages

There’s no message format imposed – ZeroMQ implements a framed
wire protocol that treats message payloads as blobs. This removes
any proprietary serialization requirements and is well suited for
distributing high data volumes as messages. As long as there’s
memory available at both sender and receiver and the network has
ample capacity, nothing would stop you from sending 5GB
messages.

Atomic delivery guarantees you’d either receive the whole
message, or none of it. Buffer management and reconstruction is
entirely delegated to ZeroMQ – there’s never a need to touch the
wire. Multipart messages (envelopes) are also supported for routing
and more complex systems where distinct header, body and footer
semantics makes most sense.

Transports

ZeroMQ implements transport agnostic messaging, which means you
can easily swap out transports as your infrastructure grows.
Validate a new service with threads in a process. Then scale up to
processes on a box and eventually boxes on a network. All with a
uniform API – just change the protocol segment of the connection
URI.

Supported transports


  • inproc://
    – threads in a process, never traverses the
    network stack

  • ipc://
    – processes on a box, slower than inproc://, but
    useful for splitting concerns on a given host

  • tcp://
    – nodes in a network, most latent and error prone,
    but also the most scalable

Devices

Devices are lightweight and programmable switches. You can see
an example in the code below. They are like proxies, with a
frontend and backend socket pair, that reduces complexity when
connecting pieces in your network. Similar to intermediaries in
brick and mortar economies, they are typically called wholesalers,
distributors, managers, etc. An ideal network topology should have
an intelligent core and dumb edges, with a “smart endpoint, dumb
network” consumer perspective.

A device that injects itself
into an existing topology to satisfy new persistence
requirements.

zctx_t *ctx = zctx_new ();
void *subscriber = zsocket_new (ctx, ZMQ_PUB);
void *publisher = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect(subscriber, "tcp://localhost:5001");
zsocket_bind(publisher, "tcp://localhost:5002");

while (true) {
   zmsg_t *msg = zmsg_recv(subscriber);
   zmsg_t *result = persist(msg);
   zmsg_send(&result, publisher);
}

Common device uses cases

  • Proxies for authentication or encryption
  • Translation between network segments with different message
    payloads
  • Custom or complex routing requirements
  • Filtering / de-dup during synchronization
  • Persistence for audit trails and snapshots

Scalability

The Interjection Principle states that:

Inserting an intermediate node or device into a topology should
not change behaviour at the endpoints. Thus growing a topology out
with devices should always be transparent to network edges. For
example, if an image transformation PUSH/PULL pipeline requires
additional optimizations or conversions, we can easily add devices
with those responsibilities transparent to the ventilator or the
results sink.

ZeroMQ message queues (per socket pair) can be bounded through a
High Water Mark (HWM) socket option to avoid excess memory growth
and as a means of throttling throughput for components. Each socket
pair has its own semantics for when a HWM value is reached. Some
block, others drop messages until the queue has been cleared. The
PUB/SUB pair for example drops messages for both sockets. Typically
in such a topology the PUB socket “drives” the downstream topology
and it throttles by dropping messages it can’t distribute.

As a good rule of thumb, let components that connect be unstable
and others that bind be stable, much like a traditional network.
Multiple transports and multiplexed asynchronous I/O allows a
logical component to then grow as large, or small, as it needs to
be. The ability to scale infrastructure down is very often
overlooked and there’s millions to be saved by doing that in The
Cloud.

Resiliency and reliability

By having external dependencies such as connection management
and message delivery asynchronous, ZeroMQ is designed with
“unstable” components in mind. There’s no guarantees on any
network, but the framework attempts to handle common errors with
reconnects etc. One of the big wins of this design is that it’s
possible for clients to connect to multiple endpoints
simultaneously – you cannot do this with standard sockets. Servers
also don’t need to be up before clients, this is great for
flexibility and not having to orchestrate upgrades and
deployments.

99% of deployments don’t need enterprise reliability. Back in
the seventies, transactions were costly, network topologies were
small and were managed by a large Ops teams, and in general the
system had very little churn. Nowadays transaction costs are
negligible, networks are large, ops teams are small and it all
changes very rapidly.

With this in mind, it’s recommended that ZeroMQ messages are
treated as in flight without guarantees, and to address reliability
with messaging patterns. Consult the zguide for the
Majordomo
,
Titanic
and
Binary Star
patterns, which are very well documented and have
implementations for most languages. Pieter Hintjens started to
experiment with implementing these as C libraries with a very
thin API, effectively hiding implementation details and then have
languages expose these as native bindings.

In general, don’t drink reliability Kool-Aid – it’s best solved
collectively, not by a single component.

Performance

ZeroMQ was designed with low latency and high throughput goals.
It was conceived for the financial services industry with much of
the initial development cycles focused on optimization. Latency is
always measured per message – it cannot be averaged. How long does
it take to deliver message X from Y to Z? Throughput however, is
measured at specific points, such as at the receiver, the sender or
a given device. How many messages are delivered over time to X?

A dedicated I/O thread handles all message delivery to and from
the network. This is great for scripting languages as the contract
between the language and sockets is about getting messages in to or
out of a socket’s outbound or inbound pipes. Dispatch happens in
the I/O thread.

ZeroMQ has higher latency and throughput than TCP. It attempts
to avoid traversing the network stack by disabling Nagle’s
algorithm
where possible and will generally attempt to send all
buffered messages at once. This works very well for 1 to an
infinite number of messages.

It can sustain millions of messages per second on a multi-core
box over 10GB Ethernet.

Concurrency

Ever worked with a multithreaded where no locks and mutexes are
required? ZeroMQ has excellent support for writing highly
concurrent applications. The secret sauce is not sharing any state,
but to pass them as messages to other sockets, “living” in other
application threads, instead. The distinct I/O thread and lockless
data structures allow developers to take advantage of OS native
threads and thus scale out to multiple cores with ease.

Case study – Mongrel 2

Mongrel 2 is a multi-protocol
web server that can speak to clients using HTTP, WebSockets or
Flash XML / JSON sockets. It’s one of the early Open Source
adopters and uses ZeroMQ in combination with a simple wire protocol
for backend requests. Backend servers are thus language agnostic by
tapping into the existing binding ecosystem, allowing servers to
write adapters in their native language.

Mongrel2 request
distribution.

Passing thoughts

I hope this has been a worthwhile introduction that sparks
enough interest to explore the framework and patterns further in
your language of choice. Follow a few simple rules (and unlearn a
few things) and distributed systems becomes a whole lot easier. I
also encourage you to look into custom protocols that suit your
problem domain by scanning through Keywords for Unprotocols and,
don’t forget to read the zguide :-)

Lourens Naudé is a cub at Bear Metal OÜ. You can contact him at lourens@bearmetal.eu.

Author
Comments
comments powered by Disqus