Motivation

A business user often comes across questions like “What were the sales for particular a product for the past years?” or “What are the top ten geographies for a particular market segment?” These questions can be answered by an analyst, but not without compiling complex queries which then run for a significant period of time. Even if the organization maintains a data warehouse, which would make this task a bit easier, the analysis remains an extremely tedious task.

In either case, how easy would it be for the business user to cut this cycle to just a lookup on a real-time dashboard which is constantly updated with all the questions that he or she might have? The answer is completely rhetorical!

Introduction

Visualization for big data is a hard problem. A simplified version of this problem is visualizing the metrics related to this big data. However, real-time visualization complicates this problem further; more so considering the possibility of both real-time as well as historical queries for metrics.

DataTorrent RTS includes a framework for visualization for fast big data-in-motion which we refer to as the Metrics Framework. This framework aligns with DataTorrent’s vision of lower total cost of ownership (TCO) and time to value (TTV). See this keynote for an in-depth outline of the concepts – TCO and TTV.

In this article, we’ll discuss the architecture of the metrics framework from the developer to the end business user. The discussion will be with respect to the stream processing solution at DataTorrent – DataTorrent RTS.

What Do We Mean by Metrics?

Metrics are basically information about the data that are being processed in an application. The simplest example of a metric can be the total number of records processed. For a big data application in DataTorrent RTS, a metric can be a number of things ranging from the latency or throughput of an operator to something like the business expenditure over the past year (for say, a data processing application for an advertisement campaign). We’ll try to look at the different metrics that are supported by the metrics framework in DataTorrent RTS.

What are operators? (This is a side note – generally helps in understanding the terminology in the blog)
Operators are basic building blocks of an application built to run on DataTorrent platform. An application may consist of one or more operators each of which defines some logical operation to be done on the tuples arriving at the operator. These operators are connected together using streams forming a Directed Acyclic Graph (DAG). In other words, a streaming application is represented by a DAG that consists of operations (called operators) and data flow (called streams).

System Metrics

Some of the metrics give information about the performance or health of the operator. These are generic metrics which are true for an operator irrespective of the application or the data that is being processed. We refer to these metrics as system metrics. Some examples of these metrics are latency of an operator, total tuples processed, total tuples emitted and various other moving averages.

These metrics are not restricted to just operators but also extend to an application. Some examples could be moving averages for total tuples processed or emitted at an application level, number of planned, allocated and failed containers or even information about memory usage or bytes written into the buffer server. All of these metrics are available as part of the system metrics in the metrics framework. Note that these metrics indicate the meta information about the platform and its components and hence are not customizable.

Dev Usage
The system metrics, be it for operators or applications, are collected automatically by the framework and delivered to the visualization layer. The developer/user does not have to do anything in order to visualize these metrics.

Auto Metrics

A system metric is useful for a DevOps engineer, but may not be of interest to a business user. A business user is typically interested in a semantic view of the application. For example, let’s consider the case of DataTorrent’s Omni-channel Fraud Prevention Application. The number of transactions detected as fraud, let’s call it “# fraud transactions,” is a typical metric that a business user may want to visualize. This metric can be requested in a snapshot fashion – continuously changing figure which gives the # fraud transactions at the current instant. Or we may want to visualize it in a historical fashion – # fraud transactions for the last ten minutes or hours or even days! We refer to such metrics as Auto Metrics.

What is the DataTorrent Fraud Prevention Solution?
The fraud prevention solution discussed above in fact does a lot of complex event processing by passing each record through a series of business rules and triggering one or more actions. It is a typical example of CEP which leverages a rules engine for the purpose of optimizing the process of rule application and trigger management.

For an application like fraud prevention, each or a combination of rules may potentially need to be visualized as a metric. As an example, one of the rules in the application is to filter out insignificant transactions which take into account the amount of the transaction or the account numbers involved therein. The total number of significant transactions is definitely an interesting metric for an end business user, and is another example for an Auto Metric.

To give more examples of Auto Metrics, considering an ETL processing application, the number of records filtered out, number of records detected as error records or even total records which made it successfully through the cleansing pipeline could be meaningful Auto Metrics.

Dev Usage
The naming of the term Auto Metric is actually historical and has to do with the way these metrics are collected. To visualize a field as an Auto Metric in the framework, the developer only needs to annotate the code with an @Autometric annotation. The rest is taken care of by the framework. A very simplistic example is illustrated in the code fragment below.

Class FraudPreventionOperator extends BaseOperator 
{
	@Autometric
	private long numFraudTransactions
	
	public void processTransaction(Transaction t)
	{
		if (isFraud(t)) {
			numFraudTransactions++;
		}
	}
}

Collection of Collection of Pairs (CCP) Metrics

The Auto Metrics that we discussed are predominantly numeric, which is the case most of the times. However, the framework also supports metrics of a generic nature which can be visualized on the visualization layer – dtDashboard as a table structure. We refer to these metrics as CCP (collection of collection of pairs). A pair defines a cell – the column name and the value. The first level of collection is the collection of such cells which defines a row. The final collection is a set of such rows. Any arbitrary data can be visualized via this structure.

