Skip to main content

Getting started with Apache Flink by setting up a pipeline

Introduction

In this quick tutorial you will learn how to setup a basic Apache Flink pipeline that reads data from json files in a specific directory. The content of the files will be serialised to a Scala class and then it will be printed to the stout (console). This job will run in a batch fashion (no streaming), but you’ll see how easy it is to switch from Batch to Streaming in flink.

For the setup we will use:

  • SBT
  • Scala 2.12.12
  • Flink 1.17.0

Project configuration

In your pom.xml (for maven), or sbt configuration, you need to following libraries. Please, check the latest version available. In this case we’re using 1.17.0

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>1.17.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.17.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.17.0</version>
</dependency>

<dependency>
<groupId>com.lihaoyi</groupId>
<artifactId>upickle_2.12</artifactId>
<version>2.0.0</version>
</dependency>

Final code

This is the end code that we’ll run. You can find an explanation of each block in the “Breaking down the code” section.

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala._

case class InputData(field1: String, field2: String)

object Main {

def main(args: Array[String]): Unit = {

// Flink Execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Input data and format
val dataLocation = "/tmp/input-data/"
val format = new TextInputFormat(new Path(dataLocation));

// Stream configuration
val ds: DataStream[String] = env.readFile(
inputFormat = format,
filePath = dataLocation,
watchType = FileProcessingMode.PROCESS_ONCE,
interval = -1
)

// Processing the stream
ds.map(x=> {
val data = ujson.read(x)
val o = InputData(data("field1").str, data("field2").str)
println(o)
})

// Trigger the computation
env.execute()
}
}

The input files in the /tmp/input-data/ directory look like this:

{"field1": "value1", "field2": "value2"}
{"field1": "value3", "field2": "value4"}

This code will produce the following output in the console.

>--- 

InputData(value3,value4)
InputData(value1,value2)

Breaking down the code

The first step in setting up an Apache Flink pipeline is to create an Execution Environment, this will allow us to define sinks and sources to process data.

 val env = StreamExecutionEnvironment.getExecutionEnvironment

Data Sources

Then, we need to define where is the data that we want to read (data location), and the format that we want to use. In this case we are using TextInputFormat to read the content of each line of the sources files as Strings, but there are other alternatives like CsvInputFormat.

// Input data and format
val dataLocation = "/tmp/input-data/"
val format = new TextInputFormat(new Path(dataLocation));

The next step is to define the source of our pipeline. In this case, we can use the readFile method to just read files from a folder.

We have also specified “watchType” to FileProcessingMode.PROCESS_ONCE, so the pipeline will end when all the data has been read. But you can also use *FileProcessingMode.PROCESS_CONTINUOUSLY, to make the pipeline read everything that arrives to the folder (a real streaming process).*

This step shows how easy it is to switch a pipeline from “streaming mode” to “batch mode” by just changing a line in the configuration.

// Stream configuration
val ds: DataStream[String] = env.readFile(
inputFormat = format,
filePath = dataLocation,
watchType = FileProcessingMode.PROCESS_ONCE,
interval = -1
)

Data Transformation

After that, we need to specifying what do we want to do with each line of the file. In this case we want to parse each line of the file as an instance of the InputData case class that we have defined earlier, and print that class to the standard output.

// Processing the stream
ds.map(x=> {
val data = ujson.read(x)
val o = InputData(data("field1").str, data("field2").str)
println(o)
})

Pipeline Execution

And finally we execute the pipeline:

env.execute()

Common Errors

Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.

Check that you have the following dependency in your pom:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
</dependency>

Source code

You can find the code of this tutorial in our github repo: