[More Kafka]4-1. 카프카 컨슈머

[More Kafka]4-1. 카프카 프로듀서 옵션


(1) 컨슈머(Consumer) 설명

컨슈머

  • 컨슈머는 어떠한 컨슈머 그룹에 속해있음.
  • 컨슈머 그룹별로 각 컨슈머가 해당 토픽을 기준으로 서로 다른 파티션을 분담해서 메시지를 읽어온다.
  • 컨슈머의 수 = 파티션의 수, best
  • 각 컨슈머는 스레드로 구현되며 병행으로 실행된다.
  • 하나의 스레드당 하나의 컨슈머를 싱행시키는 것이 원칙.

컨슈머 리더

  • 컨슈머 그룹에 처음으로 합류하는 컨슈머 = 컨슈머 그룹의 리더
  • 컨슈머 리더는 어떤 컨슈머가 어떤 파티션을 처리할 지 할당의 책임(PartitionAssignor 클래스 사용)을 가짐.
  • 할당 처리 절차가 정해지면 아래에서 설명할 그룹조정자(group coordinator) 에게 파티션 할당 내역을 전송함.

리밸런싱

  • 특정 컨슈머의 파티션 소유권을 다른 컨슈머에게 옮기는 것
  • 파티션 4개의 topic에 대해서 어떤 컨슈머 그룹(컨슈머 4개)이 메시지를 소비하고 있다.
  • 컨슈머 4개 중 하나의 컨슈머가 장애(하트비트를 세션 타임아웃 시간 내에 전송못함)를 통해서 그룹에서 빠지는 상황 = 컨슈머로서 파티션에서 메시지 소비 자격 해제
  • 장애로 판단된 컨슈머의 역할을 대체 할 녀석으로 이전 = 즉 파티션 소유권을 이전 하는 행위를 리밸런싱 이라고 함.
  • 잦은 리밸런싱 상황이 발생하지 않도록 하트비트 간격 시간과 세션 타임아웃 옵션을 조절하면서 설정하는 것도 필요하다.

그룹조정자(group coordinator)

  • 컨슈머가 세션 타임아웃 시간동안 하트비트를 보내는 카프카 브로커이다.
  • 브로커 중 하나가 그룹조정자의 역할을 한다.

커밋과 오프셋

  • poll(지정된시간동안 대기 ms) 메서드는 호출될 때마다 해당 토픽의 그룹 컨슈머들이 아직 읽지 않은 레코드들을 반환함.
  • 즉 카프카의 각 컨슈머들은 파티션별로 자신이 읽는 레코드의 현재 위치를 추적할 수 있음.
  • 커밋(commit) : 파티션 내부의 현재 위치( 오프셋, offset )를 변경하는 것.

커밋과 오프셋 이슈

  • enable.auto.commit 설정 그리고 컨뮤서 상황에 따라서 아래와 같은 이슈가 있을 수 있음.
  • 마지막으로 커밋된 오프셋이 컨슈머가 가장 최근에 읽고 처리한 메시지의 오프셋보다 작으면 그 사이가 중복처리된다.
  • 마지막으로 커밋된 오프셋이 컨슈머가 최근에 읽고 처리한 메시지의 오프셋보다 크면 그 사이는 누락된다.

(2) 컨슈머 커밋 / 오프셋 컨트롤

리밸런싱 리스너(Rebalance Listener)

  • p93
  • 컨슈머가 파티션의 소유권을 잃게 되는 상황에서 마지막 메시지의 오프셋을 커밋
  • 카프카 컨슈머 APi에서 컨슈머 파티션이 추가나 제거될 때, 코드가 실행되도록 지원하는 부분이 있다.
  • subscribe() 메서드를 호출할 때 ConsumerRebalanceListener 인터페이스를 구현하는 객체를 인자로 전달하면 된다.
  • ConsumerRebalanceListener 인터페이스는 아래의 두 가지 메서드를 정의하며, 실제 우리가 구현하면된다.
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
: 리밸런싱이 시작되기 전에, 컨슈머가 메시지를 읽고 처리하는 것을 중단하고 호출된다.
오프셋을 커밋해야하는 부분이 이 메소드이다.

