diff --git a/handwritten/spanner/src/helper.ts b/handwritten/spanner/src/helper.ts index e50802bf5c1e..42c67d53e05b 100644 --- a/handwritten/spanner/src/helper.ts +++ b/handwritten/spanner/src/helper.ts @@ -283,5 +283,10 @@ export function isError(value: any): boolean { * @returns {Boolean} `true` if the value is a UUID, otherwise `false`. */ export function isUuid(value: any): boolean { - return typeof value === 'string' && /^(?:[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000|ffffffff-ffff-ffff-ffff-ffffffffffff)$/i.test(value); + return ( + typeof value === 'string' && + /^(?:[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000|ffffffff-ffff-ffff-ffff-ffffffffffff)$/i.test( + value, + ) + ); } diff --git a/handwritten/spanner/src/instrument.ts b/handwritten/spanner/src/instrument.ts index 99bfdf9a38e8..61c92a55c787 100644 --- a/handwritten/spanner/src/instrument.ts +++ b/handwritten/spanner/src/instrument.ts @@ -25,7 +25,6 @@ import { context, trace, INVALID_SPAN_CONTEXT, - ROOT_CONTEXT, SpanAttributes, TimeInput, TracerProvider, @@ -97,23 +96,27 @@ const { AsyncHooksContextManager, } = require('@opentelemetry/context-async-hooks'); +let contextManagerInstallAttempted = false; + /* - * This function ensures that async/await works correctly by - * checking if context.active() returns an invalid/unset context - * and if so, sets a global AsyncHooksContextManager otherwise - * spans resulting from async/await invocations won't be correctly - * associated in their respective hierarchies. + * If no global ContextManager is registered, install an AsyncHooksContextManager + * so that async/await trace context propagation works for apps that haven't + * configured OpenTelemetry themselves. If the host app has already installed a + * ContextManager, leave it alone — tearing down a working manager breaks the + * host's baggage and span parentage on the next gRPC call. + * + * setGlobalContextManager() returns false when a manager is already registered, + * which is the documented signal that we shouldn't replace it. The + * `contextManagerInstallAttempted` latch makes the call idempotent so we don't + * allocate a new AsyncHooksContextManager on each Spanner client construction. */ function ensureInitialContextManagerSet() { - if (!context['_contextManager'] || context.active() === ROOT_CONTEXT) { - // If no context manager is currently set, or if the active context is the ROOT_CONTEXT, - // trace context propagation cannot - // function correctly with async/await for OpenTelemetry - // See {@link https://opentelemetry.io/docs/languages/js/context/#active-context} - context.disable(); // Disable any prior contextManager. - const contextManager = new AsyncHooksContextManager(); - contextManager.enable(); - context.setGlobalContextManager(contextManager); + if (contextManagerInstallAttempted) return; + contextManagerInstallAttempted = true; + const contextManager = new AsyncHooksContextManager(); + contextManager.enable(); + if (!context.setGlobalContextManager(contextManager)) { + contextManager.disable(); } } diff --git a/handwritten/spanner/src/multiplexed-session.ts b/handwritten/spanner/src/multiplexed-session.ts index fdcc6675ceb6..902078923e0f 100644 --- a/handwritten/spanner/src/multiplexed-session.ts +++ b/handwritten/spanner/src/multiplexed-session.ts @@ -18,6 +18,7 @@ import {EventEmitter} from 'events'; import {Database} from './database'; import {Session} from './session'; import {GetSessionCallback} from './session-factory'; +import {context, ROOT_CONTEXT} from '@opentelemetry/api'; import { ObservabilityOptions, getActiveOrNoopSpan, @@ -123,25 +124,29 @@ export class MultiplexedSession opts: this._observabilityOptions, dbName: this.database.formattedName_, }; - return startTrace( - 'MultiplexedSession.createSession', - traceConfig, - async span => { - span.addEvent('Requesting a multiplexed session'); - try { - const [createSessionResponse] = await this.database.createSession({ - multiplexed: true, - }); - this._multiplexedSession = createSessionResponse; - span.addEvent('Created a multiplexed session'); - } catch (e) { - setSpanError(span, e as Error); - throw e; - } finally { - span.end(); - } - }, - ); + return context.with(ROOT_CONTEXT, () => { + return startTrace( + 'MultiplexedSession.createSession', + traceConfig, + async span => { + span.addEvent('Requesting a multiplexed session'); + try { + const [createSessionResponse] = await this.database.createSession( + { + multiplexed: true, + }, + ); + this._multiplexedSession = createSessionResponse; + span.addEvent('Created a multiplexed session'); + } catch (e) { + setSpanError(span, e as Error); + throw e; + } finally { + span.end(); + } + }, + ); + }); }; // Assign the running task to the shared promise variable, and ensure diff --git a/handwritten/spanner/src/session-pool.ts b/handwritten/spanner/src/session-pool.ts index 3c80e5bdceb2..49ac167cc4be 100644 --- a/handwritten/spanner/src/session-pool.ts +++ b/handwritten/spanner/src/session-pool.ts @@ -16,6 +16,7 @@ import {EventEmitter} from 'events'; import PQueue from 'p-queue'; +import {context, ROOT_CONTEXT} from '@opentelemetry/api'; import {Database} from './database'; import {Session} from './session'; @@ -688,49 +689,55 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface { opts: this._observabilityOptions, dbName: this.database.formattedName_, }; - return startTrace('SessionPool.createSessions', traceConfig, async span => { - span.addEvent(`Requesting ${amount} sessions`); - - // while we can request as many sessions be created as we want, the backend - // will return at most 100 at a time, hence the need for a while loop. - while (amount > 0) { - let sessions: Session[] | null = null; - - span.addEvent(`Creating ${amount} sessions`); - - try { - [sessions] = await this.database.batchCreateSessions({ - count: amount, - labels: labels, - databaseRole: databaseRole, - }); - - amount -= sessions.length; - nReturned += sessions.length; - } catch (e) { - this._pending -= amount; - this.emit('createError', e); + return context.with(ROOT_CONTEXT, () => { + return startTrace( + 'SessionPool.createSessions', + traceConfig, + async span => { + span.addEvent(`Requesting ${amount} sessions`); + + // while we can request as many sessions be created as we want, the backend + // will return at most 100 at a time, hence the need for a while loop. + while (amount > 0) { + let sessions: Session[] | null = null; + + span.addEvent(`Creating ${amount} sessions`); + + try { + [sessions] = await this.database.batchCreateSessions({ + count: amount, + labels: labels, + databaseRole: databaseRole, + }); + + amount -= sessions.length; + nReturned += sessions.length; + } catch (e) { + this._pending -= amount; + this.emit('createError', e); + span.addEvent( + `Requested for ${nRequested} sessions returned ${nReturned}`, + ); + setSpanErrorAndException(span, e as Error); + span.end(); + throw e; + } + + sessions.forEach((session: Session) => { + setImmediate(() => { + this._inventory.borrowed.add(session); + this._pending -= 1; + this.release(session); + }); + }); + } + span.addEvent( `Requested for ${nRequested} sessions returned ${nReturned}`, ); - setSpanErrorAndException(span, e as Error); span.end(); - throw e; - } - - sessions.forEach((session: Session) => { - setImmediate(() => { - this._inventory.borrowed.add(session); - this._pending -= 1; - this.release(session); - }); - }); - } - - span.addEvent( - `Requested for ${nRequested} sessions returned ${nReturned}`, + }, ); - span.end(); }); } diff --git a/handwritten/spanner/system-test/spanner.ts b/handwritten/spanner/system-test/spanner.ts index 7031a7fe49cd..4b774fcf85e3 100644 --- a/handwritten/spanner/system-test/spanner.ts +++ b/handwritten/spanner/system-test/spanner.ts @@ -933,7 +933,11 @@ describe('Spanner', () => { }); it('GOOGLE_STANDARD_SQL should write uuid array values', async () => { - const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()]; + const values = [ + crypto.randomUUID(), + crypto.randomUUID(), + crypto.randomUUID(), + ]; const {row} = await insert( {UUIDArray: values}, Spanner.GOOGLE_STANDARD_SQL, @@ -942,7 +946,11 @@ describe('Spanner', () => { }); it.skip('POSTGRESQL should write uuid array values', async () => { - const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()]; + const values = [ + crypto.randomUUID(), + crypto.randomUUID(), + crypto.randomUUID(), + ]; const {row} = await insert({UUIDArray: values}, Spanner.POSTGRESQL); assert.deepStrictEqual(row.toJSON().UUIDArray, values); }); @@ -4674,7 +4682,11 @@ describe('Spanner', () => { }); it('GOOGLE_STANDARD_SQL should bind arrays', async () => { - const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()]; + const values = [ + crypto.randomUUID(), + crypto.randomUUID(), + crypto.randomUUID(), + ]; const query = { sql: 'SELECT @v', @@ -4714,7 +4726,11 @@ describe('Spanner', () => { }); it.skip('POSTGRESQL should bind arrays', async () => { - const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()]; + const values = [ + crypto.randomUUID(), + crypto.randomUUID(), + crypto.randomUUID(), + ]; const query = { sql: 'SELECT $1',