Introduction to part two
This is part two of a two-part series on how a Wallaroo system reacts to workload demands that exceed Wallaroo’s capacity, i.e., how Wallaroo reacts to overload. Part one, which defines what overload is and summarizes overload mitigation techniques, can be found here: “Some Common Mitigation Techniques for Overload in Queueing Networks”.
Wallaroo uses several back-pressure techniques to limit message queue sizes of all actors within the system. Together, these techniques form an end-to-end mechanism that protects Wallaroo from high-volume data sources. This article describes how these back-pressure mechanisms are implemented by and integrated with Wallaroo. Also, we take a peek at a big change to Wallaroo’s internals that we hope will change Wallaroo in several good ways.
Where Does the Term Back-Pressure Come From?
The term “back-pressure” originally described the resistance to forward flow of a gas or liquid in a confined space, such as an air duct or pipe. The gas (or liquid) experiences friction along the sides of the duct (or pipe). It is easy to create an experiment to experience different amounts of back-pressure by a gas. For example, using your lungs and mouth, try to blow as much air as you can – as quickly as possible – through the following tubes for comparison:
- A small cocktail straw with an opening only 1 or 2 millimeters in diameter
- A regular drink straw, for example, a straw from the soda fountain at McDonalds
- The cardboard tube from a roll of paper towels or toilet paper
TCP’s Back-Pressure Mechanism: Sliding Window Flow Control
The TCP protocol uses a flow control mechanism called a “sliding window” to prevent the sender from using too much of the receiver’s memory and CPU resources. Wallaroo integrates TCP’s flow control into its back-pressure system. This isn’t the time & place to describe TCP’s sliding window flow control in detail. However, a picture is worth a thousand words (Figure 25.7 below). Let’s take a look.
First, let’s make some assumptions about a hypothetical TCP connection.
- The TCP connection has already been established.
- The sender application is Wallaroo.
- Wallaroo is attempting to send 50,000 bytes of data to the receiver as quickly as possible.
- The receiver is a TCP data sink application that stores Wallaroo’s computation output in 1,000 byte chunks.
- The receiver’s disk drive is very old and slow. Writing a single 1,000 byte block can cause the application to pause for several seconds.
Figure 25.7 tracks one of the two advertise windows of the sliding window protocol for the connection. We look at one direction only; TCP maintains a separate sliding window for data sent in the other direction.
From the receiver’s point of view:
The advertise window starts at 2,500 bytes.
Packets start arriving from the sender. The app has read 2,500 bytes quickly but then does not read any more network data for a while: the app is put to sleep by the kernel due to very slow file I/O speeds.
The disk finally finished, so the app wakes up, reads 2,000 more bytes from the TCP connection, writes the new data to disk, then goes to sleep a second time, waiting for disk I/O to finish.
The app wakes up and then reads another 1,000 bytes from the TCP connection.
The advertise window falls to zero twice in this example. When the advertise window is zero, the Wallaroo sender must stop transmitting and wait for an “ack” message with a non-zero advertise window.
How does Wallaroo know the receiver’s advertise window size? That level of detail is not available through the POSIX network socket API.
A POSIX system can tell a user application when the
advertise window size is exactly zero.
If the socket has been configured in non-blocking I/O mode,
writev(2) system call to the socket will fail
errno value of
EAGAIN, depending on the
operating system. (I will assume the
EWOULDBLOCK value for this article.)
tells the application that the kernel was not immediately able to send
any data on the socket.
When Wallaroo sends data to a sink with the
writev(2) system call,
and the call fails with
then we know that the sink is slow. We don’t
know why the receiving sink is slow. The
EWOULDBLOCK status can be
caused by other conditions, such as lack of buffer space in the
sender’s kernel. However, we do know that
Wallaroo is sending data faster than the TCP sink can read it and/or
faster than the sender’s OS can handle it. We know that we have a
flow control problem, and we need to create back-pressure to fix the
Wallaroo needs to spread information about the flow control problem to other parts of itself. Let’s look at that problem in the next section.
Wallaroo is a distributed system of actors in a single OS process
Wallaroo is an application written in the Pony language. For more on this topic, check out our article: “Why We Used Pony to Write Wallaroo.”
Pony implements the Actor Model of concurrency, which means that a Pony program is a collection of independent actor objects that perform computations upon their own private state. Actors communicate with other actors only using message passing.
Figure 26: Partial view of a Wallaroo application, with actors
In our example, there is an actor called
TCPSink that is
responsible for writing data to a TCP socket that is connected a
Wallaroo data sink. When that actor experiences an
event when sending data, it is the only actor that is aware of the
sink’s speed problem. Pony does not allow global variables. We
cannot simply use a global variable like we can (very naively!) in C,
hey_everyone_slow_down = 1), to cause the rest of Wallaroo to
In an Actor Model system, actors communicate with each other by
sending messages. Therefore, to tell the other actors about the TCP
TCPSink needs to send messages to other Wallaroo actors. If all
actors in the system are told to stop, then Wallaroo as a whole can
act as if a big finger reached into the system and pressed a “PAUSE”
When that metaphorical “PAUSE” button is pressed, the system no longer creates data to send to the sink. Wallaroo’s internal queues become frozen as producing actors stop sending messages. When paused, the message mailbox queues for each actor are effectively limited in size. As long as the pause happens quickly enough, we shouldn’t have to worry about uncontrollable memory use by the application as a whole.
Wallaroo today: The mute protocol
Inside of Wallaroo today, a custom protocol is used to control the back-pressure of stopping and starting computation in the data stream pipeline. We don’t really want to broadcast to all actors: many Wallaroo actors don’t need to know about back-pressure or flow control. But we do need a scheme to help determine what actors really do need to participate in the protocol.
The protocol is informally called the mute protocol, based on the
name of one of the messages it sends, called
mute. The word “mute”
means to be silent or to cause a speaker to become silent. That’s
what we want for back-pressure: to cause sending actors upstream in
a Wallaroo pipeline to stop sending messages to the
As a data stream processor, the computation stages inside of Wallaroo are arranged in a stream or pipeline. The diagram below shows a simplified view of a “word count” application in Wallaroo, showing the actors that are directly involved with processing the data stream.
Figure 27: View of a Wallaroo application “word count”
Now, let’s see what happens when a data sink stops consuming data as quickly as Wallaroo produces it.
TCPSink actor becomes aware of back-pressure on the TCP
socket (via the
EWOULDBLOCK error status when trying to send data),
TCPSink sends a
mute message up the chain to its predecessor.
That predecessor sends a
mute message to its predecessor, and so on,
all the way backward to the
TCPSource actor. (See
Figure 28 below.)
mute message reaches the
TCPSource actor, the
actor stops reading from its socket.
The same TCP flow control
condition that told us that the data sink is slow will now
eventually force the data source to stop sending.
If the data source sends enough data, then the TCP advertise window will
drop all the way down to zero. In reaction, the source is forced to stop.
Figure 28: View of a Wallaroo application “word count” with
unmute message sending.
When the sink TCP socket becomes writable again, then the
actor sends an
unmute message to its predecessor. The
message is forwarded step by step back up the chain. Eventually, the
unmute message reaches the
TCPSource actor. The
resumes reading from the source TCP socket, which triggers TCP to
send a non-zero advertise window to the data source. The pipeline starts
Now we have all three pieces of back-pressure in place. Starting from the end of the pipeline and working backward, we have:
- From a slow sink TCP socket to Wallaroo’s
TCPSinkactor, by reducing TCP’s advertise window to zero. (Recall, the zero window size is signaled to
- Backward along the stream processing chain from
TCPSource, using the custom
muteprotocol within Wallaroo.
- From Wallaroo’s
TCPSourceactor to the source’s TCP socket, also by reducing TCP’s advertise window to zero. (Recall, the
TCPSourceactor stops reading from the source socket when that actor is muted.)
mute protocol between Wallaroo actors is
software, and software tends to have bugs. It would
be good to replace the artisanal, hand-crafted
and rely instead on a back-pressure system that applies to all Pony programs,
including Wallaroo. That general back-pressure system is described
Wallaroo’s future: Plans to use Pony’s built-in back-pressure scheduler
A comprehensive back-pressure system was added to Pony in November 2017 by Sean T. Allen, VP of Engineering at Wallaroo Labs. Look for a Wallaroo Labs article by Sean in the near future that describes this the back-pressure scheduler in more detail.
Sean’s back-pressure implementation operates at the actor scheduling layer
within the Pony runtime. Pony’s actor scheduler maintains internal
state of actors that are paused due to back-pressure. The scheduler
propagates pause and resume signals in a manner
similar to the
unmute messages shown in Figure 28.
Because the back-pressure system is inside the Pony actor scheduler,
a Pony developer needs to add little or no additional code to get
comprehensive back-pressure scheduling behavior. In fact, the current
0.4.1 release of Wallaroo uses the back-pressure scheduler today.
To create full end-to-end back-pressure, Wallaroo needs some source code change. The changes fall into two categories:
- Removal of most of the existing back-pressure mechanism, with its
- Add small pieces of glue code to allow TCP sockets that experience
EWOULDBLOCKerrors when writing data to signal to the runtime that the socket’s owner actor should initiate back-pressure propagation by the scheduler.
Other Wallaroo actors that use back-pressure
There are two other areas where Wallaroo has actors that participate in back-pressure propagation.
Wallaroo has source and sink actors for communicating with Kafka servers that act as Wallaroo data sources and sinks. Kafka’s own protocol imposes additional constraints on flow control, but the general TCP principles described here also apply to the Kafka-related actors.
Wallaroo uses TCP to copy data between nodes in a multi-worker Wallaroo system. Figure 29 has a simplified view of the actors involved in a pipeline that is split across two Wallaroo worker processes. The class names of the actors used for that internal communication are
DataChannel, but the back-pressure principles are the same as for the
DataChannelactors also implement the
muteprotocol (today) and are subject to the back-pressure mechanism in the scheduler (future).
Figure 29: Actors and socket involved in multi-worker communication
Testing Wallaroo and back-pressure
I have already made many of the above code changes on a development branch, but are not yet tested systematically. If you are a frequent reader of the Wallaroo blog, then you know that we devote a lot of time and effort to correctness testing.
Testing-wise, I’ve first done a lot of preparation work to permit Wallaroo
to control the kernel’s TCP buffer sizes.
(It’s much easier to create reliable, repeatable tests
for the back-pressure system if you can predict the TCP
sockets’ buffer sizes in advance!)
The new tests apply to both the current back-pressure
protocol and to the new Pony runtime’s back-pressure scheduler
We aren’t completely sure that the Pony runtime’s back-pressure scheduler will actually meet Wallaroo’s needs. Here’s a partial list of open questions:
- We hope that the runtime’s back-pressure scheduler will sharply limit the amount of memory used by Wallaroo when back-pressure is active. But we haven’t measured Wallaroo’s actual memory behavior yet.
- We suspect that some additional parameters will be necessary to tune the scheduler’s back-pressure behavior.
- The back-pressure scheduler may yet be buggy. Its code is not yet half a year old.
- How will the runtime react when back-pressure scheduling has taken effect, and then a system administrator wants to query the cluster’s state? Or change the cluster’s size, larger or smaller?
- Do either of the back-pressure implementations stop too much processing? Zach Tellman points out in his talk (linked below) that buffer space can be traded for latency and throughput improvements. Perhaps Wallaroo pipelines may stall for too long, with some processing steps idle while waiting for upstream steps to resume sending after back-pressure has been released?
These open questions drive my interest in test infrastructure: I want to find as many bugs in Wallaroo’s workload/overload management as possible before our customers do. It’s a fun task! And when we find performance changes and/or interesting bugs in this work, we’ll write about it here. Stay tuned.
Series conclusion: Wallaroo uses back-pressure to create end-to-end back-pressure during overload conditions
The first part of this short series on overload mitigation first gave an informal definition for the term “overload” in the context of queueing networks. It then presented several techniques that can mitigate the effect of overload conditions. One of those techniques, back-pressure, nicely fits the goals and constraints of Wallaroo.
The second article (this one) describes three back-pressure mechanisms
used by Wallaroo: the TCP protocol’s sliding window protocol,
mute protocol, and an actor scheduler-based back-pressure
implementation. Wallaroo integrates these techniques into an
end-to-end back-pressure system that can mitigate overload in a
Wallaroo cluster of any size. As we strive to make Wallaroo a robust
and reliable data stream processing platform, we have more work ahead
to fully integrate and test the back-pressure system.
Pointers additional online resources and to other back-pressure systems
- dataArtisans: How Apache Flink™ handles backpressure
- Fred Hebert: Handling Overload
- Henn Idan: Reactive Streams and the Weird Case of Back Pressure
- Reactive Streams initiative: Introduction to JDK9 java.util.concurrent.Flow
- Zach Tellman: Everything Will Flow,
an overview of Clojure’s
- Wikipedia: Back-Pressure, TCP Flow Control, and Sliding window protocol topics
Figure 25.7 is excerpted from “Fundamentals Of Computer Networking And Internetworking” class notes, chapter 25 by Douglas Comer (Purdue University) and Pearson Education. I used #25 as the basis for all of the following figures. I hope my unorthodox numbering scheme didn’t lead you on a goosechase to try to find Figure 3.
Did you enjoy this post?
Subscribe to our blog and be updated each time we add a new post.