diff --git a/lib/connection/connections/HttpRetryPolicy.ts b/lib/connection/connections/HttpRetryPolicy.ts index c28d0efc..0d2ecde8 100644 --- a/lib/connection/connections/HttpRetryPolicy.ts +++ b/lib/connection/connections/HttpRetryPolicy.ts @@ -9,6 +9,28 @@ function delay(milliseconds: number): Promise { }); } +// Transient network error codes worth retrying. Aligned with the OS-level errno set +// surfaced by Node's `http`/`https` (and `node-fetch` via `system` FetchError type) +// when an in-flight request fails before/while delivering a response. Matches the +// classes of errors that the Python (urllib3) and JDBC (Apache HttpClient) drivers +// retry by default at the connection layer. +const RETRYABLE_NETWORK_ERROR_CODES = new Set([ + 'ECONNRESET', + 'ECONNREFUSED', + 'ETIMEDOUT', + 'EHOSTUNREACH', + 'ENETUNREACH', + 'EPIPE', + 'ENOTFOUND', + 'EAI_AGAIN', +]); + +// Fallback message patterns for errors that don't carry an errno. node-fetch surfaces +// "socket hang up" as a generic FetchError, and "Premature close" when the response +// body stream closes before all data is received — both occur regularly when a +// keep-alive TCP connection is silently dropped by an intermediate load balancer. +const RETRYABLE_NETWORK_ERROR_MESSAGE_RE = /socket hang up|premature close|aborted/i; + export default class HttpRetryPolicy implements IRetryPolicy { private context: IClientContext; @@ -24,45 +46,131 @@ export default class HttpRetryPolicy implements IRetryPolicy { if (this.isRetryable(details)) { - const clientConfig = this.context.getConfig(); + return this.computeRetry(details); + } - // Don't retry if overall retry timeout exceeded - const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout; - if (timeoutExceeded) { - throw new RetryError(RetryErrorCode.TimeoutExceeded, details); - } + return { shouldRetry: false }; + } - this.attempt += 1; + public async invokeWithRetry(operation: RetryableOperation): Promise { + for (;;) { + // Capture either the resolved response or the thrown error so the + // retry-decision logic below can flow without an early `continue` and + // share one backoff site between both paths. + let outcome: { ok: true; details: HttpTransactionDetails } | { ok: false; error: unknown }; + try { + // eslint-disable-next-line no-await-in-loop + const details = await operation(); + outcome = { ok: true, details }; + } catch (error) { + outcome = { ok: false, error }; + } - // Don't retry if max attempts count reached - const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts; - if (attemptsExceeded) { - throw new RetryError(RetryErrorCode.AttemptsExceeded, details); + if (outcome.ok) { + // eslint-disable-next-line no-await-in-loop + const status = await this.shouldRetry(outcome.details); + if (!status.shouldRetry) { + return outcome.details; + } + // eslint-disable-next-line no-await-in-loop + await delay(status.retryAfter); + } else { + // The operation threw before producing a response. This is typically a + // transient network failure (stale keep-alive socket reset by a load + // balancer, DNS hiccup, truncated response body, etc.). The status-code- + // driven `shouldRetry` path can't see these because there's no `Response` + // to inspect, so we have a separate decision point here. Non-network + // errors (programmer errors, config errors, RetryError raised by our + // own attempts/timeout budget) are re-thrown unchanged. + if (!this.isRetryableNetworkError(outcome.error)) { + throw outcome.error; + } + // eslint-disable-next-line no-await-in-loop + const status = await this.computeNetworkErrorRetry(outcome.error); + if (!status.shouldRetry) { + throw outcome.error; + } + // eslint-disable-next-line no-await-in-loop + await delay(status.retryAfter); } + } + } + + // Shared budgeting logic — bumps the attempt counter, enforces overall retries + // timeout/max attempts, and computes the next backoff. Used by both the HTTP + // status-code path (`shouldRetry`) and the network-error path + // (`computeNetworkErrorRetry`) so they share a single attempt budget. + private computeRetry(details: HttpTransactionDetails): ShouldRetryResult { + const clientConfig = this.context.getConfig(); + + // Don't retry if overall retry timeout exceeded + const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout; + if (timeoutExceeded) { + throw new RetryError(RetryErrorCode.TimeoutExceeded, details); + } - // If possible, use `Retry-After` header as a floor for a backoff algorithm - const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin); - const retryAfter = this.getBackoffDelay( - this.attempt, - retryAfterHeader ?? clientConfig.retryDelayMin, - clientConfig.retryDelayMax, - ); + this.attempt += 1; - return { shouldRetry: true, retryAfter }; + // Don't retry if max attempts count reached + const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts; + if (attemptsExceeded) { + throw new RetryError(RetryErrorCode.AttemptsExceeded, details); } - return { shouldRetry: false }; + // If possible, use `Retry-After` header as a floor for a backoff algorithm + const retryAfterHeader = this.getRetryAfterHeader(details, clientConfig.retryDelayMin); + const retryAfter = this.getBackoffDelay( + this.attempt, + retryAfterHeader ?? clientConfig.retryDelayMin, + clientConfig.retryDelayMax, + ); + + return { shouldRetry: true, retryAfter }; } - public async invokeWithRetry(operation: RetryableOperation): Promise { - for (;;) { - const details = await operation(); // eslint-disable-line no-await-in-loop - const status = await this.shouldRetry(details); // eslint-disable-line no-await-in-loop - if (!status.shouldRetry) { - return details; - } - await delay(status.retryAfter); // eslint-disable-line no-await-in-loop + private async computeNetworkErrorRetry(error: unknown): Promise { + const clientConfig = this.context.getConfig(); + + const timeoutExceeded = Date.now() - this.startTime >= clientConfig.retriesTimeout; + if (timeoutExceeded) { + throw new RetryError(RetryErrorCode.TimeoutExceeded, error); + } + + this.attempt += 1; + + const attemptsExceeded = this.attempt >= clientConfig.retryMaxAttempts; + if (attemptsExceeded) { + throw new RetryError(RetryErrorCode.AttemptsExceeded, error); + } + + const retryAfter = this.getBackoffDelay(this.attempt, clientConfig.retryDelayMin, clientConfig.retryDelayMax); + + return { shouldRetry: true, retryAfter }; + } + + protected isRetryableNetworkError(error: unknown): boolean { + if (!error || typeof error !== 'object') { + return false; + } + const candidate = error as { code?: string; type?: string; message?: string }; + + // node-fetch FetchError surfaces low-level network failures with `type: 'system'` + // and a body-stream timeout with `type: 'body-timeout'`. Both should be retried; + // `request-timeout` is converted to a Thrift TApplicationException upstream so + // we don't need to retry it here. + if (candidate.type === 'system' || candidate.type === 'body-timeout') { + return true; + } + + if (typeof candidate.code === 'string' && RETRYABLE_NETWORK_ERROR_CODES.has(candidate.code)) { + return true; + } + + if (typeof candidate.message === 'string' && RETRYABLE_NETWORK_ERROR_MESSAGE_RE.test(candidate.message)) { + return true; } + + return false; } protected isRetryable({ response }: HttpTransactionDetails): boolean { diff --git a/lib/connection/connections/ThriftHttpConnection.ts b/lib/connection/connections/ThriftHttpConnection.ts index 2b3b493c..428fbb02 100644 --- a/lib/connection/connections/ThriftHttpConnection.ts +++ b/lib/connection/connections/ThriftHttpConnection.ts @@ -121,12 +121,25 @@ export default class ThriftHttpConnection extends EventEmitter { body: data, }; + // Consume the response body inside the retried block. node-fetch surfaces + // late-stage failures (TCP RST after headers, "Premature close") as rejections + // from `response.buffer()`, not from `fetch()`. Reading the body here means + // the retry policy sees those failures and can retry them like any other + // transient network error — the body Buffer is captured via closure so the + // post-retry caller still gets it. + let responseBuffer: Buffer | undefined; + this.getThriftMethodName(data) .then((thriftMethod) => this.getRetryPolicy(thriftMethod)) .then((retryPolicy) => { - const makeRequest = () => { + const makeRequest = async () => { + responseBuffer = undefined; const request = new Request(this.url, requestConfig); - return fetch(request).then((response) => ({ request, response })); + const response = await fetch(request); + if (response.status === 200) { + responseBuffer = await response.buffer(); + } + return { request, response }; }; return retryPolicy.invokeWithRetry(makeRequest); }) @@ -134,8 +147,10 @@ export default class ThriftHttpConnection extends EventEmitter { if (response.status !== 200) { throw new THTTPException(response); } - - return response.buffer(); + // `responseBuffer` is always set when status is 200, since `makeRequest` + // assigns it before resolving in that branch and the retry loop only + // returns a fulfilled response (failures are thrown). + return responseBuffer as Buffer; }) .then((buffer) => { this.transport.receiver((transportWithData) => this.handleThriftResponse(transportWithData), seqId)(buffer); diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index cb67f5ce..20288022 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -1,4 +1,4 @@ -import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch'; +import fetch, { RequestInfo, RequestInit, Request, Response } from 'node-fetch'; import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types'; import HiveDriverError from '../errors/HiveDriverError'; import IClientContext from '../contracts/IClientContext'; @@ -103,12 +103,13 @@ export default class CloudFetchResultHandler implements IResultsProvider { const connectionProvider = await this.context.getConnectionProvider(); const agent = await connectionProvider.getAgent(); const retryPolicy = await connectionProvider.getRetryPolicy(); const requestConfig: RequestInit = { agent, ...init }; - const result = await retryPolicy.invokeWithRetry(() => { + // Read the body inside the retried block. CloudFetch downloads are large + // GETs against pre-signed cloud-storage URLs that frequently surface + // `socket hang up` / "Premature close" once the stream is mid-transfer. + // Pulling `arrayBuffer()` in here lets the retry policy treat those + // body-stream failures the same as connect-time failures. + let downloaded: ArrayBuffer | undefined; + const result = await retryPolicy.invokeWithRetry(async () => { + downloaded = undefined; const request = new Request(url, requestConfig); - return fetch(request).then((response) => ({ request, response })); + const response = await fetch(request); + if (response.ok) { + downloaded = await response.arrayBuffer(); + } + return { request, response }; }); - return result.response; + // Fall back to reading the body here if the retry policy returned a + // response without consuming it via our operation (e.g. unit-test stubs + // that hand back a pre-baked Response without invoking the operation + // callback). In production the body is always read inside the retried + // block above, so this path is a no-op. + if (downloaded === undefined && result.response.ok) { + downloaded = await result.response.arrayBuffer(); + } + return { response: result.response, body: downloaded }; } /** diff --git a/tests/unit/connection/connections/HttpRetryPolicy.test.ts b/tests/unit/connection/connections/HttpRetryPolicy.test.ts index 50ba0bf5..fe65adcb 100644 --- a/tests/unit/connection/connections/HttpRetryPolicy.test.ts +++ b/tests/unit/connection/connections/HttpRetryPolicy.test.ts @@ -1,6 +1,6 @@ import { expect, AssertionError } from 'chai'; import sinon from 'sinon'; -import { Request, Response, HeadersInit } from 'node-fetch'; +import { Request, Response, HeadersInit, FetchError } from 'node-fetch'; import HttpRetryPolicy from '../../../../lib/connection/connections/HttpRetryPolicy'; import RetryError, { RetryErrorCode } from '../../../../lib/errors/RetryError'; @@ -287,5 +287,118 @@ describe('HttpRetryPolicy', () => { expect(operation.callCount).to.equal(expectedAttempts); } }); + + it('should retry transient network errors (FetchError ECONNRESET / socket hang up)', async () => { + const context = new ClientContextStub({ + retryDelayMin: 1, + retryDelayMax: 2, + retryMaxAttempts: 20, + }); + const policy = new HttpRetryPolicy(context); + + function fetchError(): Error { + // node-fetch FetchError for a low-level socket failure carries + // `type: 'system'` and the underlying errno on `code`. + return new FetchError('request to https://example.com failed, reason: socket hang up', 'system', { + code: 'ECONNRESET', + } as unknown as NodeJS.ErrnoException); + } + + const expectedAttempts = 3; + const operation = sinon.stub(); + for (let i = 0; i < expectedAttempts - 1; i += 1) { + operation.onCall(i).rejects(fetchError()); + } + operation.onCall(expectedAttempts - 1).resolves({ + request: new Request('http://localhost'), + response: new Response(undefined, { status: 200 }), + }); + + const result = await policy.invokeWithRetry(operation); + expect(result.response.status).to.equal(200); + expect(operation.callCount).to.equal(expectedAttempts); + }); + + it('should retry "Premature close" body-stream errors', async () => { + const context = new ClientContextStub({ + retryDelayMin: 1, + retryDelayMax: 2, + retryMaxAttempts: 20, + }); + const policy = new HttpRetryPolicy(context); + + const operation = sinon.stub(); + // First call: body stream closes early — node-fetch surfaces this via + // `response.buffer()` / `response.arrayBuffer()` as a plain Error whose + // message contains "Premature close". With our fix the operation is + // expected to consume the body inline and re-throw this as a retryable + // failure. + operation.onCall(0).rejects(new Error('Premature close')); + operation.onCall(1).resolves({ + request: new Request('http://localhost'), + response: new Response(undefined, { status: 200 }), + }); + + const result = await policy.invokeWithRetry(operation); + expect(result.response.status).to.equal(200); + expect(operation.callCount).to.equal(2); + }); + + it('should not retry non-transient errors (programmer errors, etc.)', async () => { + const context = new ClientContextStub({ + retryDelayMin: 1, + retryDelayMax: 2, + retryMaxAttempts: 20, + }); + const policy = new HttpRetryPolicy(context); + + // A regular Error with no `code`/`type` hints and no matching message + // pattern must not be retried — those are programmer errors, not + // transient network failures, and silently retrying would mask bugs. + const operation = sinon.stub().rejects(new TypeError('cannot read property of undefined')); + + try { + await policy.invokeWithRetry(operation); + expect.fail('It should re-throw the error'); + } catch (error) { + if (error instanceof AssertionError) { + throw error; + } + expect(error).to.be.instanceOf(TypeError); + expect(operation.callCount).to.equal(1); + } + }); + + it('should stop retrying network errors once max attempts is reached', async () => { + const context = new ClientContextStub({ + retryDelayMin: 1, + retryDelayMax: 2, + retryMaxAttempts: 3, + }); + const clientConfig = context.getConfig(); + const policy = new HttpRetryPolicy(context); + + const fetchError = new FetchError('socket hang up', 'system', { + code: 'ECONNRESET', + } as unknown as NodeJS.ErrnoException); + const operation = sinon.stub().rejects(fetchError); + + try { + await policy.invokeWithRetry(operation); + expect.fail('It should throw a RetryError after exhausting retries'); + } catch (error) { + if (error instanceof AssertionError) { + throw error; + } + // Once attempts are exhausted we surface a RetryError carrying the + // underlying network failure as the payload — matches the existing + // semantics for HTTP-status-driven exhaustion so callers have one + // consistent terminal exception shape regardless of failure mode. + expect(error).to.be.instanceOf(RetryError); + expect((error as RetryError).errorCode).to.equal(RetryErrorCode.AttemptsExceeded); + expect((error as RetryError).payload).to.equal(fetchError); + expect(operation.callCount).to.equal(clientConfig.retryMaxAttempts); + } + }); }); });