Skip to content

Commit

Permalink
feat(mqtt): implements the mqtt parser for ogma
Browse files Browse the repository at this point in the history
* feat(mqtt): implements the mqtt parser for ogma

MQTT does not natively provide an option for retrieving the IP address of the client, so the only
way to do it is to add the IP in the paylaod. Sucks, but it's life. Otherwise, super simple
implementation of a parser.

re #18

* ci(all): adds docker-compose for CI

* ci(all): fix docker-compose version
  • Loading branch information
jmcdo29 authored May 17, 2020
1 parent 40c2d0e commit b83b65c
Show file tree
Hide file tree
Showing 12 changed files with 429 additions and 696 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ jobs:
run: yarn lint
- name: test
run: yarn test:cov
- name: Start Docker Compose
run: docker-compose up -d
- name: integration test
run: yarn test:int
- name: Stop Docker Compose
run: docker-compose down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ coverage/
*.tgz

*.txt
*.gql
*.gql
12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: "3.3"
services:
mqtt:
image: eclipse-mosquitto
container_name: mqtt
ports:
- "1883:1883"
# rabbitmq:
# kafka:
# grpc:
# nats:
# redis:
6 changes: 3 additions & 3 deletions integration/src/rpc/client/rpc-client.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ export class RpcClientController

@Get()
getMessage() {
return this.micro.send({ cmd: 'message' }, {});
return this.micro.send({ cmd: 'message' }, { ip: '127.0.0.1' });
}

@Get('error')
@UseFilters(ExceptionFilter)
getError() {
return this.micro.send({ cmd: 'error' }, {});
return this.micro.send({ cmd: 'error' }, { ip: '127.0.0.1' });
}

@Get('skip')
getSkip() {
return this.micro.send({ cmd: 'skip' }, {});
return this.micro.send({ cmd: 'skip' }, { ip: '127.0.0.1' });
}
}
16 changes: 12 additions & 4 deletions integration/test/rpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import { INestApplication, INestMicroservice } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { color } from '@ogma/logger';
import { OgmaInterceptor } from '@ogma/nestjs-module';
import {
AbstractInterceptorService,
OgmaInterceptor,
Type,
} from '@ogma/nestjs-module';
import { MqttParser } from '@ogma/platform-mqtt';
import { TcpParser } from '@ogma/platform-tcp';
import { RpcClientModule } from '../src/rpc/client/rpc-client.module';
import { RpcServerModule } from '../src/rpc/server/rpc-server.module';
Expand All @@ -15,20 +20,23 @@ import {
} from './utils';

