chore: handle duplicate TSV rows during event import (#1297)

* chore: return handler for duplicate TSV rows

* chore: add log
This commit is contained in:
Rafael Cárdenas
2022-08-26 13:25:23 -05:00
committed by GitHub
parent b3338e3a52
commit 619c176bff

View File

@@ -1037,11 +1037,24 @@ export class PgDataStore
payload jsonb NOT NULL
) ON COMMIT DROP
`);
// Use a `temp_raw_tsv` table first to store the raw TSV data as it might come with duplicate
// rows which would trigger the `PRIMARY KEY` constraint in `temp_event_observer_requests`.
// We will "upsert" from the former to the latter before event ingestion.
await client.query(`
CREATE TEMPORARY TABLE temp_raw_tsv
(LIKE temp_event_observer_requests)
ON COMMIT DROP
`);
onStatusUpdate?.('Importing raw event requests into temporary table...');
const importStream = client.query(
pgCopyStreams.from(`COPY temp_event_observer_requests FROM STDIN`)
);
const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`));
await pipelineAsync(readStream, importStream);
onStatusUpdate?.('Removing any duplicate raw event requests...');
await client.query(`
INSERT INTO temp_event_observer_requests
SELECT *
FROM temp_raw_tsv
ON CONFLICT DO NOTHING;
`);
const totalRowCountQuery = await client.query<{ count: string }>(
`SELECT COUNT(id) count FROM temp_event_observer_requests`
);