Angular shitstorm: What's your opinion on the controversial plans for Angular 2.0?
Tutorial

An introduction to scriptable sockets with ZeroMQ

LourensNaud
zeromq-logo1

Lourens Naudé explains how ZeroMQ allows scalable, extendable and less painful distributed systems for all.

Most software these days is social software – moving pieces in a larger ecosystem, from mobile phones to data centres. The cost of exposing and maintaining these integration points are still too expensive. Code refactoring is a common practice at most shops, but how often do we find stories for reworking communications infrastructure in sprint backlogs? A myriad of projects continue to reinvent the wheel with network specific code that often requires years of experience and knowledge of specs to get right. There’s a better way. But first…

What’s wrong with sockets?

Not everyone can write good network services. A strong developer profile with knowledge of I/O multiplexing, buffer management and protocols is required. APIs, socket options and error codes also aren’t portable, however high level languages tend to handle them very well.

Most are limited to being locked into two communication patterns: bi-directional or multicast, with lock step connection setup. This does not conform to the scalable data and work distribution patterns required by modern software and business needs.

Instant messaging for Apps

What is ZeroMQ?

  • Broker – there’s no central “message queue” or persistence, thus no Single Point of Failure (SPOF)
  • Admin – nothing to administer, other than your apps off course
  • Cost – BSD sockets API, easy to learn patterns and abstractions
  • Waste – light footprint, minimal allocations and system calls
  • Latency (almost) – designed from the ground up for minimal latency in the Financial Services industry
  • Price tag

It’s not a server or a message queue, but an embeddable library that can be either of them, or anything you want. It’s perfect for a small team where distributed software can be rolled out and maintained by developers, without Ops intervention. The sweet spot between complex messaging infrastructure and low level transport has with the ability to still use messaging patterns for communication. A huge bindings ecosystem (30+ languages) supports building out language agnostic components, with uniform messaging abstractions and a BSD style API. Interesting fact: Sergey Aleynikov, the infamous Golden Sachs programmer, wrote the first Erlang binding.

It’s asynchronous by design and part concurrency, part communications framework. We’ll look at concurrency specifics later on in this article.

A little history

A few years ago iMatix Corporation worked on the AMQP protocol specification along with some giants in the financial services industry. ØMQ started as a back of the napkin diagram during discussions between Martin Sustrik and Pieter Hintjens which resulted in collaboration between iMatix Corporation and a large community of contributors. Its design evolved with the best practices in place; lessons learnt from the AMQP effort (brokerless) and fierce optimization. Thus, eventually the project had a major breakthrough by adding message queue semantics to BSD sockets.

Due to its low latency and high throughput characteristics, ZeroMQ penetrated diverse environments such as AT&T, Twitter, Github, many financial institutions and even the Samsung Galaxy S3. Today it’s making its way into several start-ups as well.

The zguide was written to “sell” this game changing technology to a community and became a documentation poster child for other projects. A lightweight contribution process with concise patterns and multiple languages (source code as well) helped this documentation become the basis of a book.

You may also come across the Crossroads I/O project, which is a fork of ZeroMQ by some of the original developers (Martin Sustrik and Martin Lucina).

What is a ZeroMQ socket?

This question is very common with newcomers and I’ll try my best to demystify it here. ZeroMQ sockets are an API model – it provides small embeddable servers that work over various transports and speak to your apps using a socket metaphor. A familiar BSD socket API (bind, connect, send and receive), exposes connection management and messaging interface. These sockets send or receive length specified opaque blobs (messages) shuffled into a framed wire protocol, using any of the supported transports: tcp, ipc or inproc (more on this later). There are no byte streams, but messages go on the wire.

Supported socket pairs:

  • REQ/REP – client/server (RPC)
  • PUB/SUB – data distribution
  • PUSH/PULL – task distribution
  • PAIR – exclusively for inter-thread communication and others

Common socket pairs

Each socket has behaviour specific to its role in a messaging pattern. SUB sockets for example don’t know how to send data (they’re unidirectional), while REQ sockets can send and receive data. A router (fair-queued, round-robin, fan-out, depending on the socket’s role in a given pattern) and an I/O multiplexor complete these “scriptable” sockets.

Here’s a classic “apart from” scenario by Pieter Hintjens, dreamt up on the ZeroMQ wiki. Enjoy!

Rowan: APART FROM portability, message framing, super fast asynchronous I/O, support for every bloody language anyone cares about, huge community, price tag of zero, mind-blowing performance, protection from memory overflows, loads of internal consistency checks, patterns like pub/sub and request/reply, batching, and seamless support for inter-thread transport as well as TCP and multicast, ZEROMQ IS JUST SOCKETS!!!

Amari: Sure!

Kim: You forgot zero-copy and automatic TCP reconnection…

Touching the water