describe.each`
server | transport | options | protocol
${'TCP'} | ${Transport.TCP} | ${{}} | ${'IPv4'}
server | transport | options | protocol | parser
${'TCP'} | ${Transport.TCP} | ${{}} | ${'IPv4'} | ${TcpParser}
${'MQTT'} | ${Transport.MQTT} | ${{ url: 'mqtt://127.0.0.1:1883' }} | ${'mqtt'} | ${MqttParser}
`(
'$server server',
({
server,
transport,
options,
protocol,
parser,
}: {
server: string;
transport: number;
options: any;
protocol: string;
parser: Type<AbstractInterceptorService>;
}) => {
let rpcServer: INestMicroservice;
let rpcClient: INestApplication;
Expand All @@ -37,7 +45,7 @@ describe.each`
const modRef = await createTestModule(RpcServerModule, {
service: serviceOptionsFactory(server),
interceptor: {
rpc: TcpParser,
rpc: parser,
},
});
rpcServer = modRef.createNestMicroservice<MicroserviceOptions>({
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
"lerna": "^3.20.2",
"lint-staged": "^10.0.8",
"morgan": "^1.10.0",
"npm-check": "^5.9.2",
"mqtt": "^4.0.1",
"pino": "^6.2.0",
"prettier": "^2.0.1",
"reflect-metadata": "^0.1.13",
Expand Down
23 changes: 19 additions & 4 deletions packages/platform-mqtt/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
# `@ogma/platform-mqtt`

> TODO: description
The `MqttParser` parser for the `OgmaInterceptor`. This plugin class parses MQTT request and response object to be able to successfully log the data about the request. For more information, check out [the @ogma/nestjs-module](../nestjs-module/README.md) documentation.

## Installation

Nothing special, standard `npm i @ogma/platform-mqtt` or `yarn add @ogma/platform-mqtt`

## Usage

```
const platformMqtt = require('@ogma/platform-mqtt');
This plugin is to be used in the `OgmaInterceptorOptions` portion of the `OgmaModule` during `forRoot` or `forRootAsync` registration. It can be used like so:

// TODO: DEMONSTRATE API
```ts
@Module(
OgmaModule.forRoot({
interceptor: {
rpc: MqttParser
}
})
)
export class AppModule {}
```

## Important Notes

Because of how MQTT requests are sent and the data available in them, to get the IP address in the request log, an IP property must be sent in the payload. This is the only way to get the IP address. If an IP property is not sent, the interceptor will use an empty string. [You can find more information here](https://stackoverflow.com/questions/45235080/how-to-know-the-ip-address-of-mqtt-client-in-node-js).
9 changes: 7 additions & 2 deletions packages/platform-mqtt/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
"url": "git+https://github.com/jmcdo29/ogma.git"
},
"scripts": {
"test": "echo \"No tests to run\""
"prebuild": "rimraf lib",
"build": "tsc -p tsconfig.build.json",
"postbuild": "mv ./lib/src/* ./lib && rmdir lib/src",
"test": "jest",
"test:cov": "jest --coverage"
},
"bugs": {
"url": "https://github.com/jmcdo29/ogma/issues"
Expand All @@ -39,6 +43,7 @@
"rxjs": "^6.5.4"
},
"peerDependencies": {
"@ogma/nestjs-module": "^0.1.0"
"@ogma/nestjs-module": "^0.1.0",
"@nestjs/microservices": "^7.0.13"
}
}
1 change: 1 addition & 0 deletions packages/platform-mqtt/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './mqtt-interceptor.service';
41 changes: 41 additions & 0 deletions packages/platform-mqtt/src/mqtt-interceptor.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { ExecutionContext, Injectable } from '@nestjs/common';
import { MqttContext } from '@nestjs/microservices';
import { PATTERN_METADATA } from '@nestjs/microservices/constants';
import { AbstractInterceptorService } from '@ogma/nestjs-module';

@Injectable()
export class MqttParser extends AbstractInterceptorService {
getCallPoint(context: ExecutionContext): string {
return JSON.stringify(
this.reflector.get(PATTERN_METADATA, context.getHandler()),
);
}

getCallerIp(context: ExecutionContext): string {
const client = this.getClient(context);
const packet = client.getPacket();
const payload = JSON.parse(packet.payload.toString());
return payload.data?.ip || '';
}

getMethod(): string {
return 'MQTT';
}

getStatus(
context: ExecutionContext,
inColor: boolean,
error?: Error | ExecutionContext,
): string {
const status = error ? 500 : 200;
return inColor ? this.wrapInColor(status) : status.toString();
}

getProtocol(): string {
return 'mqtt';
}

private getClient(context: ExecutionContext): MqttContext {
return context.switchToRpc().getContext<MqttContext>();
}
}
107 changes: 107 additions & 0 deletions packages/platform-mqtt/test/mqtt-interceptor.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { createMock } from '@golevelup/ts-jest';
import { ExecutionContext } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { MqttParser } from '../src';
import { Reflector } from '@nestjs/core';
import { PATTERN_METADATA } from '@nestjs/microservices/constants';
import { color } from '@ogma/logger';

describe('MQTTParser', () => {
let parser: MqttParser;
let reflector: Reflector;

beforeEach(async () => {
const modRef = await Test.createTestingModule({
providers: [
MqttParser,
{
provide: Reflector,
useValue: {
get: jest.fn(() => 'message'),
},
},
],
}).compile();
parser = modRef.get(MqttParser);
reflector = modRef.get(Reflector);
});
describe('getCallPoint', () => {
it('should get the reflected message', () => {
const funcMock = () => 'string';
const ctxMock = createMock<ExecutionContext>({
getHandler: funcMock,
});
expect(parser.getCallPoint(ctxMock)).toBe(JSON.stringify('message'));
expect(reflector.get).toBeCalledWith(PATTERN_METADATA, funcMock());
});
});
describe('getCallerIp', () => {
it('should return an ip from the payload', () => {
const ctxMock = createMock<ExecutionContext>({
switchToRpc: () => ({
getContext: () => ({
getPacket: () => ({
payload: Buffer.from(
JSON.stringify({ data: { hello: 'world', ip: '127.0.0.1' } }),
),
}),
}),
}),
});
expect(parser.getCallerIp(ctxMock)).toBe('127.0.0.1');
});
it('should return an empty string from the payload without ip as a prop', () => {
const ctxMock = createMock<ExecutionContext>({
switchToRpc: () => ({
getContext: () => ({
getPacket: () => ({
payload: Buffer.from(
JSON.stringify({ data: { hello: 'world' } }),
),
}),
}),
}),
});
expect(parser.getCallerIp(ctxMock)).toBe('');
});
it('should return an empty string', () => {
const ctxMock = createMock<ExecutionContext>({
switchToRpc: () => ({
getContext: () => ({
getPacket: () => ({
payload: Buffer.from(JSON.stringify({})),
}),
}),
}),
});
expect(parser.getCallerIp(ctxMock)).toBe('');
});
});
describe('getMethod', () => {
it('should return "MQTT"', () => {
expect(parser.getMethod()).toBe('MQTT');
});
});
describe('getStatus', () => {
it('should return a 200', () => {
expect(parser.getStatus(createMock<ExecutionContext>(), false)).toBe(
'200',
);
});
it('should return a 500', () => {
expect(
parser.getStatus(createMock<ExecutionContext>(), false, new Error()),
).toBe('500');
});
it('should return a 200 in color', () => {
expect(parser.getStatus(createMock<ExecutionContext>(), true)).toBe(
color.green(200),
);
});
});
describe('getProtocol', () => {
it('should return "mqtt"', () => {
expect(parser.getProtocol()).toBe('mqtt');
});
});
});
Loading

0 comments on commit b83b65c

Please sign in to comment.