Skip to content

Commit

Permalink
chore: Update Omnichannel's room closing mechanism to use transaction…
Browse files Browse the repository at this point in the history
…s. (#32896)
  • Loading branch information
KevLehman committed Aug 22, 2024
1 parent f6351e8 commit be5d153
Show file tree
Hide file tree
Showing 16 changed files with 275 additions and 82 deletions.
187 changes: 127 additions & 60 deletions apps/meteor/app/livechat/server/lib/LivechatTyped.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import type {
ILivechatDepartmentAgents,
LivechatDepartmentDTO,
OmnichannelSourceType,
ILivechatInquiryRecord,
} from '@rocket.chat/core-typings';
import { ILivechatAgentStatus, UserStatus, isOmnichannelRoom } from '@rocket.chat/core-typings';
import { Logger, type MainLogger } from '@rocket.chat/logger';
Expand All @@ -40,11 +41,12 @@ import {
import { serverFetch as fetch } from '@rocket.chat/server-fetch';
import { Match, check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';
import type { Filter, FindCursor } from 'mongodb';
import type { Filter, FindCursor, ClientSession, MongoError } from 'mongodb';
import UAParser from 'ua-parser-js';

import { callbacks } from '../../../../lib/callbacks';
import { trim } from '../../../../lib/utils/stringUtils';
import { client } from '../../../../server/database/utils';
import { i18n } from '../../../../server/lib/i18n';
import { addUserRolesAsync } from '../../../../server/lib/roles/addUserRoles';
import { removeUserFromRolesAsync } from '../../../../server/lib/roles/removeUserFromRoles';
Expand Down Expand Up @@ -142,6 +144,13 @@ type ICRMData = {
crmData?: IOmnichannelRoom['crmData'];
};

type ChatCloser = { _id: string; username: string | undefined };

const isRoomClosedByUserParams = (params: CloseRoomParams): params is CloseRoomParamsByUser =>
(params as CloseRoomParamsByUser).user !== undefined;
const isRoomClosedByVisitorParams = (params: CloseRoomParams): params is CloseRoomParamsByVisitor =>
(params as CloseRoomParamsByVisitor).visitor !== undefined;

const dnsResolveMx = util.promisify(dns.resolveMx);

class LivechatClass {
Expand Down Expand Up @@ -219,14 +228,115 @@ class LivechatClass {
return Users.findOnlineAgents();
}

async closeRoom(params: CloseRoomParams): Promise<void> {
async closeRoom(params: CloseRoomParams, attempts = 2): Promise<void> {
let newRoom: IOmnichannelRoom;
let chatCloser: ChatCloser;
let removedInquiryObj: ILivechatInquiryRecord | null;

const session = client.startSession();
try {
session.startTransaction();
const { room, closedBy, removedInquiry } = await this.doCloseRoom(params, session);
await session.commitTransaction();

newRoom = room;
chatCloser = closedBy;
removedInquiryObj = removedInquiry;
} catch (e) {
this.logger.error({ err: e, msg: 'Failed to close room', afterAttempts: attempts });
await session.abortTransaction();
// Dont propagate transaction errors
if (
(e as unknown as MongoError)?.errorLabels?.includes('UnknownTransactionCommitResult') ||
(e as unknown as MongoError)?.errorLabels?.includes('TransientTransactionError')
) {
if (attempts > 0) {
this.logger.debug(`Retrying close room because of transient error. Attempts left: ${attempts}`);
return this.closeRoom(params, attempts - 1);
}

throw new Error('error-room-cannot-be-closed-try-again');
}
throw e;
} finally {
await session.endSession();
}

// Note: when reaching this point, the room has been closed
// Transaction is commited and so these messages can be sent here.
return this.afterRoomClosed(newRoom, chatCloser, removedInquiryObj, params);
}

async afterRoomClosed(
newRoom: IOmnichannelRoom,
chatCloser: ChatCloser,
inquiry: ILivechatInquiryRecord | null,
params: CloseRoomParams,
): Promise<void> {
if (!chatCloser) {
// this should never happen
return;
}
// Note: we are okay with these messages being sent outside of the transaction. The process of sending a message
// is huge and involves multiple db calls. Making it transactionable this way would be really hard.
// And passing just _some_ actions to the transaction creates some deadlocks since messages are updated in the afterSaveMessages callbacks.
const transcriptRequested =
!!params.room.transcriptRequest || (!settings.get('Livechat_enable_transcript') && settings.get('Livechat_transcript_send_always'));
this.logger.debug(`Sending closing message to room ${newRoom._id}`);
await Message.saveSystemMessageAndNotifyUser('livechat-close', newRoom._id, params.comment ?? '', chatCloser, {
groupable: false,
transcriptRequested,
...(isRoomClosedByVisitorParams(params) && { token: params.visitor.token }),
});

if (settings.get('Livechat_enable_transcript') && !settings.get('Livechat_transcript_send_always')) {
await Message.saveSystemMessage('command', newRoom._id, 'promptTranscript', chatCloser);
}

this.logger.debug(`Running callbacks for room ${newRoom._id}`);

process.nextTick(() => {
/**
* @deprecated the `AppEvents.ILivechatRoomClosedHandler` event will be removed
* in the next major version of the Apps-Engine
*/
void Apps.self?.getBridges()?.getListenerBridge().livechatEvent(AppEvents.ILivechatRoomClosedHandler, newRoom);
void Apps.self?.getBridges()?.getListenerBridge().livechatEvent(AppEvents.IPostLivechatRoomClosed, newRoom);
});

const visitor = isRoomClosedByVisitorParams(params) ? params.visitor : undefined;
const opts = await parseTranscriptRequest(params.room, params.options, visitor);
if (process.env.TEST_MODE) {
await callbacks.run('livechat.closeRoom', {
room: newRoom,
options: opts,
});
} else {
callbacks.runAsync('livechat.closeRoom', {
room: newRoom,
options: opts,
});
}

void notifyOnRoomChangedById(newRoom._id);
if (inquiry) {
void notifyOnLivechatInquiryChanged(inquiry, 'removed');
}

this.logger.debug(`Room ${newRoom._id} was closed`);
}

async doCloseRoom(
params: CloseRoomParams,
session: ClientSession,
): Promise<{ room: IOmnichannelRoom; closedBy: ChatCloser; removedInquiry: ILivechatInquiryRecord | null }> {
const { comment } = params;
const { room } = params;

this.logger.debug(`Attempting to close room ${room._id}`);
if (!room || !isOmnichannelRoom(room) || !room.open) {
this.logger.debug(`Room ${room._id} is not open`);
return;
throw new Error('error-room-closed');
}

const commentRequired = settings.get('Livechat_request_comment_when_closing_conversation');
Expand All @@ -238,7 +348,7 @@ class LivechatClass {
this.logger.debug(`Resolved chat tags for room ${room._id}`);

const now = new Date();
const { _id: rid, servedBy, transcriptRequest } = room;
const { _id: rid, servedBy } = room;
const serviceTimeDuration = servedBy && (now.getTime() - new Date(servedBy.ts).getTime()) / 1000;

const closeData: IOmnichannelRoomClosingInfo = {
Expand All @@ -249,11 +359,6 @@ class LivechatClass {
};
this.logger.debug(`Room ${room._id} was closed at ${closeData.closedAt} (duration ${closeData.chatDuration})`);

const isRoomClosedByUserParams = (params: CloseRoomParams): params is CloseRoomParamsByUser =>
(params as CloseRoomParamsByUser).user !== undefined;
const isRoomClosedByVisitorParams = (params: CloseRoomParams): params is CloseRoomParamsByVisitor =>
(params as CloseRoomParamsByVisitor).visitor !== undefined;

if (isRoomClosedByUserParams(params)) {
const { user } = params;
this.logger.debug(`Closing by user ${user?._id}`);
Expand All @@ -276,76 +381,38 @@ class LivechatClass {

this.logger.debug(`Updating DB for room ${room._id} with close data`);

const inquiry = await LivechatInquiry.findOneByRoomId(rid);

const removedInquiry = await LivechatInquiry.removeByRoomId(rid);
const inquiry = await LivechatInquiry.findOneByRoomId(rid, { session });
const removedInquiry = await LivechatInquiry.removeByRoomId(rid, { session });
if (removedInquiry && removedInquiry.deletedCount !== 1) {
throw new Error('Error removing inquiry');
}
if (inquiry) {
void notifyOnLivechatInquiryChanged(inquiry, 'removed');
}

const updatedRoom = await LivechatRooms.closeRoomById(rid, closeData);
const updatedRoom = await LivechatRooms.closeRoomById(rid, closeData, { session });
if (!updatedRoom || updatedRoom.modifiedCount !== 1) {
throw new Error('Error closing room');
}

await Subscriptions.removeByRoomId(rid, {
const subs = await Subscriptions.countByRoomId(rid, { session });
const removedSubs = await Subscriptions.removeByRoomId(rid, {
async onTrash(doc) {
void notifyOnSubscriptionChanged(doc, 'removed');
},
session,
});

this.logger.debug(`DB updated for room ${room._id}`);
if (removedSubs.deletedCount !== subs) {
throw new Error('Error removing subscriptions');
}

const newRoom = await LivechatRooms.findOneById(rid);
this.logger.debug(`DB updated for room ${room._id}`);

// Retrieve the closed room
const newRoom = await LivechatRooms.findOneById(rid, { session });
if (!newRoom) {
throw new Error('Error: Room not found');
}

this.logger.debug(`Sending closing message to room ${room._id}`);

const transcriptRequested =
!!transcriptRequest || (!settings.get('Livechat_enable_transcript') && settings.get('Livechat_transcript_send_always'));

await Message.saveSystemMessageAndNotifyUser('livechat-close', rid, comment ?? '', closeData.closedBy, {
groupable: false,
transcriptRequested,
...(isRoomClosedByVisitorParams(params) && { token: params.visitor.token }),
});

if (settings.get('Livechat_enable_transcript') && !settings.get('Livechat_transcript_send_always')) {
await Message.saveSystemMessage('command', rid, 'promptTranscript', closeData.closedBy);
}

process.nextTick(() => {
/**
* @deprecated the `AppEvents.ILivechatRoomClosedHandler` event will be removed
* in the next major version of the Apps-Engine
*/
void Apps.self?.getBridges()?.getListenerBridge().livechatEvent(AppEvents.ILivechatRoomClosedHandler, newRoom);
void Apps.self?.getBridges()?.getListenerBridge().livechatEvent(AppEvents.IPostLivechatRoomClosed, newRoom);
});

const visitor = isRoomClosedByVisitorParams(params) ? params.visitor : undefined;
const opts = await parseTranscriptRequest(params.room, options, visitor);
if (process.env.TEST_MODE) {
await callbacks.run('livechat.closeRoom', {
room: newRoom,
options: opts,
});
} else {
callbacks.runAsync('livechat.closeRoom', {
room: newRoom,
options: opts,
});
}

void notifyOnRoomChangedById(newRoom._id);

this.logger.debug(`Room ${newRoom._id} was closed`);
return { room: newRoom, closedBy: closeData.closedBy, removedInquiry: inquiry };
}

async getRequiredDepartment(onlineRequired = true) {
Expand Down
1 change: 0 additions & 1 deletion apps/meteor/app/utils/server/functions/getMongoInfo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ function getOplogInfo(): { oplogEnabled: boolean; mongo: MongoConnection } {

const oplogEnabled = isWatcherRunning();

// @ts-expect-error - You're drunk ts
return { oplogEnabled, mongo };
}

Expand Down
1 change: 1 addition & 0 deletions apps/meteor/definition/externals/meteor/mongo.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ declare module 'meteor/mongo' {

interface MongoConnection {
db: mongodb.Db;
client: mongodb.MongoClient;
_oplogHandle: OplogHandle;
rawCollection(name: string): mongodb.Collection;
}
Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/server/database/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import { MongoInternals } from 'meteor/mongo';

export const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo;
export const { db, client } = MongoInternals.defaultRemoteCollectionDriver().mongo;
3 changes: 2 additions & 1 deletion apps/meteor/server/models/raw/BaseRaw.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ export abstract class BaseRaw<
return this.col.deleteMany(filter);
}

const cursor = this.find(filter);
const cursor = this.find<ResultFields<T, C>>(filter, { session: options?.session });

const ids: T['_id'][] = [];
for await (const doc of cursor) {
Expand All @@ -372,6 +372,7 @@ export abstract class BaseRaw<
// since the operation is not atomic, we need to make sure that the record is not already deleted/inserted
await this.trash?.updateOne({ _id } as Filter<TDeleted>, { $set: trash } as UpdateFilter<TDeleted>, {
upsert: true,
session: options?.session,
});

void options?.onTrash?.(doc);
Expand Down
5 changes: 3 additions & 2 deletions apps/meteor/server/models/raw/LivechatInquiry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import type {
IndexDescription,
FindCursor,
UpdateFilter,
DeleteOptions,
} from 'mongodb';

import { getOmniChatSortQuery } from '../../../app/livechat/lib/inquiries';
Expand Down Expand Up @@ -274,8 +275,8 @@ export class LivechatInquiryRaw extends BaseRaw<ILivechatInquiryRecord> implemen
throw new Error('Method not implemented on the community edition.');
}

async removeByRoomId(rid: string): Promise<DeleteResult> {
return this.deleteOne({ rid });
async removeByRoomId(rid: string, options?: DeleteOptions): Promise<DeleteResult> {
return this.deleteOne({ rid }, options);
}

getQueuedInquiries(options?: FindOptions<ILivechatInquiryRecord>): FindCursor<ILivechatInquiryRecord> {
Expand Down
4 changes: 3 additions & 1 deletion apps/meteor/server/models/raw/LivechatRooms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import type {
FindCursor,
UpdateResult,
AggregationCursor,
UpdateOptions,
} from 'mongodb';

import { getValue } from '../../../app/settings/server/raw';
Expand Down Expand Up @@ -1589,7 +1590,7 @@ export class LivechatRoomsRaw extends BaseRaw<IOmnichannelRoom> implements ILive
);
}

closeRoomById(roomId: string, closeInfo: IOmnichannelRoomClosingInfo) {
closeRoomById(roomId: string, closeInfo: IOmnichannelRoomClosingInfo, options?: UpdateOptions) {
const { closer, closedBy, closedAt, chatDuration, serviceTimeDuration, tags } = closeInfo;

return this.updateOne(
Expand All @@ -1611,6 +1612,7 @@ export class LivechatRoomsRaw extends BaseRaw<IOmnichannelRoom> implements ILive
open: 1,
},
},
options,
);
}

Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/server/models/raw/Rooms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ export class RoomsRaw extends BaseRaw<IRoom> implements IRoomsModel {
});
}

incUsersCountByIds(ids: Array<IRoom['_id']>, inc = 1): Promise<Document | UpdateResult> {
incUsersCountByIds(ids: Array<IRoom['_id']>, inc = 1, options?: UpdateOptions): Promise<Document | UpdateResult> {
const query: Filter<IRoom> = {
_id: {
$in: ids,
Expand All @@ -611,7 +611,7 @@ export class RoomsRaw extends BaseRaw<IRoom> implements IRoomsModel {
},
};

return this.updateMany(query, update);
return this.updateMany(query, update, options);
}

allRoomSourcesCount(): AggregationCursor<{ _id: Required<IOmnichannelGenericRoom['source']>; count: number }> {
Expand Down
Loading

0 comments on commit be5d153

Please sign in to comment.