Case Study: Machine-Generated Data and Extreme Scalability

Case Study: Machine-Generated Data and Extreme Scalability

The Problem

Big Data is getting exponentially bigger all the time. The data/device ecosystem is expanding at an unprecedented rate. And with the expansion of the Internet of Things, which contributes massive amounts of machine-generated data—Big Data is posing big challenges for today’s enterprises.

Many businesses find themselves needing to process ever growing streams of data. In addition, many mission-critical applications today – from finance, to fraud detection, geo-location services, operations monitoring and more—all require this data to be analyzed and responded to in real-time.

When designing your Real-time Big Data application, you need to ensure that its architecture can support an ever growing, and never ending, mountain of information – for today’s load, and also for the future. Your solution needs to allow for EXTREME, linear, scalability – being able to automatically accommodate any future changes to load, distribution or business logic, as your needs evolve.

Enterprise-scale that’s Future-Ready

DataTorrent’s Real-time Stream Processing platform supports today’s most demanding, mission-critical, big-data applications. Built natively on top of Hadoop 2.x, DataTorrent is designed to enable highly scalable, massively distributed real-time computations.

DataTorrent provides unprecedented scalability for real-time stream processing running on a cluster of commodity hardware. As a Hadoop-native technology, the platform shares resources and seamlessly exchanges data with other Hadoop components, such as MapReduce, HDFS, Hive and others.

Extreme Scalability:

  • DataTorrent automatically scales out or in to accommodate any data size and processing need you may need to support – now and in the future
  • Linear scalability with sub-second latency guaranteed – even while processing 500 million events per sec

Case Study: Machine-generated Data and Extreme Scalability

How to Process 500M Events per Second?

This paper discusses a sample application for processing machine-generated data and how it was scaled to process a sustained stream of 500 million events per second.

Machine-generated Data:

Many businesses today are powered by a complex grid of integrated systems, sensors, actuators, monitors, apps, and more – all working 24/7/365 –generating A LOT of data, all the time. The machine-data generated by those systems can be crucial to the operations of many businesses. But often, by the time they can do anything with it, the data is worthless. Processing massive amounts of data in real-time is the true technical challenge of today. Businesses need to take action on system data immediately, when it is fresh, and not after a costly failure or business interruption.
For example, your business may want to:

  • Address any exceptions as they occur – set up automatic alerts to monitor possible issues or trigger automatic actions based on certain events
  • Predict system failures and perform smart maintenance
  • Correlate usage and performance data across multiple sources in real time.

Machine generated data is a classic case where high velocity meets high variety of data sources, resulting in a high volume of data that makes it more difficult for real-time processing.


Case Study Outline:

case-study-extreme-scalability-platform-ui

Input Data

The application used for this case study is tasked to monitor a continuous stream of events, generated by a huge number of devices. The devices are deployed with multiple customers and located in geographically distributed data centers and offices. It is assumed that samples would be sent over the Internet using a distributed message bus mechanism. The data set was modeled to resemble real data, but for the purposes of this benchmark, it is randomly generated. Each event contains a combination of measurement and configuration data and includes sampled measurements of CPU, RAM, and HDD utilization. It also includes the following values: Device ID, Model, Customer, Production version and several versions of Software components installed on this device.

Processing Flow

Upon receiving data, the application calculates the average of CPU, RAM, and HDD usage for different combinations of Device IDs, Customers, Production and Software Versions. It then monitors deviations from normal behavior over different set of dimensions. The application saves this data for offline analysis into persistent storage (a Redis NoSQL key-value store was used in this example). It also generates and sends alerts in near real-time using standard mail protocol (SMTP).

Being a DataTorrent application, it is fully fault-tolerant. All operators periodically save their state as snapshots (using HDFS) and, in the unlikely event of a process or container failure, they are all automatically re-launched and resumed from the last snapshot.

case-study-extreme-scalability-processing-flow

Operator Library

The application is built using a number of standard building blocks (Operators) from DataTorrent’s open source Operator Library, called Malhar. The operators used here are:

  • InputReceiver – This input operator simulates the reception of the data from an external source. As the amount of data grows, the operator is automatically scaled to multiple instances when translated into its physical layout.
  • GenerateDimensions – Accepts the data from InputReceiver and generates values over several configured dimensions. Multiple new events are generated per each input event. This operator is automatically scaled to the same number of instances as InputReceiver.
  • PrereqAverage – Calculates the sum and count of the number of times each dimension key is encountered. This calculates the partial sum for each key. During the runtime, the number of physical instances of this operator is the same as the number of instances of the two previous operators. This is done to distribute the average calculation across the cluster and to avoid creating a bottleneck.
  • Average – Calculates the average for each key across all the input data over the configured dimensions. This operator also generates alert events if the computed average for any key is above a preconfigured threshold (70% was used in this example). In the physical plan, it is also possible to run multiple instances of this operator, but for this specific example, one instance was sufficient.
  • RedisAverageOutput – This operator writes the computed averages for each dimension key to the Redis NoSQL store. It is possible to create more than one instance of the RedisAverageOutput operator (so that each physical operator instance would write to another database of the same Redis instance or to a different Redis instance). Typically, the number of such output operator instances depends on the number of keys and the volume of data. In this example, one physical instance provided sufficient performance.
  • SmtpAvgOperator – Sends the mail using standard SMTP protocol to a preconfigured e-mail address upon receiving an alert event from the “Average” operator. While theoretically it is also possible to have multiple such operators working in parallel with multiple mail gateways, in practice one should be enough in almost all cases.

