diff --git a/lib/dispatcher/balanced-pool.js b/lib/dispatcher/balanced-pool.js index 8bad487b3f2..ca14bf6ed1d 100644 --- a/lib/dispatcher/balanced-pool.js +++ b/lib/dispatcher/balanced-pool.js @@ -164,35 +164,14 @@ class BalancedPool extends PoolBase { throw new BalancedPoolMissingUpstreamError() } - const dispatcher = this[kClients].find(dispatcher => ( - !dispatcher[kNeedDrain] && - dispatcher.closed !== true && - dispatcher.destroyed !== true - )) - - if (!dispatcher) { - return - } - - const allClientsBusy = this[kClients].map(pool => pool[kNeedDrain]).reduce((a, b) => a && b, true) - - if (allClientsBusy) { - return - } - let counter = 0 - let maxWeightIndex = this[kClients].findIndex(pool => !pool[kNeedDrain]) + let maxWeightIndex = -1 while (counter++ < this[kClients].length) { this[kIndex] = (this[kIndex] + 1) % this[kClients].length const pool = this[kClients][this[kIndex]] - // find pool index with the largest weight - if (pool[kWeight] > this[kClients][maxWeightIndex][kWeight] && !pool[kNeedDrain]) { - maxWeightIndex = this[kIndex] - } - // decrease the current weight every `this[kClients].length`. if (this[kIndex] === 0) { // Set the current weight to the next lower weight. @@ -202,11 +181,30 @@ class BalancedPool extends PoolBase { this[kCurrentWeight] = this[kMaxWeightPerServer] } } - if (pool[kWeight] >= this[kCurrentWeight] && (!pool[kNeedDrain])) { + + // Skip unavailable pools after updating the current weight for this cycle. + if ( + pool[kNeedDrain] || + pool.closed === true || + pool.destroyed === true + ) { + continue + } + + // Track the best fallback if no pool matches the current weight. + if (maxWeightIndex === -1 || pool[kWeight] > this[kClients][maxWeightIndex][kWeight]) { + maxWeightIndex = this[kIndex] + } + + if (pool[kWeight] >= this[kCurrentWeight]) { return pool } } + if (maxWeightIndex === -1) { + return + } + this[kCurrentWeight] = this[kClients][maxWeightIndex][kWeight] this[kIndex] = maxWeightIndex return this[kClients][maxWeightIndex]