In stream processing applications, data arrives continuously and needs to be processed expediently in order to keep up with the incoming flow. Latency  is the primary metric with which the health of a streaming application is measured. High latency is typically an indication of problems. It can cause the application to be unable to keep up with the flow of incoming data.

Apache Apex, being a distributed processing system, spreads the processing across the different components, or operators, of the application. Providing a strong operational focus, Apex measures and reports latencies at the individual operator level as well as the overall end-to-end latency of the application. In this blog, we will explore how latencies are calculated in both the operator level and the application level.

Operator Latency

When data is flowing continuously, how does one measure latency? The approach that Apex takes is aligned with the concept of a streaming window, which is an integral part of how Apex operates. Streaming windows are temporal slices of data in a data stream.  In  Apex, the latency of an operator is defined as the time it takes for a streaming window to propagate from upstream to downstream of the operator.

As described in the documentation of STRAM, each live worker container sends heartbeats to STRAM periodically so that STRAM knows it is alive. These heartbeats also contain performance metrics and other statistics for the operators running in the worker containers. Examples of these metrics include CPU usage for each operator, memory usage for the worker container, number of tuples processed and emitted, queue size of each port of the operators, current window ID, checkpoint status, AutoMetrics, etc.

One of these statistics is the timestamp of when the END_WINDOW control tuple[1]  is processed for each  window  in an operator, immediately  before the END_WINDOW control tuple is put into the sinks of the output ports. For the purpose of this blog, let’s call this timestamp end-window timestamp  or EWT. For a given window, STRAM takes the largest of the EWTs of all upstream operators and subtract it from the EWT of the operator in question. Let’s consider this sub-DAG:

image02

EWT1, EWT2, … , EWTn  are the EWTs in each upstream operator of Operator X. EWT0  is the EWT of operator X. The value:

 EWT0  – max(EWT1, EWT2, … , EWTn)

is the time it takes for one window to propagate from upstream to downstream of Operator X, in other words, the time latency Operator X adds for this given window. Note that for input operators, this value is zero because,  by definition, input operators do not have an upstream.

In general, the worker containers run in different nodes. Therefore, these EWTs in the heartbeats of each container are based on the system clocks of different nodes. In order to calculate the latency correctly, clock skews among  different nodes need to be accounted for. For the majority of applications running in Apex, the latency of each operator is on the order of  milliseconds. From our observation, synchronizing the system clocks by running NTP in each node is not enough to achieve millisecond accuracy.

Among the many statistics contained in  the worker container heartbeat, one of them is the timestamp measured on the container just before the heartbeat was sent to STRAM. To solve the clock skew problem, STRAM takes the timestamp when it receives the heartbeat and stores the difference between the two timestamps for each worker container. This difference includes the heartbeat latency plus the clock skew between STRAM and the worker container. STRAM applies the moving average  of these differences to the EWT of each operator  in the worker container to compensate for clock skews before using them to calculate the latency of each operator.

EWTs are  window-based. If an operator falls behind its upstream by n  windows, the latency calculation of this operator must use the EWT of the upstream n  windows ago. This poses a problem because n  is unbounded and STRAM does not have unlimited memory to store all these EWTs. Hence, an application attribute THROUGHPUT_CALCULATION_MAX_SAMPLES is used to limit the maximum number of outstanding windows in memory for latency calculation. This value is by default 1000. If an operator falls behind for any n  greater than THROUGHPUT_CALCULATION_MAX_SAMPLES windows, then it uses (n  * window width) for estimating the operator latency instead of using the EWTs.

Application Latency

The latency of an application is defined as the time it takes for a streaming window to propagate from root operators (also known as input operators) to the leaf operators in the physical DAG. Below is the process that describes how this is calculated.

As described in the previous section, we have the latency, as well as the upstream operator that has the largest clock-skew adjusted EWT, for each physical operator in the DAG.  We walk the DAG from each of the leaf operators up the stream, by selecting the upstream operator with the largest EWT for each step of the way, all the way to a root operator. As we do that, we take the sum of the operator latencies along the path.  The maximum of these sums is the application latency, and the path associated with the maximum sum is the critical path.

image00

In  this physical DAG  example, we walk the DAG from Operators D, E, and F to the root Operator A, and the operator latency sums are 30 + 5 + 0 = 35 ms from D, 20 + 100 + 0 = 120 ms from E, and 2 + 100 + 0 = 102 ms from F. Therefore, the application latency is 120 ms, and A->C->E is the critical path in the DAG.

This critical path denotes the bottleneck of the application. This means that the application latency will not improve if operator latencies improve only outside of the critical path. In this example, it does not matter how fast operator B becomes  because it is not part of the critical path of the DAG. If an application developer wants  to optimize this application for latency, he/she should focus on optimizing operators C and E. In a real-world application, the physical DAG often contains many  more operators and is much more complicated. Knowing the critical path of your application is extremely important when tuning your application  to maximize performance.

Because of that, the dynamic partitioning feature in Apex can make use of the critical path information to decide whether or not to partition an operator. This is not implemented yet in Apex. We will do so if there is a demand for it.

Retrieving the Latencies and the Critical Path

With Apache Apex, you can retrieve the latency and critical path information using the CLI. A sample session showing this is  below:

dt (application_1456485348783_2283) > get-app-info
{
   ...
   "stats": {
      ...
      "criticalPath": [
         "1",
         "2",
         "18",
         "22",
         "24",
         "23"
      ],
      "latency": "180"
      ...
   },
 ...
}
dt (application_1456485348783_2283) > list-operators 7
{"operators": [{
   "id": "7",
   â€¦
   "latencyMA": "233",
   â€¦
}]}

With DataTorrent RTS, in addition to the CLI, you can also get the latency information via the REST API  in dtGateway, which can be integrated with your own monitoring tool, and in the physical view of the application in the UI for easy visualization:

image01


[1]  Control tuples are markers that are inserted into the data flow by Apex to help carry out some management tasks. Examples of these tuples are END_WINDOW tuple that marks end of a window, checkpoint tuple etc. These control tuples are lightweight pieces of data, only inserted at a few points in the stream and add very little overhead to the stream.