반응형
kafka 다운로드 링크
Apache Kafka
정의 : 분산 스트리밍 플랫폼
다운로드
압축해제 : windows도 tar 명령어 지원 없을 시 wsl2를 설치해서 하셔도 됩니다.
tar xvf kafka_2.13-2.8.0
kafka 명령어 및 설정 파일
config 폴더
- 여러 properties(설정) 파일이 존재
Zookeeper 및 카프카 서버 구동
- linux/mac : bin 폴더 내에 sh 파일로 실행 파일 존재
- windows : bin/windows 폴더 내에 bat파일로 실행 파일 존재
- 현재 실습은 windows 기준으로 하겠습니다. mac/linux인 경우에는 중간에 windows 만 빼시면 다 작동합니다.
# zookeeper 실행
bin/windows/zookeeper-server-start.bat config/zookeeper.properties
# kafka broker 서버 구동
bin/windows/kafka-server-start.bat config/server.properties
Topic 생성/목록/정보 확인
# Topic 생성
bin/windows/kafka-topics.bat --create --topic ${topic.name} --bootstrap-server ${domain}:${port} --partitions ${cluster.number}
# Topic 목록 확인
bin/windows/kafka-topics.bat --bootstrap-server ${domain}:${port} --list
# Topic 정보 확인
bin/windows/kafka-topics.bat --describe --topic ${topic.name} --bootstrap-server ${domain}:${port}
- partitions : multi clustering을 작성할 때 몇 개의 cluster에 분산해서 저장할지 설정하는 옵션
메시징 생성과 소비
# 생성
bin/windows/kafka-console-producer.bat --broker-list ${domain}:${port} --topic ${topic.name}
# 소비
bin/windows/kafka-console-consumer.bat --bootstrap-server ${domain}:${port} --topic ${topic.name} --from-beginning
- from-beginning 옵션 : 처음부터 끝까지 전부 데이터를 가져오는 옵션
Kafka Connector를 사용한 설정
Kafka Connector 설치
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.0.1.tar.gz
JDBC Connector 설치
- 링크 접속
- 파일 위치들
- confluent 폴더에
etc/kafka/connect-distributed.properties
에서 confluent-jdbc-connector 항목 추가
- mysql 과 kafka 연동을 위해서 mysql connector 라이브러리를 confluent 폴더에 `share/java/kafka`로 이동
- mysql connector 는 이슈가 존재합니다.(링크)
- maria-db-client를 추가해주셔 합니다.
Kafka Connect 실행
bin/windows/connect-distributed.bat etc/kafka/connect-distributed.properties
- zookeeper, broker 서버는 구동된 상태에서 진행해야 합니다.
- windows kafka connect 이슈발생
Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'
- 해결법
- confluent 폴더에서
bin/windows/kafka-run-class.bat
의 파일을 수정
- confluent 폴더에서
Kafka Source와 Sink
데이터를 가져와서 broker에 저장합니다. source와 연결된 sink의 target 프로그램으로 전달
- DB table 구조
CREATE SCHEMA `my_db`;
create table users (
id bigint auto_increment primary key,
user_id varchar(20),
pwd varchar(200),
name varchar(255),
created_at datetime
);
Source 등록
- 정의 : 데이터를 등록하면 broker 서버에 등록하는 역할
echo '
{
"name" : "my-soruce-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://localhost:3306/my_db",
"connection.user": "root",
"connection.password" : "root",
"mode" : "incrementing",
"incrementing.column.name" : "id",
"table.whitelist" : "users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
- postman이나 curl을 통해서 전달 가능
- whilte list
- db insert나 어떤 이벤트가 발생시 my_topic에 있는 topic에 넣어준다 prepix뒤에 whitelist 이름이 붙어서 실행
- topic : my_topic_users
- mode
- incrementing : data가 등록 되면서 자동으로 증가
- 목록 확인 방법 :
curl http://localhost:8083/connectors
- connect 확인 :
curl [http://localhost:8083/connectors/${end-point-name:my-source-connect}/status
- table 데이터 update 발생
insert into users(user_id, pwd, name) values('test3', 'pwd', 'TEST buy');
추가- 새로운 토픽 생성
- Subscriber로 데이터 확인
Sink 등록
- 정의 : 등록된 데이터를 target application으로 전송하는 역할
echo '
{
"name" : "my-sink-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://localhost:3306/my_db",
"connection.user": "root",
"connection.password" : "root",
"auto.create" : "true",
"auto.evolve" : "true",
"delete.enabled" : "false",
"tasks.max" : "1",
"topics" : "my_topic_users"
}
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"
- topic을 table로 생성해서 추가
- source로 data를 추가하고, sink로 subscribe해서 target (mysql)에 전달해서 정보를 update하는 형태이다 -> 데이터 싱크를 맞추는 형태
반응형
'Spring > MSA' 카테고리의 다른 글
Spring Cloud Contract를 이용한 Test 코드 작성 (0) | 2022.09.17 |
---|---|
Spring cloud config 설정 (0) | 2021.06.25 |
Gradle Multi Module 프로젝트 (2) | 2021.06.12 |