Skip to content

Commit

Permalink
feat(rmq): implements RabbitMQ parser for AMQP requests
Browse files Browse the repository at this point in the history
Same as MQTT and NATS, IP must be sent as part of the request, otherwise callerIP is an empty
string.

fix #20
  • Loading branch information
jmcdo29 committed May 19, 2020
1 parent b9136f8 commit 2e44261
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 16 deletions.
6 changes: 5 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ services:
container_name: mqtt
ports:
- "1883:1883"
# rabbitmq:
rabbitmq:
image: rabbitmq
container_name: rabbit
ports:
- "5672:5672"
# kafka:
# grpc:
nats:
Expand Down
21 changes: 17 additions & 4 deletions integration/test/rpc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
} from '@ogma/nestjs-module';
import { MqttParser } from '@ogma/platform-mqtt';
import { NatsParser } from '@ogma/platform-nats';
import { RabbitMqParser } from '@ogma/platform-rabbitmq';
import { TcpParser } from '@ogma/platform-tcp';
import { RpcClientModule } from '../src/rpc/client/rpc-client.module';
import { RpcServerModule } from '../src/rpc/server/rpc-server.module';
Expand All @@ -20,11 +21,23 @@ import {
serviceOptionsFactory,
} from './utils';

const tcpOptions = {};
const mqttOptions = { url: 'mqtt://localhost:1883' };
const natsOptions = { url: 'nats://localhost:4222' };
const rabbitOptions = {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false,
},
};

