Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 29 additions & 27 deletions src/core/RequestHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -1187,10 +1187,10 @@ class RequestHandler {
while (true) {
this._getUsageStatsService()?.recordAttempt(
proxyRequest.request_id,
this.currentAuthIndex,
this._getAccountNameForIndex(this.currentAuthIndex)
currentQueueAuthIndex,
this._getAccountNameForIndex(currentQueueAuthIndex)
);
this._forwardRequest(proxyRequest);
this._forwardRequest(proxyRequest, currentQueueAuthIndex);
initialMessage = await currentQueue.dequeue();

const initialStatus = Number(initialMessage?.status);
Expand Down Expand Up @@ -1590,10 +1590,10 @@ class RequestHandler {
while (true) {
this._getUsageStatsService()?.recordAttempt(
proxyRequest.request_id,
this.currentAuthIndex,
this._getAccountNameForIndex(this.currentAuthIndex)
currentQueueAuthIndex,
this._getAccountNameForIndex(currentQueueAuthIndex)
);
this._forwardRequest(proxyRequest);
this._forwardRequest(proxyRequest, currentQueueAuthIndex);
initialMessage = await currentQueue.dequeue();

const initialStatus = Number(initialMessage?.status);
Expand Down Expand Up @@ -1962,10 +1962,10 @@ class RequestHandler {
while (true) {
this._getUsageStatsService()?.recordAttempt(
proxyRequest.request_id,
this.currentAuthIndex,
this._getAccountNameForIndex(this.currentAuthIndex)
currentQueueAuthIndex,
this._getAccountNameForIndex(currentQueueAuthIndex)
);
this._forwardRequest(proxyRequest);
this._forwardRequest(proxyRequest, currentQueueAuthIndex);
initialMessage = await currentQueue.dequeue();

const initialStatus = Number(initialMessage?.status);
Expand Down Expand Up @@ -2273,14 +2273,16 @@ class RequestHandler {
this.currentAuthIndex,
proxyRequest.request_attempt_id
);
const messageQueueAuthIndex =
this.connectionRegistry.getAuthIndexForRequest(requestId) ?? this.currentAuthIndex;
this._setupClientDisconnectHandler(res, requestId);

this._getUsageStatsService()?.recordAttempt(
requestId,
this.currentAuthIndex,
this._getAccountNameForIndex(this.currentAuthIndex)
messageQueueAuthIndex,
this._getAccountNameForIndex(messageQueueAuthIndex)
);
this._forwardRequest(proxyRequest);
this._forwardRequest(proxyRequest, messageQueueAuthIndex);
const response = await messageQueue.dequeue();

if (response.event_type === "error") {
Expand Down Expand Up @@ -2412,14 +2414,16 @@ class RequestHandler {
this.currentAuthIndex,
proxyRequest.request_attempt_id
);
const messageQueueAuthIndex =
this.connectionRegistry.getAuthIndexForRequest(requestId) ?? this.currentAuthIndex;
this._setupClientDisconnectHandler(res, requestId);

this._getUsageStatsService()?.recordAttempt(
requestId,
this.currentAuthIndex,
this._getAccountNameForIndex(this.currentAuthIndex)
messageQueueAuthIndex,
this._getAccountNameForIndex(messageQueueAuthIndex)
);
this._forwardRequest(proxyRequest);
this._forwardRequest(proxyRequest, messageQueueAuthIndex);
const response = await messageQueue.dequeue();

if (response.event_type === "error") {
Expand Down Expand Up @@ -2906,10 +2910,10 @@ class RequestHandler {
// Record attempt before forwarding, so failed attempts are also counted
this._getUsageStatsService()?.recordAttempt(
proxyRequest.request_id,
this.currentAuthIndex,
this._getAccountNameForIndex(this.currentAuthIndex)
currentQueueAuthIndex,
this._getAccountNameForIndex(currentQueueAuthIndex)
);
this._forwardRequest(proxyRequest);
this._forwardRequest(proxyRequest, currentQueueAuthIndex);
headerMessage = await currentQueue.dequeue();

const headerStatus = Number(headerMessage?.status);
Expand Down Expand Up @@ -3210,11 +3214,11 @@ class RequestHandler {
// This ensures failed attempts (e.g. 429 before any response) are also counted.
this._getUsageStatsService()?.recordAttempt(
proxyRequest.request_id,
this.currentAuthIndex,
this._getAccountNameForIndex(this.currentAuthIndex)
currentQueueAuthIndex,
this._getAccountNameForIndex(currentQueueAuthIndex)
);
try {
this._forwardRequest(proxyRequest);
this._forwardRequest(proxyRequest, currentQueueAuthIndex);

const initialMessage = await currentQueue.dequeue(this.timeouts.FAKE_STREAM);

Expand Down Expand Up @@ -4299,11 +4303,11 @@ class RequestHandler {
);
}

_forwardRequest(proxyRequest) {
const connection = this.connectionRegistry.getConnectionByAuth(this.currentAuthIndex);
_forwardRequest(proxyRequest, authIndex = this.currentAuthIndex) {
const connection = this.connectionRegistry.getConnectionByAuth(authIndex);
if (connection) {
this.logger.debug(
`[Request] Forwarding request #${proxyRequest.request_id} via connection for authIndex=${this.currentAuthIndex}` +
`[Request] Forwarding request #${proxyRequest.request_id} via connection for authIndex=${authIndex}` +
` (attempt=${proxyRequest.request_attempt_id})`
);
connection.send(
Expand All @@ -4313,9 +4317,7 @@ class RequestHandler {
})
);
} else {
throw new Error(
`Unable to forward request: No WebSocket connection found for authIndex=${this.currentAuthIndex}`
);
throw new Error(`Unable to forward request: No WebSocket connection found for authIndex=${authIndex}`);
}
}

Expand Down