본문 바로가기

Big Data/Flink

Flink 시작하기 #6 Windows



스트리밍 데이터는 unbounded 데이터로 처음과 끝의 개념이 없다. element의 데이터를 개별적으로 처리하는 연산만 사용한다면 큰 문제는 없지만, 집계연산을 사용한다면 문제가 생긴다. 만약 평균값을 계산한다고 한다면 끝이 없다면 할 수 없다. 그래서 스트리밍 데이터 처리 시스템에서는 Windows라는 개념이 존재한다.
Windows는 쉽게 말해서 특정한 룰에 따라 일정 데이터를 모아 처리하는 개념이다. 

Flink Windows 구조는 아래와 같다. keyedStream와 DataSteam 별로 다른 API를 쓸분 구조는 동일하다. trigger, evictor, allowedLateness 구분은 선택사항이다. 간단하게 window() 만 구현하면 기본적은 window 기능을 사용할 수 있다.

Keyed Windows

stream
       .keyBy(...)          <-  keyed versus non-keyed windows
       .window(...)         <-  required: "assigner"
      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
      [.allowedLateness()]  <-  optional, else zero
       .reduce/fold/apply() <-  required: "function"

Non-Keyed Windows

stream
       .windowAll(...)      <-  required: "assigner"
      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
      [.allowedLateness()]  <-  optional, else zero
       .reduce/fold/apply() <-  required: "function"

WindowAssigner를 window/windowAll을 통해 세팅하면 원하는 window 방법으로 데이터를 처리할 수 있다.
각 window는 시간이나 카운트 기반으로 설정할 수 있다. count는 말그대로 들어오는 데이터 갯수 별로 쪼개는 개념이다. 그리고 시간은 튜플의 갯수에 상관없이 시간단위로 window 크기를 설정한다. 

Flink는 4종류의 window를 기본적으로 제공하고 있다. 하기 설명은 time기반의 window로 예제를 작성하고 설명할 것이다.

Tumbling Windows
고정된 시간 단위로 중복 데이터 처리없이 window를 설정하는 방식이다.  예를 들어, window size를 5초로 설정한 Tumbling windows는 5초 간격으로 들어오는 데이터를 중복 없이 처리되는 구조이다. 00:00 ~ 00:05, 00:06~00:10, ... 이런식으로 처리된다.
아래 그림 처럼 고정된 window size안에 들어가는 모든 데이터를 하나의 window size로 처리한다. 개념이 아주 간단하고 심플하다. 


예제 코드를 작성해보자. window 개념을 쉽고 이해하기 위해 데이터는 1부터 9까지 데이터를 일정 간격으로 생성하는 source로 작성할 것이다.
function은 어떤 데이터들이 한 window 에서 처리되는지 쉽게 알아보기 위해 들어오는 데이터를 시간, 갯수 그리고 append된 데이터로 만들어 window 특성들을 결과만 보더라도 쉽게 이해할 수 있도록 작성한다.

window flink 코드를 작성하기 전에 위에서 셜명한 source와 function를 분리해 놓았다. 이 후 예제부터는 아주 간단하게 window관련 코드만 작성할 것이다.
StreamCreator는 데이터를 무한히 생성해줄 것이고, Operator는 window 데이터 처리를 쉽게 보여줄수 있도록 처리하는 역할을 담당한다.

 

자 이제 Tumbler Window 코드를 작성해보자.
역시 가장 먼저 StreamExecutionEnvironment를 가져와야 한다. 그리고 timeCharacteristic을 설정해야한다. 이부분은 EventTime을 설명하는 포스팅에서 자세히 설명할 것이다. 이부분 time-based window를 작성하기 위해선 필수 설정이다. 그리고 위에서 작성한 StreamCreator에서 1부터 9까지의 데이터를 지속적으로 생성하고, non-keyedStream의 window연산인 windowAll로 TublingEventTimeWindows로 2초 window size로 설정한다. apply 메소드에 미리 작성한 appendAllFunction을 지정하고 sink로 콘솔에 프린트하도록 print()로 하나의 flow를 작성한다.


이부분에서 중요한 부분은 windowAll(TumblingEventTimeWindows.of(Time.seconds(2))) 이부분이다.
Tumbling Window로 2초 간격로 쪼개라라는 의미가 이 코드로 작성된 것이다.

이제 결과를 보자. time부분을 보면 2초 간격으로 실행되는것을 볼 수 있다. 그리고 처리되는 count가 다른 이유는 creator에서 9까지 데이터를 생성후 delay 시간이 더 길기 때문에 각 window마다 처리되는 사이즈가 약간 다르다 하지만 데이터를 보면 알 수 있듯이 절대 중복으로 처리되는 데이터가 없다.

Key: , time: 21:44:52 160, Count: 3 -> 123
Key: , time: 21:44:54 062, Count: 4 -> 4567
Key: , time: 21:44:56 137, Count: 2 -> 89
Key: , time: 21:44:58 102, Count: 4 -> 1234
Key: , time: 21:45:00 168, Count: 4 -> 5678
Key: , time: 21:45:02 235, Count: 2 -> 91
Key: , time: 21:45:04 207, Count: 4 -> 2345
Key: , time: 21:45:06 074, Count: 4 -> 6789
Key: , time: 21:45:08 132, Count: 2 -> 12


