A majority of the big data setups still use files and streaming applications and platforms are a new concept. For a streaming platform to be widely usable, it should be able to handle files and process them in a performant way. Apache Apex provides out of the box support for handling files. This comes in the form of various operators for reading and writing files in a scalable and fault-tolerant manner. In this blog post, we will discuss some of the fault-tolerance mechanisms provided by Apache Apex for file operations.


To understand fault-tolerance in any Apex operator, the reader should be aware of the concept of streaming windows in Apex. Essentially, windows are finite segments in an unbounded dataset. At the input operators in the DAG, the Apex platform inserts markers (control tuples) into the stream which mark start and end of a window. Each window is assigned an ID called Window ID that traverses the entire DAG, which helps with checkpointing the state of the operators and recovery after failures. More information on windowing can be found here.

The Apex platform provides fault-tolerance out of the box for any intermediate operator in the streaming application. However, input and output connectors that interact with external systems need more than platform support. For example, an input operator should be able to replay tuples of a window that it has already finished processing when it is restarted after a failure; similarly, an output operator may have to cleanup data written beyond the recovery window when it is restarted at that window after a failure.

Reading From Files

For reading from files with fault-tolerance, Apex provides an operator called AbstractFileInputOperator. This operator requires a directory, which it scans periodically, and finds new files to read and parse. The LineByLineFileInputOperator, which is an extension of the above abstract operator, reads lines from files and outputs them.

Fault-Tolerance Aspects

1. Never loses a record

Platform support of stateful checkpointing and automatic recovery helps to achieve this. The properties and the state of the operator are preserved by the checkpointing mechanism.  The important property in this operator is the folder to scan and the state is:

        1. Set of files processed.
        2. Current file and the offset to which it has been read.

After a failure, the operator is restored using a checkpointed state. With this state, the operator avoids losing any data and at the same time restart processing from an earlier point in the stream that is reasonably close to where it had gone down.  This is illustrated in the diagram below.


This example assumes default checkpointing frequency of 60 streaming windows. The input operator is checkpointed after window 59 is completed. From window 60, it continues reading file 3 from offset 1000. The operator fails at window 89 (an arbitrary window before the next checkpoint) and is restored in another container with the checkpointed state after window 59. The operator will now resume its processing from the beginning of window 60. The saved state lets the operator know that it had processed file1, file2 and file3 till offset 1000 so it opens file3 and starts reading it from offset 1000.  

2. Idempotent processing

In Apex, idempotent processing means that a finished window will always have the same records and a record is associated with exactly one finished window. A finished window is a window for which this operator has completed the endWindow() callback. In order to support idempotent processing, an input operator needs to be able to replay a finished window. To understand the importance of idempotency, let’s consider the below example application:


CountStoreOperator persists records to a database exactly once using JDBC. The operator relies on the idempotent processing of the upstream operator to ensure exactly-once semantics which is explained in detail here.

If the lineInput operator cannot replay a window with exactly the same tuples as before, it can result in either data loss or duplicated output down the pipeline. This is explained by extending the example in diagram1.


In the above example, the lineInput operator emits lines 1001 and 1002 in window 60. These lines are persisted by the countStore in the same window to the database. In addition to persisting lines, the store operator also saves the window Id 60 in an additional metadata table in the database as a window Id that it has processed completely (refer End-to-end “Exactly Once” with Apache Apex).

The lineInput operator fails on window 61 and so both the operators are re-deployed in new containers. These are restored with the state at the end of window 59. When lineInput starts processing window 60 after recovery, it emits line 1001, 1002 & 1003 in window 60. The countStore compares the current window (window 60) to the one saved in the metadata table. If the current window is less than or equal to the stored window then it will ignore all the tuples of that window because any updates related to those tuples are already persisted. Therefore, line 1003 is lost. If window 60 replays with just line 1001 with window 61 containing 1002 and 1003, line 1002 would be duplicated.

Records are lost when a finished window contains more records on replay. Alternatively, records are duplicated when they migrate from a finished window to a later window on replay.

The Apex library provides a utility called WindowDataManager which helps input operators to replay a finished window. This utility persists incremental state for each finished window. An implementation (FSWindowDataManager) that persists incremental state on file system is also provided in the same class.

The AbstractFileInputOperator uses WindowDataManager to repeat a finished window after failure so the lineInput operator will save incremental state information every application window, which is a list of objects of type RecoveryEntry.chandni_blog_p2.png

Each recovery entry tells how much a file was read in the corresponding window.  The diagram below illustrates how this information is used to replay a window.


The lineInput operator at the end of window 60 uses WindowDataManager to save incremental state which indicates that it read file 1 from offset 1000 to 1400 in that window. It fails on window 61 and recovers at window 60.  WindowDataManager provides a list of windows for which incremental state was persisted. Since window 60 is among those, the recovery entry is fetched in the beginWindow() callback and only the portion of the file corresponding to the recovery entry is read.

