How Apex serializes events using StreamCodecs

A stream in Apache Apex (now incubating) represents a unidirectional flow of data from the output port of an operator to the input port of another operator. Apex uses StreamCodec as a mechanism to serialize events flowing across a stream that connects two operators. Before an event is sent from output port to input port, it must be converted into a byte array, so that it can be transferred across the network. By default, Apex uses Kryo for serialization; however, an API is also available for defining custom StreamCodecs. Apache Apex supports API that allows you to create your own Custom Codecs.

What is StreamCodec?

Before we dive into a sample implementation, let’s see what StreamCodec is all about. StreamCodec is an attribute of an input port that is receiving events. It specifies how to convert an event to a byte array before sending it over the network (serialization) and back again at the receiving end (deserialization).

When you enable partitioning for a downstream operator, StreamCodec also decides which physical partition will receive a data tuple. You can extend this functionality to filter events before sending them to downstream operators.

This diagram shows an upstream operator(U) connected to three downstream operators(D1, D2, and D3). Each downstream operator has a separate StreamCodec C1, C2, and C3 respectively. As shown, each operator can receive different filtered data.

When to use Custom StreamCodec?

In normal scenarios, the default Stream Codec will suffice. However, you may want to implement a custom Stream Codec for any of the following reasons:

  • • For custom serialization-deserialization for enabling compression or encryption.
  • • For defining custom partitioning. By default, Apex sends an event to a partition based on its hash code.
  • • For event filtering before sending the data over a stream.

Custom StreamCodec for Serialization and Deserialization

For building a sample StreamCodec, let’s consider a simple application that analyses a log file and counts statistics such as the number of log lines per class name. In the Lifecycle of DAG you saw how an application DAG is created. Following the same steps, the DAG for this application looks like this:

LogOutputGenerator emits log lines, which are transmitted to LogAggregator operator. Since each log line could be long, we can optimize the cost of network transfer, by compressing the line text. This can be done by using a simple run length encoder with a custom StreamCodec. We’ll try to write a custom codec for the stream connecting LogOutputGenerator and LogAggregator. Here is how the code looks like:

public class SampleStreamCodec implements StreamCodec 
{
    @Override
    public Object fromByteArray(Slice fragment)
    {
      // Custom logic to deserialize 
      return decode(new String(fragment.buffer));
    }


    @Override
    public Slice toByteArray(String o)
    {
      // Custom logic to serialize
      return new Slice(runLengthEncode(o).getBytes());
    }
}

 

The methods toByteArray and fromByteArray handle serialization and deserialization.

For simplicity, assume that each log entry contains only letters. A simple run length encoder and decoder can, thus, be written to compress each line, as shown below:

/* From http://rosettacode.org/wiki/Run-length_encoding#Java */
private String runLengthEncode(String o)
{
  StringBuffer encodedString = new StringBuffer();
  char previousChar = o.charAt(0);
  int cnt = 1;
  for (int i = 1; i < o.length(); i++) {
    if (previousChar == o.charAt(i)) {
      cnt++;
    } else {
      encodedString.append(previousChar);
      encodedString.append(String.valueOf(cnt));
      cnt = 1;
      previousChar = o.charAt(i);
    }
  }
  return encodedString.toString();
}


private String decode(String o)
{
  StringBuffer decodedString = new StringBuffer();
  Pattern pattern = Pattern.compile("[0-9]+|[a-zA-Z]");
  Matcher matcher = pattern.matcher(o);
  while (matcher.find()) {
    int number = Integer.parseInt(matcher.group());
    matcher.find();
    while (number-- != 0) {
      decodedString.append(matcher.group());
    }
  }
  return decodedString.toString();
}

 

With this custom codec, we can improve the performance of this application, by minimizing the network overhead.

In order to set this StreamCodec, simply specify STREAM_CODEC attribute for input port of Log Aggregator operator. This can be done by adding one line in the populateDag method of the application:

dag.setInputPortAttribute(logAggregator.input, PortContext.STREAM_CODEC, new SampleStreamCodec());

 

Custom StreamCodec for Partitionings

Let us now consider second use case for Stream Codec i.e. for Partitioning. Assume that in this application, LogAggregator is a partitioned operator with 2 physical partitions {0, 1}. With the default Stream Codec, the data line will be partitioned based on entire log line’s hash code. Since LogAggregator combines log lines per class name, it makes sense to partition the log entries by class name so that log entries for a class go to the same physical partition.

If getPartition returns 0, the entry will be picked by the first partition. If it returns 1, the entry goes to the second partition, and so on.
Here is the method for partitioning data by class name:

@Override
public int getPartition(String o)
{
  // Custom partitioning
  String[] parts = o.split(":");
  return parts[0].hashCode() %2;
}

 

The getPartition method in the StreamCodec returns a partition number. When specifying partitioning for an operator, a mask is also supplied by the operator. The mask is applied on top of getPartition to get the final physical partition number where the data is sent. For more information on setting partition keys for an operator, see the Partition section in Application development guide.

Custom StreamCodec for Filtering data

