At DataTorrent, over the past 2.5 years, we have developed RTS, a Hadoop native stream processing platform. At the core, the platform was designed for extreme scalability, low latency and fault tolerance. It is possible to build applications without disk I/O in the data flow, i.e. all processing happens end-to-end in memory and therefore overcomes the typical limiting factor for latency and scalability.

Most real world applications however have the need to transfer data to and from persistent storage at some point. For example, there may be the requirement to store all ingested events or serve computed data for dashboards or other downstream processing. Use cases include Tagging for files, Click to source, Historical data, Analytical result data storage or ability to store lots of small files.

Out of the vast and fast growing number of data store solutions (SQL, NoSQL, Key-Value, Columnar, In-Memory grids, etc.), which one is right for a stream processing application capable of processing millions of events per second with corresponding throughput requirements for the data store?

Evaluating third-party solutions was the logical starting point. We use them as part of customer engagements and already have many adapters in our operator library. Why reinvent what is not our immediate focus? So we experimented, debated and tried to find the perfect match in the world of data stores. It didn’t happen, some of the sticky problems are:

  • Scalability and performance limits (see performance section below)
  • Dependency on additional external components
  • Additional developer skill set required, extra complexity even for basic needs
  • Additional operational skill set to manage separate cluster/stack
  • Fragmentation, there is not a de facto data store (unlike HDFS for the file system and YARN for the compute layer).

Many of our applications have a relatively simple data access pattern. We concluded that we want to offer an out of the box solution that works natively on Hadoop and meets the RTS scalability and performance standards.  We named the solution HDHT (Hadoop Distributed Hash Table), to reflect the criteria outlined below.

Criteria for solution

  • Hadoop Native ‘ DataTorrent RTS is a native YARN application, the same should be true for the storage mechanism.
  • Avoid extra components, minimize operational barrier ‘ With a solution native to RTS and HDFS, existing skill set of Hadoop administration is leveraged.
  • Embedded, scales with application ‘ Leverage RTS and HDFS. Needs to scale in terms of operations and storage capacity, for which YARN and HDFS are the pillars.
  • Fault tolerant ‘ Operator components certified as part of Malhar. Leverage RTS and HDFS.
  • Optimized for streaming application and RTS. Random read and random write access with simple programming model and seamless integration. The developer familiar with RTS should be able to persist data without learning another API or system.
  • “Unlimited” storage capacity for historical/archived data. Another capability for which HDFS scalability is foundational.
  • Simplicity ‘ since it is embedded into the RTS operator model, features such as client/server communication or explicit transactions are not required

Programming Model

The programming model of a key-value store or hash table can be applied to a wide range of common use cases. Within most streaming applications, ingested events or computed data already carry the key that can be used for storage and retrieval. Many operations performed during computation require key based access. Hash based stream partitioning paired with a key-value based cache or storage abstraction is a natural fit. Many developers easily understand how to work with a hash table or systems like Memcached or Redis, for example.

Storage Layer

HDFS is mature and designed to store massive amounts of data, traditionally in larger files to fill blocks of 64MB or more. The access pattern is sequential write and read with good throughput characteristics. Files in HDFS are immutable, although append support was added in recent versions.

Scalable random read and write access can be implemented on top of a distributed file system, based on proven concepts. This makes HDFS the natural choice of underlying storage for an embedded key-value store in DataTorrent RTS.  Google pioneered and published the approach for big data scale with its proprietary BigTable store. It is the model of HBase within the Hadoop ecosystem. The HBase region server implements mutation on top of HDFS and its HFile component is one of the supported file formats in HDHT.

The file format in HDHT is pluggable through an interface that supports alternative, customizable implementations. The implementation is responsible for the following:

  • Sequential write of key-value pairs in order. Keys and values are byte arrays, key ordering is configurable through comparator and the default is lexicographical order.
  • Random read by key, range scan by key range, iterative access in key order
  • Optimize for random read using fast seeks and caching (through block structure in HFile or TFile)

Operator Integration

HDHT is part of the Malhar operator library and provides the abstractions to plug into the RTS operator model. They are designed to make it easy to implement mutable persistent storage with key based access within an operator. This layer implements the log structured merge tree to enable high throughput writes (see HBase write path) and fast random reads that support the latency requirements of a streaming application.