What is dtDashboard? dtDashboard is the visualization component of DataTorrent RTS and is seamlessly integrated with the metrics framework. The Gateway, which is part of the RTS platform, uses the Metrics Reader API to read data from the metric store.

An additional requirement for using such metrics is that the developer needs to specify an aggregator for these metrics. Consider the case of a physical DAG in DataTorrent RTS with multiple partitions of an operator where we have defined a CCP metric. The framework needs to understand how to combine the CCP structure from multiple physical partitions into a logical structure.

Dev Usage
An example of a CCP metric is illustrated in the code fragment below:

@AutoMetric
@FieldSerializer.Bind(JavaSerializer.class)
private Collection<Collection<Pair<String, Object>>> topN;

This structure (CCP) can be populated / modified by the user during processing and the framework will make sure to deliver the metric reported by StrAM to the visualization layer in RTS. StrAM (Streaming Application Master) actually refers to the application master for an RTS application. It is responsible for negotiating resources for the operators in the application as well as managing the entire application. Features like scalability, fault tolerance, and metric reporting, among others, are built into the StrAM which make RTS powerful and production grade.

To specify an aggregator, use the following API specified in the interface AutoMetric in Apex API:

interface Aggregator
{
	/**
	* Aggregates values of a specific metric.
	*
	* @param windowId window id for which values are aggregated.
	* @param physicalMetrics collection of physical metrics from all instances.
	* @return map of aggregated metric keys and values.
	*/
	Map<String, Object> aggregate(long windowId, Collection<PhysicalMetricsContext> physicalMetrics);
}

Once this aggregator is defined, the logical CCP structure can be created by the StrAM and delivered to the plugin which takes care of delivering the metrics to the dtDashboard visualization.

Note that this aggregator can also be defined for any numeric metric as well.

Application Metrics

Both of the concepts System metrics and Auto metrics are localized to the notion of an operator. However, the operator is just part of the processing logic. The business logic is split up into multiple operators, each of which executes just a small part. How about a metric which is not bound by the rather implementation-specific view of an operator but accounts for the business view of the application? The problem is that the metrics are bound to an operator. The solution, in this case, is to combine metrics from multiple operators to compose the application level metric. Let’s discuss this by way of an example. Let’s say the metric which needs to be visualized is the percentage of all transactions which are fraud. The way we can compute this is as follows:

% fraud transactions = # total transactions / #fraud transactions

The problem is that the # total transactions and the # fraud transactions as needed in the computation above is not available with a single operator. However, this can be computed if we can combine the metrics from the ingestion operator (which has the # total transactions) and the fraud prevention operator (which has the # fraud transactions). The metrics framework also provides support for users to define their own application level metrics – much like the % fraud transactions metric.

Dev Usage
From a development perspective, the developer needs to define a method computeAppLevelMetrics() which allows the engine to understand how to compute the application level metric:

Map<String, Object> computeAppLevelMetrics(Map<String,Map<String,Object>> operatorMetrics) 
{
	// Computation of app metrics from operator metrics
}

Visualization

The dtDashboard discussed above is a tool which allows visualization of all the metrics on a browser based UI. Below is a snapshot of a dashboard for an advertisement use case, showing 4 widgets, a stacked area chart, a pie chart a multi-bar chart, and a table. The visualization UI features a number of other widgets in addition to the above.

Another example of a dashboard which is showing the top ten hashtags and their counts in a real-time fashion:

Architectural Overview

DataTorrent RTS always had support for visualization of metrics in real-time. However, the architecture has significantly changed to a decentralized one making it both more scalable and robust.

The following block diagram illustrates the new architecture:

Metrics data is reported to the StrAM by the operators in every window. The StrAM collects this metric data and uses the Metrics Writer API to write it to the metric store on HDFS. The metric store is a custom designed, indexed, and highly optimized store for storage and retrieval of metrics.

The metric reader and writer APIs are designed to seamlessly integrate with external systems which may want to either store or retrieve metrics data from the Metrics Store.

What is dtGateway?
dtGateway is one of the main components of DataTorrent RTS and is accessible via a web interface: dtManage. It is a Java-based multithreaded web server that allows you to easily access information and perform various operations on DataTorrent RTS. dtGateway constantly communicates with all the running RTS App Masters (StrAM), as well as the Node Managers and the Resource Manager in the Hadoop cluster, in order to gather all the information and to perform all the operations users may need. It can run on any node in your Hadoop cluster or any other node that can access your Hadoop nodes, and is installed as a system service automatically by the RTS installer.

For more information, please check out:

  1. DataTorrent RTS Visualization Framework (dtDashboard) http://docs.datatorrent.com/dtdashboard/
  2. DataTorrent Fraud Prevention Solution https://www.datatorrent.com/fraud-prevention/
  3. Metric Data Store & Visualization in DataTorrent RTS webinar