Skip to content

Commit

Permalink
Chore: Add options to debug stdout and rate limiter (#25336)
Browse files Browse the repository at this point in the history
  • Loading branch information
sampaiodiego authored May 2, 2022
1 parent 8e17893 commit fcc18d7
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
22 changes: 19 additions & 3 deletions apps/meteor/app/lib/server/startup/rateLimiter.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,22 @@ import { DDPRateLimiter } from 'meteor/ddp-rate-limiter';
import { RateLimiter } from 'meteor/rate-limit';

import { settings } from '../../../settings/server';
import { metrics } from '../../../metrics';
import { Logger } from '../../../logger';
import { metrics } from '../../../metrics/server';
import { Logger } from '../../../logger/server';

const logger = new Logger('RateLimiter');

const slowDownRate = parseInt(process.env.RATE_LIMITER_SLOWDOWN_RATE);

const rateLimiterConsoleLog = ({ msg, reply, input }) => {
console.warn('DDP RATE LIMIT:', msg);
console.warn(JSON.stringify({ reply, input }, null, 2));
};

const rateLimiterLogger = ({ msg, reply, input }) => logger.info({ msg, reply, input });

const rateLimiterLog = String(process.env.RATE_LIMITER_LOGGER) === 'console' ? rateLimiterConsoleLog : rateLimiterLogger;

// Get initial set of names already registered for rules
const names = new Set(
Object.values(DDPRateLimiter.printRules())
Expand Down Expand Up @@ -85,6 +96,7 @@ RateLimiter.prototype.check = function (input) {
callbackReply.timeToReset = ruleResult.timeToNextReset;
callbackReply.allowed = false;
callbackReply.numInvocationsLeft = 0;
callbackReply.numInvocationsExceeded = numInvocations - rule.options.numRequestsAllowed;
rule._executeCallback(callbackReply, input);
// ==== END OVERRIDE ====
} else {
Expand Down Expand Up @@ -112,7 +124,7 @@ const ruleIds = {};

const callback = (msg, name) => (reply, input) => {
if (reply.allowed === false) {
logger.info({ msg, reply, input });
rateLimiterLog({ msg, reply, input });
metrics.ddpRateLimitExceeded.inc({
limit_name: name,
user_id: input.userId,
Expand All @@ -121,6 +133,10 @@ const callback = (msg, name) => (reply, input) => {
name: input.name,
connection_id: input.connectionId,
});
// sleep before sending the error to slow down next requests
if (slowDownRate > 0 && reply.numInvocationsExceeded) {
Meteor._sleepForMs(slowDownRate * reply.numInvocationsExceeded);
}
// } else {
// console.log('DDP RATE LIMIT:', message);
// console.log(JSON.stringify({ ...reply, ...input }, null, 2));
Expand Down
4 changes: 3 additions & 1 deletion apps/meteor/server/lib/logger/logQueue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import EventEmitter from 'events';

const { MOLECULER_LOG_LEVEL, STDOUT_VIEWER_DISABLED = 'false' } = process.env;

type LogQueue = {
id: string;
data: string;
Expand Down Expand Up @@ -56,6 +58,6 @@ function queueWrite(...args: any): boolean {
return true;
}

if (String(process.env.MOLECULER_LOG_LEVEL).toLowerCase() !== 'debug') {
if (String(MOLECULER_LOG_LEVEL).toLowerCase() !== 'debug' && STDOUT_VIEWER_DISABLED === 'false') {
process.stdout.write = queueWrite;
}
22 changes: 18 additions & 4 deletions apps/meteor/server/stream/stdout.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { performance } from 'perf_hooks';

import { EJSON } from 'meteor/ejson';
import { Log } from 'meteor/logging';

import notifications from '../../app/notifications/server/lib/Notifications';
import { getQueuedLogs, logEntries } from '../lib/logger/logQueue';

const processString = function (string: string, date: Date): string {
function processString(string: string, date: Date): string {
let obj;
try {
if (string[0] === '{') {
Expand All @@ -20,15 +22,27 @@ const processString = function (string: string, date: Date): string {
} catch (error) {
return string;
}
};
}

const transformLog = function (item: any): { id: string; string: string; ts: Date } {
function rawTransformLog(item: any): { id: string; string: string; ts: Date; time?: number } {
return {
id: item.id,
string: processString(item.data, item.ts),
ts: item.ts,
};
};
}

function timedTransformLog(log: any): { id: string; string: string; ts: Date; time?: number } {
const timeStart = performance.now();
const item = rawTransformLog(log);
const timeEnd = performance.now();

item.time = timeEnd - timeStart;

return item;
}

const transformLog = process.env.STDOUT_METRICS === 'true' ? timedTransformLog : rawTransformLog;

logEntries.on('log', (item) => {
// TODO having this as 'emitWithoutBroadcast' will not sent this data to ddp-streamer, so this data
Expand Down

0 comments on commit fcc18d7

Please sign in to comment.