fix: closing Rocksdb on shutdown signal

This commit is contained in:
pragmaxim
2026-01-26 09:01:25 +01:00
parent 75ca6e1e85
commit 2824b9924e
4 changed files with 94 additions and 30 deletions

View File

@@ -12,6 +12,7 @@ import (
"runtime/debug"
"strconv"
"strings"
"sync"
"syscall"
"time"
@@ -134,7 +135,24 @@ func mainWithExitCode() int {
rand.Seed(time.Now().UTC().UnixNano())
chanOsSignal = make(chan os.Signal, 1)
signal.Notify(chanOsSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
shutdownSigCh := make(chan os.Signal, 1)
signalCh := make(chan os.Signal, 1)
// Use a single signal listener and fan out shutdown signals to avoid races
// where long-running workers consume the OS signal before main shutdown runs.
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
var shutdownOnce sync.Once
go func() {
sig := <-signalCh
shutdownOnce.Do(func() {
// Flip global shutdown state and close chanOsSignal to broadcast shutdown.
// Closing the channel unblocks select loops that only receive from it.
common.SetInShutdown()
close(chanOsSignal)
// Ensure waitForSignalAndShutdown can proceed even if the OS signal
// was already consumed by another goroutine in previous versions.
shutdownSigCh <- sig
})
}()
glog.Infof("Blockbook: %+v, debug mode %v", common.GetVersionInfo(), *debugMode)
@@ -174,7 +192,13 @@ func mainWithExitCode() int {
glog.Error("rocksDB: ", err)
return exitCodeFatal
}
defer index.Close()
defer func() {
glog.Info("shutdown: rocksdb close start")
if err := index.Close(); err != nil {
glog.Error("shutdown: rocksdb close error: ", err)
}
glog.Info("shutdown: rocksdb close finished")
}()
internalState, err = newInternalState(config, index, *enableSubNewTx)
if err != nil {
@@ -358,17 +382,18 @@ func mainWithExitCode() int {
if internalServer != nil || publicServer != nil || chain != nil {
// start fiat rates downloader only if not shutting down immediately
initDownloaders(index, chain, config)
waitForSignalAndShutdown(internalServer, publicServer, chain, 10*time.Second)
waitForSignalAndShutdown(internalServer, publicServer, chain, shutdownSigCh, 10*time.Second)
}
// Always stop periodic state storage to prevent writes during shutdown.
close(chanStoreInternalState)
if *synchronize {
close(chanSyncIndex)
close(chanSyncMempool)
close(chanStoreInternalState)
<-chanSyncIndexDone
<-chanSyncMempoolDone
<-chanStoreInternalStateDone
}
<-chanStoreInternalStateDone
return exitCodeOK
}
@@ -521,10 +546,16 @@ func syncIndexLoop() {
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
common.TickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
if err == db.ErrOperationInterrupted || common.IsInShutdown() {
return
}
glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...")
// retry once in case of random network error, after a slight delay
time.Sleep(time.Millisecond * 2500)
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
if err == db.ErrOperationInterrupted || common.IsInShutdown() {
return
}
glog.Error("syncIndexLoop ", errors.ErrorStack(err))
}
}
@@ -572,12 +603,11 @@ func syncMempoolLoop() {
}
func storeInternalStateLoop() {
stopCompute := make(chan os.Signal)
defer func() {
close(stopCompute)
close(chanStoreInternalStateDone)
}()
signal.Notify(stopCompute, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
// Reuse the global shutdown channel so compute work stops when shutdown begins.
stopCompute := chanOsSignal
var computeRunning bool
lastCompute := time.Now()
lastAppInfo := time.Now()
@@ -653,11 +683,17 @@ func pushSynchronizationHandler(nt bchain.NotificationType) {
}
}
func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, timeout time.Duration) {
sig := <-chanOsSignal
func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, shutdownSig <-chan os.Signal, timeout time.Duration) {
// Read the first OS signal from the dedicated channel to avoid races with worker shutdown paths.
sig := <-shutdownSig
common.SetInShutdown()
glog.Infof("shutdown: %v", sig)
if sig != nil {
glog.Infof("shutdown: %v", sig)
} else {
glog.Info("shutdown: signal received")
}
// Bound server/RPC shutdown; RocksDB close happens after main returns via defer.
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

View File

@@ -7,7 +7,10 @@ After=network.target
ExecStart={{template "Backend.ExecCommandTemplate" .}}
User={{.Backend.SystemUser}}
Restart=on-failure
# Allow enough time for graceful shutdown/flush work before SIGKILL.
TimeoutStopSec=300
# Be explicit about the signal used for graceful shutdown.
KillSignal=SIGTERM
WorkingDirectory={{.Env.BackendInstallPath}}/{{.Coin.Alias}}
{{if eq .Backend.ServiceType "forking" -}}
Type=forking

View File

@@ -253,26 +253,26 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync
return nil
}
if initialSync {
ConnectLoop:
for {
select {
case <-w.chanOsSignal:
glog.Info("connectBlocks interrupted at height ", lastRes.block.Height)
return ErrOperationInterrupted
case res := <-bch:
if res == empty {
break ConnectLoop
}
err := connect(res)
if err != nil {
return err
}
}
logInterrupted := func() {
if lastRes.block != nil {
glog.Info("connectBlocks interrupted at height ", lastRes.block.Height)
} else {
glog.Info("connectBlocks interrupted")
}
} else {
// while regular sync, OS sig is handled by waitForSignalAndShutdown
for res := range bch {
}
// During regular sync, shutdown is now signaled by closing chanOsSignal,
// so we honor it here to avoid leaving RocksDB in an open state.
// Initial sync uses the same shutdown-aware loop.
ConnectLoop:
for {
select {
case <-w.chanOsSignal:
logInterrupted()
return ErrOperationInterrupted
case res := <-bch:
if res == empty {
break ConnectLoop
}
err := connect(res)
if err != nil {
return err

View File

@@ -13,6 +13,17 @@ import (
"github.com/trezor/blockbook/db"
)
// blockingChain delays GetBlock so shutdown can be asserted deterministically.
type blockingChain struct {
bchain.BlockChain
gate chan struct{}
}
func (c *blockingChain) GetBlock(hash string, height uint32) (*bchain.Block, error) {
<-c.gate
return nil, bchain.ErrBlockNotFound
}
func testConnectBlocks(t *testing.T, h *TestHandler) {
for _, rng := range h.TestData.ConnectBlocks.SyncRanges {
withRocksDBAndSyncWorker(t, h, rng.Lower, func(d *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) {
@@ -43,6 +54,20 @@ func testConnectBlocks(t *testing.T, h *TestHandler) {
t.Run("verifyAddresses", func(t *testing.T) { verifyAddresses(t, d, h, rng) })
})
}
t.Run("shutdownDuringRegularSync", func(t *testing.T) {
withRocksDBAndSyncWorker(t, h, 0, func(_ *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) {
gate := make(chan struct{})
db.SetBlockChain(sw, &blockingChain{BlockChain: h.Chain, gate: gate})
close(ch)
err := db.ConnectBlocks(sw, nil, false)
if err != db.ErrOperationInterrupted {
t.Fatalf("expected ErrOperationInterrupted, got %v", err)
}
// Allow the worker goroutine to exit cleanly.
close(gate)
})
})
}
func testConnectBlocksParallel(t *testing.T, h *TestHandler) {