For successful launch of fast, big data projects, try DataTorrent’s AppFactory.

Now that the Apache apex-core and apex-malhar projects are incubating, I thought this would be a good time to provide a simple introduction to developing applications using these Open Source projects.

This document introduces the reader to the streaming platform Apache Apex and a large associated library of operators (Malhar) and associated tools. All of these are open source and freely downloadable from github. It is targeted at Java developers using the Linux operating system, so the reader is expected to have basic command line skills, have some familiarity with the Java programming language, and have at least a basic understanding of the benefits and pitfalls of distributed computing. Acquaintance with a Java IDE (Integrated Development Environment) such as Eclipse, NetBeans or IntelliJ,   the Maven build tool, and the Git revision control system will be useful though not required. Internet connectivity however, is required since we will need to download Apex and Malhar code.

There are, of course, other ways of approaching the problem such as using the binary releases or the virtual machine sandbox available on our website. However, I felt that diving directly into the code base would be more appealing to a (significant) segment of developers.

The following sections will walk the reader through the various steps to create, compile and run a couple of simple applications. Detailed documents on various aspects of the platform (such as the architecture, basic streaming concepts, etc.) and operator library are provided in the Additional Resources section at the end.

Setup

To begin with, make sure that you have Java JDK (version 1.7.0_79 or later), maven (version 3.0.5 or later) and git (version 1.7.1 or later) by running the following commands:

Command Expected output
java -version
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
mvn --version
Apache Maven 3.0.5
Maven home: /usr/share/maven
Java version: 1.7.0_79, vendor: Oracle Corporation
Java home: /home/<user>/Software/java/jdk1.7.0_79/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.16.0-44-generic", arch: "amd64", family: "unix"
git --version
git version 1.7.1

Next, create a new directory where you want the code to reside, for example:

       cd ~/src; mkdir dt; cd dt

 

Now, download the code for Apex and Malhar using git (all the commands below assume that you are running them from this newly created directory):

       git clone https://github.com/apache/incubator-apex-core
       git clone https://github.com/apache/incubator-apex-malhar

 

You should now see two directories named   incubator-apex-core and incubator-apex-malhar. Step into each, switch to the 3.1 branch and build it (make sure you build incubator-apex-core   first):

       pushd incubator-apex-core; git checkout release-3.1; mvn clean install -DskipTests; popd
       pushd incubator-apex-malhar; git checkout release-3.1; mvn clean install -DskipTests; popd

 

If you prefer, you can omit the semicolons and type each command on a line by itself. The “install” argument to the mvn command will cause resources from each project to be installed to your local maven repository (typically ~/.m2/repository); nothing is installed to your system directories so you do not need root (superuser) privileges to run this command. The “-DskipTests” argument skips running unit tests since they can take a long time.

At the time of writing, 3.1 was the latest release so we use the release-3.1 branch; if a later release is available, use a suitably modified branch name.

If this is your first build of these components, it can take several minutes to complete since Maven will download a variety of plugins needed for the build; later builds should go much faster.

Running pre-built applications

There are a number of pre-built applications that are bundled with Malhar, each in its own subdirectory under incubator-apex-malhar/demos; some of these are standalone and some require external resources (such as a Twitter feed for the twitter demo). The build creates  a file with the .apa extension  (called an “application package“) for each in the corresponding target subdirectory; such packages may contain multiple applications.

We now discuss the command-line tool called dtcli which is bundled in core. Define the following alias for convenience: alias dtcli=`pwd`/incubator-apex-core/engine/src/main/scripts/dtcli

You can now start it with:

dtcli

 

It may produce a couple of warnings about log4j not being initialized; these are harmless and may be ignored.   You should see the “dt>” prompt. Type help to see a list of available commands and a brief description for each. You can exit this application by typing CTRL-D or exit. For any command you enter, the left/right arrow keys will move the cursor within the command to edit it and the up/down arrow keys to cycle through command history just like a shell.

For brevity, we use these variables in the commands that follow (they can be entered verbatim at the “dt>” prompt):

wc=incubator-apex-malhar/demos/wordcount/target/wordcount-demo-3.1.0.apa
uc=incubator-apex-malhar/demos/uniquecount/target/uniquecount-3.1.0.apa

 

Now, run a couple of the standalone apps as follows; in each case you can hit CTRL-C to interrupt the program and return to the prompt:

Command Description
launch -local $wc Prints a list of unique words encountered in each window of the input stream along with a frequency count for each. The input for this application comes from the file demos/wordcount/src/main/resources/samplefile.txt
launch -local $uc Shows you a list of two applications present in this jar; choose “1” which corresponds to the application named UniqueKeyValueCountDemo. It should then prints a list of unique key-value pairs and a frequency count for each. The input for this application is randomly generated.

The “-local” option requests the app to be run in local mode ‘ meaning without a cluster. Running it in a Hadoop cluster is a more advanced topic and will be covered in later blogs.

There are a number of additional pre-built applications under the demos directory:

