Skip to content

Commit

Permalink
feat: timeout xmtp messages
Browse files Browse the repository at this point in the history
  • Loading branch information
marian2js committed Dec 28, 2023
1 parent 50e1314 commit 80ef562
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
8 changes: 6 additions & 2 deletions apps/api/src/chat/services/broadcast.consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { JobNonRetriableError } from '@app/common/errors/job-non-retriable-error
import { wait } from '@app/common/utils/async.utils'
import { XmtpLib } from '@app/definitions/integration-definitions/xmtp/xmtp.lib'
import { getWalletName } from '@app/definitions/utils/address.utils'
import { sendXmtpMessage } from '@chainjet/tools/dist/messages'
import { InjectQueue, Process, Processor } from '@nestjs/bull'
import { Logger } from '@nestjs/common'
import { Interval } from '@nestjs/schedule'
Expand Down Expand Up @@ -108,7 +107,12 @@ export class BroadcastConsumer {
continue
}
try {
const message = await sendXmtpMessage(client, sendTo, campaign.message + '\n\n' + unsubscribeMessage)
const message = await XmtpLib.sendDirectMessageWithTimeout(
client,
sendTo,
campaign.message + '\n\n' + unsubscribeMessage,
10 * 1000,
)
await this.campaignMessageService.createOne({
campaign: campaign._id,
address: contact.address,
Expand Down
28 changes: 27 additions & 1 deletion libs/definitions/src/integration-definitions/xmtp/xmtp.lib.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Client, Conversation } from '@xmtp/xmtp-js'
import { sendXmtpMessage } from '@chainjet/tools/dist/messages'
import { Client, Conversation, DecodedMessage } from '@xmtp/xmtp-js'
import { isAddress } from 'ethers/lib/utils'

const clientsCache = new Map<string, Client>()
Expand Down Expand Up @@ -48,4 +49,29 @@ export const XmtpLib = {
}
return xmtpMessage
},

sendDirectMessageWithTimeout: async (
client: Client,
sendTo: string,
message: string,
timeoutMs: number,
): Promise<DecodedMessage> => {
let timeoutHandle: NodeJS.Timeout | undefined

const timeoutPromise = new Promise<DecodedMessage>((_, reject) => {
timeoutHandle = setTimeout(() => {
reject(new Error('Message sending timed out'))
}, timeoutMs)
})

const sendMessagePromise: Promise<DecodedMessage> = sendXmtpMessage(client, sendTo, message)

try {
return await Promise.race([sendMessagePromise, timeoutPromise])
} catch (error) {
throw error
} finally {
clearTimeout(timeoutHandle)
}
},
}

0 comments on commit 80ef562

Please sign in to comment.