With the advent of Hadoop, “High Availability” (HA) has become not so costly affair for enterprises and naturally it has become very desirable aspect of any Enterprise BigData Application. With maturing HDFS and YARN APIs, Hadoop application developers are able to ensure that not only failed worker processes can be replaced with the healthy ones but also the master process which orchestrates these workers can be corrected if sick.[1] Sounds like a solid foundation for HA. Right?

Hadoop does a pretty good job in providing a good building block using HA for HDFS and rich YARN API to control various processes. Yet a lot needs to be done to make any system that processes data to achieve an Efficient HA. Here at DataTorrent, we define the Efficient HA as the one which has the following characteristics:

  • Auto Fault Resilient Any application or hardware fault should be auto-detected and corrected by replacing all the impacted components with new good ones. The detection and recovery mechanism should not mandate human intervention.
  • Stateful  The newly reinstated components should be bootstrapped with a known good state of the faulty component that was captured not too long ago before the fault.
  • Orderly  The data that is being processed by these components should be presented to the new component in the same order as delivered by the sources.
  • Lossless  There should absolutely be no loss of data or even duplication of data. The output of the HA Applications should not be burdened with doubts on data accuracy due to any short lived failure.
  • Timely  The fault detection and recovery should happen in reasonably quickly so as not to impact the timely processing characteristics of the application. e.g. real-time application recovery and catch up with backlog should happen within seconds if not instantaneously.

These characteristics ensure that irrespective of the number of failures, the output of your application will be the same as it would have been if there was no failure. In other words if you were not paying close attention to the processing pipeline, you would not even know that there was a failure!

These are the characteristics that stare many Hadoop application developers in the face when they set out to write fault tolerant applications. Hadoop provides only a basic API to achieve an efficient HA.   Without any platform help, these developers are left on their own to fill this gap between Hadoop ecosystem and the application developed to work with it. This is precisely the gap which we abhor at DataTorrent. DataTorrent RTS platform (RTS) tries to address the efficient HA using familiar Java language constructs and it’s concisely simple API. That’s right ‘ RTS does much more than just the real time streaming applications. It’s a platform to write efficient HA applications in Hadoop without implementing the code for HA. The platform provides many bells and whistles to guarantee HA out of the box. So application developers can use most of their time to focus just on the business logic. We take pride in enabling this capability and rightly enough our customers love to challenge us on their specific use cases.

One of the use cases that we encountered recently was interesting enough to mention at this point. The application the customer developer using RTS required a warm up time of approximately 6 hours. During this time the application logic would build the state form zero to the one which would start giving the results in real-time which would match their traditional pipe line which processed terabytes of data every hour. The difference between doing the job in real-time vs in batch mode actually amounted to 10s of millions of dollars annual cost saving. The customer was very happy giving that 6 hours of warm up time. But did I say that our customers love to challenge us? As they learn more about the system, they wanted to make sure that even when they roll out changes to their application logic or upgrade the Hadoop cluster, they did not want to pay the penalty of warm up time. It did make sense to us and we added a feature to the platform which allowed the new application instance to inherit the state from the old application instance and continue processing from there. Since the state is already built, the results from the new application instance are already usable as soon as they come out. To prove that the feature worked we devised a test. Let’s use a picture to describe the test.

Both-Applications-Active

The incoming Flume event stream is split into 2 duplicate events streams. Top one feeds to RTS Application, which we have decided not to induce any failure in. The bottom one feeds into RTS application that we will induce failure by virtue of bringing down the Hadoop cluster which supports it. DB1 stores the output of the uninterrupted application. DB2 stores the output of application which goes through downtime. Needless to say that in steady state both the applications are acting on the streaming data in real-time and have the same output recorded into DB1 and DB2 respectively. This matching output is the success criterion for the test.

The test is that we terminate the fully warmed up RTS bottom application just before the lunch by bringing down the HDFS completely for that cluster. Go for a regular lunch. The time taken for a typical lunch is typically how much the cluster upgrades (should) take usually. Come back and restart the RTS application. Just for fun, let’s call this test “The Lunch Test”.  We have conducted the test multiple times since we devised it. And it has passed consistently. You must be wondering how works.

It’s pretty simple to explain. When DataTorrent RTS Application is shutdown, your test setup gets converted into this:

One-Application-Down

 

DTFlumeSink gives the data to the downstream application if present. When the application is down, the channel between the Flume and DTFlumeSink buffers the data as DTFlumeSink stalls the consumption of the events from the channel. Once the application comes back up it resumes the consumption and delivers the events as fast as the downstream application can consume and quickly churn through all the data. If you pay attention to channel capacity, this is the typical graph you notice:

Event-Drain

Build up of events during downtime and subsequent drain after recovery

The rapid drain of the events is because RTS application is tuned to be 5 times faster than the rate at which events arrive. This explains how none of the events are lost even when the application is down. But how is the quick warm up achieved?

It’s pretty simple. RTS is packaged with an innovative checkpointing scheme which captures the states of various components of the application periodically. The method makes no distinction between Application Master or Workers and backs up their state on your choice of store. The default store is HDFS. When there is an application failure, the platform is able to identify the impacted components of the application and bootstrap their states from this store before putting the new components back to work. All this happens transparent to RTS application developer. In fact, this idea is so core to RTS that the application developers need to go out of the way to disable it.

For the applications which tie into the “Flume” messaging bus, this checkpointing scheme also ties into Flume’s transactional semantics. So transaction is committed only when RTS has secured the event data belonging to the transaction as part of its state. The small change – I mentioned earlier we made – in the RTS was to make the new application instance aware of these checkpoints saved by the previous instance of the application. It was relatively a simple change compared to the effort that went into setting the foundation for RTS.

We had to write the DTFlumeSink to tie into Flume’s transactional semantics. We could have also used Kafka in place of Flume. In Kafka’s case the read offset support in it would invalidate the need for any component like DTFlumeSink to be part of the application.

Through this use case we saw that without having to make any change to your application code how a small change to DataTorrent RTS was able to bring in efficiency in your business logic. The discussion of the actual application code is an irrelevant, lengthy (and rather customer confidential) one. So let’s save it for another day. Rest assured that it was as simple as the code you would write in an application which *assumes* that there will never be a failure. Enabling Efficient HA out of the box in our platform transparent to the application developer is one of things that keeps us going and reinventing ourselves. Our simple API is an indication of it, so be sure to check it out.

If you have questions or feedback on the Efficient HA or need advice on your use cases – we are all ears.