Lastly, consider a scenario where you are only interested in log entries from classes belonging to a particular package (for example, com.my.package) where log entries from other classes can be ignored. In this case, you shouldn’t even send the extra log entries to LogAggregator, right? How can this be done with StreamCodec so that there is no overhead of sending unnecessary data across network?

You can do this by modifying the getPartition method slightly. For all valid class names, return hashCode of class name as before. But for all class names to be ignored, return a partition number that won’t be picked by any physical partition!

Let’s modify the getPartition Method:

@Override
public int getPartition(String o)
{
  // Custom partitioning
  String[] parts = o.split(":");
  if(parts.length() > 0 && parts[0].conatains("com.my.package")) {
    return parts[0].hashCode() % 2;
  }
  return 2;
}

 

Since LogAggregator only picks up partitions {0 and 1}, when partition 2 is returned, it is not received by any physical operator. This causes dropping of events that are not needed, thereby resulting in filtering!

Stateful StreamCodec

Apex exposes an interface called StatefulStreamCodec which extends the StreamCodec interface. The extended interface provides an additional performance benefit in serialization and deserialization by maintaining a state within the codec itself. For example, instead of sending a fully classified class name along with an object, the codec can store an integer. This creates a mapping from the class name to the integer. The serializer and deserializer both use this mapping state to convert object to and from bytes, followed by the serialized integer being sent instead of the class name. The state or the mapping is also sent across whenever there is a change. This results in significant performance benefits by reducing the number of bytes sent over network. For example, if we encounter a new class name, the state is updated with the new mapping from class name to integer. When we encounter the same class name again, the state is not sent again. Instead, the previous mapping is used to serialize and deserialize.

In the example of LogAggregator, each log line has a full class name. You can use the stateful StreamCodec, to compress each log lines even further. Let’s extend the StreamCodec implementation to a stateful one. Assuming the log lines are of the format : , split this string using “:”. Compress the first part or the class name by using the stateful StreamCodec, which creates state mappings from class name to an ID. The second part of the line can be compressed using the runlength encoding. The methods fromDataStatePair and toDataStatePair handle the logic of maintaining states and serialization-deserialization using the states. Here is the code snippet:

public class SampleStatefulStreamCodec extends SampleStreamCodec implements StatefulStreamCodec
{
  HashMap<String, Integer> mapping = new HashMap<>();
  HashMap<Integer, String> receivedState = new HashMap<>();
  int id = 0;


  @Override
  public Object fromDataStatePair(DataStatePair dspair)
  {
    if(dspair.state != null) {
      // Store state for future reference
      byte[] stateBytes = dspair.state.buffer;
      int classId = ByteBuffer.wrap(stateBytes).getInt();
      String value = new String(stateBytes, 4, stateBytes.length - 4);
      receivedState.put(classId, value);
    }
    // Decode data part of DataStatePair
    int classId = ByteBuffer.wrap(dspair.data.buffer).getInt();
    String logLine = new String(dspair.data.buffer, 4, dspair.data.buffer.length - 4);
    String className = receivedState.get(classId);
    String value = className + ":" + decode(logLine);
    return value;
  }


  @Override
  public DataStatePair toDataStatePair(String o)
  {
    // Custom logic to serialize
    DataStatePair pair = new DataStatePair();
    String[] parts = o.split(":");
    Integer classId;
    String className = parts[0];
    if (!mapping.containsKey(parts[0])) {
      // Send state mapping if not already encountered classname
      classId = ++id;
      mapping.put(parts[0], classId);
      byte[] bytes = ByteBuffer.allocate(className.getBytes().length + 4).putInt(classId).put(className.getBytes()).array();
      pair.state = new Slice(bytes);
    } else {
      classId = mapping.get(parts[0]);
    }
    String value = runLengthEncode(parts[1]);
    byte[] bytes = ByteBuffer.allocate(value.getBytes().length + 4).putInt(classId).put(value.getBytes()).array();
    pair.data = new Slice(bytes);
    return pair;  
  }
}

 

One important thing to note is that the state is cleared at every checkpoint which occurs after every 30 seconds (by default). In event of failure of an operator, the internal state will be lost, which is why you must reset the state after checkpointing. Apex handles it by invoking the resetState method periodically. The codec should override this method to clear any internal mappings:

 @Override
    public void resetState()
    {
      mapping.clear();
      receivedState.clear();
    }

 

With that, you saw flavours of stream codecs supported by Apex and their usage. Feel free to try it out in your Apex cluster or on Sandbox!
Here are some additional useful resources:
DataTorrent Sandbox: https://www.datatorrent.com/sandbox-quick-start/

Building Applications with Apache Apex and Malhar: http://docs.datatorrent.com/tutorials/topnwords/

Application Developer Guide: https://www.datatorrent.com/docs/guides/ApplicationDeveloperGuide.html

Operator Developer Guide: https://www.datatorrent.com/docs/guides/OperatorDeveloperGuide.html

Malhar Operator Library: https://www.datatorrent.com/docs/guides/MalharStandardOperatorLibraryTemplatesGuide.html