— Know how the Kafka Input Operator consumes messages from Kafka to Apex applications

For successful launch of fast, big data projects, try DataTorrent’s AppFactory.

Apache Apex (incubating)  supports  high availability, fault tolerance, low latency, and high scalability. But what is all this without valuable input?  Just like luxury cars need high quality fuel, Apex needs a reliable data source to input data.  Apache Kafka, a high-throughput, distributed messaging system, is designed to achieve the same goals as a message queue system. It is the  natural choice as Apex’s  favorite Message Broker  data source. Using Kafka, you can easily build a reliable, real-time processing pipeline from on top of Apex. To align with the Kafka messaging system, Apex supports an out-of-the-box Kafka input operator  for pumping data into Apex applications.

The Kafka input operator[1]  is one of the most popular input operators  in the Malhar  library. It consumes data from the Kafka messaging system, which is later processed in the Apex environment. The Kafka input operator is designed to be partitionable, dynamically scalable, fault-tolerant, and high throughput operator that guarantees no loss of data. It uses the simple consumer api  to consume data from the Kafka  broker[2]. It also talks with the Zookeeper[3] to monitor metadata change, and adjust partitions accordingly at runtime.

Kafka overall.jpg

Figure 1: Data flow for the Kafka Input Operator

Simple consumer APIs vs High-level consumer APIs

The Kafka operator supports both  high-level consumer APIs  and simple consumer APIs. However, only simple consumer APIs  deliver all features in the operator. Let’s compare these JAVA consumer APIs. High-level consumer APIs  are  easy to use, while encapsulating broker changes and consumer failure in a black box. But the more it encapsulates, the less flexible it becomes. There are some scenarios that high-level consumer APIs cannot handle. For example, it doesn’t support customized sticky partition. This means that the overall throughput might not be as optimized as expected and you have no control of which partitions that each client consumes data from. Other than that, it doesn’t support customized offset management either. So developers are unable to keep track of offsets of messages that are processed instead of offsets that are consumed. That is why it is recommended to use Kafka input operator with Simple Consumer. That’s the only option to guarantee no data loss and support auto-discovery driven partitioning.  The remaining sections will discuss about the features and how they are achieved with Simple consumer APIs


Partitioning helps make the Kafka input operator scalable. A single node consumer will likely encounter  bottlenecks when you have multiple partitions for a Kafka topic. Apex supports dynamic partitioning, which is a key feature that allows the Kafka input operator to scale with Kafka partitions. There are two partition strategies supported by default: one-to-one and one-to-many.

The one-to-one partitioning strategy  starts the same number of Kafka input operator instances as the number of Kafka partitions for the topic the operators are consuming. Each operator consumes from only one partition of the topic and remains associated with it. If a new Kafka partition is added to a topic at runtime, the engine deploys a new operator.


Figure 2:  One-to-one partition

The one-to-many partitioning strategy  on the other hand, is a little more complicated.The basic idea is that each physical operator can consume messages from one or more Kafka partitions. For example, if there are 5  kafka partitions  P1 through P5, and we want 2 operators to consume data from those partitions in parallel, Apex starts 2 physical operator instances. The application master assigns Kafka partitions in a round-robin fashion and splits them into 2  operators  O1(P1, P3, P5) and O2 (P2, P4).  Inside each operator, it starts the same number of threads as the number of brokers that host the partitions assigned to it. For example, if previous 5 partitions are hosted on 2 different brokers B1(P1, P2, P3) and B2(P4, P5), operator O1 will have two threads. One reads data from P1 and P3 that sit in the same broker B1. The other reads data from P5, which is in the other broker B2. The reason behind this thread model is to maximize the throughput while minimizing the resources that the operator uses. The one-to-many strategy guarantees dynamic scalability along with Kafka partitions, which means if you add new Kafka partitions or alter the partition metadata, the system will automatically associate them with threads in one of the operators using same thread model.


Figure 3: One-to-many partition

Fault tolerance

Fault tolerance is a must have for the Kafka input operator. When we talk about failures in distributed systems, we assume that they can occur on both the Kafka broker side as well as the Kafka input operator side.  

The failures on the Kafka broker side include all possible events that can trigger leader broker change, such as  broker node outage, broker process being killed, or someone just manually changing the partition  assignment. These scenarios may or may not cause exceptions on the consumer side, but the operator will always monitor leader broker changes through Zookeeper, and make adjustments accordingly. In a one-to-one strategy, fault tolerance is achieved easily; the Kafka operator switches connection to new broker. In contrast, in a one-to-many strategy, besides switching, the operator maintains the thread model by creating threads for new brokers or by killing threads that are no longer required.  

Failure recovery on the Kafka input operator is easy because it is supported by Apex natively. The failure recovery we talk here assumes using Simple consumer APIs. With the Kafka input operator using Simple consumer APIs, in event of an instance going down, another is brought to life by the Apex engine. Other instances are not affected, and the operator mechanisms proceed without interruptions. And also the recovered operator can resume from the safe checkpointed offsets. For details, see the Apex developer guide.

Offset management

Offset management is another key component for the kafka input operator. It is very useful in some scenarios, for example, set initial offset to any arbitrary position instead of just the tail or the head of a message queue. Another good example is, when restart an application, consuming from certain points to continue from messages processed versus messages consumed. These scenarios require offsets to be exposed and managed within the developer’s logic. We have a very simple interface called OffsetManager  to deal with the offsets. This is how the code looks:


public interface OffsetManager {
  public Map<KafkaPartition, Long> loadInitialOffsets();
  public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions);


The loadInitialOffsets method is called by the operator upon activation. It informs the operator about the point where it starts or resumes consuming. The updateOffsets method is called with the current offsets every time a new message is processed. This simplistic interface has its advantages, you can choose any data storage you want to keep the offsets in. They can be stored in a database, hdfs, or Kafka itself. Moreover, you can store the offsets along with the window id from the Operator  interface so that in event of an operator failure, you can resume consuming from the offset that was processed as opposed to the offset that has been consumed. To know more about this, read this help.


Kafka’s potential is best leveraged with a good stream processing platform. Apex is such a good platform to process data from Kafka. It ensures high throughput, scalability, and fault-tolerance. The seamless integration between Kafka and Apex using the Kafka input operator requires  minimal developer effort, with a guarantee of zero data loss. With Apache Apex (incubating), the support for these features from a platform level makes the task a lot easier.

DataTorrent helps our customers get into production quickly using open source technologies to handle real-time, data-in-motion, fast, big data for critical business outcomes. Using the best-of-breed solutions in open source, we harden the open source applications to make them act and look as one through our unique IP that we call Apoxi™. Apoxi is a framework that gives you the power to manage those open source applications reliably just like you do with your proprietary enterprise software applications, lowering the time to value (TTV) with total lower cost of ownership (TCO). To get started, you can download DataTorrent RTS or micro data services from the DataTorrent AppFactory.