In this example we have a parallel work distribution pipeline. The goal is to move jobs from the ventilator and collect results in the sink as fast as possible. The workers tier is composed of simple devices – frontend and backend socket pairs (see the code below). Now, imagine that any tier or any worker can be deployed in any language. To scale out worker capacity, just add more to the tier. Need to remove the SPOF at the edges? Just spawn more. Are the results sink not able to keep up? Spawn more.

Parallel task distribution and collection.

zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_PULL);
void *backend = zsocket_new (ctx, ZMQ_PUSH);
# connect to the ventilator
zsocket_connect(frontend, "tcp://localhost:5001");
# connect to the results sink
zsocket_connect(backend, "tcp://localhost:5002");

while (true) {
    zmsg_t *msg = zmsg_recv(frontend);
    zmsg_t *result = process(msg);
    zmsg_send(&result, backend);
}

Common use cases

These are by no means a complete set of patterns, I’ve just picked the most appropriate to cover in this article. Please consult the zguide for more high level patterns.

RPC (REQ/REP)

The pattern connects a set of clients to a set of services and is useful for remote procedure calls (RPC) and task distribution. Either end supports multiplexed I/O and can thus handle several thousand connections.

 1:1 req / reply – the client/server control socket style we’re all familiar with.

 REQ socket can connect to multiple REP sockets for failover (round-robin).

 Bind, ready to serve several thousand clients.

This is the only lockstep (must send before recv, always in order) socket pair. Despite this “limitation”, REQ/REP pairs can implement custom routing between client and a frontend, which then load balances to work out a set of backends. The other patterns covered here are downstream only. This bi-directional REQ/REP pattern is useful for flow control by signalling upstream providers in downstream topologies, such as pipeline and pub-sub.

Data Distribution (PUB/SUB)

This pattern connects a set of publishers to a set of subscribers and is useful for data distribution requirements such as game state, market data and other feeds. ZeroMQ can saturate most network pipes and is suitable for high volume replication.

Standard pub-sub model.

Inverted pub/sub for logging.

ZeroMQ 2.2 PUB sockets distributes while SUB sockets filter. This removes a lot of processing overhead from the PUB socket, but can also cause severe network congestion when there are a large number of subscribers and you have filters that won’t match against messages sent. Subscription forwarding in 3.2 changes this to move filtering to the PUB socket.

A protocol extension passes subscriptions from SUB sockets to upstream peers (PUB sockets) in order to conserve bandwidth in such scenarios. This protocol change is currently not backwards compatible with ZeroMQ 2.x PUB/SUB sockets, but it’s being addressed.

Work Distribution (PUSH/PULL)

The pipeline patterns connect nodes in a fan-out / fan-in pattern that can have multiple steps. Its primary use case is for parallel task distribution and collection.

Pipeline – one producer, many consumers.

Parallel task distribution.

The pipeline is usually infinitely extendable, much like UNIX pipes. Tasks are distributed in round-robin fashion to PULL sockets. This pattern is suitable for workloads with variable processing overhead such as image processing or video transcoding.

Messages

There’s no message format imposed – ZeroMQ implements a framed wire protocol that treats message payloads as blobs. This removes any proprietary serialization requirements and is well suited for distributing high data volumes as messages. As long as there’s memory available at both sender and receiver and the network has ample capacity, nothing would stop you from sending 5GB messages.

Atomic delivery guarantees you’d either receive the whole message, or none of it. Buffer management and reconstruction is entirely delegated to ZeroMQ – there’s never a need to touch the wire. Multipart messages (envelopes) are also supported for routing and more complex systems where distinct header, body and footer semantics makes most sense.

Transports

ZeroMQ implements transport agnostic messaging, which means you can easily swap out transports as your infrastructure grows. Validate a new service with threads in a process. Then scale up to processes on a box and eventually boxes on a network. All with a uniform API – just change the protocol segment of the connection URI.

Supported transports

  • inproc:// – threads in a process, never traverses the network stack
  • ipc:// – processes on a box, slower than inproc://, but useful for splitting concerns on a given host
  • tcp:// – nodes in a network, most latent and error prone, but also the most scalable

Devices

Devices are lightweight and programmable switches. You can see an example in the code below. They are like proxies, with a frontend and backend socket pair, that reduces complexity when connecting pieces in your network. Similar to intermediaries in brick and mortar economies, they are typically called wholesalers, distributors, managers, etc. An ideal network topology should have an intelligent core and dumb edges, with a “smart endpoint, dumb network” consumer perspective.

A device that injects itself into an existing topology to satisfy new persistence requirements.

zctx_t *ctx = zctx_new ();
void *subscriber = zsocket_new (ctx, ZMQ_PUB);
void *publisher = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect(subscriber, "tcp://localhost:5001");
zsocket_bind(publisher, "tcp://localhost:5002");

