(1) 컨슈머(Consumer) 설명
컨슈머
- 컨슈머는 어떠한 컨슈머 그룹에 속해있음.
- 컨슈머 그룹별로 각 컨슈머가 해당 토픽을 기준으로 서로 다른 파티션을 분담해서 메시지를 읽어온다.
컨슈머의 수 = 파티션의 수, best
- 각 컨슈머는 스레드로 구현되며 병행으로 실행된다.
- 하나의 스레드당 하나의 컨슈머를 싱행시키는 것이 원칙.
컨슈머 리더
- 컨슈머 그룹에 처음으로 합류하는 컨슈머 = 컨슈머 그룹의 리더
- 컨슈머 리더는 어떤 컨슈머가 어떤 파티션을 처리할 지 할당의 책임(PartitionAssignor 클래스 사용)을 가짐.
- 할당 처리 절차가 정해지면 아래에서 설명할 그룹조정자(group coordinator) 에게 파티션 할당 내역을 전송함.
리밸런싱
특정 컨슈머의 파티션 소유권을 다른 컨슈머에게 옮기는 것
- 파티션 4개의 topic에 대해서 어떤 컨슈머 그룹(컨슈머 4개)이 메시지를 소비하고 있다.
- 컨슈머 4개 중 하나의 컨슈머가 장애(하트비트를 세션 타임아웃 시간 내에 전송못함)를 통해서 그룹에서 빠지는 상황 = 컨슈머로서 파티션에서 메시지 소비 자격 해제
- 장애로 판단된 컨슈머의 역할을 대체 할 녀석으로 이전 = 즉 파티션 소유권을 이전 하는 행위를
리밸런싱
이라고 함. - 잦은 리밸런싱 상황이 발생하지 않도록 하트비트 간격 시간과 세션 타임아웃 옵션을 조절하면서 설정하는 것도 필요하다.
그룹조정자(group coordinator)
- 컨슈머가 세션 타임아웃 시간동안 하트비트를 보내는 카프카 브로커이다.
- 브로커 중 하나가 그룹조정자의 역할을 한다.
커밋과 오프셋
- poll(지정된시간동안 대기 ms) 메서드는 호출될 때마다 해당 토픽의 그룹 컨슈머들이
아직 읽지 않은 레코드들을 반환
함. - 즉 카프카의 각 컨슈머들은 파티션별로 자신이 읽는 레코드의 현재 위치를 추적할 수 있음.
커밋(commit)
: 파티션 내부의 현재 위치( 오프셋, offset
)를 변경하는 것.
커밋과 오프셋 이슈
enable.auto.commit
설정 그리고 컨뮤서 상황에 따라서 아래와 같은 이슈가 있을 수 있음.- 마지막으로 커밋된 오프셋이 컨슈머가 가장 최근에 읽고 처리한 메시지의 오프셋보다 작으면 그 사이가 중복처리된다.
- 마지막으로 커밋된 오프셋이 컨슈머가 최근에 읽고 처리한 메시지의 오프셋보다 크면 그 사이는 누락된다.
(2) 컨슈머 커밋 / 오프셋 컨트롤
리밸런싱 리스너(Rebalance Listener)
- p93
컨슈머가 파티션의 소유권을 잃게 되는 상황에서 마지막 메시지의 오프셋을 커밋
- 카프카 컨슈머 APi에서 컨슈머 파티션이 추가나 제거될 때, 코드가 실행되도록 지원하는 부분이 있다.
subscribe()
메서드를 호출할 때 ConsumerRebalanceListener
인터페이스를 구현하는 객체를 인자로 전달하면 된다.ConsumerRebalanceListener
인터페이스는 아래의 두 가지 메서드를 정의하며, 실제 우리가 구현하면된다.
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
: 리밸런싱이 시작되기 전에, 컨슈머가 메시지를 읽고 처리하는 것을 중단하고 호출된다.
오프셋을 커밋해야하는 부분이 이 메소드이다.
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
: 이 메서드는 파티션이 브로커에게 재할당된 후에, 컨슈머가 파티션을 새로 할당받아 메시지를 소비하기 전에 호출되는 메소드이다.
특정 오프셋 지정 레코드 소비 - 기본
- 현재 오프셋에서 이전으로 돌아가서 재 소비를 가능하게 하거나
- (시간에 민감한 앱의 경우) 소비가 딜레이가 일어날 때, 미소비된 메시지를 건너뛰는 경우에 사용할 수 있다.
특정 오프셋 지정 레코드 소비 - 읽는 순서 보장
- 파티션을 하나로 한다
- 트랜잭션으로 관리하는 것도 방법이 있음
- 기본적으로 다중 파티션을 사용하면서 순서보장하는 것이 쉽지 않음.
- 전제 조건이 있다. (1) 레코드와 오프셋이 모두 커밋이 되거나 (2) 둘다 커밋이 안되는 것이 보장되어야 함
- 데이터베이스에 저장되는 경우에
레코드
DB저장, 오프셋
도 DB에 저장함.
읽는 순서 보장 그래서 어떻게? p96
- 위에서 설명한 리밸런싱 리스너와 seek()을 동시에 사용한다
ConsumerRebalanceListener
의 인터페이스를 구현할 때 onPartitionsAssigned
메소드에서 consumer.seek()
을 호출해준다seek()
: 새로 할당된 파티션의 오프셋들을 찾음.
(3) 컨슈머 옵션
컨슈머 옵션
bootstrap.servers
- Kafka에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보
- 리스트 전체를 입력하는 방식을 권장함
fetch.min.bytes
- 한 번에 가져올 수 있는 최소 데이터 사이즈
- 지정한 사이즈보다 작은 경우 데이터가 누적될 때까지 기다림.
group.id
- Consumer가 속한 group을 식별하는 ID
enable.auto.commit
- false 하면 애플리케이션에서 요구할 때만 오프셋이 커밋됨.
- background로 offset을 주기적으로 commit.
auto.offset.reset
- 초기 offset이 없거나 데이터가 삭제된 경우 다음 옵션으로 reset.
- earliest : 가장 초기의 offset 값으로 설정
- latest : 가장 마지막 offset 값으로 설정
- none : 이전 offset을 못 찾으면 오류 발생
fetch.max.bytes
request.timeout.ms
session.timeout.ms
- Consumer와 broker 사이의 세션 timeout(기본값 10초)
- heartbeat를 timeout이 지나도 못 받으면 장애로 판단. consumer group은 rebalance를 시도.
heartbeat.interval.ms
- KafkaConsumer가 poll() 메소드로 얼마나 자주 heartbeat를 보낼 것인지 조정. 일반적으로
session.timeout.ms
1/3(기본값 3초)
max.poll.records
- 단일 호출 poll()에 대한 최대 레코드 수를 조정
max.poll.interval.ms
- 주기적으로 poll을 호출하지 않으면 장애라고 판단. group에서 제외한 후 다른 Consumer가 해당 partition에서 메시지를 가지고 갈 수 있게 처리.
auto.commit.interval.ms
- default : 5000
- 주기적으로 offset을 commit하는 시간
fetch.max.wait.ms
- fetch.min.bytes 데이터보다 적은 경우 응답을 기다리는 최대시간.