From d84e30ed11174bc57c43926cf4aa9dc3247d2554 Mon Sep 17 00:00:00 2001 From: Nut Pinyo Date: Thu, 30 May 2024 23:57:41 +0700 Subject: [PATCH] fix: implement Shutdown() for processors (#22) --- core/indexer/indexer.go | 4 ++++ core/indexer/interface.go | 3 +++ modules/runes/processor.go | 14 +++++++++++++- modules/runes/runes.go | 8 ++++++-- 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/core/indexer/indexer.go b/core/indexer/indexer.go index 8d366c2..05dfff0 100644 --- a/core/indexer/indexer.go +++ b/core/indexer/indexer.go @@ -91,6 +91,10 @@ func (i *Indexer[T]) Run(ctx context.Context) (err error) { select { case <-i.quit: logger.InfoContext(ctx, "Got quit signal, stopping indexer") + if err := i.Processor.Shutdown(ctx); err != nil { + logger.ErrorContext(ctx, "Failed to shutdown processor", slogx.Error(err)) + return errors.Wrap(err, "processor shutdown failed") + } return nil case <-ctx.Done(): return nil diff --git a/core/indexer/interface.go b/core/indexer/interface.go index 761d25d..bb5a5dc 100644 --- a/core/indexer/interface.go +++ b/core/indexer/interface.go @@ -29,6 +29,9 @@ type Processor[T Input] interface { // VerifyStates verifies the states of the indexed data and the indexer // to ensure the last shutdown was graceful and no missing data. VerifyStates(ctx context.Context) error + + // Shutdown gracefully stops the processor. Database connections, network calls, leftover states, etc. should be closed and cleaned up here. + Shutdown(ctx context.Context) error } type IndexerWorker interface { diff --git a/modules/runes/processor.go b/modules/runes/processor.go index 04b5a2e..0fcb528 100644 --- a/modules/runes/processor.go +++ b/modules/runes/processor.go @@ -31,6 +31,7 @@ type Processor struct { bitcoinClient btcclient.Contract network common.Network reportingClient *reportingclient.ReportingClient + cleanupFuncs []func(context.Context) error newRuneEntries map[runes.RuneId]*runes.RuneEntry newRuneEntryStates map[runes.RuneId]*runes.RuneEntry @@ -40,13 +41,14 @@ type Processor struct { newRuneTxs []*entity.RuneTransaction } -func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient) *Processor { +func NewProcessor(runesDg datagateway.RunesDataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, bitcoinClient btcclient.Contract, network common.Network, reportingClient *reportingclient.ReportingClient, cleanupFuncs []func(context.Context) error) *Processor { return &Processor{ runesDg: runesDg, indexerInfoDg: indexerInfoDg, bitcoinClient: bitcoinClient, network: network, reportingClient: reportingClient, + cleanupFuncs: cleanupFuncs, newRuneEntries: make(map[runes.RuneId]*runes.RuneEntry), newRuneEntryStates: make(map[runes.RuneId]*runes.RuneEntry), newOutPointBalances: make(map[wire.OutPoint][]*entity.OutPointBalance), @@ -228,3 +230,13 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error { } return nil } + +func (p *Processor) Shutdown(ctx context.Context) error { + var errs []error + for _, cleanup := range p.cleanupFuncs { + if err := cleanup(ctx); err != nil { + errs = append(errs, err) + } + } + return errors.WithStack(errors.Join(errs...)) +} diff --git a/modules/runes/runes.go b/modules/runes/runes.go index c89ba75..ab305dc 100644 --- a/modules/runes/runes.go +++ b/modules/runes/runes.go @@ -33,6 +33,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) { runesDg runesdatagateway.RunesDataGateway indexerInfoDg runesdatagateway.IndexerInfoDataGateway ) + var cleanupFuncs []func(context.Context) error switch strings.ToLower(conf.Modules.Runes.Database) { case "postgresql", "postgres", "pg": pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres) @@ -42,7 +43,10 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) { } return nil, errors.Wrap(err, "can't create Postgres connection pool") } - defer pg.Close() + cleanupFuncs = append(cleanupFuncs, func(ctx context.Context) error { + pg.Close() + return nil + }) runesRepo := runespostgres.NewRepository(pg) runesDg = runesRepo indexerInfoDg = runesRepo @@ -62,7 +66,7 @@ func New(injector do.Injector) (indexer.IndexerWorker, error) { return nil, errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.Runes.Datasource) } - processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient) + processor := NewProcessor(runesDg, indexerInfoDg, bitcoinClient, conf.Network, reportingClient, cleanupFuncs) if err := processor.VerifyStates(ctx); err != nil { return nil, errors.WithStack(err) }