Skip to content

Commit

Permalink
refactor: pbxEvents entity out of db watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardogarim committed May 6, 2024
1 parent 7d5bdde commit 848df67
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
23 changes: 23 additions & 0 deletions apps/meteor/app/lib/server/lib/notifyListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { api, dbWatchersDisabled } from '@rocket.chat/core-services';
import type { IPbxEvent, IRocketChatRecord } from '@rocket.chat/core-typings';
import type { IBaseModel, IPbxEventsModel } from '@rocket.chat/model-typings';
import { PbxEvents } from '@rocket.chat/models';
import type { Filter } from 'mongodb';

type ClientAction = 'inserted' | 'updated' | 'removed';

async function getEntityDataById<T extends IRocketChatRecord, M extends IBaseModel<T>>(ids: T['_id'] | T['_id'][], model: M): Promise<T[]> {
if (Array.isArray(ids)) {
const query = { _id: { $in: ids } } as unknown as Filter<T>;
return model.find(query).toArray();
}
const item = await model.findOneById<T>(ids);
return item ? [item] : [];
}

export async function broadcastOnPbxEventChanges<T extends IPbxEvent>(id: T['_id'], clientAction: ClientAction = 'updated'): Promise<void> {
if (dbWatchersDisabled) {
const item = await getEntityDataById<IPbxEvent, IPbxEventsModel>(id, PbxEvents);
void api.broadcast('watch.pbxevents', { clientAction, id, data: item[0] });
}
}
2 changes: 1 addition & 1 deletion apps/meteor/server/database/watchCollections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ export function getWatchCollections(): string[] {
IntegrationHistory.getCollectionName(),
Integrations.getCollectionName(),
EmailInbox.getCollectionName(),
PbxEvents.getCollectionName(),
Settings.getCollectionName(),
LivechatPriority.getCollectionName(),
];

// add back to the list of collections in case db watchers are enabled
if (!dbWatchersDisabled) {
collections.push(Messages.getCollectionName());
collections.push(PbxEvents.getCollectionName());
}

if (onlyCollections.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ import { Logger } from '@rocket.chat/logger';
import { Users, PbxEvents } from '@rocket.chat/models';
import type { Db } from 'mongodb';

import { broadcastOnPbxEventChanges } from '../../../../../../app/lib/server/lib/notifyListener';
import { Command, CommandType } from '../Command';
import { Commands } from '../Commands';
import { ACDQueue } from './ACDQueue';
import { CallbackContext } from './CallbackContext';
// import { sendMessage } from '../../../../../../app/lib/server/functions/sendMessage';

export class ContinuousMonitor extends Command {
private logger: Logger;
Expand Down Expand Up @@ -140,13 +140,15 @@ export class ContinuousMonitor extends Command {
// This event represents when an agent drops a call because of disconnection
// May happen for any reason outside of our control, like closing the browswer
// Or network/power issues
await PbxEvents.insertOne({
const pbxEvent = await PbxEvents.insertOne({
event: eventName,
uniqueId: `${eventName}-${event.contactstatus}-${now.getTime()}`,
ts: now,
agentExtension: event.aor,
});

void broadcastOnPbxEventChanges(pbxEvent.insertedId, 'inserted');

return;
}

Expand All @@ -159,7 +161,7 @@ export class ContinuousMonitor extends Command {
// NOTE: using the uniqueId prop of event is not the recommented approach, since it's an opaque ID
// However, since we're not using it for anything special, it's a "fair use"
// uniqueId => {server}/{epoch}.{id of channel associated with this call}
await PbxEvents.insertOne({
const pbxEvent = await PbxEvents.insertOne({
uniqueId,
event: eventName,
ts: now,
Expand All @@ -170,6 +172,8 @@ export class ContinuousMonitor extends Command {
callUniqueIdFallback: event.linkedid,
agentExtension: event?.connectedlinenum,
});

void broadcastOnPbxEventChanges(pbxEvent.insertedId, 'inserted');
} catch (e) {
this.logger.debug('Event was handled by other instance');
}
Expand Down Expand Up @@ -282,7 +286,7 @@ export class ContinuousMonitor extends Command {
* and event.calleridnum is the extension that is initiating a call.
*/
try {
await PbxEvents.insertOne({
const pbxEvent = await PbxEvents.insertOne({
uniqueId: `${event.event}-${event.calleridnum}-${event.channel}-${event.destchannel}-${event.uniqueid}`,
event: event.event,
ts: new Date(),
Expand All @@ -291,6 +295,8 @@ export class ContinuousMonitor extends Command {
callUniqueIdFallback: event.linkedid,
agentExtension: event.calleridnum,
});

void broadcastOnPbxEventChanges(pbxEvent.insertedId, 'inserted');
} catch (e) {
// This could mean we received a duplicate event
// This is quite common since DialEnd event happens "multiple times" at the end of the call
Expand Down

0 comments on commit 848df67

Please sign in to comment.