본문 바로가기

Big Data/Flink

Flink 시작하기 #5 Streaming API



#Streaming dataflow
Flink의 streaming dataflow는 데이터를 받아오는 Data source, 그리고 데이터를 처리하는 Transformation, 데이터를 최종 처리하는 data Sink로 3단계로 구성된다.  구지 스트리밍이 아니여도 이 flow는 비슷할 것이다. 데이터를 가져와서 가공하고 저장하는 단계를 flink는 네이밍을 저렇게 한 것이다. dataflow의 구성은 아래 그림과 같다. 이 포스팅은 Flink 1.2 기준으로 작성됬다.


#Data source
source는 input에 해당하며 어디서부터 데이터를 가져올 것 인가를 정의하는 단계이다.
flink는 기본적으로 StreamExecutionEnviroment로부터 다음의 stream source를 제공하고 있다.

- File-based
  파일로부타 data source를 생성한다. 단순히 local에있는 파일을 읽을수도 있고, HDFS나 s3도 가능하다. 그리고 파일이 변경이 있을때마다 데이터를 가져올 수 있는 watch 기능도 있다. 간단하게 local 텍스트 파일을 읽는 예제를 작성해보자


StreamExecutionEnviroment를 가져오는 부분과 env.execute()는 flink 프로그램을 하는 기본이다. 이후 작업에선 생략할 것이다.
그리고 간단히 DataStream의 print()를 써서 단순히 콘솔에 찍는것으로 예제를 확인할 것이다.

다시 위 예제를 설명하면 local에 있는 path로 readTextFile로 가져올 수 있다. line별로 하나의 element를 이루고 있다

- Socket-based
소켓 데이터를 받을 수 있는 API 도 지원한다. 이 예제는 이전에 포스팅한 Flink WordCount에도 사용한 source이다.


- Collection-based
collection으로부터 데이터 스트림을 생성하는 방법이다. 단순히 transformation의 연산을 테스트해보기 위해 가장 간단하고 쉽다.
사용하는 API는 다음과 같다.
  • fromCollection(Seq)
  • fromCollection(Iterator)
  • fromElements(elements: _*)
  • fromParallelCollection(SplittableIterator)
  • generateSequence(from, to)
방법은 다양하지만 결국 Collection의 elements를 DataStream으로 만들어주는것이다.


- Custom
파일이나 소켓, Collection등 외에도 직접 사용자가 source를 만들 수 있다. env.addSource(SourceFunction[])를 통해 만들 수 있다.
간단하게 fromCollection과 유사한 custom source를 만들 어보자


1, 10까지 데이터를 100ms 간격으로 무한히 생성해주는 source이다. 이런식으로 RichSourceFunction을 구현해서 addSource로 만들 수 있다.

그리고 kafka나 RabbitMQ등 외부 시스템과 연결을 해주는 connectors 들이 이미 구현되어 있다. 주로 flink 와 사용되는 시스템은 대부분 지원을 하고 있다. 아니면 다른 사용자가 미리 만들어놓은 오픈소스가 분명히 있을 것이다. 검색해서 사용하면된다.
kafka와 연동하는 포스팅은 나중에 따로 작성할 예정이니 예제는 패스한다.



#Data Sink
sink는 처리된 stream을 저장하는 단계라고 보면된다. 꼭 저장이 아니여도 된다. stream을 소비하는 단계라 flink의 특징인 lazy evaluation 방식에 따라 stream을 아무리 처리해도 sink단계가 없다면 처리되지 않는다. 
sink는 파일로 저장하거나 socket으로 전달, 그리고 custom sink가 있다. 

- File

-Socket

- Custom

source와 거의 유사하다. custom의 경우 RichSinkFunction을 구현해 addSink로 만들어서 처리하면 된다.



#Transformation
transformation은 데이터 스트림을 변경시켜 새로운 스트림을 만드는 작업이다. 즉 연산을 통해 중간 데이터 가공이라고 할 수 있다.
Flink는 대부분의 스트르밍 플랫폼과 유사한 API를 지원한다. 
공식 documents에 나와있는대로 아래 transformation의 연산을 정리했다.
실제 적용한 소스 코드는 https://github.com/Minsub/FlinkStudy 에 작성해둔 샘플을 참고해보면 된다,

Transformation Description
Map
DataStream → DataStream

Takes one element and produces one element. A map function that doubles the values of the input stream:

dataStream.map { x => x * 2 }
FlatMap
DataStream → DataStream

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap { str => str.split(" ") }
Filter
DataStream → DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter { _ != 0 }
KeyBy
DataStream → KeyedStream

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See keys on how to specify keys. This transformation returns a KeyedDataStream.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Reduce
KeyedStream → DataStream

A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. 

A reduce function that creates a stream of partial sums:

keyedStream.reduce { _ + _ }
Fold
KeyedStream → DataStream

A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. 

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...

val result: DataStream[String] =
    keyedStream.fold("start")((str, i) => { str + "-" + i })

Aggregations
KeyedStream → DataStream

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimun value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")
Window
KeyedStream → WindowedStream

Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a description of windows.

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data

WindowAll
DataStream → AllWindowedStream

Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.

WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.

windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply { AllWindowFunction }
Window Reduce
WindowedStream → DataStream

Applies a functional reduce function to the window and returns the reduced value.

windowedStream.reduce { _ + _ }
Window Fold
WindowedStream → DataStream

Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":

val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i })
Aggregations on windows
WindowedStream → DataStream

Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key")
Union
DataStream* → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

dataStream.union(otherStream1, otherStream2, ...)
Window Join
DataStream,DataStream → DataStream

Join two data streams on a given key and a common window.

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... }
Window CoGroup
DataStream,DataStream → DataStream

Cogroups two data streams on a given key and a common window.

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {}
Connect
DataStream,DataStream → ConnectedStreams

"Connects" two data streams retaining their types, allowing for shared state between the two streams.

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream)
CoMap, CoFlatMap
ConnectedStreams → DataStream

Similar to map and flatMap on a connected data stream

connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
)
Split
DataStream → SplitStream

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
)

Select
SplitStream → DataStream

Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

Iterate
DataStream → IterativeStream → DataStream

Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See iterations for a complete description.

initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
}

Extract Timestamps
DataStream → DataStream

Extracts timestamps from records in order to work with windows that use event time semantics. See Event Time.

stream.assignTimestamps { timestampExtractor }