Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move broadcastMessageFromData to notifyListener #32843

Merged
Merged
4 changes: 2 additions & 2 deletions apps/meteor/app/autotranslate/server/autotranslate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import _ from 'underscore';

import { callbacks } from '../../../lib/callbacks';
import { isTruthy } from '../../../lib/isTruthy';
import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { notifyOnMessageChange } from '../../lib/server/lib/notifyListener';
import { Markdown } from '../../markdown/server';
import { settings } from '../../settings/server';

Expand Down Expand Up @@ -332,7 +332,7 @@ export abstract class AutoTranslate {
}

private notifyTranslatedMessage(messageId: string): void {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: messageId,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import type { IRoom } from '@rocket.chat/core-typings';
import { Messages, Rooms, VideoConference } from '@rocket.chat/models';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { deleteRoom } from '../../../lib/server/functions/deleteRoom';
import { notifyOnMessageChange } from '../../../lib/server/lib/notifyListener';

const updateAndNotifyParentRoomWithParentMessage = async (room: IRoom): Promise<void> => {
const { value: parentMessage } = await Messages.refreshDiscussionMetadata(room);
if (!parentMessage) {
return;
}
void broadcastMessageFromData({
void notifyOnMessageChange({
id: parentMessage._id,
data: parentMessage,
});
Expand Down
11 changes: 5 additions & 6 deletions apps/meteor/app/federation/server/endpoints/dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import { eventTypes } from '@rocket.chat/core-typings';
import { FederationServers, FederationRoomEvents, Rooms, Messages, Subscriptions, Users, ReadReceipts } from '@rocket.chat/models';
import EJSON from 'ejson';

import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { API } from '../../../api/server';
import { FileUpload } from '../../../file-upload/server';
import { deleteRoom } from '../../../lib/server/functions/deleteRoom';
import { notifyOnRoomChanged, notifyOnRoomChangedById } from '../../../lib/server/lib/notifyListener';
import { notifyOnMessageChange, notifyOnRoomChanged, notifyOnRoomChangedById } from '../../../lib/server/lib/notifyListener';
import { notifyUsersOnMessage } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendAllNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { processThreads } from '../../../threads/server/hooks/aftersavemessage';
Expand Down Expand Up @@ -303,7 +302,7 @@ const eventHandlers = {
}
}
if (messageForNotification) {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: messageForNotification._id,
data: messageForNotification,
});
Expand Down Expand Up @@ -334,7 +333,7 @@ const eventHandlers = {
} else {
// Update the message
await Messages.updateOne({ _id: persistedMessage._id }, { $set: { msg: message.msg, federation: message.federation } });
void broadcastMessageFromData({
void notifyOnMessageChange({
id: persistedMessage._id,
data: {
...persistedMessage,
Expand Down Expand Up @@ -404,7 +403,7 @@ const eventHandlers = {

// Update the property
await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } });
void broadcastMessageFromData({
void notifyOnMessageChange({
id: persistedMessage._id,
data: {
...persistedMessage,
Expand Down Expand Up @@ -462,7 +461,7 @@ const eventHandlers = {
// Otherwise, update the property
await Messages.updateOne({ _id: messageId }, { $set: { [`reactions.${reaction}`]: reactionObj } });
}
void broadcastMessageFromData({
void notifyOnMessageChange({
id: persistedMessage._id,
data: {
...persistedMessage,
Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/deleteMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import { Messages, Rooms, Uploads, Users, ReadReceipts } from '@rocket.chat/mode
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { canDeleteMessageAsync } from '../../../authorization/server/functions/canDeleteMessage';
import { FileUpload } from '../../../file-upload/server';
import { settings } from '../../../settings/server';
import { notifyOnRoomChangedById } from '../lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../lib/notifyListener';

export const deleteMessageValidatingPermission = async (message: AtLeast<IMessage, '_id'>, userId: IUser['_id']): Promise<void> => {
if (!message?._id) {
Expand Down Expand Up @@ -93,7 +92,7 @@ export async function deleteMessage(message: IMessage, user: IUser): Promise<voi
void notifyOnRoomChangedById(message.rid);

if (keepHistory || showDeletedStatus) {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});
}
Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import { Match, check } from 'meteor/check';
import { callbacks } from '../../../../lib/callbacks';
import { isRelativeURL } from '../../../../lib/utils/isRelativeURL';
import { isURL } from '../../../../lib/utils/isURL';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { hasPermissionAsync } from '../../../authorization/server/functions/hasPermission';
import { FileUpload } from '../../../file-upload/server';
import { settings } from '../../../settings/server';
import { notifyOnRoomChangedById } from '../lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../lib/notifyListener';
import { validateCustomMessageFields } from '../lib/validateCustomMessageFields';
import { parseUrlsInMessage } from './parseUrlsInMessage';

Expand Down Expand Up @@ -292,7 +291,7 @@ export const sendMessage = async function (user: any, message: any, room: any, u

await callbacks.run('afterSaveMessage', message, room);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/lib/server/functions/updateMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import { Messages, Rooms } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { settings } from '../../../settings/server';
import { notifyOnRoomChangedById } from '../lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../lib/notifyListener';
import { validateCustomMessageFields } from '../lib/validateCustomMessageFields';
import { parseUrlsInMessage } from './parseUrlsInMessage';

Expand Down Expand Up @@ -102,7 +101,7 @@ export const updateMessage = async function (
// so we wait for it to run before broadcasting
const data = await callbacks.run('afterSaveMessage', msg, room, user._id);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: msg._id,
data: data as any, // TODO move "afterSaveMessage" type definition to specify a return value
});
Expand Down
69 changes: 69 additions & 0 deletions apps/meteor/app/lib/server/lib/notifyListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
AtLeast,
ISettingColor,
IUser,
IMessage,
SettingValue,
MessageTypesValues,
} from '@rocket.chat/core-typings';
import {
Rooms,
Expand All @@ -30,7 +33,11 @@
LivechatInquiry,
LivechatDepartmentAgents,
Users,
Messages,
} from '@rocket.chat/models';
import mem from 'mem';

import { shouldHideSystemMessage } from '../../../../server/lib/systemMessage/hideSystemMessage';

type ClientAction = 'inserted' | 'updated' | 'removed';

Expand Down Expand Up @@ -366,11 +373,11 @@
}

if (clientAction === 'inserted') {
void api.broadcast('watch.users', { clientAction, id, data: data! });

Check warning on line 376 in apps/meteor/app/lib/server/lib/notifyListener.ts

View workflow job for this annotation

GitHub Actions / 🔎 Code Check / Code Lint

Forbidden non-null assertion
return;
}

void api.broadcast('watch.users', { clientAction, diff: diff!, unset: unset || {}, id });

Check warning on line 380 in apps/meteor/app/lib/server/lib/notifyListener.ts

View workflow job for this annotation

GitHub Actions / 🔎 Code Check / Code Lint

Forbidden non-null assertion
});

/**
Expand Down Expand Up @@ -401,3 +408,65 @@
void notifyOnUserChange({ id, clientAction, data: user });
},
);

const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);

const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });

export async function getMessageToBroadcast({ id, data }: { id: IMessage['_id']; data?: IMessage }): Promise<IMessage | void> {
const message = data ?? (await Messages.findOneById(id));
if (!message) {
return;
}

if (message.t) {
const hiddenSystemMessages = (await getSettingCached('Hide_System_Messages')) as MessageTypesValues[];
const shouldHide = shouldHideSystemMessage(message.t, hiddenSystemMessages);

if (shouldHide) {
return;
}
}

if (message._hidden || message.imported != null) {
return;
}

const useRealName = (await getSettingCached('UI_Use_Real_Name')) === true;
if (useRealName) {
if (message.u?._id) {
const name = await getUserNameCached(message.u._id);
if (name) {
message.u.name = name;
}
}

if (message.mentions?.length) {
for await (const mention of message.mentions) {
const name = await getUserNameCached(mention._id);
if (name) {
mention.name = name;
}
}
}
}

return message;
}

export const notifyOnMessageChange = withDbWatcherCheck(async ({ id, data }: { id: IMessage['_id']; data?: IMessage }): Promise<void> => {
if (!dbWatchersDisabled) {
return;
}
const message = await getMessageToBroadcast({ id, data });
if (!message) {
return;
}
void api.broadcast('watch.messages', { message });
});
5 changes: 2 additions & 3 deletions apps/meteor/app/message-pin/server/pinMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ import { check } from 'meteor/check';
import { Meteor } from 'meteor/meteor';

import { isTruthy } from '../../../lib/isTruthy';
import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { notifyOnRoomChangedById } from '../../lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../lib/server/lib/notifyListener';
import { settings } from '../../settings/server';
import { getUserAvatarURL } from '../../utils/server/getUserAvatarURL';

Expand Down Expand Up @@ -227,7 +226,7 @@ Meteor.methods<ServerMethods>({
if (settings.get('Message_Read_Receipt_Store_Users')) {
await ReadReceipts.setPinnedByMessageId(originalMessage._id, originalMessage.pinned);
}
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/message-star/server/starMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import { Messages, Subscriptions, Rooms } from '@rocket.chat/models';
import type { ServerMethods } from '@rocket.chat/ui-contexts';
import { Meteor } from 'meteor/meteor';

import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync, roomAccessAttributes } from '../../authorization/server';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { notifyOnRoomChangedById } from '../../lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../lib/server/lib/notifyListener';
import { settings } from '../../settings/server';

declare module '@rocket.chat/ui-contexts' {
Expand Down Expand Up @@ -63,7 +62,7 @@ Meteor.methods<ServerMethods>({

await Messages.updateUserStarById(message._id, uid, message.starred);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/app/reactions/server/setReaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import _ from 'underscore';

import { callbacks } from '../../../lib/callbacks';
import { i18n } from '../../../server/lib/i18n';
import { broadcastMessageFromData } from '../../../server/modules/watchers/lib/messages';
import { canAccessRoomAsync } from '../../authorization/server';
import { hasPermissionAsync } from '../../authorization/server/functions/hasPermission';
import { emoji } from '../../emoji/server';
import { isTheLastMessage } from '../../lib/server/functions/isTheLastMessage';
import { notifyOnRoomChangedById } from '../../lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../lib/server/lib/notifyListener';

const removeUserReaction = (message: IMessage, reaction: string, username: string) => {
if (!message.reactions) {
Expand Down Expand Up @@ -111,7 +110,7 @@ async function setReaction(room: IRoom, user: IUser, message: IMessage, reaction

await Apps.self?.triggerEvent(AppEvents.IPostMessageReacted, message, user, reaction, isReacted);

void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});
}
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/app/threads/server/hooks/aftersavemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Messages } from '@rocket.chat/models';
import { Meteor } from 'meteor/meteor';

import { callbacks } from '../../../../lib/callbacks';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';
import { notifyOnMessageChange } from '../../../lib/server/lib/notifyListener';
import { updateThreadUsersSubscriptions, getMentions } from '../../../lib/server/lib/notifyUsersOnMessage';
import { sendMessageNotifications } from '../../../lib/server/lib/sendNotificationsOnMessage';
import { settings } from '../../../settings/server';
Expand Down Expand Up @@ -62,7 +62,7 @@ export async function processThreads(message: IMessage, room: IRoom) {
await notifyUsersOnReply(message, replies);
await metaData(message, parentMessage, replies);
await notification(message, room, replies);
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message.tmid,
});

Expand Down
5 changes: 2 additions & 3 deletions apps/meteor/ee/server/lib/message-read-receipt/ReadReceipt.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ import { api } from '@rocket.chat/core-services';
import { LivechatVisitors, ReadReceipts, Messages, Rooms, Subscriptions, Users } from '@rocket.chat/models';
import { Random } from '@rocket.chat/random';

import { notifyOnRoomChangedById } from '../../../../app/lib/server/lib/notifyListener';
import { notifyOnRoomChangedById, notifyOnMessageChange } from '../../../../app/lib/server/lib/notifyListener';
import { settings } from '../../../../app/settings/server';
import { SystemLogger } from '../../../../server/lib/logger/system';
import { roomCoordinator } from '../../../../server/lib/rooms/roomCoordinator';
import { broadcastMessageFromData } from '../../../../server/modules/watchers/lib/messages';

// debounced function by roomId, so multiple calls within 2 seconds to same roomId runs only once
const list = {};
Expand Down Expand Up @@ -70,7 +69,7 @@ export const ReadReceipt = {
if (isUserAlone) {
const result = await Messages.setAsReadById(message._id);
if (result.modifiedCount > 0) {
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import type { ParsedMail, Attachment } from 'mailparser';
import stripHtml from 'string-strip-html';

import { FileUpload } from '../../../app/file-upload/server';
import { notifyOnMessageChange } from '../../../app/lib/server/lib/notifyListener';
import { Livechat as LivechatTyped } from '../../../app/livechat/server/lib/LivechatTyped';
import { QueueManager } from '../../../app/livechat/server/lib/QueueManager';
import { settings } from '../../../app/settings/server';
import { i18n } from '../../lib/i18n';
import { broadcastMessageFromData } from '../../modules/watchers/lib/messages';
import { logger } from './logger';

type FileAttachment = VideoAttachmentProps & ImageAttachmentProps & AudioAttachmentProps;
Expand Down Expand Up @@ -234,7 +234,7 @@ export async function onEmailReceived(email: ParsedMail, inbox: string, departme
},
);
room && (await LivechatRooms.updateEmailThreadByRoomId(room._id, thread));
void broadcastMessageFromData({
void notifyOnMessageChange({
id: msgId,
});
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import type Mail from 'nodemailer/lib/mailer';

import { FileUpload } from '../../../app/file-upload/server';
import { sendMessage } from '../../../app/lib/server/functions/sendMessage';
import { notifyOnMessageChange } from '../../../app/lib/server/lib/notifyListener';
import { settings } from '../../../app/settings/server';
import { slashCommands } from '../../../app/utils/server/slashCommand';
import { callbacks } from '../../../lib/callbacks';
import { i18n } from '../../lib/i18n';
import { broadcastMessageFromData } from '../../modules/watchers/lib/messages';
import { inboxes } from './EmailInbox';
import type { Inbox } from './EmailInbox';
import { logger } from './logger';
Expand Down Expand Up @@ -171,7 +171,7 @@ slashCommands.add({
},
},
);
void broadcastMessageFromData({
void notifyOnMessageChange({
id: message._id,
});

Expand Down
Loading
Loading