Real-Time Stream Processing and Analytics

The diagram illustrates the partitioning. Each partition manages its own state (set of data files). Keys are ordered and point writes translate to rewrite of affected files.

In the typical case, the operator developer extends a writer abstraction that provides:

  • Port on which operator receives the data tuples from the stream with partitioning support.
  • Mapping of operator partitions to underlying storage directories.
  • Basic operations to read and write key-value pairs.
  • Write-buffer, write-ahead-log (WAL) and flushing/compaction of data files.
  • Caching of files readers.
  • Fault tolerance.

The user defines the type of the stream and how tuples are serialized into key-value pairs for storage.

Example

An example application is aggregation of dimensional data. Metrics are computed for different combinations of keys and materialized for reporting and analytics (see Segmentation use case). Aggregation involves a read-incremental-update-write pattern with the data store. Data is aggregated in time slots with writes affecting recent time slots and older time slots becoming read-only.

hdht-blog-img02

An interactive front end (dashboard) may visualize in-flight and historical aggregates. Data for time series is served by the application through asynchronous query protocol as series of point lookups. This is just one example for low latency access requirement.

hdht-blog-img03

For this type of workload to scale, the store needs to support high number random writes to a smaller subset of the total data and fast reads over the entire dataset.

High throughput, low latency and fault tolerant writes are accomplished by buffering updates with write-ahead log and confining updates to as few datafiles as possible by design of the key. In the sample application, data is aggregated by time and data arrives in approximately chronological order. The key can use the timestamp as prefix. Throughput is constrained by the disk I/O operations of write-ahead logging to HDFS,  which scales with the number of operator partitions and data nodes.

On the read side, the system scales with the amount of memory available for block caching. Once blocks are loaded into memory, neighboring key lookups are fast.

The external consumer places queries through publish-subscribe. The operator managing the data partition is also responsible for serving the data. Query results are updated with the streaming window. This allows us to visualize data while it is being computed with very low end-to-end latency.

Performance

Our goal is maximum throughput with milliseconds latency. The throughput numbers presented are for a single partition of HDHT operator (aggregate throughput scales with DFS). Important factors that affect performance:

  • Key and value size.
  • Workload: write and read-modify-write are typical for our applications.
  • Key clustering: Optimal throughput for given time interval when keys fall into small ranges of the keyspace, so that updates to underlying files are minimized.
  • Operator memory: Larger sizes for write cache and block cache (for read access) minimize DFS access and increase operator throughput.
  • WAL flush interval: The log is flushed at the window boundary. The size of the window (time) can be tuned to maximize use of HDFS bandwidth (write pipeline).
  • Data file size: Smaller files reduce the amount of adjacent data that needs to be rewritten for an update. Flipside is increased overhead for namenode and meta data management. The optimal setting depends on key clustering.

Below are a few throughput examples. HDHT settings: 16 byte keys, 1K values, write cache size: 1m entries, data file size: 128MB, file format: TFile, operator memory: 8GB. Hadoop cluster with multiple nodes (each with node manager and data node) and 10Gb/s network.

Workload

Key arrival

WAL flush interval (ms)

Throughput per partition (keys/second)

read-modify-write

sequential

500

45,000

read-modify-write

sequential

5,000

110,000

write

range

500

41,000

read-modify-write

range

500

27,000

read-modify-write

range

15,000

70,000

The results show that for the targeted access pattern an operator using HDHT can significantly outperform the  operator that acts as client to an external store such as HBase. HDHT throughput scales with the underlying HDFS cluster and benefits from native Hadoop file system support at no extra cost. HDHT translates the point writes to batches, which allows it to efficiently utilize the throughput of the distributed file system. In contrast, any separate datastore cluster, even when perfectly tuned, still comes with the extra client server communication and coordination overhead for its higher level API (see HBase write path vs. bulk load).

Outlook

HDHT is currently production ready and available as part of DataTorrent RTS release 2.0. Future plans include optimizations such as improved compaction, data purging, interoperability (export to Hive, ORC etc.) and tools for offline state management.