Lets feed our Rabbit' nicely 🐰
This repository contains a wrapper over amqplib
written in Typescript
with an accent of functionnal programming, using fp-ts
. It will handle
high-level features like RPC without hassle.
Warning This library is still heavily being worked on, but no breaking changes on the API are planned.
Feature highlights:
- Built-in RPC support
- Functional typings with
fp-ts
- Automatic error recovery using
amqp-connection-manager
Install the package from npm:
npm install --save-exact @mansagroup/rabbitmq-fp
This library has
fp-ts
as peer dependency, to match your project's version.
This library support automatic recovery on AMQP connection or channel error. However, a newly created channel will not inherit the configuration from the previous one. This means that each new channel must be reconfigured (asserting exchanges, queues, binding queues, etc...).
To solve this, when creating an adapter, you must pass a setup function
which takes the created channel and returns this same channel. This can
easily represented as a fp-ts
's flow
method:
import { flow } from 'fp-ts/function';
import * as TE from 'fp-ts/TaskEither';
import { SetupFn } from '@mansagroup/rabbitmq-fp';
const setupFn: SetupFn.Fn = flow(
SetupFn.assertExchange('my-exchange'),
SetupFn.assertQueue('my-queue'),
SetupFn.bindQueue('my-queue', 'my-exchange', 'my-routing-key'),
);
This function will be invoked every time a new channel is created.
An adapter is the actual brain of this library. It is the high-level
bridge between your code and the underlying amqplib
library. It will
requires your previously created setup function but also a logger:
import { Logger, createRabbitMQAdapter } from '@mansagroup/rabbitmq-fp';
const logger: Logger = {
info: (msg, extra) => {},
// This for every log level
};
const adapter = await createRabbitMQAdapter(
'amqp://username:password@host:port',
setupFn,
{
logger,
},
)();
To keep this simple, we will setup a simple event consumer which will
print hello {greetings}
every time a message is published. A consumer
is a function which takes a payload and returns a TaskEither
.
Note If the
TaskEither
is aLeft
, then the message will benack
, otherwise it will beack
.
import { EventHandler } from '@mansagroup/rabbitmq-fp';
import { pipe } from 'fp-ts/function';
import * as IO from 'fp-ts/IO';
interface Payload {
greetings: string;
}
const consumer: EventHandler<Payload> = (payload) =>
pipe(
`hello ${payload.greetings}`,
IO.of,
IO.map(console.log),
IO.map(TE.right),
)();
await adapter.consumeEvent('my-queue', consumer)();
Finally, after that your consumer is created and ready, you can publish your first message to see the consumer invoked:
await adapter.publish<Payload>('my-exchange', 'my-routing-key', {
greetings: 'Bob',
})();
Now, if we pull everything together, we could have a flow like the
one from the everything-together.ts
example.
- Everything together: the previous section example in code
- RPC: consume and request over RPC methods
This project is MIT licensed.
Thanks goes to these wonderful people (emoji key):
Jérémy Levilain 💻 📖 🤔 |
This project follows the all-contributors specification. Contributions of any kind welcome!