[Kafka]프로듀서(producer) 이해, 팁, 옵션
이 글은 카프카, 데이터 플랫폼의 최강자 고승범/공용준 님의 책을 공부하며
정리하는 글입니다.
프로듀서 이해, 팁, 옵션
프로듀서 코드, 팁 그리고 옵션에 대해서 알아보자
각자의 언어로 프로듀서를 만들기
- 현재 참고하는 책에서는 자바와 파이썬을 이용한 예제를 보여주고 있음
- 우리의 프로젝트는 Scala를 사용할 것이므로 여기에서 코드를 다루지는 않음
- 프로듀서를 만들 때 중요한 부분에 대해서만 정리
Send()
실제 운영환경에서 가장 중요한 부분은 send() 함수이다
Send()를 통해 메시지를 보내는 방식은 3가지 이다
- 메시지를 보내고 확인하지 않기
- 카프카가 항상 살아있는 상태이고, 프로듀서가 자동으로 재전송
- 대부분 성공적으로 전송되지만, 일부 메시지는 손실될 수도 있다.
- 이 방식은 메시지 손실 가능성이 있기 때문에 일반적으로 사용하지 않는다.
- send()는 자바 퓨처(future) 객체로 RecordMetadata를 리턴 받지만 성공여부는 알 수 없다
- 동기 전송
- send() 메소드의 Future 객체를 리턴하고, get()메소드를 통해 성공 여부를 알 수 있다.
send(). get(); system.out.printf("Partition: %d, Offset: %d, metadata.partition(), \ metadata.offset()); //get(); 을 통해서 RecordMetadata를 얻고, partition, offset 정보를 받을 수 있음을 알 수 있다.
- 비동기 전송
- send() 메소드를 콜백과 같이 호출하고, 카프카 브로커에서 응답을 받으면 콜백한다.
- 보낸 모든 메시지의 응답을 기다린다면 많은 시간이 소요되는데 비동기의 경우에는 속도를 보장한다.
- 또한 메시지를 보내지 못하거나 예외를 처리하게해 에러를 기록하는 등 에러 로그 등에 기록할 수 있다.
- 구현을 위해서는 org.apache.kafka.clients.producer.Callback 클래스를 임포트한다.
//java class PeterCallback implements Callback { public void onCompletion(RecordMetadata metadata, Exception exception) { if(metadata != null) { System.out.println("Partition: " + metadata.partition() + ", Offset : " + metadata.offset() + " "); } else { exception.printStackTrace(); } } }
- 지금 else에서는 단순 예외처리만 해주었는데 metadata == null 에 대한 추가적인 예외처리를 해야한다.(에러 로그기록 등등)
- 메시지를 보내고 확인하지 않기
send()를 for loop으로 감싸기 p152
for loop을 이해서 동일한 메시지를 여러개 보내자
for(int i = 1; i < 11; i++) send(~~~~~~~~~~~~~~~~).
메시지가 크다면 압축 옵션도 사용하자
프로듀서 주요 옵션
- bootstrap.servers
- 정의된 포맷: kafka01:9092, kafka02:9092 …..
- 전체 카프카 리스트를 적어주는 것을 권장. 하나만 사용시 장애 발생시 불능
- acks
- 프로듀서가 카프카 토픽의 리더에게 메시지를 보낸 후 요청을 완료 전 ack(승인)의 수
- 해당 옵션이 작으면 성능이 좋으나 메시지 손실 가능성이 있다.
- 정의 된 포맷: ack, 0 인 경우에는 어떠한 ack도 기다리지 않는다.
- ack, 1 은 리더는 데이터를 기록하지만, 모든 팔로워는 확인하지 않는다. 손실가능성 있음.
- acks, all 또는 -1 은 리더는 ISR 그룹의 모든 팔로워로부터 데이터에 대한 ack를 기다림.
- best 권장: 프로듀서는 ack=all, 브로커의 min.insync.relicas의 옵션은 2, 리플리케이션 팩터=3이다. 즉 리더신호 하나 팔로워신호 하나만 총 2개만 ack 되어도 손실 없이 유지할 수 있다.
- buffer.memory
- 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 등)할 수 있는 전체 메모리 바이트(byte)이다.
- compression.type
- 압축하는 것이며 옵션으로 none, gzip,snappy,lz4 등 포맷 중 선택한다.
- retries
- 일시적인 오류로 인해 전송 실패 데이터를 다시 보낸다.
- batch.size
- 프로듀서는 같은 파티션으로 보내는 여러 데이터를 배치로 보내려 함.
- 설정된 배치 크기(byte)를 넘는 경우에는 시도하지 않는다.
- 고가용성 메시지의 경우라면 배치 사이즈를 주지 않는 것도 방법이다.
- linger.ms
- 배치형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간
- 결국 배치를 전부 채우고 가게해서 효율성을 높이는데 좋은 듯함
- 물론 배치 사이즈에 도달하지 않고 시간이 지나면 메시지는 전송 됨.
- 0이 기본 값(지연 없음)이며, 0보다 큰 값을 설정하면 지연 시간은 발생되나 처리량은 좋아진다.
- max.request.size
- 프로듀서가 보낼 수 있는 최대 메시지 바이트 사이즈. 기본값은 1mb
- 카프카는 메가바이트 이상을 처리하기에 적합한 고속 큐다! 1mb보다 줄일필요가 있겠는가?
- ssl, kerberos 등 옵션은 공식문서 아래를 참고하자
카프카 프로듀서 공식문서
한줄기억하기
프로듀서 코드 짤 때, send()에서 예외처리해라, 배치처리를 위해서 for loop으로 감싸라
/kafka/config/server.properties 에서 min.insync.relicas=2, 리플리케이션 팩터=3으로 해라 ack 신호는 2개를 받아야 하는데, 리플리케이션 팩터가 1개라면 안되지 안겠는가?
프로듀서 코드를 짜거나 테스트용 프로듀스 토픽 생성시, acks=all
용어 정리
고가용성: 오랜시간 정상적으로 지속적인 운영이 가능한 상황을 말함.