diff --git a/blockbook.go b/blockbook.go index fa0cfdd7..77c1bade 100644 --- a/blockbook.go +++ b/blockbook.go @@ -12,6 +12,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "syscall" "time" @@ -134,7 +135,24 @@ func mainWithExitCode() int { rand.Seed(time.Now().UTC().UnixNano()) chanOsSignal = make(chan os.Signal, 1) - signal.Notify(chanOsSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) + shutdownSigCh := make(chan os.Signal, 1) + signalCh := make(chan os.Signal, 1) + // Use a single signal listener and fan out shutdown signals to avoid races + // where long-running workers consume the OS signal before main shutdown runs. + signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) + var shutdownOnce sync.Once + go func() { + sig := <-signalCh + shutdownOnce.Do(func() { + // Flip global shutdown state and close chanOsSignal to broadcast shutdown. + // Closing the channel unblocks select loops that only receive from it. + common.SetInShutdown() + close(chanOsSignal) + // Ensure waitForSignalAndShutdown can proceed even if the OS signal + // was already consumed by another goroutine in previous versions. + shutdownSigCh <- sig + }) + }() glog.Infof("Blockbook: %+v, debug mode %v", common.GetVersionInfo(), *debugMode) @@ -174,7 +192,13 @@ func mainWithExitCode() int { glog.Error("rocksDB: ", err) return exitCodeFatal } - defer index.Close() + defer func() { + glog.Info("shutdown: rocksdb close start") + if err := index.Close(); err != nil { + glog.Error("shutdown: rocksdb close error: ", err) + } + glog.Info("shutdown: rocksdb close finished") + }() internalState, err = newInternalState(config, index, *enableSubNewTx) if err != nil { @@ -358,17 +382,18 @@ func mainWithExitCode() int { if internalServer != nil || publicServer != nil || chain != nil { // start fiat rates downloader only if not shutting down immediately initDownloaders(index, chain, config) - waitForSignalAndShutdown(internalServer, publicServer, chain, 10*time.Second) + waitForSignalAndShutdown(internalServer, publicServer, chain, shutdownSigCh, 10*time.Second) } + // Always stop periodic state storage to prevent writes during shutdown. + close(chanStoreInternalState) if *synchronize { close(chanSyncIndex) close(chanSyncMempool) - close(chanStoreInternalState) <-chanSyncIndexDone <-chanSyncMempoolDone - <-chanStoreInternalStateDone } + <-chanStoreInternalStateDone return exitCodeOK } @@ -521,10 +546,16 @@ func syncIndexLoop() { // resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second common.TickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() { if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil { + if err == db.ErrOperationInterrupted || common.IsInShutdown() { + return + } glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...") // retry once in case of random network error, after a slight delay time.Sleep(time.Millisecond * 2500) if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil { + if err == db.ErrOperationInterrupted || common.IsInShutdown() { + return + } glog.Error("syncIndexLoop ", errors.ErrorStack(err)) } } @@ -572,12 +603,11 @@ func syncMempoolLoop() { } func storeInternalStateLoop() { - stopCompute := make(chan os.Signal) defer func() { - close(stopCompute) close(chanStoreInternalStateDone) }() - signal.Notify(stopCompute, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) + // Reuse the global shutdown channel so compute work stops when shutdown begins. + stopCompute := chanOsSignal var computeRunning bool lastCompute := time.Now() lastAppInfo := time.Now() @@ -653,11 +683,17 @@ func pushSynchronizationHandler(nt bchain.NotificationType) { } } -func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, timeout time.Duration) { - sig := <-chanOsSignal +func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, shutdownSig <-chan os.Signal, timeout time.Duration) { + // Read the first OS signal from the dedicated channel to avoid races with worker shutdown paths. + sig := <-shutdownSig common.SetInShutdown() - glog.Infof("shutdown: %v", sig) + if sig != nil { + glog.Infof("shutdown: %v", sig) + } else { + glog.Info("shutdown: signal received") + } + // Bound server/RPC shutdown; RocksDB close happens after main returns via defer. ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() diff --git a/build/templates/backend/debian/service b/build/templates/backend/debian/service index 2f5193ff..d25d1d64 100644 --- a/build/templates/backend/debian/service +++ b/build/templates/backend/debian/service @@ -7,7 +7,10 @@ After=network.target ExecStart={{template "Backend.ExecCommandTemplate" .}} User={{.Backend.SystemUser}} Restart=on-failure +# Allow enough time for graceful shutdown/flush work before SIGKILL. TimeoutStopSec=300 +# Be explicit about the signal used for graceful shutdown. +KillSignal=SIGTERM WorkingDirectory={{.Env.BackendInstallPath}}/{{.Coin.Alias}} {{if eq .Backend.ServiceType "forking" -}} Type=forking diff --git a/db/sync.go b/db/sync.go index e0ba75fc..dbc333f8 100644 --- a/db/sync.go +++ b/db/sync.go @@ -253,26 +253,26 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync return nil } - if initialSync { - ConnectLoop: - for { - select { - case <-w.chanOsSignal: - glog.Info("connectBlocks interrupted at height ", lastRes.block.Height) - return ErrOperationInterrupted - case res := <-bch: - if res == empty { - break ConnectLoop - } - err := connect(res) - if err != nil { - return err - } - } + logInterrupted := func() { + if lastRes.block != nil { + glog.Info("connectBlocks interrupted at height ", lastRes.block.Height) + } else { + glog.Info("connectBlocks interrupted") } - } else { - // while regular sync, OS sig is handled by waitForSignalAndShutdown - for res := range bch { + } + // During regular sync, shutdown is now signaled by closing chanOsSignal, + // so we honor it here to avoid leaving RocksDB in an open state. + // Initial sync uses the same shutdown-aware loop. +ConnectLoop: + for { + select { + case <-w.chanOsSignal: + logInterrupted() + return ErrOperationInterrupted + case res := <-bch: + if res == empty { + break ConnectLoop + } err := connect(res) if err != nil { return err diff --git a/tests/sync/connectblocks.go b/tests/sync/connectblocks.go index 1bb22dd1..53a69754 100644 --- a/tests/sync/connectblocks.go +++ b/tests/sync/connectblocks.go @@ -13,6 +13,17 @@ import ( "github.com/trezor/blockbook/db" ) +// blockingChain delays GetBlock so shutdown can be asserted deterministically. +type blockingChain struct { + bchain.BlockChain + gate chan struct{} +} + +func (c *blockingChain) GetBlock(hash string, height uint32) (*bchain.Block, error) { + <-c.gate + return nil, bchain.ErrBlockNotFound +} + func testConnectBlocks(t *testing.T, h *TestHandler) { for _, rng := range h.TestData.ConnectBlocks.SyncRanges { withRocksDBAndSyncWorker(t, h, rng.Lower, func(d *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) { @@ -43,6 +54,20 @@ func testConnectBlocks(t *testing.T, h *TestHandler) { t.Run("verifyAddresses", func(t *testing.T) { verifyAddresses(t, d, h, rng) }) }) } + + t.Run("shutdownDuringRegularSync", func(t *testing.T) { + withRocksDBAndSyncWorker(t, h, 0, func(_ *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) { + gate := make(chan struct{}) + db.SetBlockChain(sw, &blockingChain{BlockChain: h.Chain, gate: gate}) + close(ch) + err := db.ConnectBlocks(sw, nil, false) + if err != db.ErrOperationInterrupted { + t.Fatalf("expected ErrOperationInterrupted, got %v", err) + } + // Allow the worker goroutine to exit cleanly. + close(gate) + }) + }) } func testConnectBlocksParallel(t *testing.T, h *TestHandler) {