Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api/jobs/update-usage: Process users in parallel and allow cancellation #2203

Merged
merged 3 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions packages/api/src/controllers/stripe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Router } from "express";
import sql from "sql-template-strings";
import Stripe from "stripe";
import { products } from "../config";
import logger from "../logger";
import { authorizer } from "../middleware";
import { CliArgs } from "../parse-cli";
import { User } from "../schema/types";
Expand All @@ -25,33 +26,46 @@ export const reportUsage = async (
config: CliArgs,
adminToken: string
) => {
let payAsYouGoUsers = await getPayAsYouGoUsers(config.ingest, adminToken);
const payAsYouGoUsers = await getPayAsYouGoUsers(config.ingest, adminToken);

let updatedUsers = [];
for (const user of payAsYouGoUsers) {
try {
let userUpdated = await reportUsageForUser(
stripe,
config,
user,
adminToken
);
updatedUsers.push(userUpdated);
} catch (e) {
console.log(`
Failed to create usage record for user=${user.id} with error=${e.message}
`);
updatedUsers.push({
id: user.id,
usageReported: false,
error: e.message,
});
const updatedUsers = [];
const pendingUsers = payAsYouGoUsers.slice();

const processUser = async () => {
while (true) {
const user = pendingUsers.pop();
if (!user) {
return;
}

try {
const userUpdated = await reportUsageForUser(
stripe,
config,
user,
adminToken
);
updatedUsers.push(userUpdated);
} catch (e) {
logger.error(
`Failed to create usage record for user=${user.id} with error=${e.message}`
);
updatedUsers.push({
id: user.id,
usageReported: false,
error: e.message,
});
}
}
};

const workers = [];
for (let i = 0; i < config.updateUsageConcurrency; i++) {
workers.push(processUser());
}
await Promise.all(workers);

return {
updatedUsers: updatedUsers,
};
return { updatedUsers };
};

async function getPayAsYouGoUsers(ingests: Ingest[], adminToken: string) {
Expand Down Expand Up @@ -136,7 +150,7 @@ async function reportUsageForUser(
},
{} as Record<string, string>
);
console.log(`
logger.info(`
usage: reporting usage to stripe for user=${user.id} email=${user.email} from=${billingCycleStart} to=${billingCycleEnd}
`);
await sendUsageRecordToStripe(
Expand Down Expand Up @@ -281,7 +295,7 @@ app.post("/webhook", async (req, res) => {

const user = users[0];

console.log(`
logger.info(`
invoice=${invoice.id} payment failed for user=${user.id} notifying support team
`);

Expand All @@ -294,7 +308,7 @@ app.post("/webhook", async (req, res) => {
let diff = now - lastNotification;
let days = diff / (1000 * 60 * 60 * 24);
if (days < 7) {
console.log(`
logger.warn(`
Not sending email for payment failure of user=${user.id} because team was notified less than 7 days ago
`);
return res.sendStatus(200);
Expand Down Expand Up @@ -365,14 +379,14 @@ app.post("/webhook", async (req, res) => {
unsubscribe: "",
text: `
Your Livepeer Studio account has been disabled due to a failed payment.

Please update your payment method to reactivate your account.
`,
});
}
}
} catch (e) {
console.log(`
logger.error(`
Failed to send email for payment failure of user=${user.id} with error=${e.message}
`);
}
Expand Down Expand Up @@ -478,7 +492,7 @@ app.post(
user.stripeCustomerSubscriptionId
);
} catch (e) {
console.log(`
logger.error(`
error- subscription not found for user=${user.id} email=${user.email} subscriptionId=${user.stripeCustomerSubscriptionId}
`);
await db.user.update(user.id, {
Expand Down
14 changes: 11 additions & 3 deletions packages/api/src/jobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,17 @@ export async function runJob(jobName: JobType, config: CliArgs): Promise<void> {

logger.info(`Starting job. job=${jobName}`);
try {
const result = await timeout(config.jobTimeoutSec * 1000, () =>
jobFuncs[jobName](config)
);
const sigterm = new Promise<never>((_, reject) => {
process.on("SIGTERM", () => {
logger.warn("SIGTERM received, terminating immediately...");
reject(new Error("Job was terminated by SIGTERM"));
});
});
const job = timeout(config.jobTimeoutSec * 1000, () => {
return jobFuncs[jobName](config);
});

const result = await Promise.race([job, sigterm]);

const elapsedTime = process.hrtime(startTime);
const elapsedTimeSec = elapsedTime[0] + elapsedTime[1] / 1e9;
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/parse-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,12 @@ export default function parseCli(argv?: string | readonly string[]) {
"job/update-usage: Admin API token to be used in the update usage job internal calls",
type: "string",
},
"update-usage-concurrency": {
describe:
"job/update-usage: number of concurrent workers to run for updating users usage",
type: "number",
default: 10,
},
"stream-info-service": {
describe: "start the Stream Info service instead of Studio API",
type: "boolean",
Expand Down
Loading