while (true) {
   zmsg_t *msg = zmsg_recv(subscriber);
   zmsg_t *result = persist(msg);
   zmsg_send(&result, publisher);
}

Common device uses cases

  • Proxies for authentication or encryption
  • Translation between network segments with different message payloads
  • Custom or complex routing requirements
  • Filtering / de-dup during synchronization
  • Persistence for audit trails and snapshots

Scalability

The Interjection Principle states that:

Inserting an intermediate node or device into a topology should not change behaviour at the endpoints. Thus growing a topology out with devices should always be transparent to network edges. For example, if an image transformation PUSH/PULL pipeline requires additional optimizations or conversions, we can easily add devices with those responsibilities transparent to the ventilator or the results sink.

ZeroMQ message queues (per socket pair) can be bounded through a High Water Mark (HWM) socket option to avoid excess memory growth and as a means of throttling throughput for components. Each socket pair has its own semantics for when a HWM value is reached. Some block, others drop messages until the queue has been cleared. The PUB/SUB pair for example drops messages for both sockets. Typically in such a topology the PUB socket “drives” the downstream topology and it throttles by dropping messages it can’t distribute.

As a good rule of thumb, let components that connect be unstable and others that bind be stable, much like a traditional network. Multiple transports and multiplexed asynchronous I/O allows a logical component to then grow as large, or small, as it needs to be. The ability to scale infrastructure down is very often overlooked and there’s millions to be saved by doing that in The Cloud.

Resiliency and reliability

By having external dependencies such as connection management and message delivery asynchronous, ZeroMQ is designed with “unstable” components in mind. There’s no guarantees on any network, but the framework attempts to handle common errors with reconnects etc. One of the big wins of this design is that it’s possible for clients to connect to multiple endpoints simultaneously – you cannot do this with standard sockets. Servers also don’t need to be up before clients, this is great for flexibility and not having to orchestrate upgrades and deployments.

99% of deployments don’t need enterprise reliability. Back in the seventies, transactions were costly, network topologies were small and were managed by a large Ops teams, and in general the system had very little churn. Nowadays transaction costs are negligible, networks are large, ops teams are small and it all changes very rapidly.

With this in mind, it’s recommended that ZeroMQ messages are treated as in flight without guarantees, and to address reliability with messaging patterns. Consult the zguide for the Majordomo, Titanic and Binary Star patterns, which are very well documented and have implementations for most languages. Pieter Hintjens started to experiment with implementing these as C libraries with a very thin API, effectively hiding implementation details and then have languages expose these as native bindings.

In general, don’t drink reliability Kool-Aid – it’s best solved collectively, not by a single component.

Performance

ZeroMQ was designed with low latency and high throughput goals. It was conceived for the financial services industry with much of the initial development cycles focused on optimization. Latency is always measured per message – it cannot be averaged. How long does it take to deliver message X from Y to Z? Throughput however, is measured at specific points, such as at the receiver, the sender or a given device. How many messages are delivered over time to X?

A dedicated I/O thread handles all message delivery to and from the network. This is great for scripting languages as the contract between the language and sockets is about getting messages in to or out of a socket’s outbound or inbound pipes. Dispatch happens in the I/O thread.

ZeroMQ has higher latency and throughput than TCP. It attempts to avoid traversing the network stack by disabling Nagle’s algorithm where possible and will generally attempt to send all buffered messages at once. This works very well for 1 to an infinite number of messages.

It can sustain millions of messages per second on a multi-core box over 10GB Ethernet.

Concurrency

Ever worked with a multithreaded where no locks and mutexes are required? ZeroMQ has excellent support for writing highly concurrent applications. The secret sauce is not sharing any state, but to pass them as messages to other sockets, “living” in other application threads, instead. The distinct I/O thread and lockless data structures allow developers to take advantage of OS native threads and thus scale out to multiple cores with ease.

Case study – Mongrel 2

Mongrel 2 is a multi-protocol web server that can speak to clients using HTTP, WebSockets or Flash XML / JSON sockets. It’s one of the early Open Source adopters and uses ZeroMQ in combination with a simple wire protocol for backend requests. Backend servers are thus language agnostic by tapping into the existing binding ecosystem, allowing servers to write adapters in their native language.

Mongrel2 request distribution.

Passing thoughts

I hope this has been a worthwhile introduction that sparks enough interest to explore the framework and patterns further in your language of choice. Follow a few simple rules (and unlearn a few things) and distributed systems becomes a whole lot easier. I also encourage you to look into custom protocols that suit your problem domain by scanning through Keywords for Unprotocols and, don’t forget to read the zguide :-)

Lourens Naudé is a cub at Bear Metal OÜ. You can contact him at lourens@bearmetal.eu.

Author
Comments
comments powered by Disqus