Skip to content
This repository has been archived by the owner on Jun 2, 2023. It is now read-only.
/ rabbitmq-fp Public archive

A RabbitMQ client for TypeScript, with functional programming in mind.

License

Notifications You must be signed in to change notification settings

MansaGroup/rabbitmq-fp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

99 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Banner

RabbitMQ-fp

Lets feed our Rabbit' nicely 🐰

License GitHub Issues GitHub Stars

This repository contains a wrapper over amqplib written in Typescript with an accent of functionnal programming, using fp-ts. It will handle high-level features like RPC without hassle.

Warning This library is still heavily being worked on, but no breaking changes on the API are planned.

Feature highlights:

  • Built-in RPC support
  • Functional typings with fp-ts
  • Automatic error recovery using amqp-connection-manager

Getting started

Install the package from npm:

npm install --save-exact @mansagroup/rabbitmq-fp

This library has fp-ts as peer dependency, to match your project's version.

Create a setup function

This library support automatic recovery on AMQP connection or channel error. However, a newly created channel will not inherit the configuration from the previous one. This means that each new channel must be reconfigured (asserting exchanges, queues, binding queues, etc...).

To solve this, when creating an adapter, you must pass a setup function which takes the created channel and returns this same channel. This can easily represented as a fp-ts's flow method:

import { flow } from 'fp-ts/function';
import * as TE from 'fp-ts/TaskEither';
import { SetupFn } from '@mansagroup/rabbitmq-fp';

const setupFn: SetupFn.Fn = flow(
  SetupFn.assertExchange('my-exchange'),
  SetupFn.assertQueue('my-queue'),
  SetupFn.bindQueue('my-queue', 'my-exchange', 'my-routing-key'),
);

This function will be invoked every time a new channel is created.

Create an adapter

An adapter is the actual brain of this library. It is the high-level bridge between your code and the underlying amqplib library. It will requires your previously created setup function but also a logger:

import { Logger, createRabbitMQAdapter } from '@mansagroup/rabbitmq-fp';

const logger: Logger = {
  info: (msg, extra) => {},
  // This for every log level
};

const adapter = await createRabbitMQAdapter(
  'amqp://username:password@host:port',
  setupFn,
  {
    logger,
  },
)();

Create your consumer

To keep this simple, we will setup a simple event consumer which will print hello {greetings} every time a message is published. A consumer is a function which takes a payload and returns a TaskEither.

Note If the TaskEither is a Left, then the message will be nack, otherwise it will be ack.

import { EventHandler } from '@mansagroup/rabbitmq-fp';
import { pipe } from 'fp-ts/function';
import * as IO from 'fp-ts/IO';

interface Payload {
  greetings: string;
}

const consumer: EventHandler<Payload> = (payload) =>
  pipe(
    `hello ${payload.greetings}`,
    IO.of,
    IO.map(console.log),
    IO.map(TE.right),
  )();

await adapter.consumeEvent('my-queue', consumer)();

Publish your message

Finally, after that your consumer is created and ready, you can publish your first message to see the consumer invoked:

await adapter.publish<Payload>('my-exchange', 'my-routing-key', {
  greetings: 'Bob',
})();

Everything together

Now, if we pull everything together, we could have a flow like the one from the everything-together.ts example.

Examples

License

This project is MIT licensed.

Contributors

Thanks goes to these wonderful people (emoji key):


Jérémy Levilain

💻 📖 🤔

This project follows the all-contributors specification. Contributions of any kind welcome!