public void onPartitionsAssigned(Collection<TopicPartition> partitions)
: 이 메서드는 파티션이 브로커에게 재할당된 후에, 컨슈머가 파티션을 새로 할당받아 메시지를 소비하기 전에 호출되는 메소드이다.

특정 오프셋 지정 레코드 소비 - 기본

  • 현재 오프셋에서 이전으로 돌아가서 재 소비를 가능하게 하거나
  • (시간에 민감한 앱의 경우) 소비가 딜레이가 일어날 때, 미소비된 메시지를 건너뛰는 경우에 사용할 수 있다.

특정 오프셋 지정 레코드 소비 - 읽는 순서 보장

  • 파티션을 하나로 한다
  • 트랜잭션으로 관리하는 것도 방법이 있음
    • 기본적으로 다중 파티션을 사용하면서 순서보장하는 것이 쉽지 않음.
    • 전제 조건이 있다. (1) 레코드와 오프셋이 모두 커밋이 되거나 (2) 둘다 커밋이 안되는 것이 보장되어야 함
    • 데이터베이스에 저장되는 경우에 레코드 DB저장, 오프셋 도 DB에 저장함.

읽는 순서 보장 그래서 어떻게? p96

  • 위에서 설명한 리밸런싱 리스너와 seek()을 동시에 사용한다
  • ConsumerRebalanceListener 의 인터페이스를 구현할 때 onPartitionsAssigned 메소드에서 consumer.seek() 을 호출해준다
  • seek() : 새로 할당된 파티션의 오프셋들을 찾음.

(3) 컨슈머 옵션

컨슈머 옵션

  • bootstrap.servers
    • Kafka에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보
    • 리스트 전체를 입력하는 방식을 권장함
  • fetch.min.bytes
    • 한 번에 가져올 수 있는 최소 데이터 사이즈
    • 지정한 사이즈보다 작은 경우 데이터가 누적될 때까지 기다림.
  • group.id
    • Consumer가 속한 group을 식별하는 ID
  • enable.auto.commit
    • false 하면 애플리케이션에서 요구할 때만 오프셋이 커밋됨.
    • background로 offset을 주기적으로 commit.
  • auto.offset.reset
    • 초기 offset이 없거나 데이터가 삭제된 경우 다음 옵션으로 reset.
    • earliest : 가장 초기의 offset 값으로 설정
    • latest : 가장 마지막 offset 값으로 설정
    • none : 이전 offset을 못 찾으면 오류 발생
  • fetch.max.bytes
    • 한 번에 가져올 수 있는 최대 메시지 크기
  • request.timeout.ms
    • 요청에 대해 응답을 기다리는 최대 시간
  • session.timeout.ms
    • Consumer와 broker 사이의 세션 timeout(기본값 10초)
    • heartbeat를 timeout이 지나도 못 받으면 장애로 판단. consumer group은 rebalance를 시도.
  • heartbeat.interval.ms
    • KafkaConsumer가 poll() 메소드로 얼마나 자주 heartbeat를 보낼 것인지 조정. 일반적으로 session.timeout.ms 1/3(기본값 3초)
  • max.poll.records
    • 단일 호출 poll()에 대한 최대 레코드 수를 조정
  • max.poll.interval.ms
    • 주기적으로 poll을 호출하지 않으면 장애라고 판단. group에서 제외한 후 다른 Consumer가 해당 partition에서 메시지를 가지고 갈 수 있게 처리.
  • auto.commit.interval.ms
    • default : 5000
    • 주기적으로 offset을 commit하는 시간
  • fetch.max.wait.ms
    • fetch.min.bytes 데이터보다 적은 경우 응답을 기다리는 최대시간.