Sliding Window
위에서 설명한 Tumbling Window의 개념은 아주 심플하고 간단하다. Sliding Window는 일반적인 데이터 처리와 약간 다르다. 중복데이터를 허용하기 때문이다. 
N 시간마다 +-M시간 전후 데이터를 한 window에서 처리되는 개념이다.
아래 그림처럼 각 window마다 앞뒤로 중복으로 처리되는 데이터가 존재한다. 실제 활용할 수 있는 예제로 예를 들면 10초마다 최근 1분간의 온도의 평균을 계산하는 프로세스가 있다면 위에서 설명한 Tumbling window로 처리할 수 없다. 간격보다 처리해야할 데이터의 시간이 더 크기 때문이다. 

예제코드는 다음과 같다. 
SlidingEventTimeWindows.of 를 통해 WindowAssigner를 만든다. 이때 파라미터는 2개인데 첫번째는 처리되는 데이터 시간이고, 두전째는 처리 간격이다.
아래 예제기준으로는 2초마다 4초간 들어온 데이터를 처리하는 코드이다.



결과는 2초마다 window가 처리되었고 4초간 들어온 모든 데이터를 중복처리되었다. 
Key: , time: 22:16:56 338, Count: 3 -> 123
Key: , time: 22:16:58 039, Count: 7 -> 1234567
Key: , time: 22:17:00 107, Count: 7 -> 4567891
Key: , time: 22:17:02 160, Count: 8 -> 89123456
Key: , time: 22:17:04 124, Count: 8 -> 23456789


Session Window
Session window는 일정 기간동안 반응이 없는 경우 세션 시작부터 반응이 없는 시간까지의 데이터를 하나의 window size로 처리한다. 예를들어 설명하면, session gap을 5초로 설명하면 5초 동안 데이터가 들어오지 않으면 window를 쪼개는 방식이다. 00:00부터 1초 간격으로 데이터가 꾸준히 들어오다가 00:10초부터 데이터가 들어오지 않고 00:17초 에 데이터가 들어왔다면 session gap인 5초를 지나 00:00~00:10가지의 데이터를 window size로 정해진다.
아래 그림을 보면 session gap마다 window가 처리되는 개념을 설명하고 있다. 따라서 하나의 window에서 처리되는 element 갯수가 굉장히 다를 수 있다.

다음은 session window의 예제 코드이다. 설정은 간단하다 session gap만 설정하면 된다.
아래 예제는 300ms간격으로 1부터 9까지 데이터를 생성한다. 다시 데이터 1이 생성된 시간은 두배인 600ms 이후에 들어오는 source이다. 그리고 session gap은 500ms이다.


결과는 다음과 같다. 일정간격으로 생성되는 source이기 때문에 각 window에서 처리되는 갯수는 같을 것이다. 9에서 다시 1이 생성되는 간격인 600ms 에만 session gap을 넘어서기 때문에 아래와 같은 결과가 나타난다.
Key: , time: 22:26:24 542, Count: 9 -> 123456789
Key: , time: 22:26:27 768, Count: 9 -> 123456789
Key: , time: 22:26:31 188, Count: 9 -> 123456789
Key: , time: 22:26:34 473, Count: 9 -> 123456789


Global Window
global window는 하나의 윈도우로 모든 데이터를 처리한다. 따라서 trigger와 evictor를 설정해야지만 의미있게 사용할 수 있다. 
trigger 는 가져올 데이터에 대한 정의를 하고, evictor는 처리할 데이터에 대한 정의이다.

아래 코드는 count기반으로 3개의 데이터가 들어올 때마다 5개 데이터를 한 window에서 처리되는 예제이다. 이 코드만 보면 count기반의 sliding window와 동일할 것이다. 


결과는 다음과 같다. 1초마다 데이터가 들어오기 때문에 3개의 데이터가 들어올때마다 처리가 되기 때문에 3초 간격으로 window가 처리됬고 5개의 데이터가 처리되기 때문에 중복처리가 되었다.
time: 22:34:34 507, Count: 3 -> 123
time: 22:34:37 339, Count: 5 -> 23456
time: 22:34:40 350, Count: 5 -> 56789
time: 22:34:45 406, Count: 5 -> 89123
time: 22:34:48 399, Count: 5 -> 23456
time: 22:34:51 397, Count: 5 -> 56789

만약 위 코드에서 evictor 부분을 주석을 하고 다시 실행하면 결과가 아래와 같을 것이다. 3초마다 데이터가 들어온다는 정의는 있는데 몇개를 처리할지에 대한정의가 없기 때문에 지금까지 들어온 모든 데이터를 처리하게 된다.


time: 22:44:37 076, Count: 3 -> 123
time: 22:44:39 919, Count: 6 -> 123456
time: 22:44:42 915, Count: 9 -> 123456789
time: 22:44:47 973, Count: 12 -> 123456789123
time: 22:44:50 981, Count: 15 -> 123456789123456
time: 22:44:53 973, Count: 18 -> 123456789123456789
time: 22:44:59 017, Count: 21 -> 123456789123456789123
time: 22:45:02 001, Count: 24 -> 123456789123456789123456


#출처