본문 바로가기
Spring/MSA

Kafka 설치와 환경 구축

by clearinging 2021. 7. 5.
반응형

kafka 다운로드 링크

Apache Kafka

정의 : 분산 스트리밍 플랫폼

다운로드

kafka 설치

압축해제 : windows도 tar 명령어 지원 없을 시 wsl2를 설치해서 하셔도 됩니다.

tar xvf kafka_2.13-2.8.0

kafka 명령어 및 설정 파일

config 폴더

  • 여러 properties(설정) 파일이 존재

Zookeeper 및 카프카 서버 구동

  • linux/mac : bin 폴더 내에 sh 파일로 실행 파일 존재
  • windows : bin/windows 폴더 내에 bat파일로 실행 파일 존재
  • 현재 실습은 windows 기준으로 하겠습니다. mac/linux인 경우에는 중간에 windows 만 빼시면 다 작동합니다.
# zookeeper 실행
bin/windows/zookeeper-server-start.bat config/zookeeper.properties

# kafka broker 서버 구동
bin/windows/kafka-server-start.bat config/server.properties

Topic 생성/목록/정보 확인

# Topic 생성
bin/windows/kafka-topics.bat --create --topic ${topic.name} --bootstrap-server ${domain}:${port} --partitions ${cluster.number}
# Topic 목록 확인
bin/windows/kafka-topics.bat --bootstrap-server ${domain}:${port} --list
# Topic 정보 확인
bin/windows/kafka-topics.bat --describe --topic ${topic.name} --bootstrap-server ${domain}:${port}
  • partitions : multi clustering을 작성할 때 몇 개의 cluster에 분산해서 저장할지 설정하는 옵션

메시징 생성과 소비

# 생성
bin/windows/kafka-console-producer.bat --broker-list ${domain}:${port} --topic ${topic.name}

# 소비
bin/windows/kafka-console-consumer.bat --bootstrap-server ${domain}:${port} --topic ${topic.name} --from-beginning
  • from-beginning 옵션 : 처음부터 끝까지 전부 데이터를 가져오는 옵션

Kafka Connector를 사용한 설정

Kafka Connector 설치

  • curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.0.1.tar.gz

JDBC Connector 설치

다운로드 링크

  • 파일 위치들

  • confluent 폴더에 etc/kafka/connect-distributed.properties에서 confluent-jdbc-connector 항목 추가

  • mysql 과 kafka 연동을 위해서 mysql connector 라이브러리를 confluent 폴더에 `share/java/kafka`로 이동
    • mysql connector 는 이슈가 존재합니다.(링크)
    • maria-db-client를 추가해주셔 합니다.

Kafka Connect 실행

bin/windows/connect-distributed.bat etc/kafka/connect-distributed.properties
  • zookeeper, broker 서버는 구동된 상태에서 진행해야 합니다.
  • windows kafka connect 이슈발생
    • Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'
  • 해결법
    • confluent 폴더에서 bin/windows/kafka-run-class.bat의 파일을 수정

추가
실행 후 topic 3개 추가 확인

Kafka Source와 Sink

데이터를 가져와서 broker에 저장합니다. source와 연결된 sink의 target 프로그램으로 전달

  • DB table 구조
CREATE SCHEMA `my_db`;
create table users (
    id bigint auto_increment primary key,
    user_id varchar(20),
    pwd varchar(200),
    name varchar(255),
    created_at datetime
);

Source 등록

  • 정의 : 데이터를 등록하면 broker 서버에 등록하는 역할
echo '
{
    "name" : "my-soruce-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url" : "jdbc:mysql://localhost:3306/my_db",
        "connection.user": "root",
        "connection.password" : "root",
        "mode" : "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist" : "users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
    }
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

Postman사용해서 추가
확인

  • postman이나 curl을 통해서 전달 가능
  • whilte list
    • db insert나 어떤 이벤트가 발생시 my_topic에 있는 topic에 넣어준다 prepix뒤에 whitelist 이름이 붙어서 실행
    • topic : my_topic_users
  • mode
    • incrementing : data가 등록 되면서 자동으로 증가
  • 목록 확인 방법 : curl http://localhost:8083/connectors
  • connect 확인 : curl [http://localhost:8083/connectors/${end-point-name:my-source-connect}/status
  • table 데이터 update 발생
    • insert into users(user_id, pwd, name) values('test3', 'pwd', 'TEST buy'); 추가
    • 새로운 토픽 생성
      topic 생성 확인
    • Subscriber로 데이터 확인
      publiser된 데이터

Sink 등록

  • 정의 : 등록된 데이터를 target application으로 전송하는 역할
echo '
{
    "name" : "my-sink-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url" : "jdbc:mysql://localhost:3306/my_db",
        "connection.user": "root",
        "connection.password" : "root",
        "auto.create" : "true",
        "auto.evolve" : "true",
        "delete.enabled" : "false",
        "tasks.max" : "1",
        "topics" : "my_topic_users"
    }
}
' | curl -X POST -d @- http://localhost:8083/connectors --header "content-Type:application/json"

sink 추가
insert query 실행 후 결과

- topic을 table로 생성해서 추가

- source로 data를 추가하고, sink로 subscribe해서 target (mysql)에 전달해서 정보를 update하는 형태이다 -> 데이터 싱크를 맞추는 형태

반응형

'Spring > MSA' 카테고리의 다른 글

Spring Cloud Contract를 이용한 Test 코드 작성  (0) 2022.09.17
Spring cloud config 설정  (0) 2021.06.25
Gradle Multi Module 프로젝트  (2) 2021.06.12