[Kafka]컨슈머(consumer) 이해, 팁, 옵션


이 글은 카프카, 데이터 플랫폼의 최강자 고승범/공용준 님의 책을 공부하며
정리하는 글입니다.

컨슈머(consumer) 이해, 팁, 옵션

컨슈머 이해, 팁, 옵션

컨슈머란

  • 토픽의 메시지를 가져와서 소비하는 역할을 하는 애플리케이션이나 서버
  • 주요 기능은 파티션 리더에게 메시지를 가져오기를 요청
  • 각 요청은 로그의 오프셋을 명시하고 그 위치로부터 메시지를 수신
  • 가져올 메시지의 위치를 조정
  • 가져온 데이터를 다시 가져올 수 있음 <- RabbitMQ와 가장 큰 차이점

컨슈머 주요 옵션

  • bootstrap.servers
    • 정의된 포맷: kafka01:9092, kafka02:9092….
    • 전체 카프카 리스트를 적어주는 것을 권장. 하나만 사용시 장애 발생시 불능
  • fetch.min.byte
    • 한번에 가져올 수 있는 최소 데이터 사이즈
    • 지정한 크기보다 작다면 데이터가 누적될 때까지 기다림
  • group.id
    • 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자
    • 가장 중요하므로 하단에서 다시 정리
  • enable.auto.commit
    • 백그라운드로 주기적으로 오프셋을 커밋함
    • enable.auto.commit=true 로 설정하면 5초마다 컨슈머는 poll()를 호출할 때 가장 마지막 오프셋을 커밋함(자동 커밋).
    • 5초 주기는 기본값이며, auto.commit.interval.ms 옵션을 통해 조정 가능.
  • auto.offset.reset
    • 오프셋이 없는 경우(데이터가 없음)에 아래의 옵션으로 리셋함
      • earliest : 가장 초기의 오프셋 값으로 설정
      • latest : 가장 마지막의 오프셋 값으로 설정
      • none : 이전 오프셋값을 찾지 못하면 에러를 나타냄
  • fetch.max.bytes
    • 한번에 가져올 수 있는 최대 데이터 사이즈
  • request.timeout.ms
    • 요청에 대해 응답을 기다리는 최대 시간
  • session.timeout.ms
    • 컨슈머가 살아있는 것으로 판단하는 시간(디폴트 10초)
    • 10초 안에 컨슈머가 그룹 코디네이터에게 하트비트를 보내야함
    • 10초를 넘으면 장애로 판단하고 리밸런스를 시도함
    • 세션 타임아웃이 짧다면 가비지 컬렉션이나 poll loop 완료시간이 길어지게 되어 리밸런스가 일어나기도 함.
    • 세션 타임아웃이 길다면 리밸런스보다 실제 오류 탐시 시간이 오래 걸림
  • heartbeat.interval.ms
    • 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼지 조정함.
    • session.timeout.ms과 밀접하며 반드시 session.timeout.ms의 값보다 당연히 작아야 하며 일반적으로 1/3로 설정. 디폴트는 3초
  • max.poll.records
    • 단일 호출 poll()에 대한 최대 레코드 수를 조정.
  • max.poll.interval.ms
    • 컨슈머가 하트비트를 주기적으로 보내는데, 하트비트만 보내고 메시지를 가져가지 않은 경우, 무기한 파티션 점유를 막기 위함
    • 즉 컨슈머가 poll()을 호출하지 않으면 장애로 판단하며, 다른 컨슈머에게 메시지를 가져가도록 함.
  • auto.commit.interval.ms
    • 주기적으로 오프셋을 커밋하는 시간
  • fetch.max.wait.ms
    • fetch.min.bytes의 설정 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간.
    • fetch.min.bytes는 한번에 가져가는 최대 데이터 사이즈인데 무한정 데이터가 차길 기다릴 수 없으니 시간제약을 준다고 생각하자.

Python을 이용한 컨슈머 코드

from kafka import KafkaConsumer

#(1)
consumer = KafkaConsumer('peter-topic',group_id='peter-consumer',
	bootstrap_servers='kafka01:9092,kafka02:9092',enable_auto_commit=True,
	auto_offset_reset='latest')
#(2)
for message in consumer:
	print "Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
		message.topic, message.partition, message.offset, message.key, message.value.
		decode('utf-8'))
  • (1)은 토픽명, 컨슈머에서 사용할 그룹아이디, 브로커 주소(전부), 자동 커밋, 오프셋 리셋 옵션을 지정.
    • 오프셋 리셋값은 earliest와 latest가 있는데 earlist는 토픽의 처음부터 메시지를 가져오고 latest는 토픽의 가장 마지막부터 메시지를 가져온다. 기본값은 latest다.
    • 컨슈머 그룹과 커밋과 오프셋은 후에 다룸
  • (2)루프를 돌고, poll을 호출하면서 메시지를 가져온다.


참고: 카프카, 데이터 플랫폼의 최강자