feat: production-capable CPU profiling #641

This commit is contained in:
Matthew Little
2021-10-29 22:51:54 +02:00
committed by GitHub
parent 60b5df7b8a
commit edb8d121d1
5 changed files with 552 additions and 1 deletions

4
.env
View File

@@ -21,6 +21,10 @@ PG_SSL=false
# to the local database. The API will only read data from the PG database specified above to respond to requests.
# STACKS_READ_ONLY_MODE=true
# If specified, an http server providing profiling capability endpoints will be opened on the given port.
# This port should not be publicly exposed.
# STACKS_PROFILER_PORT=9119
STACKS_CORE_EVENT_PORT=3700
STACKS_CORE_EVENT_HOST=127.0.0.1

View File

@@ -375,6 +375,58 @@ export function httpPostRequest(
});
}
/**
* A helper function that uses the idiomatic Node.js convention for reading an http response body into memory.
* Rejects if the http connection is terminated before the http response has been fully received.
*/
export function readHttpResponse(res: http.IncomingMessage): Promise<Buffer> {
return new Promise<Buffer>((resolve, reject) => {
const chunks: Buffer[] = [];
res.on('data', chunk => chunks.push(chunk));
res.on('end', () => {
if (!res.complete) {
return reject(
new Error('The connection was terminated while the message was still being sent')
);
}
const buffer = chunks.length === 1 ? chunks[0] : Buffer.concat(chunks);
resolve(buffer);
});
res.on('close', () => {
if (!res.complete) {
return reject(
new Error('The connection was terminated while the message was still being sent')
);
}
});
res.on('error', error => {
reject(error);
});
});
}
/**
* Create an http request using Node.js standard `http` lib, providing more fine-grain control over
* capabilities compared to wrappers like `node-fetch`.
* @returns The http request and response once http headers are available (the typical behavior of Node.js http requests).
*/
export async function httpGetRequest(url: string, opts?: http.RequestOptions) {
return new Promise<[http.ClientRequest, http.IncomingMessage]>((resolve, reject) => {
try {
const urlObj = new URL(url);
const req = http.request(urlObj, opts ?? {}, res => {
resolve([req, res]);
});
req.on('error', error => {
reject(error);
});
req.end();
} catch (error) {
reject(error);
}
});
}
/**
* Parses a boolean string using conventions from CLI arguments, URL query params, and environmental variables.
* If the input is defined but empty string then true is returned. If the input is undefined or null than false is returned.
@@ -727,13 +779,20 @@ export function waiter<T = void>(): Waiter<T> {
export function stopwatch(): {
/** Milliseconds since stopwatch was created. */
getElapsed: () => number;
getElapsedAndRestart: () => number;
} {
const start = process.hrtime();
let start = process.hrtime();
return {
getElapsed: () => {
const hrend = process.hrtime(start);
return hrend[0] * 1000 + hrend[1] / 1000000;
},
getElapsedAndRestart: () => {
const hrend = process.hrtime(start);
const result = hrend[0] * 1000 + hrend[1] / 1000000;
start = process.hrtime();
return result;
},
};
}

View File

