Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion handwritten/spanner/src/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,5 +283,10 @@ export function isError(value: any): boolean {
* @returns {Boolean} `true` if the value is a UUID, otherwise `false`.
*/
export function isUuid(value: any): boolean {
return typeof value === 'string' && /^(?:[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000|ffffffff-ffff-ffff-ffff-ffffffffffff)$/i.test(value);
return (
typeof value === 'string' &&
/^(?:[0-9a-f]{8}-[0-9a-f]{4}-[1-8][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}|00000000-0000-0000-0000-000000000000|ffffffff-ffff-ffff-ffff-ffffffffffff)$/i.test(
value,
)
);
}
33 changes: 18 additions & 15 deletions handwritten/spanner/src/instrument.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
context,
trace,
INVALID_SPAN_CONTEXT,
ROOT_CONTEXT,
SpanAttributes,
TimeInput,
TracerProvider,
Expand Down Expand Up @@ -97,23 +96,27 @@ const {
AsyncHooksContextManager,
} = require('@opentelemetry/context-async-hooks');

let contextManagerInstallAttempted = false;

/*
* This function ensures that async/await works correctly by
* checking if context.active() returns an invalid/unset context
* and if so, sets a global AsyncHooksContextManager otherwise
* spans resulting from async/await invocations won't be correctly
* associated in their respective hierarchies.
* If no global ContextManager is registered, install an AsyncHooksContextManager
* so that async/await trace context propagation works for apps that haven't
* configured OpenTelemetry themselves. If the host app has already installed a
* ContextManager, leave it alone — tearing down a working manager breaks the
* host's baggage and span parentage on the next gRPC call.
*
* setGlobalContextManager() returns false when a manager is already registered,
* which is the documented signal that we shouldn't replace it. The
* `contextManagerInstallAttempted` latch makes the call idempotent so we don't
* allocate a new AsyncHooksContextManager on each Spanner client construction.
*/
function ensureInitialContextManagerSet() {
if (!context['_contextManager'] || context.active() === ROOT_CONTEXT) {
// If no context manager is currently set, or if the active context is the ROOT_CONTEXT,
// trace context propagation cannot
// function correctly with async/await for OpenTelemetry
// See {@link https://opentelemetry.io/docs/languages/js/context/#active-context}
context.disable(); // Disable any prior contextManager.
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
context.setGlobalContextManager(contextManager);
if (contextManagerInstallAttempted) return;
contextManagerInstallAttempted = true;
const contextManager = new AsyncHooksContextManager();
contextManager.enable();
if (!context.setGlobalContextManager(contextManager)) {
contextManager.disable();
}
}
Comment thread
jcalem-rogo marked this conversation as resolved.

Expand Down
43 changes: 24 additions & 19 deletions handwritten/spanner/src/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {EventEmitter} from 'events';
import {Database} from './database';
import {Session} from './session';
import {GetSessionCallback} from './session-factory';
import {context, ROOT_CONTEXT} from '@opentelemetry/api';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
Expand Down Expand Up @@ -123,25 +124,29 @@ export class MultiplexedSession
opts: this._observabilityOptions,
dbName: this.database.formattedName_,
};
return startTrace(
'MultiplexedSession.createSession',
traceConfig,
async span => {
span.addEvent('Requesting a multiplexed session');
try {
const [createSessionResponse] = await this.database.createSession({
multiplexed: true,
});
this._multiplexedSession = createSessionResponse;
span.addEvent('Created a multiplexed session');
} catch (e) {
setSpanError(span, e as Error);
throw e;
} finally {
span.end();
}
},
);
return context.with(ROOT_CONTEXT, () => {
return startTrace(
'MultiplexedSession.createSession',

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would need a similar change for regular session pool startTrace('SessionPool.createSessions'

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

traceConfig,
async span => {
span.addEvent('Requesting a multiplexed session');
try {
const [createSessionResponse] = await this.database.createSession(
{
multiplexed: true,
},
);
this._multiplexedSession = createSessionResponse;
span.addEvent('Created a multiplexed session');
} catch (e) {
setSpanError(span, e as Error);
throw e;
} finally {
span.end();
}
},
);
});
};

// Assign the running task to the shared promise variable, and ensure
Expand Down
83 changes: 45 additions & 38 deletions handwritten/spanner/src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import {EventEmitter} from 'events';
import PQueue from 'p-queue';
import {context, ROOT_CONTEXT} from '@opentelemetry/api';

import {Database} from './database';
import {Session} from './session';
Expand Down Expand Up @@ -688,49 +689,55 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
opts: this._observabilityOptions,
dbName: this.database.formattedName_,
};
return startTrace('SessionPool.createSessions', traceConfig, async span => {
span.addEvent(`Requesting ${amount} sessions`);

// while we can request as many sessions be created as we want, the backend
// will return at most 100 at a time, hence the need for a while loop.
while (amount > 0) {
let sessions: Session[] | null = null;

span.addEvent(`Creating ${amount} sessions`);

try {
[sessions] = await this.database.batchCreateSessions({
count: amount,
labels: labels,
databaseRole: databaseRole,
});

amount -= sessions.length;
nReturned += sessions.length;
} catch (e) {
this._pending -= amount;
this.emit('createError', e);
return context.with(ROOT_CONTEXT, () => {
return startTrace(
'SessionPool.createSessions',
traceConfig,
async span => {
span.addEvent(`Requesting ${amount} sessions`);

// while we can request as many sessions be created as we want, the backend
// will return at most 100 at a time, hence the need for a while loop.
while (amount > 0) {
let sessions: Session[] | null = null;

span.addEvent(`Creating ${amount} sessions`);

try {
[sessions] = await this.database.batchCreateSessions({
count: amount,
labels: labels,
databaseRole: databaseRole,
});

amount -= sessions.length;
nReturned += sessions.length;
} catch (e) {
this._pending -= amount;
this.emit('createError', e);
span.addEvent(
`Requested for ${nRequested} sessions returned ${nReturned}`,
);
setSpanErrorAndException(span, e as Error);
span.end();
throw e;
}

sessions.forEach((session: Session) => {
setImmediate(() => {
this._inventory.borrowed.add(session);
this._pending -= 1;
this.release(session);
});
});
}

span.addEvent(
`Requested for ${nRequested} sessions returned ${nReturned}`,
);
setSpanErrorAndException(span, e as Error);
span.end();
throw e;
}

sessions.forEach((session: Session) => {
setImmediate(() => {
this._inventory.borrowed.add(session);
this._pending -= 1;
this.release(session);
});
});
}

span.addEvent(
`Requested for ${nRequested} sessions returned ${nReturned}`,
},
);
span.end();
});
}

Expand Down
24 changes: 20 additions & 4 deletions handwritten/spanner/system-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,11 @@ describe('Spanner', () => {
});

it('GOOGLE_STANDARD_SQL should write uuid array values', async () => {
const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()];
const values = [
crypto.randomUUID(),
crypto.randomUUID(),
crypto.randomUUID(),
];
const {row} = await insert(
{UUIDArray: values},
Spanner.GOOGLE_STANDARD_SQL,
Expand All @@ -942,7 +946,11 @@ describe('Spanner', () => {
});

it.skip('POSTGRESQL should write uuid array values', async () => {
const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()];
const values = [
crypto.randomUUID(),
crypto.randomUUID(),
crypto.randomUUID(),
];
const {row} = await insert({UUIDArray: values}, Spanner.POSTGRESQL);
assert.deepStrictEqual(row.toJSON().UUIDArray, values);
});
Expand Down Expand Up @@ -4674,7 +4682,11 @@ describe('Spanner', () => {
});

it('GOOGLE_STANDARD_SQL should bind arrays', async () => {
const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()];
const values = [
crypto.randomUUID(),
crypto.randomUUID(),
crypto.randomUUID(),
];

const query = {
sql: 'SELECT @v',
Expand Down Expand Up @@ -4714,7 +4726,11 @@ describe('Spanner', () => {
});

it.skip('POSTGRESQL should bind arrays', async () => {
const values = [crypto.randomUUID(), crypto.randomUUID(), crypto.randomUUID()];
const values = [
crypto.randomUUID(),
crypto.randomUUID(),
crypto.randomUUID(),
];

const query = {
sql: 'SELECT $1',
Expand Down
Loading