The most literal meaning of data ingestion is absorbing the data in. Even though ingestion is a problem by itself, it’s often insufficient just to take the data in. The broader term ingestion refers to discovering the data sources, importing the data from these data sources, and processing this data to produce intermediate data often for later use by various dependent systems. Between the time the data is ingested and the time it is used by these other systems the data rests in a durable data store such as the one provided by databases or file systems. So, ironically, the process of ingestion also involves sending this data out to these durable data stores as well.

The concept of data ingestion has existed for quite some time, yet the changing magnitude of the data has caused evolution in ways to ingest it. Until recently CEP Engines were considered the state of the art data ingestion mechanism. But then with big data computation the practical boundaries on the amount of data that can be brought together in one place and the amount of processing power that can be employed to analyze it are getting pushed far beyond the CEPs traditional capabilities. The trend for big data ingestion is to design the solutions which are scale out rather than scale up. While processing part is largely addressed by MapReduce paradigm, the ingestion part does not have an operationally viable paradigm in the big data eco system. It’s the part which each enterprise is designing ad hoc for their specific needs. Furthermore expertise needed to design an operational solution is very rare and that further impedes development of an operational ingestion solution. These limitations have bounded enterprises in terms of the amount of data such a solution can ingest, the problem set to which it can be applied, and an ability to operate it optimally. These bounds are further exaggerated by advent of internet of things which is causing omni-flood of data through sensors deployed all around us. This flood is just getting started and only expected to get worse. So how do we design an ingestion platform which not only addresses the scale needed today but also scales out to address the needs of tomorrow? This is the problem which is staring in the eyes of enterprise architects.

At DataTorrent, we are fortunate enough to work with not only the small technology companies but also the very big enterprises which are leaders in the domains such Internet of Things or Ad Tech. These enterprises have needs to ingest a very large amount of data with minimal latency from variety of sources. While designing and building the solution using Apex to address their data ingestion needs, we validated a common pattern for ingestion across these various domains. The pattern can be described in a very simplified way using the following diagram.

ingestion

Each of the blocks signifies a specific stage in the ingestion process.

  • Input: Discover, and fetch the data for ingestion. The discovery of data may be from File System, messaging queues, web services, sensors, databases, or even the outputs of other ingestion apps. These are just a few examples. The possibilities are limitless.
  • Filter: Analyze the raw data and identify the interesting subset. Filter stage is typically used for quality control, or to simply sample the dataset, or to parse the data.
  • Enrich: Plug in the missing pieces in the data. This stage often involves talking to external data sources to plug in the missing data attributes. Sometimes this may just mean that the data is being transformed from a specific form into the form suitable for downstream processes.
  • Process: This stage is actually meant to do some lightweight processing to either further enrich the event or transform the event from one form into the other processed form. In a way there is a good overlap between the functionality of enrich stage and that of process stage. The main difference is that Enrich embellishes by employing external systems whereas process usually computes using the existing attributes of the data.
  • Segregate: Often times before the data is given to downstream systems it makes sense to bundle them so that similar data sits together. Even though not always necessary for boxcarting aka compaction, segregation does make sense most of the times for boxcarting the data.

Even though we specified the most logical order for Filter, Enrich, Process, and Segregate stages, it’s not necessary that these stages occur in the same order. The order and even the number of instances of each of these steps are dependent on the specific ingestion application.

  • Output: With Apex, outputs are almost always mirror of inputs in terms of possible things they can do. They are as essential as Input. While input fetches the data, output rests the data; either on durable storage systems or to other processing systems.

Another possible ingestion architecture is simplified hub and spoke architecture. Where both the hub and the spokes are essentially running the same application.

hub n spoke

There are so many different ways in which each of the processing blocks can manifest themselves in ingestion app. It’s almost impossible to capture all the use cases in a single app. So we decided to leave the ingestion app as a template to model concrete ingestion apps after.

DataTorrent AppFactory has a lot of sample data services that help in common IT use cases based on data ingestion. These data services support wide variety of sources and sinks. The types for both input and output could be one of HDFS, NFS, S3, Azure Store, FTP, Kafka, JMS, JDB/Databases, Solace, RabbitMQ, …

Of course it’s developed using our flagship open source product Apex which is licensed under Apache 2.0. We are in the process of incubating Apex and associated operator library Malhar as an Apache Software Foundation project. Apex allows you to write ultra low latency applications which not only scale out elastically to meet the SLA but also are fault resilient as they recover on their own in a stateful manner. All Apex applications are DAGs consisting of business logic nodes (Operators) and data flow edges (Streams) connecting them to each other.

Unlike DistCp, these data services have UI interfaces in addition to CLI, and the UI comes with various widgets to check both system as well as application metrics. Probably for the demo purpose, it’s the easiest to launch the ingestion app through this configuration page.

I will now walk through a common template whose pattern is included in the data services.

When this app is launched, the dtManage is able to show the logical DAG for this application as below.

logical ingestion

In the logical dag,

FileSplitter discovers the files and just like DistCp would do, logically chops up the files into smaller chunks and forwards this information to BlockReader. This operator belongs to the input stage of our ingestion architecture.

BlockReader starts processing the blocks demarked by FileSplitter and starts reading them. In this case it’s reading the entire block as one record but in some other use cases, this is where the app reads the records and parses them.

The data copy/sync data service is supposed to copy the data as it is so it does not do anything for enriching the data. But if you encryption is requested it’s done as part of the process stage. Otherwise it simply transfers the data one block at a time to the BlockWriter without doing any processing.

BlockSynchronizer and FileMerger belong to the Segregation stage and they decide how the blocks created by BlockWriter need to be put back together. BlockSynchronizers job is to track the progress of BlockWriter and when all the blocks needed for stitching the file together have been written out by BlockWriter, it signals the FileMerger to start stitching them together.

The FileMerger itself plays dual role. It segregates as well as outputs the data to the sink.

SummaryWriter is just an operator which creates the audit of successful/unsuccessful copy work done by the app.

An interesting thing to note here is that any of the operators that we discussed do not have any limitation in terms of how many instances they can scale up to at runtime. Depending upon the workload, each piece is capable of scaling itself independent of neighboring operators. So given enough resources while it’s possi     ble for this app to copy/process terabytes of data in just a few minutes, the real utility comes from its ability to track the progress of copy operation at the block level and automatically recover from that exact point in case of failure.

Oh yes, did I mention to you that our data services are not limited to file based sources, it can also ingest the data from message bus based sources such as Kafka and JMS and after compacting write them to rolling files of fixed size.

If all this sounds interesting, we would love some feedback on what this various data services listed on our AppFactory, include those that replace regular ‘hadoop fs -cp’ or ‘DistCp’. You can download DataTorrent RTS from DataTorrent download page: https://www.datatorrent.com/download. Do keep a look out for future additions to our data services in terms of new sources, more analytics, and, integration with data governance, and tighter integration into big data and cloud ecosystem.