Skip to content

Commit

Permalink
autoscaler interval log (#2009)
Browse files Browse the repository at this point in the history
* autoscaler interval log

* updated gitflow deprecated download-artifact@v2

* added actual task # added

* task executor log message

* decorator for logs

* added algo name + count of algos.

* removed redundant logging

* fixed total calculation

* removed underscore

* total sum

* removed total algorithms logging

* log required cutoff by maxReplicasPerTick

* round trip based secondary scale

* use round trip and not duration.

* wait for both round trip and reqrate

* scaling logic changed

* init value

* updated scale logic

* removed unused

* no need to scale if value remained the same

* changed to array

* updated scaling logic

* logging has been added again

* added scaling to 0

* fixed 0 not being proccessed

* added doc

* corrected if

* refactor

* clean-up un-used codes

* jsdoc added

* unused code

* unused code

* updated tests

* removed redundent conditions

* additional condition

* changed to config value

* removed timeout (added for a check)

* changed scale up condition

* scale up amount changed

* changed scale up condition

* undo last change

* added logging

* logging

* added _ since prop didnt exist

* redundent, being handled in auto-scaler

* changed condition to scale up

* added logging

* added logging

* removed =

* removed unused code

* removed unused code

* avg of round trip (array)

* removed total logging

* fixed bug

* added dynamic max size to fixed-window

* wip

* fix problematic value

* logging

* logging fixed

* fixed error

* fix

* not needed, changed back

* Now not scaling down in case there is queue

* removed for checking

* undo last check

* undo for check

* removed for checking

* corrected config access

* added config parameter for debugging purposes

* unused

* removed unused

* fixed

* fix

* wip

* logging

* fix config logic

* fixed logging

* updated config value

* updated config value

* revert window change update

* queue empty when less then 1 sec

* fixed config debug check

* removed old logging used for checking

* fixed reaching undefined value

* removed (used for debugging)

* changed window size

* not needed

---------

Co-authored-by: Adir111 <[email protected]>
Co-authored-by: Adir David <[email protected]>
  • Loading branch information
3 people authored Nov 11, 2024
1 parent aeffbce commit 44e0c1a
Show file tree
Hide file tree
Showing 7 changed files with 540 additions and 560 deletions.
9 changes: 9 additions & 0 deletions core/task-executor/lib/reconcile/reconciler.js
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,15 @@ const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs,
required: 0
};
}
const _created = reconcileResult[algorithmName].created;
const _skipped = reconcileResult[algorithmName].skipped;
const { paused, resumed, required } = reconcileResult[algorithmName];
const total = _created + _skipped + paused + resumed + required;
if (total !== 0) {
log.info(`CYCLE: task-executor: algo: ${algorithmName} created: ${_created},
skipped: ${_skipped}, paused: ${paused},
resumed: ${resumed}, required: ${required}.`);
}
reconcileResult[algorithmName].active = ws.count;
});
return reconcileResult;
Expand Down
9 changes: 1 addition & 8 deletions core/worker/config/main/config.base.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,10 @@ config.streaming = {
minTimeNonStatsReport: formatters.parseInt(process.env.AUTO_SCALER_NON_STATS_REPORT, 10000),
},
scaleUp: {
replicasExtra: formatters.parseInt(process.env.AUTO_SCALER_EXTRA_REPLICAS, 0.35),
maxScaleUpReplicasPerNode: formatters.parseInt(process.env.AUTO_SCALER_MAX_REPLICAS, 1000),
maxScaleUpReplicasPerTick: formatters.parseInt(process.env.AUTO_SCALER_MAX_REPLICAS_PER_SCALE, 10),
replicasOnFirstScale: formatters.parseInt(process.env.AUTO_SCALER_REPLICAS_FIRST_SCALE, 1),
minTimeToCleanUpQueue: formatters.parseInt(process.env.AUTO_SCALER_MIN_TIME_CLEAN_QUEUE, 30),
},
scaleDown: {
tolerance: formatters.parseInt(process.env.AUTO_SCALER_SCALE_DOWN_TOLERANCE, 0.4),
minTimeIdleBeforeReplicaDown: formatters.parseInt(process.env.AUTO_SCALER_MIN_TIME_WAIT_REPLICA_DOWN, 60000),
minQueueSizeBeforeScaleDown: formatters.parseInt(process.env.AUTO_SCALER_MIN_QUEUE_SIZE_BEFORE_SCALE_DOWN, 0),
minTimeQueueEmptyBeforeScaleDown: formatters.parseInt(process.env.AUTO_SCALER_MIN_TIME_QUEUE_EMPTY, 60000),
minTimeToCleanUpQueue: formatters.parseInt(process.env.AUTO_SCALER_MIN_TIME_CLEAN_QUEUE, 30), // seconds
},
scaleIntervention: {
throttleMs: formatters.parseInt(process.env.SCALE_INTERVENTION_LOG_THROTTLE_TIME, 200)
Expand Down
5 changes: 5 additions & 0 deletions core/worker/lib/streaming/core/metrics.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
const { mean } = require('@hkube/stats');
const Logger = require('@hkube/logger');

const _calcRate = (list) => {
let first = list[0];
if (list.length === 1) {
first = { time: first.time - 2000, count: 0 };
}
const last = list[list.length - 1];

const log = Logger.GetLogFromContainer();
log.info(`STATISTICS: first value: ${first.count}, last value: ${last.count}, time diff: ${last.time - first.time} ms`);

const timeDiff = (last.time - first.time) / 1000;
const countDiff = last.count - first.count;
let rate = 0;
Expand Down
31 changes: 15 additions & 16 deletions core/worker/lib/streaming/core/scaler.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Scaler {
this._status = SCALE_STATUS.IDLE;
this._startInterval();
this._minStatelessCount = minStatelessCount;
this._isQueueEmpty = true;
}

stop() {
Expand All @@ -58,38 +59,37 @@ class Scaler {
return;
}

let pendingUp = false;
this._status = SCALE_STATUS.IDLE;
const unScheduledAlgorithm = await this._getUnScheduledAlgorithm();

if (unScheduledAlgorithm) {
this._status = `${SCALE_STATUS.UNABLE_SCALE} ${unScheduledAlgorithm.message}`;
pendingUp = true;
}
else {
const queue = await this._getQueue();
if (queue) {
this._status = SCALE_STATUS.PENDING_QUEUE;
pendingUp = true;
}
}

const currentSize = this._getCurrentSize();
const shouldScaleUp = pendingUp ? false : this._shouldScaleUp(currentSize);
const shouldScaleDown = this._shouldScaleDown(currentSize);
const shouldScaleUp = this._shouldScaleUp(currentSize);
const shouldScaleDown = this._isQueueEmpty && this._shouldScaleDown(currentSize);

if (shouldScaleUp) {
const required = this._required - currentSize;
const required = this._required - this._desired;
const replicas = Math.min(required, this._maxScaleUpReplicasPerTick);
log.info(`CYCLE: worker shouldScaleUp required: ${required}, replicas: ${replicas}, desired: ${this._desired}, currentSize: ${currentSize}`);
const scaleTo = replicas + currentSize;
this._desired = this._required;
this._desired += replicas;
this._lastScaleUpTime = Date.now();
this._status = SCALE_STATUS.SCALING_UP;
this._scaleUp({ replicas, currentSize, scaleTo });
}
if (shouldScaleDown) {
const replicas = currentSize - this._required;
const scaleTo = this._required;
log.info(`CYCLE: worker shouldScaleDown scaleTo: ${scaleTo}, replicas: ${replicas}, desired: ${this._desired}, currentSize: ${currentSize}`);
this._desired = this._required;
this._lastScaleDownTime = Date.now();
this._status = SCALE_STATUS.SCALING_DOWN;
Expand All @@ -109,14 +109,17 @@ class Scaler {
return this._status;
}

updateRequired(required) {
this._scale = true;
this._required = Math.min(required, this._maxScaleUpReplicasPerNode);
updateRequired(required, isQueueEmpty) {
this._isQueueEmpty = isQueueEmpty;
if (required !== this._required) {
this._scale = true;
this._required = Math.min(required, this._maxScaleUpReplicasPerNode);
}
}

_shouldScaleUp(currentSize) {
let shouldScaleUp = false;
if (currentSize < this._required
if (currentSize < this._required && this._desired < this._required
&& (!this._lastScaleDownTime || Date.now() - this._lastScaleDownTime > this._minTimeBetweenScales)) {
if (this._desired <= currentSize) {
shouldScaleUp = true;
Expand All @@ -140,11 +143,7 @@ class Scaler {

_shouldScaleDown(currentSize) {
let shouldScaleDown = false;
let limitScaleDown = false;
if ((this.minStatelessCount > 0)) {
limitScaleDown = (this._minStatelessCount >= this._required);
}
if (currentSize > this._required && !limitScaleDown
if (currentSize > this._required
&& (!this._lastScaleUpTime || Date.now() - this._lastScaleUpTime > this._minTimeBetweenScales)) {
if (this._desired >= currentSize) {
shouldScaleDown = true;
Expand Down
8 changes: 4 additions & 4 deletions core/worker/lib/streaming/core/statistics.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ class Statistics {

_createStatData({ maxSize }) {
return {
requests: new FixedWindow(maxSize),
requests: new FixedWindow(maxSize), // Brings one value every 2 seconds, meaning for a window_size of 10 we will consider reports of last 20 seconds.
responses: new FixedWindow(maxSize),
durations: new FixedWindow(maxSize),
grossDurations: new FixedWindow(maxSize),
queueDurations: new FixedWindow(maxSize),
durations: new FixedWindow(maxSize * 10), // Brings window_size values every 2 seconds, so for a window_size multiplied by 10 we will consider values that occured in the last 20 seconds.
grossDurations: new FixedWindow(maxSize * 10),
queueDurations: new FixedWindow(maxSize * 10),
};
}
}
Expand Down
Loading

0 comments on commit 44e0c1a

Please sign in to comment.