
MQTT 의 이해부터 테스트까지 (feat. POS 연동)
안녕하세요. 백엔드팀손유진입니다. 테이블링은 테이블에서 QR/NFC 를 태그하여 메뉴를 확인하고 주문할 수 있는 테이블오더 서비스를 제공하고 있습니다. 테이블오더로 주문한 내역은 POS 에 자동으로 전달되어 매장은 관리 리소스를 효율적으로 활용할 수 있는 장점을 가지고있습니다. 여기서 우리는 POS Agent 와 테이블링 내부 서버의 안정적인 통신을 위해 MQTT 적용을 고민하고 있습니다. 이에 따라 이 글에서는 MQTT 에 대해 학습한 내용과 테스트 과정을 공유하고자합니다. MQTT 는 낮은 대역폭과 리소스가 제한된 환경에서 안정적으로 통신을 할 수 있는 경량 메시지 프로토콜입니다. 작은 데이터 패킷을 전송하여 효율성이 높고 오버헤드와 전력 소비가 작아 IoT와 M2M 통신에 많이 사용되고있습니다. MQTT 의이해 Publisher/Subscriber 모델 MQTT 는 클라이언트와 브로커로 구성되어 있으며, 클라이언트가 Topic 에 메시지를 발행하면 해당 Topic 을 구독 중인 클라이언트가 메시지를 수신할 수있습니다. 클라이언트는 여러 Topic 을 구독할 수 있고 모든 클라이언트가 발행과 구독을 할 수 있어 Publisher 와 Subscriber 의 구분이 따로없습니다. Publish-Subscribe 방식 Topic은 브로커가 연결된 클라이언트에 대한 메시지를 필터링 하기 위해 사용되며 슬래시(/) 로 구분하는 계층적인 구조를가집니다. Topic 예시 (출처: https://www.hivemq.com/blog/mqtt-essentials-part-5-mqtt-topics-best-practices/) QoS 제어 다양한 QoS(Quality Of Service) 레벨을 지원하여 메시지 전달의 보장을 제어할 수있습니다. QoS 0 Fire andforget. Publisher 가 메시지를 보내고, 이상이 없으면 바로완료. 더 이상 관여하지않음. QoS 1 At leastonce. Publisher 가 메시지를 보내면 Subscriber 가 메시지 확인 응답할 때까지 여러 번전송. 메시지 중복이 허용되는 상황에서사용. QoS 2 Exactly once. 가장 안정적인 메시지 전송이 필요할 때사용. 메시지가 브로커에 정확히 한 번전달됨. 지속성과 안정성 영구 세션(Persistent Session)을 통해 클라이언트에 전송되었지만 수신이 확인되지 않은 메시지를 다시 전달할 수있습니다. 영구 세션을 활성화 하기 위해서는 MQTT 버전에 따라 클라이언트 옵션에 차이가 있지만 공통적으로 클린 세션(Clean Session)을 비활성화 하고 메시지의 QoS 레벨을 1이상으로 전송할 때 메시지가 다시전달됩니다. MQTT 버전 별 CleanSession 설정 (출처: https://www.emqx.com/en/blog/mqtt5-new-feature-clean-start-and-session-expiry-interval) MQTT 활용예시 MQTT 활용 예시 (출처: https://www.hivemq.com/blog/mqtt-essentials-part-1-introducing-mqtt/) 이 이미지는 모바일 장치로 “open door” 메시지를 발행하면 해당 Topic 을 구독 중인 클라이언트가 수신하여 처리한 후 성공 여부를 응답 Topic에 전달해 모바일 앱에 완료를 알리는 과정을보여줍니다. 패킷의 responseTopic 은 클라이언트가 메시지 발행 후 응답 메시지를 받을 Topic 이고, correlationData 는 메시지에 대한 식별자로사용됩니다. 이러한 Request/Response 패턴은 MQTT 5.0 버전에서 도입되었으며, 실제 서비스에서 클라이언트가 처리 완료 여부를 사용자에게 알리는 데 유용하게활용됩니다. MQTT 통신테스트 목적 테이블오더와 POS 시스템 연동과정 현재 테이블오더가 들어오면 WebSocket 으로 POS 에 설치된 Agent 에게 메시지를 전달하여 주문 정보를 연동하고있습니다. 이 때 소켓 연결이 간헐적으로 끊기면서 메시지를 수신하지 못해 POS 에 주문을 전송하지 못하는 이슈가 있었습니다. 이 문제를 해결하기 위해 MQTT 적용 시 메시지 전달이 신뢰성 있게 보장되는지 중점적으로 테스트해보았습니다. 테스트 환경구성 브로커는 Amazon MQ의 ActiveMQ를 활용하고, NestJS 프레임워크를 이용하여 구현해 보았습니다. (ActiveMQ 는 MQTT v3.1.1 과 v3.1 만지원합니다.) 필수 패키지설치 $ npm i --save @nestjs/microservices $ npm i --save mqtt 2. Client (Publisher) app.module.ts import ( Module ) from '@nestjs/common' import ( AppController ) from './app.controller' import ( ConfigModule, ConfigService ) from '@nestjs/config' import ( ClientProxyFactory, Transport ) from '@nestjs/microservices' import ( PublisherService ) from './service/publisher.service' @Module(( imports: [ConfigModule.forRoot()], controllers: [AppController], providers: [ ( provide: 'MQTT_EXAMPLE', inject: [ConfigService], useFactory: (configService: ConfigService) => ClientProxyFactory.create(( transport: Transport.MQTT, options: ( clientId: 'tabling-mqtt-example-app_01', protocol: configService.get('MQTT_PROTOCOL'), url: configService.get('MQTT_URL'), username: configService.get('MQTT_USERNAME'), password: configService.get('MQTT_PASSWORD'), clean: false, subscribeOptions: ( qos: 2, ), ), )), ), PublisherService, ], )) export class AppModule () publisher.service.ts import ( Inject, Injectable ) from '@nestjs/common' import ( ClientProxy, MqttRecordBuilder ) from '@nestjs/microservices' import ( QoS ) from '@nestjs/microservices/external/mqtt-options.interface' import ( logger ) from '../main' @Injectable() export class PublisherService ( constructor( @Inject('MQTT_EXAMPLE') private readonly mqttClient: ClientProxy, ) () async exec(topic: string, message: string, qos: QoS = 0) ( const record = new MqttRecordBuilder(message).setQoS(qos).build() return this.mqttClient .emit(topic, record) .toPromise() .then(() => ( logger.log( \n -- 메시지 발행 성공 -- Topic: $(topic) Payload: $(message) QoS: $(qos) -----------------------, ) )) .catch((error) => logger.error(error)) ) ) app.controller import ( Body, Controller, Delete, Get, Post ) from '@nestjs/common' import ( PublisherService ) from './service/publisher.service' import ( logger ) from './main' import ( QoS ) from "@nestjs/microservices/external/mqtt-options.interface" @Controller() export class AppController ( constructor( private readonly publisherService: PublisherService ) () @Post('publish') async publishMessage(@Body() body: ( topic: string message: string qos: QoS )) ( const ( topic, message, qos ) = body await this.publisherService.exec(topic, message, qos) return 'SUCCESS!' ) ) 3. Client (Subscriber) main.ts import ( NestFactory ) from '@nestjs/core' import ( Transport, MicroserviceOptions ) from '@nestjs/microservices' import ( AppModule ) from './app.module' import ( ConfigService ) from '@nestjs/config' import ( Logger ) from '@nestjs/common' export const configService = new ConfigService() export const logger = new Logger('MQTT Example') /** * clientId: 고유한 클라이언트ID, 중복된 클라이언트 ID로 연결 시 이전 연결이 종료됩니다. * clean: 영구 세션(Persistent Session) 활성화 여부 * subscribeOptions.qos: 클라이언트가 수신할 수 있는 최대 QoS 레벨 */ const mqttOptions: MicroserviceOptions = ( transport: Transport.MQTT, options: ( clientId: 'tabling-mqtt-example-main_01', protocol: configService.get('MQTT_PROTOCOL'), url: configService.get('MQTT_URL'), username: configService.get('MQTT_USERNAME'), password: configService.get('MQTT_PASSWORD'), clean: false, subscribeOptions: ( qos: 2, ), ), ) async function bootstrap() ( const app = await NestFactory.create(AppModule) app.connectMicroservice(mqttOptions) await app.startAllMicroservices() await app.listen(3000) ) bootstrap() app.controller.ts import ( Body, Controller, Post ) from '@nestjs/common' import ( Ctx, MessagePattern, MqttContext, Payload, Transport ) from '@nestjs/microservices' import ( logger ) from '../../main' @Controller() export class AppController ( constructor() () @MessagePattern('tabling/lounge/light', Transport.MQTT) handleMessage(@Payload() data: number[], @Ctx() context: MqttContext) ( logger.log(\n -- 메시지 수신 성공 -- Topic: $(context.getTopic()) Payload: $(data.toString()) ---------------------) ) ) 결과 테스트 로그 이전에 연결했던 클라이언트 ID 로 재연결 시 연결이 끊겨있는 동안 발행되었던 메시지들을 수신하는 것을 확인할 수있었습니다. 마지막으로 공부할 때 제가 궁금했던 점과 발표 중 나왔던 질문에 대한 답변을정리했습니다. [Q1] WebSocket 통신과 무슨 차이점이있나요? WebSocket 은 웹 브라우저와 서버 간의 양방향 통신을 위해 사용하는 프로토콜로 MQTT 와는 사용 목적이다릅니다. 만약 MQTT 를 브라우저 환경에서 사용한다면 브라우저는 기본적으로 MQTT 를 지원하지 않기 때문에 WebSocket 을 통해 브라우저에 MQTT 메시지를 보내야 합니다. (많은 MQTT 클라이언트 라이브러리에서 WebSocket 을지원합니다.) 또 우리의 요건 측면에서 큰 차이점은 WebSocket 은 QoS 옵션과 메시지 대기열을 지원하지 않기 때문에 메시지의 전달을 보장하기 어렵다는 점이있습니다. [Q2] Message Queue와 무슨 차이점이있나요? Message Queue는 메시지 전달을 보장하기 위해서 1:1 통신을 해야하는 반면 MQTT는 Pub/Sub 모델로 여러 구독자가 모두 동일한 메시지를 수신할 수있습니다. ActiveMQ 의 경우 Topic 뿐만 아니라 Queue 로도 메시지를 전달할 수 있기 때문에 목적에 맞게 적절한 방식을 고려할 수있습니다. [Q3] Kafka 와 무슨 차이점이있나요? Kafka 는 대용량의 데이터 스트림 처리를 위해 사용하는 플랫폼으로 실시간 데이터 처리 및 저장이 필요한 대규모 애플리케이션에적합합니다. 대량의 IoT 데이터를 처리 및 저장하여 보다 정밀한 분석을 하기 위해 MQTT와 Kafka를 결합하여 사용하기도 하며, MQTT 브로커도 스케일링이 가능하고 대용량 메시지 처리와 로드 밸런싱을 지원하기 때문에 상황에 따라 적절한 대응 방법을 선택할 수있습니다. 마치며 메시지 기반으로 통신하는 방식이 다양하기 때문에 그중 하나인 MQTT 에 대해 알아가시는 데에 도움이 되었길 바라며, 가볍게 통신 테스트할 때는 무료로 제공되는 Public Broker도 활용할 수 있다는 소소한 정보공유드립니다. 현재는 적용 여부를 검토 중인 단계이지만 적용 후 긍적적인 결과를 기대하며 마무리하겠습니다. 긴 글 읽어주셔서감사합니다! 테이블링 백엔드팀에서는 기술적인 고민을 함께 나누며, 서비스를 지속적으로 발전시켜 갈 실력 있는 동료분을 모시고 있습니다. 보다 상세한 내용은 채용 공고를 확인 부탁드리며, 많은 관심과 지원 부탁드리겠습니다. References https://www.hivemq.com/blog/mqtt-essentials-part-1-introducing-mqtt/ https://www.emqx.com/en/blog/how-to-use-mqtt-in-nodejs#introduction https://docs.nestjs.com/microservices/mqtt https://ably.com/topic/mqtt-vs-websocket https://www.hivemq.com/blog/mqtt-vs-kafka-real-time-bidirectional-data-processing/ MQTT 의 이해부터 테스트까지 (feat. POS 연동) was originally published in 테이블링 기술블로그 on Medium, where people are continuing the conversation by highlighting and responding to this story.