@@ -13,6 +13,7 @@ import { DataStore } from './datastore/common';
import { cycleMigrations, dangerousDropAllTables, PgDataStore } from './datastore/postgres-store';
import { MemoryDataStore } from './datastore/memory-store';
import { startApiServer } from './api/init';
import { startProfilerServer } from './inspector-util';
import { startEventServer } from './event-stream/event-server';
import {
isFtMetadataEnabled,
@@ -171,6 +172,16 @@ async function init(): Promise<void> {
forceKillHandler: () => apiServer.forceKill(),
});
const profilerHttpServerPort = process.env['STACKS_PROFILER_PORT'];
if (profilerHttpServerPort) {
const profilerServer = await startProfilerServer(profilerHttpServerPort);
registerShutdownConfig({
name: 'Profiler server',
handler: () => profilerServer.close(),
forceKillable: false,
});
}
registerShutdownConfig({
name: 'DB',
handler: () => db.close(),

419
src/inspector-util.ts Normal file
View File

@@ -0,0 +1,419 @@
import * as inspector from 'inspector';
import * as stream from 'stream';
import { once } from 'events';
import { createServer, Server } from 'http';
import * as express from 'express';
import { addAsync } from '@awaitjs/express';
import { logError, logger, parsePort, stopwatch, timeout } from './helpers';
import { Socket } from 'net';
type CpuProfileResult = inspector.Profiler.Profile;
interface ProfilerInstance<TStopResult = void> {
start: () => Promise<void>;
stop: () => Promise<TStopResult>;
dispose: () => Promise<void>;
session: inspector.Session;
}
function isInspectorNotConnectedError(error: unknown): boolean {
const ERR_INSPECTOR_NOT_CONNECTED = 'ERR_INSPECTOR_NOT_CONNECTED';
const isNodeError = (r: unknown): r is NodeJS.ErrnoException => r instanceof Error && 'code' in r;
return isNodeError(error) && error.code === ERR_INSPECTOR_NOT_CONNECTED;
}
/**
* Connects and enables a new `inspector` session, then starts an internal v8 CPU profiling process.
* @returns A function to stop the profiling, and return the CPU profile result object.
* The result object can be used to create a `.cpuprofile` file using JSON.stringify.
* Use VSCode or Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.cpuprofile` file.
* @param samplingInterval - Optionally set sampling interval in microseconds, default is 1000 microseconds.
*/
export function initCpuProfiling(samplingInterval?: number): ProfilerInstance<CpuProfileResult> {
const sw = stopwatch();
const session = new inspector.Session();
session.connect();
logger.info(`[CpuProfiler] Connect session took ${sw.getElapsedAndRestart()}ms`);
const start = async () => {
const sw = stopwatch();
logger.info(`[CpuProfiler] Enabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.enable', error => {
if (error) {
logError(`[CpuProfiler] Error enabling profiling: ${error}`, error);
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling enabled`);
resolve();
}
});
} catch (error) {
logError(`[CpuProfiler] Error enabling profiling: ${error}`, error);
reject(error);
}
});
logger.info(`[CpuProfiler] Enable session took ${sw.getElapsedAndRestart()}ms`);
if (samplingInterval !== undefined) {
logger.info(`[CpuProfiler] Setting sampling interval to ${samplingInterval} microseconds`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.setSamplingInterval', { interval: samplingInterval }, error => {
if (error) {
logError(`[CpuProfiler] Error setting sampling interval: ${error}`, error);
reject(error);
} else {
logger.info(`[CpuProfiler] Set sampling interval`);
resolve();
}
});
} catch (error) {
logError(`[CpuProfiler] Error setting sampling interval: ${error}`, error);
reject(error);
}
});
logger.info(`[CpuProfiler] Set sampling interval took ${sw.getElapsedAndRestart()}ms`);
}
logger.info(`[CpuProfiler] Profiling starting...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.start', error => {
if (error) {
logError(`[CpuProfiler] Error starting profiling: ${error}`, error);
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling started`);
resolve();
}
});
} catch (error) {
logError(`[CpuProfiler] Error starting profiling: ${error}`, error);
reject(error);
}
});
logger.info(`[CpuProfiler] Start profiler took ${sw.getElapsedAndRestart()}ms`);
};
const stop = async () => {
const sw = stopwatch();
logger.info(`[CpuProfiler] Profiling stopping...`);
try {
return await new Promise<CpuProfileResult>((resolve, reject) => {
try {
session.post('Profiler.stop', (error, profileResult) => {
if (error) {
logError(`[CpuProfiler] Error stopping profiling: ${error}`, error);
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling stopped`);
resolve(profileResult.profile);
}
});
} catch (error) {
reject(error);
}
});
} finally {
logger.info(`[CpuProfiler] Stop profiler took ${sw.getElapsedAndRestart()}ms`);
}
};
const dispose = async () => {
const sw = stopwatch();
try {
logger.info(`[CpuProfiler] Disabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('Profiler.disable', error => {
if (error && isInspectorNotConnectedError(error)) {
logger.info(`[CpuProfiler] Profiler already disconnected`);
resolve();
} else if (error) {
logError(`[CpuProfiler] Error disabling profiling: ${error}`, error);
reject(error);
} else {
logger.info(`[CpuProfiler] Profiling disabled`);
resolve();
}
});
} catch (error) {
if (isInspectorNotConnectedError(error)) {
logger.info(`[CpuProfiler] Profiler already disconnected`);
resolve();
} else {
reject();
}
}
});
} finally {
session.disconnect();
logger.info(
`[CpuProfiler] Disable and disconnect profiler took ${sw.getElapsedAndRestart()}ms`
);
}
};
return { start, stop, dispose, session };
}
/**
* Connects and enables a new `inspector` session, then creates an internal v8 Heap profiler snapshot.
* @param outputStream - An output stream that heap snapshot chunks are written to.
* The result stream can be used to create a `.heapsnapshot` file.
* Use Chrome's 'DevTools for Node' (under chrome://inspect) to visualize the `.heapsnapshot` file.
*/
export function initHeapSnapshot(
outputStream: stream.Writable
): ProfilerInstance<{ totalSnapshotByteSize: number }> {
const session = new inspector.Session();
session.connect();
let totalSnapshotByteSize = 0;
const start = async () => {
logger.info(`[HeapProfiler] Enabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.enable', error => {
if (error) {
logError(`[HeapProfiler] Error enabling profiling: ${error}`, error);
reject(error);
} else {
logger.info(`[HeapProfiler] Profiling enabled`);
resolve();
}
});
} catch (error) {
logError(`[HeapProfiler] Error enabling profiling: ${error}`, error);
reject(error);
}
});
session.on('HeapProfiler.addHeapSnapshotChunk', message => {
// Note: this doesn't handle stream back-pressure, but we don't have control over the
// `HeapProfiler.addHeapSnapshotChunk` callback in order to use something like piping.
// So in theory on a slow `outputStream` (usually an http connection response) this can cause OOM.
logger.info(
`[HeapProfiler] Writing heap snapshot chunk of size ${message.params.chunk.length}`
);
totalSnapshotByteSize += message.params.chunk.length;
outputStream.write(message.params.chunk, error => {
if (error) {
logger.error(
`[HeapProfiler] Error writing heap profile chunk to output stream: ${error.message}`,
error
);
}
});
});
};
const stop = async () => {
logger.info(`[HeapProfiler] Taking snapshot...`);
return await new Promise<{ totalSnapshotByteSize: number }>((resolve, reject) => {
try {
session.post('HeapProfiler.takeHeapSnapshot', undefined, (error: Error | null) => {
if (error) {
logError(`[HeapProfiler] Error taking snapshot: ${error}`, error);
reject(error);
} else {
logger.info(`[HeapProfiler] Taking snapshot completed...`);
resolve({ totalSnapshotByteSize });
}
});
} catch (error) {
logError(`[HeapProfiler] Error taking snapshot: ${error}`, error);
reject(error);
}
});
};
const dispose = async () => {
try {
logger.info(`[HeapProfiler] Disabling profiling...`);
await new Promise<void>((resolve, reject) => {
try {
session.post('HeapProfiler.disable', error => {
if (error && isInspectorNotConnectedError(error)) {
logger.info(`[HeapProfiler] Profiler already disconnected`);
resolve();
} else if (error) {
logError(`[HeapProfiler] Error disabling profiling: ${error}`, error);
reject(error);
} else {
logger.info(`[HeapProfiler] Profiling disabled`);
resolve();
}
});
} catch (error) {
if (isInspectorNotConnectedError(error)) {
logger.info(`[HeapProfiler] Profiler already disconnected`);
resolve();
} else {
reject();
}
}
});
} finally {
session.disconnect();
}
};
return { start, stop, dispose, session };
}
export async function startProfilerServer(
httpServerPort?: number | string
): Promise<{
server: Server;
address: string;
close: () => Promise<void>;
}> {
let serverPort: number | undefined = undefined;
if (httpServerPort !== undefined) {
serverPort = parsePort(httpServerPort);
}
const app = addAsync(express());
let existingSession:
| { instance: ProfilerInstance<unknown>; response: express.Response }
| undefined;
app.getAsync('/profile/cpu', async (req, res) => {
if (existingSession) {
res.status(409).json({ error: 'Profile session already in progress' });
return;
}
const durationParam = req.query['duration'];
const seconds = Number.parseFloat(durationParam as string);
if (!Number.isFinite(seconds) || seconds < 0) {
res.status(400).json({ error: `Invalid 'duration' query parameter "${durationParam}"` });
return;
}
const samplingIntervalParam = req.query['sampling_interval'];
let samplingInterval: number | undefined;
if (samplingIntervalParam !== undefined) {
samplingInterval = Number.parseFloat(samplingIntervalParam as string);
if (!Number.isInteger(samplingInterval) || samplingInterval < 0) {
res.status(400).json({
error: `Invalid 'sampling_interval' query parameter "${samplingIntervalParam}"`,
});
return;
}
}
const cpuProfiler = initCpuProfiling(samplingInterval);
existingSession = { instance: cpuProfiler, response: res };
try {
const filename = `cpu_${Math.round(Date.now() / 1000)}_${seconds}-seconds.cpuprofile`;
res.setHeader('Cache-Control', 'no-store');
res.setHeader('Transfer-Encoding', 'chunked');
res.setHeader('Content-Disposition', `attachment; filename="${filename}"`);
res.setHeader('Content-Type', 'application/json; charset=utf-8');
res.flushHeaders();
await cpuProfiler.start();
await Promise.race([timeout(seconds * 1000), once(res, 'close')]);
if (res.writableEnded || res.destroyed) {
// session was cancelled
return;
}
const result = await cpuProfiler.stop();
const resultString = JSON.stringify(result);
logger.info(
`[CpuProfiler] Completed, total profile report JSON string length: ${resultString.length}`
);
res.end(resultString);
} finally {
await existingSession.instance.dispose().catch();
existingSession = undefined;
}
});
app.getAsync('/profile/heap_snapshot', async (req, res) => {
if (existingSession) {
res.status(409).json({ error: 'Profile session already in progress' });
return;
}
const heapProfiler = initHeapSnapshot(res);
existingSession = { instance: heapProfiler, response: res };
try {
const filename = `heap_${Math.round(Date.now() / 1000)}.heapsnapshot`;
res.setHeader('Cache-Control', 'no-store');
res.setHeader('Transfer-Encoding', 'chunked');
res.setHeader('Content-Disposition', `attachment; filename="${filename}"`);
res.setHeader('Content-Type', 'application/json; charset=utf-8');
res.flushHeaders();
// Taking a heap snapshot (with current implementation) is a one-shot process ran to get the
// applications current heap memory usage, rather than something done over time. So start and
// stop without waiting.
await heapProfiler.start();
const result = await heapProfiler.stop();
logger.info(
`[HeapProfiler] Completed, total snapshot byte size: ${result.totalSnapshotByteSize}`
);
res.end();
} finally {
await existingSession.instance.dispose().catch();
existingSession = undefined;
}
});
app.getAsync('/profile/cancel', async (req, res) => {
if (!existingSession) {
res.status(409).json({ error: 'No existing profile session is exists to cancel' });
return;
}
const session = existingSession;
await session.instance.stop().catch();
await session.instance.dispose().catch();
session.response.destroy();
existingSession = undefined;
await Promise.resolve();
res.json({ ok: 'existing profile session stopped' });
});
const server = createServer(app);
const serverSockets = new Set<Socket>();
server.on('connection', socket => {
serverSockets.add(socket);
socket.once('close', () => {
serverSockets.delete(socket);
});
});
await new Promise<void>((resolve, reject) => {
try {
server.once('error', error => {
reject(error);
});
server.listen(serverPort, '0.0.0.0', () => {
resolve();
});
} catch (error) {
reject(error);
}
});
const addr = server.address();
if (addr === null) {
throw new Error('server missing address');
}
const addrStr = typeof addr === 'string' ? addr : `${addr.address}:${addr.port}`;
logger.info(`Started profiler server on: http://${addrStr}`);
const closeServer = async () => {
const closePromise = new Promise<void>((resolve, reject) => {
if (!server.listening) {
// Server already closed (can happen when server is shared between cluster workers)
return resolve();
}
server.close(error => (error ? reject(error) : resolve()));
});
for (const socket of serverSockets) {
socket.destroy();
}
await closePromise;
};
return { server, address: addrStr, close: closeServer };
}

View File

@@ -0,0 +1,58 @@
import * as http from 'http';
import * as supertest from 'supertest';
import { httpGetRequest, readHttpResponse } from '../helpers';
import { startProfilerServer } from '../inspector-util';
describe('profiler tests', () => {
let profiler: { server: http.Server; address: string; close: () => Promise<void> };
beforeAll(async () => {
profiler = await startProfilerServer();
});
test('CPU profiler snapshot bad duration', async () => {
const query1 = await supertest(profiler.server).get(`/profile/cpu?duration=-100`);
expect(query1.status).toBe(400);
});
test('generate CPU profiler snapshot', async () => {
const duration = 0.25; // 250 milliseconds
const query1 = await supertest(profiler.server).get(`/profile/cpu?duration=${duration}`);
expect(query1.status).toBe(200);
expect(query1.type).toBe('application/json');
let cpuProfileBody: any;
// Ensure entire profile result was streamed/returned
expect(() => {
cpuProfileBody = JSON.parse(query1.text);
}).not.toThrow();
// Cursory check for the expected JSON format of a `.cpuprofile` file
expect(cpuProfileBody).toEqual(
expect.objectContaining({
nodes: expect.any(Array),
samples: expect.any(Array),
timeDeltas: expect.any(Array),
startTime: expect.any(Number),
endTime: expect.any(Number),
})
);
});
test('cancel CPU profiler snapshot', async () => {
const duration = 150; // 150 seconds
// init a cpu profile request
const url = `http://${profiler.address}/profile/cpu?duration=${duration}`;
const [_req, res] = await httpGetRequest(url);
// hold on to the promise for reading the request response
const readResPromise = readHttpResponse(res);
// perform a request to cancel the previous profile session
const endQuery = await supertest(profiler.server).get(`/profile/cancel`);
expect(endQuery.status).toBe(200);
// ensure the initial request failed
await expect(readResPromise).rejects.toEqual(expect.any(Error));
});
afterAll(async () => {
await profiler.close();
});
});