Skip to content

Commit

Permalink
Added concurrency flag to apply/deploy/destroy commands (#193)
Browse files Browse the repository at this point in the history
* Add concurrency flag to commands that run apply, support currently running multiple apply nodes within the datacenter apply function

* Update destroy environment function to pass in concurrency option

* Changed default concurrency to 10

---------

Co-authored-by: David Thor <[email protected]>
  • Loading branch information
TylerAldrich and davidthor authored Dec 1, 2023
1 parent 824feea commit 325da65
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 35 deletions.
4 changes: 3 additions & 1 deletion src/commands/apply/datacenter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type ApplyDatacenterOptions = {
verbose: boolean;
autoApprove: boolean;
var?: string[];
concurrency: number;
} & GlobalOptions;

const ApplyDatacenterCommand = BaseCommand()
Expand All @@ -22,6 +23,7 @@ const ApplyDatacenterCommand = BaseCommand()
'Provide value for a datacenter variable - e.g. --var account=my-account-123 sets the `account` variable',
{ collect: true },
)
.option('-c, --concurrency <concurrency:number>', 'Maximum number of nodes to apply concurrently', { default: 1 })
.arguments('<name:string> <config_path:string>')
.action(apply_datacenter_action);

Expand Down Expand Up @@ -97,7 +99,7 @@ async function apply_datacenter_action(options: ApplyDatacenterOptions, name: st
command_helper.infraRenderer.renderGraph(infraGraph);
}

