[More Kafka]3-1. 카프카 프로듀서

[More Kafka]3-1. 카프카 프로듀서


spring boot 기반 producer 코드는 아래에 있습니다. 참고로, spring cloud stream kafka를 사용하지 않은 예제입니다. https://github.com/twowinsh87/spring-kafka2019/tree/master/springkafka01

도커 내부 프로듀서에서 메세지 보내기

(1) test 토픽의 메세지 보내기

$ docker exec -it cp-kafka-1 /bin/bash

  • cp-kafka-1 docker contatiner 로 접속

root@c94e160402b5:/# usr/bin/kafka-console-producer --broker-list cp-kafka-2:29092 --topic test

>first
[2019-09-18 03:20:52,847] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-09-18 03:20:52,950] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>second
  • 바로 cp-kafka-2 로 test 명으로 된 토픽을 보내자

(2) WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 이게 뭐지?

  • 기본적으로 docker-compose.yml 을 작성할 때, topic이 생성되어 있지 않다면, 자동으로 생성해주는 auto.create.topics.enable 을 옵션으로 주지 않았다. 왜냐면 기본값이 true 이기 때문. 그래서 자동으로 없는 topic에 대해서 생성을 해주는데
  • 저 위의 문구는 현재 test 토픽에 대한 리더가 선출되지 않았기 때문에 발생한다.

(3) “test-2” topic을 생성하고 producer가 메세지를 보내면?

root@c94e160402b5:/# usr/bin/kafka-topics --create --zookeeper cp-zookeeper-1:12181 --replication-factor 3 --partitions 3 --topic test-2
Created topic test-2.

root@c94e160402b5:/# usr/bin/kafka-console-producer --broker-list cp-kafka-2:29092 --topic test-2`
>please
>clean
  • 위와 같은 WARN 로그가 발생하지 않았음.

CLI로 topic 정보 확인하기

root@c94e160402b5:/# /usr/bin/kafka-topics --describe --zookeeper cp-zookeeper-1:12181 --topic test
Topic:test	PartitionCount:3	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: test	Partition: 1	Leader: 2	Replicas: 2	Isr: 2
	Topic: test	Partition: 2	Leader: 3	Replicas: 3	Isr: 3

root@c94e160402b5:/# /usr/bin/kafka-topics --describe --zookeeper cp-zookeeper-1:12181 --topic test-2
Topic:test-2	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test-2	Partition: 0	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1
	Topic: test-2	Partition: 1	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2
	Topic: test-2	Partition: 2	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
  • test topic과 test-2 토픽은 다르게 보인다.
  • test 토픽은 docker-compose에서 셋팅 해준 KAFKA_NUM_PARTITIONS: 3 에 따라서 PartitionCount 가 3 ReplicationFactor 는 kafka 설정의 기본값에 의해서 1 로 설정되었다.

  • test-2 토픽은 위에서 토픽을 생성해준 것과 같이 usr/bin/kafka-topics --create --zookeeper cp-zookeeper-1:12181 --replication-factor 3 --partitions 3 --topic test-2 replication-factor 3 과 partitions 3 인 것을 알 수 있다.

topic 정보의 해석

Topic:test-2	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test-2	Partition: 0	Leader: 3	Replicas: 3,2,1	Isr: 3,2,1
	Topic: test-2	Partition: 1	Leader: 1	Replicas: 1,3,2	Isr: 1,3,2
	Topic: test-2	Partition: 2	Leader: 2	Replicas: 2,1,3	Isr: 2,1,3
  • 예를 들어서 위에서 test-2 토픽의 please 라는 value를 생각해보자
  • Topic: test-2 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 이 말은 test-2 로 들어온 토픽은 3개의 파티셔닝을 이루는데, 각각의 파티셔닝된 것을 각 브로커(cp-kafka-1, cp-kafka-2, cp-kfak-3)에 분산 저장(파티셔닝)된다.
  • 그 분산 저장(파티셔닝)된 토픽이 또 3개의 복제를 가지는데, Partition: 0 으로 3개 분산 복제된 것 중 leader 의 위치가 broker.3 = cp-kaka-3 라는 것이다.
  • Replicas: 3,2,1 는 해당 test-2 토픽이 분산 저장(파티셔닝)되는데 이 분산 저장된 것 자체를 복제하며, broker.3(cp-kafka-3)가 leader 인 것을 확인했으니, broker.2(cp-kafka-2) , broker.1(cp-kafka-2) 은 broker.3 인 리더가 죽는 경우를 대비한 팔로워(예비 리더) 정도의 역할을 하며, 해당 토픽에 대한 consumer 의 요청에 응대할 수 있게 된다.

미리 해보는 Consumer

  • cp-kafka-1, cp-kafka-2, cp-kafka-3, 아무 곳이나 접속
cp-kaka-2
root@297d1c93c60c:/# usr/bin/kafka-console-consumer --bootstrap-server cp-kafka-2:29092 --topic test-2 --from-beginning
please
clean

//cp-kaka-3에 데이터 요청
root@297d1c93c60c:/# usr/bin/kafka-console-consumer --bootstrap-server cp-kafka-3:39092 --topic test-2 --from-beginning
please
clean
  • 잘 가져오는 것을 알 수 있다.

도커 외부(호스트 pc) 프로듀서에서 메세지 보내고 받기.

Host PC 에서 다운로드

$ wget http://apache.mirror.cdnetworks.com/kafka/2.3.0/kafka_2.12-2.3.0.tgz
$ tar zxf kafka_2.12-2.3.0.tgz

kafka-producer-console 이용해서 메세지 보내기

./bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic external-test
  • 그런데 오류가 난다. 전송할 수 없음.

이유는?

(Note: Do not use localhost or 127.0.0.1 as the host ip if you want to run multiple brokers.)

  • 라는 글을 보았다. 사실 도커에 의한 이슈인지 네트워크 문제인지 이 부분을 잘 알 수가 없다
  • 참고하기

host file을 수정하자

  • $ sudo vi /etc/hosts/
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
#below original
127.0.0.1       LGTwinsV3.local
255.255.255.255	broadcasthost
::1             localhost
127.0.0.1	localhost kafka
  • 127.0.0.1 옆에 한칸 띄우고 kafka 네임 설정

(중요) docker-compose.yml 수정

cp-kafka-1:
(생략)
KAFKA_LISTENERS: INTERNAL://cp-kafka-1:19092,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://cp-kafka-1:19092,EXTERNAL://kafka:9092

cp-kafka-2:
(생략)
KAFKA_LISTENERS: INTERNAL://cp-kafka-2:29092,EXTERNAL://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://cp-kafka-2:29092,EXTERNAL://kafka:9093

cp-kafka-3
(생략)
KAFKA_LISTENERS: INTERNAL://cp-kafka-3:39092,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://cp-kafka-3:39092,EXTERNAL://kafka:9094

다시 producer 메세지 발행 실행 그리고 consumer 체크

$ ./bin/kafka-console-producer.sh --broker-list kafka:9092,kafka:9093,kafka:9094 --topic external-test
>ex-1
[2019-09-19 10:58:41,801] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {external-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2019-09-19 10:58:41,950] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {external-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>ex-2
>ex-3
  • 최초 리더 설정이 안되어서 나는 WARN 메세지 이외에는 잘 작동된다.
$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic external-test --from-beginning
ex-1
ex-2
ex-3