For successful launch of fast, big data projects, try DataTorrent’s AppFactory.

* Extend your Scala expertise to building Apache Apex applications *

Scala is modern, multi-paradigm programing language that integrates features of functional as well as object-oriented languages elegantly. Big Data frameworks are already exploring Scala as a language of choice for implementations. Apache Apex is developed in Java, the Apex APIs are such that writing applications is a smooth sail. Developers can use any programing language that can run on JVM and access JAVA classes, because Scala has good interoperability with Java, running Apex applications designed in Scala is a fuss-free experience. We will explain how to write an Apache Apex application in Scala.

Writing an  Apache Apex application in Scala is simple.

Operators within the application

We will develop a word count applications in Scala. This application will look for new files in a directory. With the availability of new files, the word count application will read the files, and compute a count for each word and print result on stdout. The application requires following operators:

  • LineReader – This operator monitors directories for new files periodically. After a new file is detected, LineReader reads the file line-by-line, and makes lines available on the output port for the next operator.
  • Parser – This operator receives lines read by LineReader on its input port. Parser breaks the line into words, and makes individual words available on the output port.
  • UniqueCounter – This operator computes the count of each word received on its input port.
  • ConsoleOutputOperator – This operator prints unique counts of words on standard output.

Build the Scala word count application

Now, we will generate a sample application using maven archtype:generate.

Generate a sample application.

mvn archetype:generate -DarchetypeRepository=https://www.datatorrent.com/maven/content/repositories/releases -DarchetypeGroupId=com.datatorrent -DarchetypeArtifactId=apex-app-archetype -DarchetypeVersion=3.0.0 -DgroupId=com.datatorrent -Dpackage=com.datatorrent.wordcount -DartifactId=wordcount -Dversion=1.0-SNAPSHOT

This creates a directory called wordcount, with a sample application and build script. Let us see how to modify this application into the Scala-based word count application that we are looking to develop.

Add the Scala build plugin.

Apache Apex uses maven for building the framework and operator library. Maven supports a plugin for compiling Scala files. To enable this plugin, add the following snippet to the build -> plugins sections of the pom.xml file that is located in the application directory.

  <plugin>
    <groupId>net.alchim31.maven</groupId>
    <artifactId>scala-maven-plugin</artifactId>
    <version>3.2.1</version>
    <executions>
      <execution>
        <goals>
          <goal>compile</goal>
          <goal>testCompile</goal>
        </goals>
      </execution>
    </executions>
  </plugin>

Also, specify the Scala library as a dependency in the pom.xml file.
Add the Scala library.

<dependency>
 <groupId>org.scala-lang</groupId>
 <artifactId>scala-library</artifactId>
 <version>2.11.2</version>
</dependency>

We are now set to write a Scala application.

Write your Scala word count application

LineReader

Apache Malhar library contains an AbstractFileInputOperator operator that monitors and reads files from a directory. This operator has capabilities such as support for scaling, fault tolerance, and exactly once processing. To complete the functionality, override a few methods:
readEntity : Reads a line from a file.
emit : Emits data read on the output port.
We have overridden openFile to obtain an instance of BufferedReader that is required while reading lines from the file. We also override closeFile for closing an instance of BufferedReader.

class LineReader extends AbstractFileInputOperator[String] {

  @transient
  val out : DefaultOutputPort[String] = new DefaultOutputPort[String]();

  override def readEntity(): String = br.readLine()

  override def emit(line: String): Unit = out.emit(line)

  override def openFile(path: Path): InputStream = {
    val in = super.openFile(path)
    br = new BufferedReader(new InputStreamReader(in))
    return in
  }

  override def closeFile(is: InputStream): Unit = {
    br.close()
    super.closeFile(is)
  }

  @transient
  private var br : BufferedReader = null
}

Some Apex API classes are not serializable, and must be defined as transient. Scala supports transient annotation for such scenarios. If you see objects that are not a part of the state of the operator, you must annotate them with a @transient. For example, in this code, we have annotated buffer reader and output port as transient.

Parser

Parser splits lines using in-built JAVA split function, and emits individual words to the output port.

class Parser extends BaseOperator {
  @BeanProperty
  var regex : String = " "

  @transient
  val out = new DefaultOutputPort[String]()

  @transient
  val in = new DefaultInputPort[String]() {
    override def process(t: String): Unit = {
      for(w <- t.split(regex)) out.emit(w)
    }
  }
}

Scala simplifies automatic generation of setters and getters based on the @BinProperty annotation. Properties annotated with @BinProperty can be modified at the time of launching an application by using configuration files. You can also modify such properties while an application is running. You can specify the regular expression used for splitting within the configuration file.

UniqueCount and ConsoelOutputOperator

For this application, let us use UniqueCount and ConsoleOutputOperator as is.

Put together the word count application

Writing the main application class in Scala is similar to doing it in JAVA. You must first get an instance of DAG object by overriding the populateDAG() method. Later, you must add operators to this instance using the addOperator() method. Finally, you must connect the operators with the addStream() method.

@ApplicationAnnotation(name="WordCount")
class Application extends StreamingApplication {
  override def populateDAG(dag: DAG, configuration: Configuration): Unit = {
    val input = dag.addOperator("input", new LineReader)
    val parser = dag.addOperator("parser", new Parser)
    val counter = dag.addOperator("counter", new UniqueCounter[String])
    val out = dag.addOperator("console", new ConsoleOutputOperator)

    dag.addStream[String]("lines", input.out, parser.in)
    dag.addStream[String]("words", parser.out, counter.data)
    dag.addStream[java.util.HashMap[String,Integer]]("counts", counter.count, out.input)
  }
}

Running application

Before running the word count application, specify the input directory for the input operator. You can use the default configuration file for this. Open the src/main/resources/META-INF/properties.xml file, and add the following lines between the tag. Do not forget to replace â€Ŕusername† with your Hadoop username.

<property>
 <name>dt.application.WordCount.operator.input.prop.directory</name>
  <value>/user/username/data</value>
</property>

Build the application from the application directory using this command:

mvn clean install -DskipTests

You should now have an application package in the target directory.

Now, launch this application package using dtcli.

$ dtcli
DT CLI 3.2.0-SNAPSHOT 28.09.2015 @ 12:45:15 IST rev: 8e49cfb branch: devel-3
dt> launch target/wordcount-1.0-SNAPSHOT.apa
{"appId": "application_1443354392775_0010"}
dt (application_1443354392775_0010) >

Add some text files to the /user/username/data directory on your HDFS to see how the application works. You can see the words along with their counts in the container log of the console operator.

Summary

Scala classes are JVM classes that can be inherited from JAVA classes, while allowing transparency in JAVA object creation and calling. That is why you can easily extend your Scala capabilities to build Apex applications.
To get started with creating your first application, see https://www.datatorrent.com/buildingapps/.

See Also

DataTorrent helps our customers get into production quickly using open source technologies to handle real-time, data-in-motion, fast, big data for critical business outcomes. Using the best-of-breed solutions in open source, we harden the open source applications to make them act and look as one through our unique IP that we call Apoxi™. Apoxi is a framework that gives you the power to manage those open source applications reliably just like you do with your proprietary enterprise software applications, lowering the time to value (TTV) with total lower cost of ownership (TCO). To get started, you can download DataTorrent RTS or micro data services from the DataTorrent AppFactory.