I want to implement a fan-in/fan-out pipeline pattern like in ØMQ. I was looking at NNG, its successor, but its pipeline example does not accept multiple clients. I went to their Discord and got an answer quickly: you need to listen on the push socket and dial from the clients. I decided to make a quick post and release the code.

The why

I am currently working on designing a distributed system doing pathfinding. We decided to use Spark as it received some success in the field. In a nutshell, Spark operates by loading some data on a driver, splitting the workload into partitions and distributing these partitions among workers (connected machines usually).

One of our main overhead comes from loading the static data (>100MB) the algorithm needs to use. The library I use is written in C++; Spark in Java/Scala; and, I wrote the bindings in Python. If we use Spark’s broadcast variables, passing the data between Python and C++ means copying it every time. If we load the file from disk, the sequential read is still too long.

Currently, I am using named pipes (kernel-level file descriptors) to interface Spark partitions and the pathfinding code. (I load a resident process ahead of time, then pass it queries.) My issue is that we are not using Spark any longer; we only use Spark as a cluster manager.

So, obviously, I started playing with other ideas. One possible avenue is to use some form of “message passing:” keep a structure similar to Spark (which itself is based on Map-Reduce), but only pass messages to some running threads on the workers. Further down the line, the idea would be to have “warthog-as-a-service.” But first, I had to find a messaging library.

The problem

Nanomsg-next-generation (NNG) is the successor (of the successor) of ZeroMQ (ØMQ). It looks appealing as it is lightweight, has a nicer license, and is younger – bleeding edge ftw. Now, what first drew me to ØMQ is their fan-in/fan-out pipeline pattern as it looks exactly like what I want. NNG also has a pipeline example, great!

By default, their example is a one-way, two-nodes system. But upgrading to one push server, multiple clients, and one pull server should be easy, right? I will spare you the mistep of trying to port the ØMQ example to NNG first thing. Given the code in the example, it is only possible to connect one (pull) client to one (push) server. This stumped me for a little while, until I decided to hop on their Discord.

The solution

One of the promises of NNG is to remove the context from ØMQ. (And, obviously, my first idea was to try to get a context up and running.) It seems that, when using NNG, we need a server to listen to incoming connections and clients to dial in. (Thanks cody for the help.)

Once we have these two modifications done, we can connect multiple clients. To move from 1-to-n to 1-to-n-to-1, we need to double up the connections in the workers. To do so, I literally just followed the ØMQ example. So, here you go: fan-in/fan-out pipeline using NNG; the code is online.

The important bits are the following:

Ventilator

49  nng_socket sock;
50  int rv;
51  
52  if ((rv = nng_push0_open(&sock)) != 0) {
53      fatal("nng_push0_open", rv);
54  }
55  if ((rv = nng_listen(sock, url, NULL, 0)) != 0) {
56      fatal("nng_listen", rv);
57  }

Worker

24  int rv;
25  nng_socket receiver;
26  
27  if ((rv = nng_pull0_open(&receiver)) != 0) {
28      fatal("nng_pull0_open", rv);
29  }
30  if ((rv = nng_dial(receiver, url, NULL, 0)) != 0) {
31      fatal("nng_dial", rv);
32  }

Some results

Compiling the code should be straightforward; assuming you have NNG installed:

gcc -o ventilator -pthread -lnng ventilator.c pipeline.h
gcc -o worker -pthread -lnng worker.c pipeline.h
gcc -o sink -pthread -lnng sink.c pipeline.h

To use the code you want to:

  1. Start the ventilator.
  2. Start the sink.
  3. Start a number of workers.
  4. Pres enter in the ventilator’s console.
# 1 worker
Total elapsed time: 4706 / 4695 msec
# 2 workers
Total elapsed time: 2505 / 4691 msec
# 4 workers
Total elapsed time: 1297 / 4472 msec

Future work

What happened was I assumed that a “cooked” pipeline would handle multiple connections by default. It seems sockets are more a 1-to-1 communication by default (without listen/dial). Some more information from cody, who was kind enough to explain a few things in more details:

Not exactly. It depends on the protocol. Different protocols abstract to a different style; pub/sub is 1-to-n, push/pull is 1-to-1 but round-robins to different ones each time.

1-to-n could mean a single message goes to a bunch of other sockets at the same time, but for push/pull that isn’t the case; the pusher will only send 1 message to 1 puller, but it rotates through all the attached pullers. pub/sub is a more traditional 1-to-n.

Now that I have the pattern down, I will try to benchmark NNG vs. Spark for my use-case. It will also be possible to change the topology from: read data, process in parallel on all threads, write data; to: receive data, process, send data all in a single thread. I hope using messages will decrease my overhead as what I am doing is passing small messages and doing difficult computation. On the other hand, I have a feeling that when sending one message per query, networking may still be the largest cost. To be continued.