Skip to content

Commit

Permalink
refactor: move broadcastMessageFromData to notifyListener (#32843)
Browse files Browse the repository at this point in the history
  • Loading branch information
ricardogarim authored Jul 26, 2024
1 parent 7442ffc commit cb50ac8
Show file tree
Hide file tree
Showing 20 changed files with 158 additions and 129 deletions.
4 changes: 2 additions & 2 deletions apps/meteor/app/autotranslate/server/autotranslate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import _ from 'underscore';

import { callbacks } from '../../../lib/callbacks';
import { isTruthy } from '../../../lib/isTruthy';
import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { notifyOnMessageChange } from '../../lib/server/lib/notifyListener';
import { Markdown } from '../../markdown/server';
import { settings } from '../../settings/server';

Expand Down Expand Up @@ -332,7 +332,7 @@ export abstract class AutoTranslate {
}

private notifyTranslatedMessage(messageId: string): void {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: messageId,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import type { IRoom } from '@rocket.chat/core-typings';
import { Messages, Rooms, VideoConference } from '@rocket.chat/models';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { deleteRoom } from '../../../lib/server/functions/deleteRoom';
import { notifyOnMessageChange } from '../../../lib/server/lib/notifyListener';

const updateAndNotifyParentRoomWithParentMessage = async (room: IRoom): Promise<void> => {
const { value: parentMessage } = await Messages.refreshDiscussionMetadata(room);
if (!parentMessage) {
return;
}
void broadcastMessageFromData({
void notifyOnMessageChange({
id: parentMessage._id,
data: parentMessage,
});
Expand Down
11 changes: 5 additions & 6 deletions apps/meteor/app/federation/server/endpoints/dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import { eventTypes } from '@rocket.chat/core-typings';
import { FederationServers, FederationRoomEvents, Rooms, Messages, Subscriptions, Users, ReadReceipts } from '@rocket.chat/models';
import EJSON from 'ejson';

import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { API } from '../../../api/server';
import { FileUpload } from '../../../file-upload/server';
import { deleteRoom } from '../../../lib/server/functions/deleteRoom';
import { notifyOnRoomChanged, notifyOnRoomChangedById } from '../../../lib/server/lib/notifyListener';
import { notifyOnMessageChange, notifyOnRoomChanged, notifyOnRoomChangedById } from '../../../lib/server/lib/notifyListener';
import { notifyUsersOnMessage } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendAllNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { processThreads } from '../../../threads/server/hooks/aftersavemessage';
Expand Down Expand Up @@ -303,7 +302,7 @@ const eventHandlers = {
}
}
if (messageForNotification) {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: messageForNotification._id,
data: messageForNotification,
});
Expand Down Expand Up @@ -334,7 +333,7 @@ const eventHandlers = {
} else {
// Update the message
await Messages.updateOne({ _id: persistedMessage._id }, { $set: { msg: message.msg, federation: message.federation } });
void broadcastMessageFromData({
void notifyOnMessageChange({
id: persistedMessage._id,
data: {
...persistedMessage,
Expand Down Expand Up @@ -404,7 +403,7 @@ const eventHandlers = {

// Update the property
await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } });
void broadcastMessageFromData({
void notifyOnMessageChange({
id: persistedMessage._id,
data: {
...persistedMessage,
Expand Down Expand Up @@ -462,7 +461,7 @@ const eventHandlers = {
// Otherwise, update the property
await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } });
}
void broadcastMessageFromData({
void notifyOnMessageChange({
id: persistedMessage._id,
data: {
...persistedMessage,
Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/deleteMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import { Messages, Rooms, Uploads, Users, ReadReceipts } from '@rocket.chat/mode
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { canDeleteMessageAsync } from '../../../authorization/server/functions/canDeleteMessage';
import { FileUpload } from '../../../file-upload/server';
import { settings } from '../../../settings/server';
import { notifyOnRoomChangedById } from '../lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../lib/notifyListener';

export const deleteMessageValidatingPermission = async (message: AtLeast<IMessage, '_id'>, userId: IUser['_id']): Promise<void> => {
if (!message?._id) {
Expand Down Expand Up @@ -93,7 +92,7 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise<voi
void notifyOnRoomChangedById(message.rid);

if (keepHistory || showDeletedStatus) {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});
}
Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import { Match, check } from 'meteor/check';
import { callbacks } from '../../../../lib/callbacks';
import { isRelativeURL } from '../../../../lib/utils/isRelativeURL';
import { isURL } from '../../../../lib/utils/isURL';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission';
import { FileUpload } from '../../../file-upload/server';
import { settings } from '../../../settings/server';
import { notifyOnRoomChangedById } from '../lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../lib/notifyListener';
import { validateCustomMessageFields } from '../lib/validateCustomMessageFields';
import { parseUrlsInMessage } from './parseUrlsInMessage';

Expand Down Expand Up @@ -292,7 +291,7 @@ export const sendMessage = async function (user: any, message: any, room: any, u

await callbacks.run('afterSaveMessage', message, room);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/updateMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import { Messages, Rooms } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { settings } from '../../../settings/server';
import { notifyOnRoomChangedById } from '../lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../lib/notifyListener';
import { validateCustomMessageFields } from '../lib/validateCustomMessageFields';
import { parseUrlsInMessage } from './parseUrlsInMessage';

Expand Down Expand Up @@ -102,7 +101,7 @@ export const updateMessage = async function (
// so we wait for it to run before broadcasting
const data = await callbacks.run('afterSaveMessage', msg, room, user._id);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: msg._id,
data: data as any, // TODO move "afterSaveMessage" type definition to specify a return value
});
Expand Down
69 changes: 69 additions & 0 deletions apps/meteor/app/lib/server/lib/notifyListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import type {
AtLeast,
ISettingColor,
IUser,
IMessage,
SettingValue,
MessageTypesValues,
} from '@rocket.chat/core-typings';
import {
Rooms,
Expand All @@ -30,7 +33,11 @@ import {
LivechatInquiry,
LivechatDepartmentAgents,
Users,
Messages,
} from '@rocket.chat/models';
import mem from 'mem';

import { shouldHideSystemMessage } from '../../../../server/lib/systemMessage/hideSystemMessage';

type ClientAction = 'inserted' | 'updated' | 'removed';

Expand Down Expand Up @@ -401,3 +408,65 @@ export const notifyOnUserChangeById = withDbWatcherCheck(
void notifyOnUserChange({ id, clientAction, data: user });
},
);

const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);

const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });

export async function getMessageToBroadcast({ id, data }: { id: IMessage['_id']; data?: IMessage }): Promise<IMessage | void> {
const message = data ?? (await Messages.findOneById(id));
if (!message) {
return;
}

if (message.t) {
const hiddenSystemMessages = (await getSettingCached('Hide_System_Messages')) as MessageTypesValues[];
const shouldHide = shouldHideSystemMessage(message.t, hiddenSystemMessages);

if (shouldHide) {
return;
}
}

if (message._hidden || message.imported != null) {
return;
}

const useRealName = (await getSettingCached('UI_Use_Real_Name')) === true;
if (useRealName) {
if (message.u?._id) {
const name = await getUserNameCached(message.u._id);
if (name) {
message.u.name = name;
}
}

if (message.mentions?.length) {
for await (const mention of message.mentions) {
const name = await getUserNameCached(mention._id);
if (name) {
mention.name = name;
}
}
}
}

return message;
}

export const notifyOnMessageChange = withDbWatcherCheck(async ({ id, data }: { id: IMessage['_id']; data?: IMessage }): Promise<void> => {
if (!dbWatchersDisabled) {
return;
}
const message = await getMessageToBroadcast({ id, data });
if (!message) {
return;
}
void api.broadcast('watch.messages', { message });
});
5 changes: 2 additions & 3 deletions apps/meteor/app/message-pin/server/pinMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import { check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';

import { isTruthy } from '../../../lib/isTruthy';
import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { notifyOnRoomChangedById } from '../../lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../lib/server/lib/notifyListener';
import { settings } from '../../settings/server';
import { getUserAvatarURL } from '../../utils/server/getUserAvatarURL';

Expand Down Expand Up @@ -227,7 +226,7 @@ Meteor.methods<ServerMethods>({
if (settings.get('Message_Read_Receipt_Store_Users')) {
await ReadReceipts.setPinnedByMessageId(originalMessage._id, originalMessage.pinned);
}
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/message-star/server/starMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import { Messages, Subscriptions, Rooms } from '@rocket.chat/models';
import type { ServerMethods } from '@rocket.chat/ui-contexts';
import { Meteor } from 'meteor/meteor';

import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { notifyOnRoomChangedById } from '../../lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../lib/server/lib/notifyListener';
import { settings } from '../../settings/server';

declare module '@rocket.chat/ui-contexts' {
Expand Down Expand Up @@ -63,7 +62,7 @@ Meteor.methods<ServerMethods>({

await Messages.updateUserStarById(message._id, uid, message.starred);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/reactions/server/setReaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import _ from 'underscore';

import { callbacks } from '../../../lib/callbacks';
import { i18n } from '../../../server/lib/i18n';
import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { emoji } from '../../emoji/server';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { notifyOnRoomChangedById } from '../../lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../lib/server/lib/notifyListener';

const removeUserReaction = (message: IMessage, reaction: string, username: string) => {
if (!message.reactions) {
Expand Down Expand Up @@ -111,7 +110,7 @@ async function setReaction(room: IRoom, user: IUser, message: IMessage, reaction

await Apps.self?.triggerEvent(AppEvents.IPostMessageReacted, message, user, reaction, isReacted);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});
}
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/app/threads/server/hooks/aftersavemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Messages } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { notifyOnMessageChange } from '../../../lib/server/lib/notifyListener';
import { updateThreadUsersSubscriptions, getMentions } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendMessageNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { settings } from '../../../settings/server';
Expand Down Expand Up @@ -62,7 +62,7 @@ export async function processThreads(message: IMessage, room: IRoom) {
await notifyUsersOnReply(message, replies);
await metaData(message, parentMessage, replies);
await notification(message, room, replies);
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message.tmid,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/ee/server/lib/message-read-receipt/ReadReceipt.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ import { api } from '@rocket.chat/core-services';
import { LivechatVisitors, ReadReceipts, Messages, Rooms, Subscriptions, Users } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';

import { notifyOnRoomChangedById } from '../../../../app/lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../../../app/lib/server/lib/notifyListener';
import { settings } from '../../../../app/settings/server';
import { SystemLogger } from '../../../../server/lib/logger/system';
import { roomCoordinator } from '../../../../server/lib/rooms/roomCoordinator';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';

// debounced function by roomId, so multiple calls within 2 seconds to same roomId runs only once
const list = {};
Expand Down Expand Up @@ -70,7 +69,7 @@ export const ReadReceipt = {
if (isUserAlone) {
const result = await Messages.setAsReadById(message._id);
if (result.modifiedCount > 0) {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});
}
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/server/features/EmailInbox/EmailInbox_Incoming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import type { ParsedMail, Attachment } from 'mailparser';
import stripHtml from 'string-strip-html';

import { FileUpload } from '../../../app/file-upload/server';
import { notifyOnMessageChange } from '../../../app/lib/server/lib/notifyListener';
import { Livechat as LivechatTyped } from '../../../app/livechat/server/lib/LivechatTyped';
import { QueueManager } from '../../../app/livechat/server/lib/QueueManager';
import { settings } from '../../../app/settings/server';
import { i18n } from '../../lib/i18n';
import { broadcastMessageFromData } from '../../modules/watchers/lib/messages';
import { logger } from './logger';

type FileAttachment = VideoAttachmentProps & ImageAttachmentProps & AudioAttachmentProps;
Expand Down Expand Up @@ -234,7 +234,7 @@ export async function onEmailReceived(email: ParsedMail, inbox: string, departme
},
);
room && (await LivechatRooms.updateEmailThreadByRoomId(room._id, thread));
void broadcastMessageFromData({
void notifyOnMessageChange({
id: msgId,
});
})
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/server/features/EmailInbox/EmailInbox_Outgoing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import type Mail from 'nodemailer/lib/mailer';

import { FileUpload } from '../../../app/file-upload/server';
import { sendMessage } from '../../../app/lib/server/functions/sendMessage';
import { notifyOnMessageChange } from '../../../app/lib/server/lib/notifyListener';
import { settings } from '../../../app/settings/server';
import { slashCommands } from '../../../app/utils/server/slashCommand';
import { callbacks } from '../../../lib/callbacks';
import { i18n } from '../../lib/i18n';
import { broadcastMessageFromData } from '../../modules/watchers/lib/messages';
import { inboxes } from './EmailInbox';
import type { Inbox } from './EmailInbox';
import { logger } from './logger';
Expand Down Expand Up @@ -171,7 +171,7 @@ slashCommands.add({
},
},
);
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
Loading

0 comments on commit cb50ac8

Please sign in to comment.