Writing To Files

For fault-tolerant writing to files, Apex provides AbstractFileOutputOperator. This operator is designed to persist incoming records to a single file or multiple files. Additional optional features that the operator provides are:

  1. Automatic rotation of files based on either file size or window count or both.
  2. Compression and encryption of data before it is persisted in files.

Below is a simple extension of AbstractFileOutputOperator that enables rotation by size as well as windows and persists all the records to a file called outputFile:

public class LineOutputOperator extends AbstractFileOutputOperator<String>
   private static String FILE_NAME = "outputFile";

   public LineOutputOperator()
       setMaxLength(67108864);     //64 megabytes
       setRotationWindows(7200); //1 hour considering default values

   protected String getFileName(String tuple)
       return FILE_NAME;

   protected byte[] getBytesForTuple(String tuple)
       return tuple.getBytes();

AbstractFileOutputOperator can write to multiple files. The file to which a particular record is persisted can be derived from the tuple. However, the LineOutputOperator here writes everything to a single file. It extracts the bytes of the record from the tuple and writes it to the output stream. It should be noted that the stream is not flushed after every write; instead it is flushed in the endWindow() callback.

Fault-Tolerance Aspects

For this operator, fault- tolerance is defined as writing each record to a file exactly once, that is, a record is never lost or duplicated.

Writing exactly once

The operator, just like any other Apex operator, leverages the platform capabilities of stateful checkpointing and automatic recovery. However platform support is not sufficient to provide the exactly once guarantee. Let’s consider the following application to understand the problem.


If lineOutput operator doesn’t cleanup data written beyond the recovery window when it is redeployed after failure then there will be duplicate lines in the output file as depicted in the diagram below.chandni_blog_diagram4.png

In this example, lineInput operator emits lines 1001 and 1002 which are flushed to the outputFile by lineOutput operator in the endWindow() callback. However, lineInput fails on window 61 and both the operators are re-deployed with the state checkpointed after window 59. After recovery, lineInput operator will replay the window 60. If the lineOutput operator doesn’t perform any cleanup after recovery, then it will end up writing lines 1001 and 1002 again to outputFile.

The base AbstractFileOutputOperator avoids writing duplicates by effectively truncating outputFile to its size after window 59. Therefore, it keeps a state which comprises mainly of the size of each file that it has written to so far.  This is depicted in the diagram below.


Here, in the setup() callback after redeployment, the lineOutput operator truncates outputFile to size 1000 which was saved in the checkpointed state. From window 60, it starts its normally processing of writing records to files.

An example application which reads records from kafka and writes them to a file exactly once is present here.

Writing exactly once to HDFS

When writing to HDFS, the fault-tolerance/recovery process described above is more complicated due to the HDFS lease mechanism.

HDFS allows multiple readers but only a single writer.  Before a client writes to a file, it has to obtain the lease for that file. This lease acts like a lock which prevents other clients from writing to that file. However, any failure may terminate the client. When a client is terminated, any lease it had acquired may still be active and may prevent other clients to write to the associated file.

Here the client is an instance of AbstractFileOutputOperator which acquires the lease of a file when it opens an output stream to that file. If this instance fails before it can close the stream to that file, then there will be a dangling lease of that file in the Name Node. The platform will restore the failed operator in another container. This new instance is the new client which will try to write to the same file but will fail with a lease exception.

The AbstractFileOutputOperator handles this by always writing to a temporary file and renaming the temporary file to the original file only when there is no possibility of writing to that file again. This process is called finalization. To request a file to be finalized, the extensions of this operator can invoke the method requestFinalize(String fileName). This request can be made just after it is known that the file will not be open again. When rotation is enabled, a part file is requested for finalization as soon as it gets completed and the new part opens. The actual finalization happens in the committed(long window) callback for the window in which the file was requested to finalize.

Writing to temporary files makes recovery cumbersome because a temporary file changes with failures. The temporary file name is derived by appending the system timestamp at its time of creation to the original file name. Therefore, in addition to the size of each file, the operator now also tracks temporary file name of each file. This is part of the operator’s state which it uses during recovery to copy the valid data from the existing temporary file to a new temporary file as shown in the diagram below.


In this example, the lineOutput was writing to the temp file outputFile.0.1463040570 before failure. After failure, it copies 1000 bytes from this temp file to outputFile.0.1463065130.tmp. In Window 199, the part 0 of the file gets completed so it is requested for finalization. The actual finalization is done only when window 199 is committed.

In conclusion, Apache Apex Malhar library provides a rich set of connectors that work with the Apex platform to be fault-tolerant and provide exactly once guarantees.