[More Kafka]3-1. 카프카 프로듀서
spring boot 기반 producer 코드는 아래에 있습니다. 참고로, spring cloud stream kafka를 사용하지 않은 예제입니다. https://github.com/twowinsh87/spring-kafka2019/tree/master/springkafka01
도커 내부 프로듀서에서 메세지 보내기
(1) test 토픽의 메세지 보내기
$ docker exec -it cp-kafka-1 /bin/bash
- cp-kafka-1 docker contatiner 로 접속
root@c94e160402b5:/# usr/bin/kafka-console-producer --broker-list cp-kafka-2:29092 --topic test
>first
[2019-09-18 03:20:52,847] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-09-18 03:20:52,950] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>second
- 바로 cp-kafka-2 로
test
명으로 된 토픽을 보내자
(2) WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
이게 뭐지?
- 기본적으로 docker-compose.yml 을 작성할 때, topic이 생성되어 있지 않다면, 자동으로 생성해주는
auto.create.topics.enable
을 옵션으로 주지 않았다. 왜냐면 기본값이true
이기 때문. 그래서 자동으로 없는 topic에 대해서 생성을 해주는데 - 저 위의 문구는 현재 test 토픽에 대한 리더가 선출되지 않았기 때문에 발생한다.
(3) “test-2” topic을 생성하고 producer가 메세지를 보내면?
root@c94e160402b5:/# usr/bin/kafka-topics --create --zookeeper cp-zookeeper-1:12181 --replication-factor 3 --partitions 3 --topic test-2
Created topic test-2.
root@c94e160402b5:/# usr/bin/kafka-console-producer --broker-list cp-kafka-2:29092 --topic test-2`
>please
>clean
- 위와 같은 WARN 로그가 발생하지 않았음.
CLI로 topic 정보 확인하기
root@c94e160402b5:/# /usr/bin/kafka-topics --describe --zookeeper cp-zookeeper-1:12181 --topic test
Topic:test PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 2 Leader: 3 Replicas: 3 Isr: 3
root@c94e160402b5:/# /usr/bin/kafka-topics --describe --zookeeper cp-zookeeper-1:12181 --topic test-2
Topic:test-2 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-2 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test-2 Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-2 Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
test
topic과test-2
토픽은 다르게 보인다.test
토픽은 docker-compose에서 셋팅 해준KAFKA_NUM_PARTITIONS: 3
에 따라서PartitionCount 가 3
ReplicationFactor
는 kafka 설정의 기본값에 의해서1
로 설정되었다.test-2
토픽은 위에서 토픽을 생성해준 것과 같이usr/bin/kafka-topics --create --zookeeper cp-zookeeper-1:12181 --replication-factor 3 --partitions 3 --topic test-2
replication-factor 3 과 partitions 3 인 것을 알 수 있다.
topic 정보의 해석
Topic:test-2 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test-2 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: test-2 Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test-2 Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
- 예를 들어서 위에서
test-2
토픽의 please 라는 value를 생각해보자 Topic: test-2 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
이 말은test-2
로 들어온 토픽은 3개의 파티셔닝을 이루는데, 각각의 파티셔닝된 것을 각 브로커(cp-kafka-1, cp-kafka-2, cp-kfak-3)에 분산 저장(파티셔닝)된다.- 그 분산 저장(파티셔닝)된 토픽이 또 3개의 복제를 가지는데,
Partition: 0
으로 3개 분산 복제된 것 중leader
의 위치가 broker.3 =cp-kaka-3
라는 것이다. Replicas: 3,2,1
는 해당test-2
토픽이 분산 저장(파티셔닝)되는데 이 분산 저장된 것 자체를 복제하며, broker.3(cp-kafka-3)가 leader 인 것을 확인했으니, broker.2(cp-kafka-2) , broker.1(cp-kafka-2) 은 broker.3 인 리더가 죽는 경우를 대비한팔로워(예비 리더)
정도의 역할을 하며, 해당 토픽에 대한consumer
의 요청에 응대할 수 있게 된다.
미리 해보는 Consumer
- cp-kafka-1, cp-kafka-2, cp-kafka-3, 아무 곳이나 접속
cp-kaka-2
root@297d1c93c60c:/# usr/bin/kafka-console-consumer --bootstrap-server cp-kafka-2:29092 --topic test-2 --from-beginning
please
clean
//cp-kaka-3에 데이터 요청
root@297d1c93c60c:/# usr/bin/kafka-console-consumer --bootstrap-server cp-kafka-3:39092 --topic test-2 --from-beginning
please
clean
- 잘 가져오는 것을 알 수 있다.
도커 외부(호스트 pc) 프로듀서에서 메세지 보내고 받기.
Host PC 에서 다운로드
$ wget http://apache.mirror.cdnetworks.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
$ tar zxf kafka_2.12-2.3.0.tgz
kafka-producer-console 이용해서 메세지 보내기
./bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic external-test
- 그런데 오류가 난다. 전송할 수 없음.
이유는?
(Note: Do not use localhost or 127.0.0.1 as the host ip if you want to run multiple brokers.)
- 라는 글을 보았다. 사실 도커에 의한 이슈인지 네트워크 문제인지 이 부분을 잘 알 수가 없다
- 참고하기
host file을 수정하자
$ sudo vi /etc/hosts/
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting. Do not change this entry.
##
#below original
127.0.0.1 LGTwinsV3.local
255.255.255.255 broadcasthost
::1 localhost
127.0.0.1 localhost kafka
- 127.0.0.1 옆에 한칸 띄우고 kafka 네임 설정
(중요) docker-compose.yml 수정
cp-kafka-1:
(생략)
KAFKA_LISTENERS: INTERNAL://cp-kafka-1:19092,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://cp-kafka-1:19092,EXTERNAL://kafka:9092
cp-kafka-2:
(생략)
KAFKA_LISTENERS: INTERNAL://cp-kafka-2:29092,EXTERNAL://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://cp-kafka-2:29092,EXTERNAL://kafka:9093
cp-kafka-3
(생략)
KAFKA_LISTENERS: INTERNAL://cp-kafka-3:39092,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://cp-kafka-3:39092,EXTERNAL://kafka:9094
다시 producer 메세지 발행 실행 그리고 consumer 체크
$ ./bin/kafka-console-producer.sh --broker-list kafka:9092,kafka:9093,kafka:9094 --topic external-test
>ex-1
[2019-09-19 10:58:41,801] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {external-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-09-19 10:58:41,950] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {external-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>ex-2
>ex-3
- 최초 리더 설정이 안되어서 나는 WARN 메세지 이외에는 잘 작동된다.
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic external-test --from-beginning
ex-1
ex-2
ex-3