Streams and Locality concept

To provide the best combination of performance and resource utilization for different use cases, DataTorrent’s platform supports multiple locality modes, including Auto (usually mapped to Node or Rack), Thread, Container, Node and Rack-based locality.

When configuring the operators, you need to configure the desired locality per stream to get the optimal throughput. In some cases, testing different settings is advised to identify and address bottlenecks. In our example, the following configurations were used:

Stream NameLocality

generate_dimension Container
prereq_calculation Thread
inter_avg Auto
avg_output Auto
avg_alert_mail Auto

In our tests, we have observed that avg_output and avg_alert_mail would be mapped to use ‘Node’ locality, while different instances of inter_avg stream would get either ‘Node’ or ‘Rack’ locality. We discuss more about this below.

Scalability Considerations

The DataTorrent platform has been designed to automatically scale as the number of events per second and/or each event size is increased. The user does not need to write any special code, but simply use the Operator Library and define certain configuration properties. When trying to scale to hundreds of millions of events per second, you can rely on the platform’s “brain” for automatically managing the various processes, in addition to using best practices for scalability and testing of your particular data stream and infrastructure.

In our case, we converged on the following set of configurations:

Parallel portioning and processing - DataTorrent allows the processing of incoming events (ingestion) using multiple input operators working in parallel. In our case, we have configured the system to use a different number of physical instances of InputReceiver depending on the load. We started with a single instance for 1M events per second, growing to 250 instances to achieve 350M events/sec, and going to 362 instances for 500M events/sec. The downstream operators, GenerateDimensions and PrereqAverage, were configured to inherit the same number of physical instances as the InputReceiver operator.

Fine-tuning stream locality - In our example, setting the ‘Container’ locality for the generate_dimension stream (between the InputReceiver and the GenerateDimensions operators) gave much better performance than setting other localities. Similarly, setting the ‘Thread’ locality for the prereq_calculation stream improved the throughput. Combined, it achieved a performance boost of about 20%.

Minimizing the operator’s snapshot size - In cases where there is no requirement to carry the internal operator state between the application windows (in our case configured to 10 second – 10 streaming windows of 1,000 milliseconds each), it would be wise to clear the internal state upon receiving an internal endWindow event. Specifically in our example, Map object is used to store the Key -> (sum, count) pair for each window inside the PrereqAverage operator. Promptly clearing the accumulated state prevented writing unnecessary data during check-pointing to HDFS, which would have negatively impacted performance.

Splitting the load using Cascading Unifier - DataTorrent automatically creates new instances of some operators when the load increases. However, in some situations, the specific configuration for certain use cases would yield improved performance. In our case, one of the bottlenecks identified was the calculation of average values over the very large data set. DataTorrent provides efficient tools, like Cascading Unifier, to deal with such problems in the typical Hadoop way of “divide and conquer.”

In our example, we divided the calculation into two different operators – PrereqAverage and Average. The first operator, PrereqAverage, is configured to use a parallel partition, as described above, while Average is configured to use a single instance. The DataTorrent platform recognizes such flows and automatically inserts built-in Unifier between upstream and downstream operators to transform multiple input streams into one output stream. By default, the Unifier has a “pass-through” behavior, which is sufficient in most cases, but created a bottleneck in this case due to high fan-in. To overcome this, we used a custom Cascading Unifier (which calculates the partial sum for the dimension keys and helps to split the load) by handling the configurable number of input streams from PrereqAverage using the UNIFIER_COUNT attribute. We have set this attribute to 8. The platform automatically inserts several layers of physical Cascading Unifier instances, each layer having 8 times less physical instances than the previous one. For example, when we had 256 input Operators, the platform added the layer of 32 instances, and then another layer of 4 instances, followed by one more layer of 1 instance, which provided the input to Average Operator.

Results

We tested the application by feeding it increasing data loads, making sure the application processes the sustained stream of events at a given speed without memory leaks and without ever losing any data. As loads increased, the DataTorrent platform automatically added additional containers to effectively handle it. The number of physical operators increased from less than 10 to more than thousand at maximum speed, running 460 containers. The platform performance scaled linearly, effectively distributing the increasing load between newly allocated hardware resources.

case-study-extreme-scalability-linear-scalability-graph

Download as PDF