Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: IntegrationHistory out of DB Watcher #32502

Merged
merged 4 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions apps/meteor/app/integrations/server/lib/updateHistory.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import type { IIntegrationHistory, OutgoingIntegrationEvent, IIntegration, IMessage, AtLeast } from '@rocket.chat/core-typings';
import { IntegrationHistory } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';

import { omit } from '../../../../lib/utils/omit';
import { notifyOnIntegrationHistoryChangedById, notifyOnIntegrationHistoryChanged } from '../../../lib/server/lib/notifyListener';

export const updateHistory = async ({
historyId,
Expand Down Expand Up @@ -77,7 +77,12 @@ export const updateHistory = async ({
};

if (historyId) {
await IntegrationHistory.updateOne({ _id: historyId }, { $set: history });
// Projecting just integration field to comply with existing listener behaviour
const integrationHistory = await IntegrationHistory.updateById(historyId, history, { projection: { 'integration._id': 1 } });
if (!integrationHistory) {
throw new Error('error-updating-integration-history');
}
void notifyOnIntegrationHistoryChanged(integrationHistory, 'updated', history);
return historyId;
}

Expand All @@ -86,11 +91,15 @@ export const updateHistory = async ({
throw new Error('error-invalid-integration');
}

history._createdAt = new Date();
// TODO: Had to force type cast here because of function's signature
// It would be easier if we separate into create and update functions
const { insertedId } = await IntegrationHistory.create(history as IIntegrationHistory);

const _id = Random.id();
if (!insertedId) {
throw new Error('error-creating-integration-history');
}

await IntegrationHistory.insertOne({ _id, ...history } as IIntegrationHistory);
void notifyOnIntegrationHistoryChangedById(insertedId, 'inserted');

return _id;
return insertedId;
};
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Meteor.methods<ServerMethods>({
});
}

// Don't sending to IntegrationHistory listener since it don't waits for 'removed' events.
await IntegrationHistory.removeByIntegrationId(integrationId);

notifications.streamIntegrationHistory.emit(integrationId, { type: 'removed', id: integrationId });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export const deleteOutgoingIntegration = async (integrationId: string, userId: s
}

await Integrations.removeById(integrationId);
// Don't sending to IntegrationHistory listener since it don't waits for 'removed' events.
await IntegrationHistory.removeByIntegrationId(integrationId);
void notifyOnIntegrationChangedById(integrationId, 'removed');
};
Expand Down
43 changes: 42 additions & 1 deletion apps/meteor/app/lib/server/lib/notifyListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,19 @@ import type {
IPbxEvent,
LoginServiceConfiguration as LoginServiceConfigurationData,
ILivechatPriority,
IIntegrationHistory,
AtLeast,
} from '@rocket.chat/core-typings';
import { Rooms, Permissions, Settings, PbxEvents, Roles, Integrations, LoginServiceConfiguration } from '@rocket.chat/models';
import {
Rooms,
Permissions,
Settings,
PbxEvents,
Roles,
Integrations,
LoginServiceConfiguration,
IntegrationHistory,
} from '@rocket.chat/models';

type ClientAction = 'inserted' | 'updated' | 'removed';

Expand Down Expand Up @@ -254,3 +265,33 @@ export async function notifyOnIntegrationChangedByChannels<T extends IIntegratio
void api.broadcast('watch.integrations', { clientAction, id: item._id, data: item });
}
}

export async function notifyOnIntegrationHistoryChanged<T extends IIntegrationHistory>(
data: AtLeast<T, '_id'>,
clientAction: ClientAction = 'updated',
diff: Partial<T> = {},
): Promise<void> {
if (!dbWatchersDisabled) {
return;
}

void api.broadcast('watch.integrationHistory', { clientAction, id: data._id, data, diff });
}

export async function notifyOnIntegrationHistoryChangedById<T extends IIntegrationHistory>(
id: T['_id'],
clientAction: ClientAction = 'updated',
diff: Partial<T> = {},
): Promise<void> {
if (!dbWatchersDisabled) {
return;
}

const item = await IntegrationHistory.findOneById(id);

if (!item) {
return;
}

void api.broadcast('watch.integrationHistory', { clientAction, id: item._id, data: item, diff });
}
2 changes: 1 addition & 1 deletion apps/meteor/server/database/watchCollections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export function getWatchCollections(): string[] {
LivechatInquiry.getCollectionName(),
LivechatDepartmentAgents.getCollectionName(),
InstanceStatus.getCollectionName(),
IntegrationHistory.getCollectionName(),
EmailInbox.getCollectionName(),
Settings.getCollectionName(),
Subscriptions.getCollectionName(),
Expand All @@ -50,6 +49,7 @@ export function getWatchCollections(): string[] {
collections.push(Permissions.getCollectionName());
collections.push(LivechatPriority.getCollectionName());
collections.push(LoginServiceConfiguration.getCollectionName());
collections.push(IntegrationHistory.getCollectionName());
}

if (onlyCollections.length > 0) {
Expand Down
15 changes: 14 additions & 1 deletion apps/meteor/server/models/raw/IntegrationHistory.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { IIntegrationHistory } from '@rocket.chat/core-typings';
import type { IIntegrationHistoryModel } from '@rocket.chat/model-typings';
import type { Db, IndexDescription } from 'mongodb';
import type { Db, IndexDescription, InsertOneResult, FindOneAndUpdateOptions } from 'mongodb';

import { BaseRaw } from './BaseRaw';

Expand All @@ -23,4 +23,17 @@ export class IntegrationHistoryRaw extends BaseRaw<IIntegrationHistory> implemen
findOneByIntegrationIdAndHistoryId(integrationId: string, historyId: string): Promise<IIntegrationHistory | null> {
return this.findOne({ 'integration._id': integrationId, '_id': historyId });
}

async create(integrationHistory: IIntegrationHistory): Promise<InsertOneResult<IIntegrationHistory>> {
return this.insertOne({ ...integrationHistory, _createdAt: new Date() });
}

async updateById(
_id: IIntegrationHistory['_id'],
data: Partial<IIntegrationHistory>,
options?: FindOneAndUpdateOptions,
): Promise<IIntegrationHistory | null> {
const response = await this.findOneAndUpdate({ _id }, { $set: data }, { returnDocument: 'after', ...options });
return response.value;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import type { IIntegrationHistory } from '@rocket.chat/core-typings';
import type { FindOneAndUpdateOptions, InsertOneResult } from 'mongodb';

import type { IBaseModel } from './IBaseModel';

export interface IIntegrationHistoryModel extends IBaseModel<IIntegrationHistory> {
removeByIntegrationId(integrationId: string): ReturnType<IBaseModel<IIntegrationHistory>['deleteMany']>;

findOneByIntegrationIdAndHistoryId(integrationId: string, historyId: string): Promise<IIntegrationHistory | null>;
create(integrationHistory: IIntegrationHistory): Promise<InsertOneResult<IIntegrationHistory>>;
updateById(
_id: IIntegrationHistory['_id'],
data: Partial<IIntegrationHistory>,
options?: FindOneAndUpdateOptions,
): Promise<IIntegrationHistory | null>;
}
Loading