Application Description
echoserver Reads messages from a network connection and echoes them back out
frauddetect Analyzes a stream of credit card merchant transactions
machinedata Analyzes a synthetic stream of events to determine health of a machine
mobile Demonstrates scalability of the Data Torrent platform by analyzing a synthetic stream of location data for a large number of cell phone connections.
mroperator Contains several map-reduce applications.
pi Simple application that compute the value of pi (Ï€) using Monte Carlo methods.
r Analyzes a synthetic stream of eruption event data for the Old Faithful geyser (https://en.wikipedia.org/wiki/Old_Faithful).
twitter Analyzes live tweet data from a Twitter stream; needs valid Twitter credentials.
yahoofinance Analyzes live stock transaction data

You can now experiment with modifying the source code for these applications or proceed to the next section to build some applications from scratch.

Your first application

We will now build a new application from scratch. Put the following lines into a text file named, say, newdt and run the file with bash newdt to generate a new maven project (the command can be typed on multiple lines as shown here with a backslash at the end of each line except the last for better readability, or all on a single line without the backslashes):

#!/bin/bash
# script to create a new project
mvn archetype:generate \
  -DarchetypeRepository=https://www.datatorrent.com/maven/content/repositories/releases \
-DarchetypeGroupId=com.datatorrent \
-DarchetypeArtifactId=apex-app-archetype \
-DarchetypeVersion=3.1.1 \
-DgroupId=com.example \
-Dpackage=com.example.myapexapp \
-DartifactId=myapexapp \
-Dversion=1.0-SNAPSHOT

The command details are based on the contents of incubator-apex-core/apex-app-archetype/README.md and may need to be adjusted as new versions of core and malhar are released. It will display a summary of these properties and prompt for confirmation with ” Y: :” at which point you can just hit ENTER. It should then create a new project directory named myapexapp containing a basic application that generates an unending sequence of random numbers and prints them out.

Step into the new project directory and run the usual command to build it:

cd myapexapp; mvn clean package -DskipTests

 

If the build is successful, it should have created the application package file:

myapexapp/target/myapexapp-1.0-SNAPSHOT.apa

 

This package can be run the same way we ran the pre-built apps above; as before, hit CTRL-C to stop the app. It will print a series of lines that look something like this: hello world: 0.30266721560549

The app consists of two source files: Application.java and RandomNumberGenerator.java. The former has the main application code that instantiates operators and wires them together in a dag (Directed Acyclic Graph); the latter is an operator that generates a sequence of random numbers. The dag uses a second operator, ConsoleOutputOperator which simply prints input tuples to the console; its source file is under incubator-apex-malhar.

Creating applications is discussed in more detail in the Application Developer Guide, and the package format is discussed in Application Packages (see Additional Resources section below).

Operators in Malhar

In addition to the operators bundled with each demo application, the Malhar library contains a large number of operators to meet a variety of needs. An overview is at Malhar Operator Library. You can, of course, write your own operators as well. The following observations may be useful in solidifying your understanding of operators as you go through the code and documentation.

  1. Input data units are arbitrary Java objects and are called ‘tuples‘ or ‘events‘ (synonymous).
  2. Operators can, generally speaking, have multiple input and output ports; they receive tuples on input ports and emit tuples on output ports.
  3. An operator may have no ports at all!
  4. Input and Output ports are fields defined in an operator; they may optionally be annotated with @InputPortFieldAnnotation or @OutputPortFieldAnnotation.
  5. The specific type of objects entering an input port is always the same; likewise the type of objects leaving an output port. However, the types of objects associated with two different ports can be, and often are, very different.
  6. All input operators implement the InputOperator interface, receive data from an external source (not from another operator) and invoke emitTuples() to write data to one or more output ports. Since they cannot receive data from another operator they have no input ports.
  7. Output operators (there is no OutputOperator interface or base class) receive tuples on input ports, process them and write computed data (typically via the processTuple() method) to external sinks.
  8. The vertices of a DAG are operators; the arcs between them are (parts of) streams.
  9. A single port is connected to a single stream.
  10. A stream is connected to one output port of an operator and multiple input ports of other operators; tuples enter the stream through the unique output port and are replicated to all the input ports. So a stream embodies the collection of outbound arcs at a single output port of a vertex (operator).
  11. The output port that is the common initial end-point of all these arcs is called the source of the stream. Similarly, all the input ports at the final end-points of these arcs are called sinks. So a stream has a single source but can have multiple sinks.
  12. A stream is represented by the StreamMeta interface and is created via DAG.addStream().
  13. DAG.StreamMeta has methods setSource() for the output port and addSink() for adding input ports.
  14. Ports are often created by extending DefaultOutputPort and DefaultInputPort.

DataTorrent helps our customers get into production quickly using open source technologies to handle real-time, data-in-motion, fast, big data for critical business outcomes. Using the best-of-breed solutions in open source, we harden the open source applications to make them act and look as one through our unique IP that we call Apoxi™. Apoxi is a framework that gives you the power to manage those open source applications reliably just like you do with your proprietary enterprise software applications, lowering the time to value (TTV) with total lower cost of ownership (TCO). To get started, you can download DataTorrent RTS or micro data services from the DataTorrent AppFactory.