feat: store block_hash in event tables -- prep for orphaned event reorg handling

This commit is contained in:
Matthew Little
2020-05-04 14:18:34 +02:00
parent 2bf2d0950b
commit c223c73280
10 changed files with 176 additions and 104 deletions

View File

@@ -157,12 +157,14 @@ export type DataStoreEventEmitter = StrictEventEmitter<
export interface DataStoreUpdateData {
block: DbBlock;
txs: DbTx[];
stxEvents: DbStxEvent[];
ftEvents: DbFtEvent[];
nftEvents: DbNftEvent[];
contractLogEvents: DbSmartContractEvent[];
smartContracts: DbSmartContract[];
txs: {
tx: DbTx;
stxEvents: DbStxEvent[];
ftEvents: DbFtEvent[];
nftEvents: DbNftEvent[];
contractLogEvents: DbSmartContractEvent[];
smartContracts: DbSmartContract[];
}[];
}
export interface DataStore extends DataStoreEventEmitter {

View File

@@ -24,27 +24,27 @@ export class MemoryDataStore extends (EventEmitter as { new (): DataStoreEventEm
async update(data: DataStoreUpdateData) {
await this.updateBlock(data.block);
for (const tx of data.txs) {
await this.updateTx(tx);
}
for (const stxEvent of data.stxEvents) {
await this.updateStxEvent(stxEvent);
}
for (const ftEvent of data.ftEvents) {
await this.updateFtEvent(ftEvent);
}
for (const nftEvent of data.nftEvents) {
await this.updateNftEvent(nftEvent);
}
for (const contractLog of data.contractLogEvents) {
await this.updateSmartContractEvent(contractLog);
}
for (const smartContract of data.smartContracts) {
await this.updateSmartContract(smartContract);
for (const entry of data.txs) {
await this.updateTx(entry.tx);
for (const stxEvent of entry.stxEvents) {
await this.updateStxEvent(entry.tx, stxEvent);
}
for (const ftEvent of entry.ftEvents) {
await this.updateFtEvent(entry.tx, ftEvent);
}
for (const nftEvent of entry.nftEvents) {
await this.updateNftEvent(entry.tx, nftEvent);
}
for (const contractLog of entry.contractLogEvents) {
await this.updateSmartContractEvent(entry.tx, contractLog);
}
for (const smartContract of entry.smartContracts) {
await this.updateSmartContract(entry.tx, smartContract);
}
}
this.emit('blockUpdate', data.block);
data.txs.forEach(tx => {
this.emit('txUpdate', tx);
data.txs.forEach(entry => {
this.emit('txUpdate', entry.tx);
});
}
@@ -144,27 +144,33 @@ export class MemoryDataStore extends (EventEmitter as { new (): DataStoreEventEm
return Promise.resolve({ results: allEvents });
}
updateStxEvent(event: DbStxEvent) {
this.stxTokenEvents.set(`${event.tx_id}_${event.event_index}`, { ...event });
updateStxEvent(tx: DbTx, event: DbStxEvent) {
this.stxTokenEvents.set(`${event.tx_id}_${tx.block_hash}_${event.event_index}`, { ...event });
return Promise.resolve();
}
updateFtEvent(event: DbFtEvent) {
this.fungibleTokenEvents.set(`${event.tx_id}_${event.event_index}`, { ...event });
updateFtEvent(tx: DbTx, event: DbFtEvent) {
this.fungibleTokenEvents.set(`${event.tx_id}_${tx.block_hash}_${event.event_index}`, {
...event,
});
return Promise.resolve();
}
updateNftEvent(event: DbNftEvent) {
this.nonFungibleTokenEvents.set(`${event.tx_id}_${event.event_index}`, { ...event });
updateNftEvent(tx: DbTx, event: DbNftEvent) {
this.nonFungibleTokenEvents.set(`${event.tx_id}_${tx.block_hash}_${event.event_index}`, {
...event,
});
return Promise.resolve();
}
updateSmartContractEvent(event: DbSmartContractEvent) {
this.smartContractEvents.set(`${event.tx_id}_${event.event_index}`, { ...event });
updateSmartContractEvent(tx: DbTx, event: DbSmartContractEvent) {
this.smartContractEvents.set(`${event.tx_id}_${tx.block_hash}_${event.event_index}`, {
...event,
});
return Promise.resolve();
}
updateSmartContract(smartContract: DbSmartContract) {
updateSmartContract(tx: DbTx, smartContract: DbSmartContract) {
this.smartContracts.set(smartContract.contract_id, { ...smartContract });
return Promise.resolve();
}

View File

@@ -168,28 +168,28 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
await client.query('BEGIN');
await this.handleReorg(client, data.block);
await this.updateBlock(client, data.block);
for (const tx of data.txs) {
await this.updateTx(client, tx);
}
for (const stxEvent of data.stxEvents) {
await this.updateStxEvent(client, stxEvent);
}
for (const ftEvent of data.ftEvents) {
await this.updateFtEvent(client, ftEvent);
}
for (const nftEvent of data.nftEvents) {
await this.updateNftEvent(client, nftEvent);
}
for (const contractLog of data.contractLogEvents) {
await this.updateSmartContractEvent(client, contractLog);
}
for (const smartContract of data.smartContracts) {
await this.updateSmartContract(client, smartContract);
for (const entry of data.txs) {
await this.updateTx(client, entry.tx);
for (const stxEvent of entry.stxEvents) {
await this.updateStxEvent(client, entry.tx, stxEvent);
}
for (const ftEvent of entry.ftEvents) {
await this.updateFtEvent(client, entry.tx, ftEvent);
}
for (const nftEvent of entry.nftEvents) {
await this.updateNftEvent(client, entry.tx, nftEvent);
}
for (const contractLog of entry.contractLogEvents) {
await this.updateSmartContractEvent(client, entry.tx, contractLog);
}
for (const smartContract of entry.smartContracts) {
await this.updateSmartContract(client, entry.tx, smartContract);
}
}
await client.query('COMMIT');
this.emit('blockUpdate', data.block);
data.txs.forEach(tx => {
this.emit('txUpdate', tx);
data.txs.forEach(entry => {
this.emit('txUpdate', entry.tx);
});
} catch (error) {
console.error(`Error performing PG update: ${error}`);
@@ -676,17 +676,18 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
}
}
async updateStxEvent(client: ClientBase, event: DbStxEvent) {
async updateStxEvent(client: ClientBase, tx: DbTx, event: DbStxEvent) {
await client.query(
`
INSERT INTO stx_events(
event_index, tx_id, block_height, canonical, asset_event_type_id, sender, recipient, amount
) values($1, $2, $3, $4, $5, $6, $7, $8)
event_index, tx_id, block_height, block_hash, canonical, asset_event_type_id, sender, recipient, amount
) values($1, $2, $3, $4, $5, $6, $7, $8, $9)
`,
[
event.event_index,
hexToBuffer(event.tx_id),
event.block_height,
hexToBuffer(tx.block_hash),
event.canonical,
event.asset_event_type_id,
event.sender,
@@ -696,17 +697,18 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
);
}
async updateFtEvent(client: ClientBase, event: DbFtEvent) {
async updateFtEvent(client: ClientBase, tx: DbTx, event: DbFtEvent) {
await client.query(
`
INSERT INTO ft_events(
event_index, tx_id, block_height, canonical, asset_event_type_id, sender, recipient, asset_identifier, amount
) values($1, $2, $3, $4, $5, $6, $7, $8, $9)
event_index, tx_id, block_height, block_hash, canonical, asset_event_type_id, sender, recipient, asset_identifier, amount
) values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`,
[
event.event_index,
hexToBuffer(event.tx_id),
event.block_height,
hexToBuffer(tx.block_hash),
event.canonical,
event.asset_event_type_id,
event.sender,
@@ -717,17 +719,18 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
);
}
async updateNftEvent(client: ClientBase, event: DbNftEvent) {
async updateNftEvent(client: ClientBase, tx: DbTx, event: DbNftEvent) {
await client.query(
`
INSERT INTO nft_events(
event_index, tx_id, block_height, canonical, asset_event_type_id, sender, recipient, asset_identifier, value
) values($1, $2, $3, $4, $5, $6, $7, $8, $9)
event_index, tx_id, block_height, block_hash, canonical, asset_event_type_id, sender, recipient, asset_identifier, value
) values($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
`,
[
event.event_index,
hexToBuffer(event.tx_id),
event.block_height,
hexToBuffer(tx.block_hash),
event.canonical,
event.asset_event_type_id,
event.sender,
@@ -738,17 +741,18 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
);
}
async updateSmartContractEvent(client: ClientBase, event: DbSmartContractEvent) {
async updateSmartContractEvent(client: ClientBase, tx: DbTx, event: DbSmartContractEvent) {
await client.query(
`
INSERT INTO contract_logs(
event_index, tx_id, block_height, canonical, contract_identifier, topic, value
) values($1, $2, $3, $4, $5, $6, $7)
event_index, tx_id, block_height, block_hash, canonical, contract_identifier, topic, value
) values($1, $2, $3, $4, $5, $6, $7, $8)
`,
[
event.event_index,
hexToBuffer(event.tx_id),
event.block_height,
hexToBuffer(tx.block_hash),
event.canonical,
event.contract_identifier,
event.topic,
@@ -757,18 +761,19 @@ export class PgDataStore extends (EventEmitter as { new (): DataStoreEventEmitte
);
}
async updateSmartContract(client: ClientBase, smartContract: DbSmartContract) {
async updateSmartContract(client: ClientBase, tx: DbTx, smartContract: DbSmartContract) {
await client.query(
`
INSERT INTO smart_contracts(
tx_id, canonical, contract_id, block_height, source_code, abi
) values($1, $2, $3, $4, $5, $6)
tx_id, canonical, contract_id, block_height, block_hash, source_code, abi
) values($1, $2, $3, $4, $5, $6, $7)
`,
[
hexToBuffer(smartContract.tx_id),
smartContract.canonical,
smartContract.contract_id,
smartContract.block_height,
hexToBuffer(tx.block_hash),
smartContract.source_code,
smartContract.abi,
]

View File

@@ -53,19 +53,21 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
const dbData: DataStoreUpdateData = {
block: dbBlock,
txs: new Array(parsedMsg.transactions.length),
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
};
for (let i = 0; i < parsedMsg.transactions.length; i++) {
const tx = parsedMsg.parsed_transactions[i];
dbData.txs[i] = createDbTxFromCoreMsg(tx);
dbData.txs[i] = {
tx: createDbTxFromCoreMsg(tx),
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
};
if (tx.raw_tx.payload.typeId === TransactionPayloadTypeID.SmartContract) {
const contractId = `${tx.sender_address}.${tx.raw_tx.payload.name}`;
dbData.smartContracts.push({
dbData.txs[i].smartContracts.push({
tx_id: tx.core_tx.txid,
contract_id: contractId,
block_height: parsedMsg.block_height,
@@ -75,14 +77,26 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
});
}
}
for (let i = 0; i < parsedMsg.events.length; i++) {
const event = parsedMsg.events[i];
for (const event of parsedMsg.events) {
const dbTx = dbData.txs.find(entry => entry.tx.tx_id === event.txid);
if (!dbTx) {
throw new Error(`Unexpected missing tx during event parsing by tx_id ${event.txid}`);
}
// TODO: this is not a real event_index -- the core-node needs to keep track and return in better format.
const eventIndex =
dbTx.stxEvents.length +
dbTx.ftEvents.length +
dbTx.nftEvents.length +
dbTx.contractLogEvents.length;
const dbEvent: DbEventBase = {
event_index: i,
event_index: eventIndex,
tx_id: event.txid,
block_height: parsedMsg.block_height,
canonical: true,
};
switch (event.type) {
case CoreNodeEventType.ContractEvent: {
const entry: DbSmartContractEvent = {
@@ -92,7 +106,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
topic: event.contract_event.topic,
value: hexToBuffer(event.contract_event.raw_value),
};
dbData.contractLogEvents.push(entry);
dbTx.contractLogEvents.push(entry);
break;
}
case CoreNodeEventType.StxTransferEvent: {
@@ -104,7 +118,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
recipient: event.stx_transfer_event.recipient,
amount: BigInt(event.stx_transfer_event.amount),
};
dbData.stxEvents.push(entry);
dbTx.stxEvents.push(entry);
break;
}
case CoreNodeEventType.StxMintEvent: {
@@ -115,7 +129,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
recipient: event.stx_mint_event.recipient,
amount: BigInt(event.stx_mint_event.amount),
};
dbData.stxEvents.push(entry);
dbTx.stxEvents.push(entry);
break;
}
case CoreNodeEventType.StxBurnEvent: {
@@ -126,7 +140,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
sender: event.stx_burn_event.sender,
amount: BigInt(event.stx_burn_event.amount),
};
dbData.stxEvents.push(entry);
dbTx.stxEvents.push(entry);
break;
}
case CoreNodeEventType.FtTransferEvent: {
@@ -139,7 +153,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
asset_identifier: event.ft_transfer_event.asset_identifier,
amount: BigInt(event.ft_transfer_event.amount),
};
dbData.ftEvents.push(entry);
dbTx.ftEvents.push(entry);
break;
}
case CoreNodeEventType.FtMintEvent: {
@@ -151,7 +165,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
asset_identifier: event.ft_mint_event.asset_identifier,
amount: BigInt(event.ft_mint_event.amount),
};
dbData.ftEvents.push(entry);
dbTx.ftEvents.push(entry);
break;
}
case CoreNodeEventType.NftTransferEvent: {
@@ -164,7 +178,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
asset_identifier: event.nft_transfer_event.asset_identifier,
value: hexToBuffer(event.nft_transfer_event.raw_value),
};
dbData.nftEvents.push(entry);
dbTx.nftEvents.push(entry);
break;
}
case CoreNodeEventType.NftMintEvent: {
@@ -176,7 +190,7 @@ async function handleClientMessage(clientSocket: Readable, db: DataStore): Promi
asset_identifier: event.nft_mint_event.asset_identifier,
value: hexToBuffer(event.nft_mint_event.raw_value),
};
dbData.nftEvents.push(entry);
dbTx.nftEvents.push(entry);
break;
}
default: {

View File

@@ -18,6 +18,10 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
type: 'integer',
notNull: true,
},
block_hash: {
type: 'bytea',
notNull: true,
},
canonical: {
type: 'boolean',
notNull: true,
@@ -36,6 +40,7 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createIndex('stx_events', 'tx_id');
pgm.createIndex('stx_events', 'block_height');
pgm.createIndex('stx_events', 'block_hash');
pgm.createIndex('stx_events', 'canonical');
pgm.createIndex('stx_events', 'sender');
pgm.createIndex('stx_events', 'recipient');

View File

@@ -18,6 +18,10 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
type: 'integer',
notNull: true,
},
block_hash: {
type: 'bytea',
notNull: true,
},
canonical: {
type: 'boolean',
notNull: true,
@@ -40,6 +44,7 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createIndex('ft_events', 'tx_id');
pgm.createIndex('ft_events', 'block_height');
pgm.createIndex('ft_events', 'block_hash');
pgm.createIndex('ft_events', 'canonical');
pgm.createIndex('ft_events', 'asset_identifier');
pgm.createIndex('ft_events', 'sender');

View File

@@ -18,6 +18,10 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
type: 'integer',
notNull: true,
},
block_hash: {
type: 'bytea',
notNull: true,
},
canonical: {
type: 'boolean',
notNull: true,
@@ -40,6 +44,7 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createIndex('nft_events', 'tx_id');
pgm.createIndex('nft_events', 'block_height');
pgm.createIndex('nft_events', 'block_hash');
pgm.createIndex('nft_events', 'canonical');
pgm.createIndex('nft_events', 'asset_identifier');
pgm.createIndex('nft_events', 'sender');

View File

@@ -18,6 +18,10 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
type: 'integer',
notNull: true,
},
block_hash: {
type: 'bytea',
notNull: true,
},
canonical: {
type: 'boolean',
notNull: true,
@@ -38,6 +42,7 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createIndex('contract_logs', 'tx_id');
pgm.createIndex('contract_logs', 'block_height');
pgm.createIndex('contract_logs', 'block_hash');
pgm.createIndex('contract_logs', 'canonical');
pgm.createIndex('contract_logs', 'contract_identifier');

View File

@@ -22,6 +22,10 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
type: 'integer',
notNull: true,
},
block_hash: {
type: 'bytea',
notNull: true,
},
source_code: {
type: 'string',
notNull: true,
@@ -34,6 +38,7 @@ export async function up(pgm: MigrationBuilder): Promise<void> {
pgm.createIndex('smart_contracts', 'tx_id');
pgm.createIndex('smart_contracts', 'block_height');
pgm.createIndex('smart_contracts', 'block_hash');
pgm.createIndex('smart_contracts', 'canonical');
pgm.createIndex('smart_contracts', 'contract_id');

View File

@@ -318,12 +318,24 @@ describe('postgres datastore', () => {
};
await db.update({
block: block1,
txs: [tx1, tx2],
stxEvents: [stxEvent1],
ftEvents: [ftEvent1],
nftEvents: [nftEvent1],
contractLogEvents: [contractLogEvent1],
smartContracts: [smartContract1],
txs: [
{
tx: tx1,
stxEvents: [stxEvent1],
ftEvents: [ftEvent1],
nftEvents: [nftEvent1],
contractLogEvents: [contractLogEvent1],
smartContracts: [smartContract1],
},
{
tx: tx2,
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
},
],
});
const fetchTx1 = await db.getTx(tx1.tx_id);
@@ -442,21 +454,29 @@ describe('postgres datastore', () => {
};
await db.update({
block: block1,
txs: [tx1],
stxEvents: [stxEvent1],
ftEvents: [ftEvent1],
nftEvents: [nftEvent1],
contractLogEvents: [contractLogEvent1],
smartContracts: [smartContract1],
txs: [
{
tx: tx1,
stxEvents: [stxEvent1],
ftEvents: [ftEvent1],
nftEvents: [nftEvent1],
contractLogEvents: [contractLogEvent1],
smartContracts: [smartContract1],
},
],
});
await db.update({
block: block2,
txs: [tx2],
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
txs: [
{
tx: tx2,
stxEvents: [],
ftEvents: [],
nftEvents: [],
contractLogEvents: [],
smartContracts: [],
},
],
});
const fetchTx1 = await db.getTx(tx1.tx_id);