Data Pipeline/Fluentd

Fluentd란?

재심 2022. 10. 29. 21:26

[Fluentd?]

  • 로그 수집기 (보통 로그 수집의 목적으로 사용하나 다양한 데이터 소스로부터 데이터를 받아 처리할 수 있음)
  • C와 Ruby로 개발됨.
  • fluentd로 전달된 데이터는 tag, time, record (JSON)으로 구성된 이벤트로 처리됨.
  • 원하는 형태로 가공 후 다양한 목적지 (ES, Kafka etc..)로 전달 될 수 있음.
  • Filter, Buffer, Parser를 직접 설정할 수 있는 것이 가장 큰 장점 
  • 데이터 유실을 막기 위해 메모리와 파일 기반의 버퍼 시스템을 사용.
  • Failover를 위한 HA구성도 가능.

Use-Cases

  • Application Log 수집
  • mongodb와 같은 DB에 데이터 수집  
  • 스트림 데이터 처리 

Tag

fluentd에서는 Tag를 통해 이벤트가 흘러가면서 적용할 수 있는 Filter, Parser 등을 적용할 수 있는 기준이 된다. 

# tag 사용 예시
<source>
    @type tail
    tag dev.sample
    path /var/log/sample.log
</source>
 
# dev.sample 만을 위한 처리방법
<match dev.sample>
    @type stdout
<match>

 

[Fluentd 데이터 구조]

데이터는 크게 Time, Tag, Recode로 나뉜다

  • Time: 이벤트 생성 시간
  • Tag: 데이터의 분류로 각 이벤트는 tag에 따라 종류를 구분할 수 있다. 즉, tag에 따라 필터링, 라우팅을 각각 적용할 수 있게 된다. 
  • Record: 이벤트의 실제 내용 (JSON 형태)

 

[Fluentd 구성요소]

fluentd는 7개의 컴포넌트로 구성된다.

일반적으로는 Input → Engine → Output의 흐름을 가장 많이 사용.

Engine을 제외한 나머지 컴포넌트들은 플러그인 형태로 제공되어 커스터마이징된 플러그인 사용이 가능하다. 

Fluentd Components

 

  • Input
  • Parser (Optional)
  • Engine
  • Filter (Optional)
  • Buffer (Optional)
  • Output
  • Formatter (Optional)

Input (Source)

로그를 수집하는 플러그인으로 HTTP, tail, TCP 등 외에도 확장 플러그인을 통해 다양한 서버나 애플리케이션으로 부터 다양한 포맷의 데이터를 수집 가능.

기본적으로 "@http" 와 같은 태그를 통해 http request를 받을 수 있는 플러그인들이 제공됨. 

플러그인 목록: https://www.fluentd.org/plugins (mongo, kafka 둘 다 있음)

 

Parser 

전달받은 데이터를 파싱하기 위한 플러그인.

Input, Filter, Output 플러그인 안에서 정의. 

"@type" 파라미터로 사용할 Parser 플러그인 이름을 지정함. 

기존적으로 apache2, nginx, syslog, csv, json, none 등이 있음.

 

parser_regexp: 정규표현식으로 데이터를 파싱

none: 데이터를 새로운 필드에 담을 때 사용

 

Filter

필터를 통해 특정 데이터만 지정해서 output으로 보내거나, 데이터에 필드 값을 추가 or 삭제 등도 가능하다 

필터 내부에 여러 플러그인을 제공한다.

 

record_transformer: output으로 내보내기 전에 이벤트에 hostname을 추가한다.

grep: 메시지 필드에 특정값이 있는 경우만 output으로 내보낸다. 정규표현식 사용가능. 

 

Buffer 

output (match) 내부에서 사용하며 input에서 들어온 이벤트가 설정한 버퍼 크기에 도달한 경우 output으로 내보내도록 설정할 수 있음. 

Fluentd : Buffer

 

버퍼의 특징

  • 버퍼에는 tag 단위로 chunk가 생성되어 태그별로 구분되어 chunk에 적재됨.
  • chunk에 데이터가 쌓이고, chunk-limit이나 interval에 도달하게 되면 그 때 Queue에 저장된다. 
  • Queue에서 이제 실제로 Output으로 데이터를 전달하는데 이 때 실패할 경우 retry, retry interval을 통해 재시도한다. retry 횟수도 설정가능
  • Queue가 꽉 찰 경우 익셉션을 발생시키고, 해소될 때 까지 input을 중단한다. (consumer lag쌓이듯이 쌓아놓나..?)
  • Fluentd가 장애로 내려갈 경우, 버퍼에 있는 chunk를 파일로 내리는 옵션을 적용하여 데이터 복구까지 가능하다. 

 

Output

앞에서 처리된 이벤트들을 실제로 저장할 곳을 지정한다 (HDFS, mongodb, Kafka etc..)

Output 플러그인에서 데이터 포맷을 Formatter를 통해 지정할 수 있다. 

"match" 태그를 통해 설정한다. 

Kafka, mongodb 모두 존재함.

 

Formatter 

Output 안에서 "format" 태그를 통해 지정하여 출력형식을 변경할 수 있음.

json, ltsv, csv, hash와 같은 플러그인을 사용한다. 

 

[참조]

https://jonnung.dev/system/2018/04/06/fluentd-log-collector-part1/

https://blog.naver.com/PostView.naver?blogId=gnlwnd352&logNo=222224289859

https://docs.fluentd.org/v/0.12/

https://my-trash-code.tistory.com/m/69