describe.each`
server | transport | options | protocol | parser
${'TCP'} | ${Transport.TCP} | ${{}} | ${'IPv4'} | ${TcpParser}
${'MQTT'} | ${Transport.MQTT} | ${{ url: 'mqtt://localhost:1883' }} | ${'mqtt'} | ${MqttParser}
${'NATS'} | ${Transport.NATS} | ${{ url: 'nats://localhost:4222' }} | ${'nats'} | ${NatsParser}
server | transport | options | protocol | parser
${'TCP'} | ${Transport.TCP} | ${tcpOptions} | ${'IPv4'} | ${TcpParser}
${'MQTT'} | ${Transport.MQTT} | ${mqttOptions} | ${'mqtt'} | ${MqttParser}
${'NATS'} | ${Transport.NATS} | ${natsOptions} | ${'nats'} | ${NatsParser}
${'RabbitMQ'} | ${Transport.RMQ} | ${rabbitOptions} | ${'amqp'} | ${RabbitMqParser}
`(
'$server server',
({
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
"@types/ws": "^7.2.4",
"@typescript-eslint/eslint-plugin": "^2.15.0",
"@typescript-eslint/parser": "^2.15.0",
"amqplib": "^0.5.6",
"amqp-connection-manager": "^3.2.0",
"apollo-server-express": "^2.11.0",
"apollo-server-fastify": "^2.11.0",
"bunyan": "^1.8.12",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { createMock } from '@golevelup/ts-jest';
import { ExecutionContext } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { NatsParser } from '../src';
import { color } from '@ogma/logger';
import { NatsParser } from '../src';

describe('NatsParser', () => {
let parser: NatsParser;
Expand Down
23 changes: 19 additions & 4 deletions packages/platform-rabbitmq/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
# `@ogma/platform-rabbitmq`

> TODO: description
The `RabbitMqParser` parser for the `OgmaInterceptor`. This plugin class parses RabbitMQ request and response object to be able to successfully log the data about the request. For more information, check out [the @ogma/nestjs-module](../nestjs-module/README.md) documentation.

## Installation

Nothing special, standard `npm i @ogma/platform-rabbitmq` or `yarn add @ogma/platform-rabbitmq`

## Usage

```
const platformRabbitmq = require('@ogma/platform-rabbitmq');
This plugin is to be used in the `OgmaInterceptorOptions` portion of the `OgmaModule` during `forRoot` or `forRootAsync` registration. It can be used like so:

// TODO: DEMONSTRATE API
```ts
@Module(
OgmaModule.forRoot({
interceptor: {
rpc: RabbitMqParser
}
})
)
export class AppModule {}
```

## Important Notes

Because of how RabbitMQ requests are sent and the data available in them, to get the IP address in the request log, an IP property must be sent in the payload. This is the only way to get the IP address. If an IP property is not sent, the interceptor will use an empty string. This is for the same reason as in the [@ogma/platform-mqtt](../platform-mqtt) docs.
11 changes: 9 additions & 2 deletions packages/platform-rabbitmq/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
"url": "git+https://github.com/jmcdo29/ogma.git"
},
"scripts": {
"test": "echo \"No tests to run\""
"prebuild": "rimraf lib",
"build": "tsc -p tsconfig.build.json",
"postbuild": "mv ./lib/src/* ./lib && rmdir lib/src",
"test": "jest",
"test:cov": "jest --coverage"
},
"bugs": {
"url": "https://github.com/jmcdo29/ogma/issues"
Expand All @@ -31,6 +35,9 @@
"rxjs": "^6.5.4"
},
"peerDependencies": {
"@ogma/nestjs-module": "^0.1.0"
"@ogma/nestjs-module": "^0.1.0",
"@nestjs/microservices": "^7.0.0",
"amqplib": "^0.5.0",
"amqp-connection-manager": "^3.2.0"
}
}
1 change: 1 addition & 0 deletions packages/platform-rabbitmq/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './rabbitmq-interceptor.service';
40 changes: 40 additions & 0 deletions packages/platform-rabbitmq/src/rabbitmq-interceptor.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { ExecutionContext, Injectable } from '@nestjs/common';
import { RmqContext } from '@nestjs/microservices';
import { AbstractInterceptorService } from '@ogma/nestjs-module';

@Injectable()
export class RabbitMqParser extends AbstractInterceptorService {
getCallPoint(context: ExecutionContext) {
return this.getClient(context).getPattern();
}

getCallerIp(context: ExecutionContext) {
const data = this.getData(context);
return data.ip || '';
}

getMethod() {
return 'RabbitMQ';
}

getProtocol() {
return 'amqp';
}

getStatus(
context: ExecutionContext,
inColor: boolean,
error?: Error | ExecutionContext,
): string {
const status = error ? 500 : 200;
return inColor ? this.wrapInColor(status) : status.toString();
}

private getClient(context: ExecutionContext): RmqContext {
return context.switchToRpc().getContext<RmqContext>();
}

private getData(context: ExecutionContext) {
return context.switchToRpc().getData();
}
}
77 changes: 77 additions & 0 deletions packages/platform-rabbitmq/test/rabbit-interceptor.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { createMock } from '@golevelup/ts-jest';
import { ExecutionContext } from '@nestjs/common';
import { Test } from '@nestjs/testing';
import { color } from '@ogma/logger';
import { RabbitMqParser } from '../src';

describe('RabbitMqParser', () => {
let parser: RabbitMqParser;

beforeEach(async () => {
const modRef = await Test.createTestingModule({
providers: [RabbitMqParser],
}).compile();
parser = modRef.get(RabbitMqParser);
});
describe('getCallPoint', () => {
it('should get the subject from the client', () => {
const ctxMock = createMock<ExecutionContext>({
switchToRpc: () => ({
getContext: () => ({
getPattern: () => JSON.stringify({ cmd: 'message' }),
}),
}),
});
expect(parser.getCallPoint(ctxMock)).toBe(
JSON.stringify({ cmd: 'message' }),
);
});
});
describe('getCallerIp', () => {
it('should return an ip from data', () => {
const ctxMock = createMock<ExecutionContext>({
switchToRpc: () => ({
getData: () => ({
ip: '127.0.0.1',
}),
}),
});
expect(parser.getCallerIp(ctxMock)).toBe('127.0.0.1');
});
it('should return a blank string', () => {
const ctxMock = createMock<ExecutionContext>({
switchToRpc: () => ({
getData: () => ({}),
}),
});
expect(parser.getCallerIp(ctxMock)).toBe('');
});
});
describe('getMethod', () => {
it('should return "RabbitMQ"', () => {
expect(parser.getMethod()).toBe('RabbitMQ');
});
});
describe('getStatus', () => {
it('should return a 200', () => {
expect(parser.getStatus(createMock<ExecutionContext>(), false)).toBe(
'200',
);
});
it('should return a 500', () => {
expect(
parser.getStatus(createMock<ExecutionContext>(), false, new Error()),
).toBe('500');
});
it('should return a 200 in color', () => {
expect(parser.getStatus(createMock<ExecutionContext>(), true)).toBe(
color.green(200),
);
});
});
describe('getProtocol', () => {
it('should return "amqp"', () => {
expect(parser.getProtocol()).toBe('amqp');
});
});
});
64 changes: 60 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2631,6 +2631,25 @@ [email protected], ajv@^6.1.0, ajv@^6.10.0, ajv@^6.10.2, ajv@^6.11.0, ajv@^6.12.0, ajv@
json-schema-traverse "^0.4.1"
uri-js "^4.2.2"

amqp-connection-manager@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/amqp-connection-manager/-/amqp-connection-manager-3.2.0.tgz#83534360533a48183113f8213af0d230c7b8d461"
integrity sha512-CnxBqUXd6ft4DbGs8YXNp0hDZgWiDQAghxG6JRJuxGbGEQdAjsb4oRR9PWBqO5V/Gssp7lDKigXX+DtSczID2w==
dependencies:
promise-breaker "^5.0.0"

amqplib@^0.5.6:
version "0.5.6"
resolved "https://registry.yarnpkg.com/amqplib/-/amqplib-0.5.6.tgz#86a7850f4f39c568eaa0dd0300ef374e17941cf4"
integrity sha512-J4TR0WAMPBHN+tgTuhNsSObfM9eTVTZm/FNw0LyaGfbiLsBxqSameDNYpChUFXW4bnTKHDXy0ab+nuLhumnRrQ==
dependencies:
bitsyntax "~0.1.0"
bluebird "^3.5.2"
buffer-more-ints "~1.0.0"
readable-stream "1.x >=1.1.9"
safe-buffer "~5.1.2"
url-parse "~1.4.3"

ansi-escapes@^3.0.0, ansi-escapes@^3.2.0:
version "3.2.0"
resolved "https://registry.yarnpkg.com/ansi-escapes/-/ansi-escapes-3.2.0.tgz#8780b98ff9dbf5638152d1f1fe5c1d7b4442976b"
Expand Down Expand Up @@ -3287,6 +3306,15 @@ bindings@^1.5.0:
dependencies:
file-uri-to-path "1.0.0"

bitsyntax@~0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/bitsyntax/-/bitsyntax-0.1.0.tgz#b0c59acef03505de5a2ed62a2f763c56ae1d6205"
integrity sha512-ikAdCnrloKmFOugAfxWws89/fPc+nw0OOG1IzIE72uSOg/A3cYptKCjSUhDTuj7fhsJtzkzlv7l3b8PzRHLN0Q==
dependencies:
buffer-more-ints "~1.0.0"
debug "~2.6.9"
safe-buffer "~5.1.2"

bl@^1.2.2:
version "1.2.2"
resolved "https://registry.yarnpkg.com/bl/-/bl-1.2.2.tgz#a160911717103c07410cef63ef51b397c025af9c"
Expand All @@ -3300,7 +3328,7 @@ [email protected]:
resolved "https://registry.yarnpkg.com/blob/-/blob-0.0.5.tgz#d680eeef25f8cd91ad533f5b01eed48e64caf683"
integrity sha512-gaqbzQPqOoamawKg0LGVd7SzLgXS+JH61oWprSLH+P+abTczqJbhTR8CmJ2u9/bUYNmHTGJx/UEmn6doAvvuig==

bluebird@^3.5.1, bluebird@^3.5.3, bluebird@^3.5.5:
bluebird@^3.5.1, bluebird@^3.5.2, bluebird@^3.5.3, bluebird@^3.5.5:
version "3.7.2"
resolved "https://registry.yarnpkg.com/bluebird/-/bluebird-3.7.2.tgz#9f229c15be272454ffa973ace0dbee79a1b0c36f"
integrity sha512-XpNj6GDQzdfW+r2Wnn7xiSAd7TM3jzkxGXBGTtWKuSXv1xUV+azxAm8jdWZN06QTQk+2N2XB9jRDkvbmQmcRtg==
Expand Down Expand Up @@ -3457,6 +3485,11 @@ [email protected], buffer-from@^1.0.0:
resolved "https://registry.yarnpkg.com/buffer-from/-/buffer-from-1.1.1.tgz#32713bc028f75c02fdb710d7c7bcec1f2c6070ef"
integrity sha512-MQcXEUbCKtEo7bhqEs6560Hyd4XaovZlO/k9V3hjVUF/zwW7KBVdSK4gIt/bzwS9MbR5qob+F5jusZsb0YQK2A==

buffer-more-ints@~1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz#ef4f8e2dddbad429ed3828a9c55d44f05c611422"
integrity sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==

buffer-xor@^1.0.3:
version "1.0.3"
resolved "https://registry.yarnpkg.com/buffer-xor/-/buffer-xor-1.0.3.tgz#26e61ed1422fb70dd42e6e36729ed51d855fe8d9"
Expand Down Expand Up @@ -4584,7 +4617,7 @@ dateformat@^3.0.0:
resolved "https://registry.yarnpkg.com/dateformat/-/dateformat-3.0.3.tgz#a6e37499a4d9a9cf85ef5872044d62901c9889ae"
integrity sha512-jyCETtSl3VMZMWeRo7iY1FL19ges1t55hMo5yaam4Jrsm5EPL89UQkoQRyiI+Yf4k8r2ZpdngkV8hr1lIdjb3Q==

[email protected], debug@^2.2.0, debug@^2.3.3:
[email protected], debug@^2.2.0, debug@^2.3.3, debug@~2.6.9:
version "2.6.9"
resolved "https://registry.yarnpkg.com/debug/-/debug-2.6.9.tgz#5d128515df134ff327e90a4c93f4e077a536341f"
integrity sha512-bC7ElrdJaJnPbAP+1EotYvqZsb3ecl5wi6Bfi6BJTUcNowp6cvspg0jXznRTKDjm/E7AdgFBVeAPVMNcKGsHMA==
Expand Down Expand Up @@ -9733,6 +9766,11 @@ progress@^2.0.0:
resolved "https://registry.yarnpkg.com/progress/-/progress-2.0.3.tgz#7e8cf8d8f5b8f239c1bc68beb4eb78567d572ef8"
integrity sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==

promise-breaker@^5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/promise-breaker/-/promise-breaker-5.0.0.tgz#58e8541f1619554057da95a211794d7834d30c1d"
integrity sha512-mgsWQuG4kJ1dtO6e/QlNDLFtMkMzzecsC69aI5hlLEjGHFNpHrvGhFi4LiK5jg2SMQj74/diH+wZliL9LpGsyA==

promise-inflight@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/promise-inflight/-/promise-inflight-1.0.1.tgz#98472870bf228132fcbdd868129bad12c3c029e3"
Expand Down Expand Up @@ -9878,6 +9916,11 @@ [email protected]:
resolved "https://registry.yarnpkg.com/querystring/-/querystring-0.2.0.tgz#b209849203bb25df820da756e747005878521620"
integrity sha1-sgmEkgO7Jd+CDadW50cAWHhSFiA=

querystringify@^2.1.1:
version "2.1.1"
resolved "https://registry.yarnpkg.com/querystringify/-/querystringify-2.1.1.tgz#60e5a5fd64a7f8bfa4d2ab2ed6fdf4c85bad154e"
integrity sha512-w7fLxIRCRT7U8Qu53jQnJyPkYZIaR4n5151KMfcJlO/A9397Wxb1amJvROTK6TOnp7PfoAmg/qXiNHI+08jRfA==

quick-format-unescaped@^3.0.3:
version "3.0.3"
resolved "https://registry.yarnpkg.com/quick-format-unescaped/-/quick-format-unescaped-3.0.3.tgz#fb3e468ac64c01d22305806c39f121ddac0d1fb9"
Expand Down Expand Up @@ -10010,7 +10053,7 @@ read@1, read@~1.0.1:
string_decoder "~1.1.1"
util-deprecate "~1.0.1"

[email protected]:
[email protected], "[email protected] >=1.1.9":
version "1.1.14"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.1.14.tgz#7cf4c54ef648e3813084c636dd2079e166c081d9"
integrity sha1-fPTFTvZI44EwhMY23SB54WbAgdk=
Expand Down Expand Up @@ -10200,6 +10243,11 @@ require-main-filename@^2.0.0:
resolved "https://registry.yarnpkg.com/require-main-filename/-/require-main-filename-2.0.0.tgz#d0b329ecc7cc0f61649f62215be69af54aa8989b"
integrity sha512-NKN5kMDylKuldxYLSUfrbo5Tuzh4hd+2E8NPPX02mZtn1VuREQToYe/ZdlJy+J3uCpfaiGF05e7B8W0iXbQHmg==

requires-port@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/requires-port/-/requires-port-1.0.0.tgz#925d2601d39ac485e091cf0da5c6e694dc3dcaff"
integrity sha1-kl0mAdOaxIXgkc8NpcbmlNw9yv8=

resolve-cwd@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/resolve-cwd/-/resolve-cwd-2.0.0.tgz#00a9f7387556e27038eae232caa372a6a59b665a"
Expand Down Expand Up @@ -10386,7 +10434,7 @@ [email protected], rxjs@^6.0.0, rxjs@^6.3.3, rxjs@^6.4.0, rxjs@^6.5.3, rxjs@^6.5.4:
dependencies:
tslib "^1.9.0"

[email protected], safe-buffer@~5.1.0, safe-buffer@~5.1.1:
[email protected], safe-buffer@~5.1.0, safe-buffer@~5.1.1, safe-buffer@~5.1.2:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==
Expand Down Expand Up @@ -11824,6 +11872,14 @@ urix@^0.1.0:
resolved "https://registry.yarnpkg.com/urix/-/urix-0.1.0.tgz#da937f7a62e21fec1fd18d49b35c2935067a6c72"
integrity sha1-2pN/emLiH+wf0Y1Js1wpNQZ6bHI=

url-parse@~1.4.3:
version "1.4.7"
resolved "https://registry.yarnpkg.com/url-parse/-/url-parse-1.4.7.tgz#a8a83535e8c00a316e403a5db4ac1b9b853ae278"
integrity sha512-d3uaVyzDB9tQoSXFvuSUNFibTd9zxd2bkVrDRvF5TmvWWQwqE4lgYJ5m+x1DbecWkw+LK4RNl2CU1hHuOKPVlg==
dependencies:
querystringify "^2.1.1"
requires-port "^1.0.0"

url@^0.11.0:
version "0.11.0"
resolved "https://registry.yarnpkg.com/url/-/url-0.11.0.tgz#3838e97cfc60521eb73c525a8e55bfdd9e2e28f1"
Expand Down

0 comments on commit 2e44261

Please sign in to comment.