Typescript로 Kafka Consumer 만들기
NestJS는 Typescript 언어를 기본으로하는 Node.js기반의 프레임워크 입니다. ExpressJS를 기반으로하고 있어서 웹 백엔드 개발에 필요한 기능들을 잘 지원하고 있고 Microservice기능들을 통해 손쉽게 Kafka와 연동을 할 수 있습니다. 그럼 NestJS 프레임워크를 사용하여 consumer를 만들어 봅니다
NestJS는 CLI를 통해 설치 가능합니다
npm i -g @nestjs/cli
Consumer 프로젝트를 생성합니다
nest new consumer
아래와 같이 프로젝트 생성이 진행되고
패키지 관리를 npm으로 할 것인지 yarn으로 할 것 인지 선택하는 항목이 나오는데
전 주로 yarn 으로 관리합니다 (편한것으로 하셔도 됩니다)
~/dev-woo $ nest new consumer
⚡ We will scaffold your app in a few seconds..
CREATE consumer/.eslintrc.js (665 bytes)
CREATE consumer/.prettierrc (51 bytes)
CREATE consumer/README.md (3340 bytes)
CREATE consumer/nest-cli.json (118 bytes)
CREATE consumer/package.json (1993 bytes)
CREATE consumer/tsconfig.build.json (97 bytes)
CREATE consumer/tsconfig.json (546 bytes)
CREATE consumer/src/app.controller.spec.ts (617 bytes)
CREATE consumer/src/app.controller.ts (274 bytes)
CREATE consumer/src/app.module.ts (249 bytes)
CREATE consumer/src/app.service.ts (142 bytes)
CREATE consumer/src/main.ts (208 bytes)
CREATE consumer/test/app.e2e-spec.ts (630 bytes)
CREATE consumer/test/jest-e2e.json (183 bytes)
? Which package manager would you ❤️ to use?
npm
❯ yarn
pnpm
src/main.ts 파일을 열어서 다음과 같이 수정합니다
app실행시 microservices모듈을 통해 kafkaOptions을 적용시킵니다 (Transport.KAFKA)
import { NestFactory } from '@nestjs/core';
import { KafkaOptions, MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const microserviceOptions: KafkaOptions = {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
};
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
microserviceOptions,
);
await app.listen();
}
bootstrap();
그리고 필요한 패키들을 추가합니다
yarn add @nestjs/microservices
yarn add kafkajs
src/app.module.ts를 열어 다음과 같이 수정합니다
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'dev-woo',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'dev-woo',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'dev-woo',
},
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule { }
Kafka client 모듈 정보를 입력합니다. 1편, Kafka 설치 관련 글에서 Kafka 실행시 broker의 port를 9092로 실행 하였으므로 여기서는 localhost:9092로 설정합니다
현재는 Kafka 설정에 대한 환경변수들을 하드코딩해서 진행을 하지만 4편에서 환경변수들을 .env파일을 활용하여 stage/production 등의 환경에서 달라지게 하는식으로 이를 개선 할 예정입니다
src/app.controller.ts 파일을 열어 다음과 같이 수정합니다
import {Controller} from '@nestjs/common';
import {
Ctx,
KafkaContext,
MessagePattern,
Payload,
} from '@nestjs/microservices';
@Controller()
export class AppController {
constructor() {
}
@MessagePattern('TestTopic')
readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const response =
`TestTopic의 메시지: ` +
JSON.stringify(originalMessage.value);
console.log(response);
return response;
}
}
Controller에서 Kafka에서 보내는 Message를 받는것을 수행합니다.
@MessagePattern 데코레이터에 받고자하는 Topic이름을 적어두면 해당 Topic에 이벤트가 발생했을때 이를 받아 올 수 있습니다.
이번에는 1편에 만들어둔 TestTopic을 작성해서 합니다
작성이 완료되었으면 이제 제대로 동작하나 테스트를 해봅니다
Testing
consumer 프로젝트를 실행합니다
yarn start
LOG [NestMicroservice] Nest microservice successfully started +2ms
라는 메시지가 나오면 실행이 성공했으며 TestTopic의 메시지를 받을 준비를 합니다.
TestTopic에 메시지를 전송하는것을 kafka-console을 사용해서 진행합니다
shell에서 아래에 명령어를 실행합니다
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TestTopic > /dev/null
consumer 프로젝트에서 다음과 같이 "Hello, World" 메시지를 받으면 성공입니다
References
'DevOps' 카테고리의 다른 글
windows를 위한 Python 설치관련도구들 (0) | 2023.03.21 |
---|---|
파이썬3 (python3) 설치 및 윈도우 (0) | 2023.03.21 |
Discord는 수백만의 사용자 메시지를 어떻게 처리할까? (0) | 2023.03.19 |
올해 최고의 소프트웨어 개발도구 6가지 (0) | 2023.03.17 |
NestJS를 이용한 Typescript로 Kafka 사용해보기 -1 (0) | 2022.12.04 |
댓글