Changing all queue-related services and modules to use async provider. Changed email jobs to not be named which was preventing processing

This commit is contained in:
Collin Duncan 2023-09-23 23:01:24 +02:00
parent 2e1401644d
commit 16d62f5613
No known key found for this signature in database
9 changed files with 22 additions and 20 deletions

View file

@ -78,7 +78,7 @@ export class EmailClient {
private async handleNewMail( private async handleNewMail(
numMessages: number, numMessages: number,
callback: (emails: Email[]) => void, callback: (emails: Email[]) => Promise<void>,
) { ) {
this.loggerService.log(`Received ${numMessages} emails`) this.loggerService.log(`Received ${numMessages} emails`)
const mailbox = await new Promise<Imap.Box>((res) => this.getMailbox(res)) const mailbox = await new Promise<Imap.Box>((res) => this.getMailbox(res))
@ -96,11 +96,11 @@ export class EmailClient {
totalMessages - (numMessages - 1), totalMessages - (numMessages - 1),
) )
this.loggerService.debug(`Fetched ${emails.length} emails`) this.loggerService.debug(`Fetched ${emails.length} emails`)
callback(emails) await callback(emails)
} }
} }
private _listen(callback: (emails: Email[]) => void) { private _listen(callback: (emails: Email[]) => Promise<void>) {
// Don't start listening until we are ready // Don't start listening until we are ready
if (this.status === EmailClientStatus.NotReady) { if (this.status === EmailClientStatus.NotReady) {
throw new Error('Not ready to listen') throw new Error('Not ready to listen')
@ -116,10 +116,13 @@ export class EmailClient {
this.mailbox = mailbox this.mailbox = mailbox
}) })
this.imapClient.on('mail', (n: number) => this.handleNewMail(n, callback)) this.imapClient.on(
'mail',
(n: number) => this.handleNewMail(n, callback),
)
} }
public listen(callback: (emails: Email[]) => void) { public listen(callback: (emails: Email[]) => Promise<void>) {
this.attempt('listen', 0, 5, 1000, () => { this.attempt('listen', 0, 5, 1000, () => {
this._listen(callback) this._listen(callback)
}) })
@ -198,7 +201,7 @@ export class EmailClient {
} }
public async markMailsSeen(messageIds: string[]) { public async markMailsSeen(messageIds: string[]) {
this.loggerService.debug('Marking mails as seen', { messageIds }) this.loggerService.debug(`Marking mails as seen (${messageIds.join(',')})`)
return new Promise<void>((res, rej) => { return new Promise<void>((res, rej) => {
this.imapClient.addFlags(messageIds.join(','), ['\\Seen'], (error) => { this.imapClient.addFlags(messageIds.join(','), ['\\Seen'], (error) => {
if (error != null) { if (error != null) {

View file

@ -9,7 +9,7 @@ import { EmailProvider } from './provider'
@Module({ @Module({
imports: [ imports: [
LoggerModule, LoggerModule,
BullModule.registerQueue({ name: EMAILS_QUEUE_NAME }), BullModule.registerQueueAsync({ name: EMAILS_QUEUE_NAME }),
], ],
providers: [EmailClient, EmailProvider], providers: [EmailClient, EmailProvider],
exports: [EmailClient, EmailProvider], exports: [EmailClient, EmailProvider],

View file

@ -20,7 +20,7 @@ export class EmailProvider {
private async handleReceivedEmails(emails: Email[]) { private async handleReceivedEmails(emails: Email[]) {
await this.emailsQueue.addBulk( await this.emailsQueue.addBulk(
emails.map((email) => ({ name: email.id, data: email })), emails.map((email) => ({ data: email })),
) )
} }

View file

@ -8,7 +8,7 @@ import { NTFY_PUBLISH_QUEUE_NAME } from './types'
@Module({ @Module({
imports: [ imports: [
BullModule.registerQueue({ name: NTFY_PUBLISH_QUEUE_NAME }), BullModule.registerQueueAsync({ name: NTFY_PUBLISH_QUEUE_NAME }),
LoggerModule, LoggerModule,
], ],
providers: [NtfyProvider, NtfyClient], providers: [NtfyProvider, NtfyClient],

View file

@ -5,12 +5,7 @@ import { Job, JobOptions, Queue } from 'bull'
import { Dayjs } from 'dayjs' import { Dayjs } from 'dayjs'
import { NtfyClient } from './client' import { NtfyClient } from './client'
import { import { MessageConfig, MessageTags, NTFY_PUBLISH_QUEUE_NAME } from './types'
MessageConfig,
MessagePriority,
MessageTags,
NTFY_PUBLISH_QUEUE_NAME,
} from './types'
@Processor(NTFY_PUBLISH_QUEUE_NAME) @Processor(NTFY_PUBLISH_QUEUE_NAME)
@Injectable() @Injectable()

View file

@ -16,7 +16,7 @@ import { ReservationsWorker } from './worker'
imports: [ imports: [
LoggerModule, LoggerModule,
TypeOrmModule.forFeature([Reservation]), TypeOrmModule.forFeature([Reservation]),
BullModule.registerQueue({ name: RESERVATIONS_QUEUE_NAME }), BullModule.registerQueueAsync({ name: RESERVATIONS_QUEUE_NAME }),
RunnerModule, RunnerModule,
NtfyModule, NtfyModule,
], ],

View file

@ -8,7 +8,10 @@ import { RunnerService } from './service'
@Module({ @Module({
providers: [RunnerService, BaanReserverenService, EmptyPageFactory], providers: [RunnerService, BaanReserverenService, EmptyPageFactory],
imports: [LoggerModule, BullModule.registerQueue({ name: 'reservations' })], imports: [
LoggerModule,
BullModule.registerQueueAsync({ name: 'reservations' }),
],
exports: [EmptyPageFactory, BaanReserverenService], exports: [EmptyPageFactory, BaanReserverenService],
}) })
export class RunnerModule {} export class RunnerModule {}

View file

@ -13,8 +13,8 @@ import { WaitingListService } from './service'
imports: [ imports: [
LoggerModule, LoggerModule,
ReservationsModule, ReservationsModule,
BullModule.registerQueue({ name: EMAILS_QUEUE_NAME }), BullModule.registerQueueAsync({ name: EMAILS_QUEUE_NAME }),
BullModule.registerQueue({ name: RESERVATIONS_QUEUE_NAME }), BullModule.registerQueueAsync({ name: RESERVATIONS_QUEUE_NAME }),
EmailModule, EmailModule,
NtfyModule, NtfyModule,
], ],

View file

@ -1,5 +1,5 @@
import { InjectQueue, Process, Processor } from '@nestjs/bull' import { InjectQueue, Process, Processor } from '@nestjs/bull'
import { Inject } from '@nestjs/common' import { Inject, Injectable } from '@nestjs/common'
import { Job, Queue } from 'bull' import { Job, Queue } from 'bull'
import { NtfyProvider } from 'src/ntfy/provider' import { NtfyProvider } from 'src/ntfy/provider'
@ -25,6 +25,7 @@ const EMAIL_START_TIME_REGEX = new RegExp(
const EMAIL_END_TIME_REGEX = new RegExp(/^Eindtijd: ([0-9]{1,2}:[0-9]{1,2})$/im) const EMAIL_END_TIME_REGEX = new RegExp(/^Eindtijd: ([0-9]{1,2}:[0-9]{1,2})$/im)
@Processor(EMAILS_QUEUE_NAME) @Processor(EMAILS_QUEUE_NAME)
@Injectable()
export class WaitingListService { export class WaitingListService {
constructor( constructor(
@InjectQueue(RESERVATIONS_QUEUE_NAME) @InjectQueue(RESERVATIONS_QUEUE_NAME)