command_helper.datacenterUtils.applyDatacenter(name, datacenter, infraGraph, logger)
command_helper.datacenterUtils.applyDatacenter(name, datacenter, infraGraph, logger, options.concurrency)
.then(async () => {
if (interval) {
clearInterval(interval);
Expand Down
5 changes: 3 additions & 2 deletions src/commands/common/datacenter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ export class DatacenterUtils {
name: string,
datacenter: Datacenter,
graph: InfraGraph,
logger: winston.Logger | undefined,
logger?: winston.Logger,
concurrency?: number,
): Promise<void> {
return new Promise((resolve) => {
return graph
.apply({ logger: logger })
.apply({ logger, concurrency })
.subscribe({
complete: async () => {
await this.saveDatacenter(name, datacenter, graph);
Expand Down
3 changes: 2 additions & 1 deletion src/commands/common/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ export class EnvironmentUtils {
graph: InfraGraph,
options?: {
logger?: winston.Logger;
concurrency?: number;
},
): Promise<boolean> {
return graph
.apply({ logger: options?.logger })
.apply({ logger: options?.logger, concurrency: options?.concurrency })
.toPromise()
.then(async () => {
await this.saveEnvironment(
Expand Down
4 changes: 3 additions & 1 deletion src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type DeployOptions = {
debug: boolean;
autoApprove: boolean;
refresh: boolean;
concurrency: number;
} & GlobalOptions;

const DeployCommand = BaseCommand()
Expand All @@ -32,6 +33,7 @@ const DeployCommand = BaseCommand()
.option('--var, --variable <variables:string>', 'Variables to pass to the component', { collect: true })
.option('-r, --refresh [refresh:boolean]', 'Force update all resources', { default: false })
.option('--auto-approve [autoApprove:boolean]', 'Skip all prompts and start the requested action', { default: false })
.option('-c, --concurrency <concurrency:number>', 'Maximum number of nodes to apply concurrently', { default: 10 })
.action(deploy_action);

async function deploy_action(options: DeployOptions, tag_or_path: string): Promise<void> {
Expand Down Expand Up @@ -126,7 +128,7 @@ async function deploy_action(options: DeployOptions, tag_or_path: string): Promi
}

await pipeline
.apply({ logger })
.apply({ logger, concurrency: options.concurrency })
.toPromise()
.then(async () => {
await command_helper.environmentUtils.saveEnvironment(
Expand Down
4 changes: 4 additions & 0 deletions src/commands/destroy/datacenter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type DestroyDatacenterOptions = {
verbose: boolean;
autoApprove: boolean;
force: boolean;
concurrency: number;
} & GlobalOptions;

const DestroyDatacenterCommand = BaseCommand()
Expand All @@ -23,6 +24,7 @@ const DestroyDatacenterCommand = BaseCommand()
'Destroy the datacenter store record, even if destruction of the datacenter fails',
{ default: false },
)
.option('-c, --concurrency <concurrency:number>', 'Maximum number of nodes to apply concurrently', { default: 1 })
.arguments('[name:string]')
.action(destroy_datacenter_action);

Expand Down Expand Up @@ -75,6 +77,7 @@ async function destroy_datacenter_action(options: DestroyDatacenterOptions, name
await destroyEnvironment({
verbose: options.verbose,
autoApprove: true,
concurrency: options.concurrency,
}, env.name);
}
} else {
Expand All @@ -100,6 +103,7 @@ async function destroy_datacenter_action(options: DestroyDatacenterOptions, name
return graph
.apply({
logger: logger,
concurrency: options.concurrency,
})
.toPromise()
.then(async () => {
Expand Down
3 changes: 3 additions & 0 deletions src/commands/destroy/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Inputs } from '../common/inputs.ts';
type DestroyResourceOptons = {
verbose: boolean;
autoApprove: boolean;
concurrency: number;
} & GlobalOptions;

export const destroyEnvironment = async (options: DestroyResourceOptons, name: string) => {
Expand Down Expand Up @@ -70,6 +71,7 @@ export const destroyEnvironment = async (options: DestroyResourceOptons, name: s
return graph
.apply({
logger: logger,
concurrency: options.concurrency,
})
.toPromise()
.then(async () => {
Expand Down Expand Up @@ -126,5 +128,6 @@ export default BaseCommand()
.description('Destroy all the resources in the specified environment')
.option('-v, --verbose [verbose:boolean]', 'Turn on verbose logs', { default: false })
.option('--auto-approve [autoApprove:boolean]', 'Skip all prompts and start the requested action', { default: false })
.option('-c, --concurrency <concurrency:number>', 'Maximum number of nodes to apply concurrently', { default: 1 })
.arguments('<name:string>')
.action(destroyEnvironment);
71 changes: 42 additions & 29 deletions src/graphs/infra/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,42 +252,55 @@ export class InfraGraph extends Graph<InfraGraphNode> {

return new Observable((subscriber) => {
(async () => {
const concurrencyMax = options.concurrency || 10;
let nodeQueue: InfraGraphNode[];
const applyJobs: Record<string, Promise<string>> = {};

while ((nodeQueue = this.getQueue()).length > 0) {
const maxJobs = nodeQueue.length < concurrencyMax ? nodeQueue.length : concurrencyMax;

for (const node of nodeQueue) {
if (node.inputs) {
try {
if (node.action !== 'delete') {
this.resolveInputFunctionsAndRefs(node);
// Nodes still applying shouldn't be added to the queue again.
// This can only happen when concurrency > 1.
const queuedJobs = Object.keys(applyJobs).length;
if (!(node.getId() in applyJobs) && queuedJobs < maxJobs) {
if (node.inputs) {
try {
if (node.action !== 'delete') {
this.resolveInputFunctionsAndRefs(node);
}
} catch (err: any) {
node.status.state = 'error';
node.status.message = err.message;
subscriber.error(err.message);
throw err;
}
} catch (err: any) {
node.status.state = 'error';
node.status.message = err.message;
subscriber.error(err.message);
throw err;
}

applyJobs[node.getId()] = new Promise<string>((resolve, reject) => {
node
.apply({
...options,
cwd,
})
.subscribe({
error: (err: any) => {
reject(err);
},
complete: () => {
resolve(node.getId());
},
});
});
}

await new Promise<void>((resolve, reject) => {
node
.apply({
...options,
cwd,
})
.subscribe({
// TODO: Is this needed? No longer modifying plan while it's running
// next: (res) => {
// this.insertSteps(res);
// },
error: (err: any) => {
reject(err);
},
complete: () => {
resolve();
},
});
});
subscriber.next(this);
// Once the queue is full, wait for any promise to finish before continuing to loop
if (queuedJobs >= maxJobs) {
const finishedNodeId = await Promise.any(Object.values(applyJobs));
delete applyJobs[finishedNodeId];
subscriber.next(this);
break;
}
}
}
})().then(() => {
Expand Down
2 changes: 1 addition & 1 deletion src/graphs/infra/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class InfraGraphNode<P extends Plugin = Plugin> extends GraphNode<Record<

public apply(options?: { cwd?: string; logger?: Logger }): Observable<InfraGraphNode<P>> {
if (this.status.state !== 'pending') {
throw new Error(`Cannot apply node in state, ${this.status.state}`);
throw new Error(`Cannot apply node ${this.getId()} in state: ${this.status.state}`);
}

return new Observable((subscriber) => {
Expand Down
1 change: 1 addition & 0 deletions src/graphs/infra/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export type PlanOptions = {
export type ApplyOptions = {
cwd?: string;
logger?: Logger;
concurrency?: number;
};

0 comments on commit 325da65

Please sign in to comment.