mirror of
https://github.com/trezor/blockbook.git
synced 2026-02-19 16:31:19 +01:00
Merge pull request #1408 from trezor/fix-closing-rocksdb-on-shutdown-signal
closing Rocksdb on shutdown signal
This commit is contained in:
58
blockbook.go
58
blockbook.go
@@ -12,6 +12,7 @@ import (
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@@ -134,7 +135,24 @@ func mainWithExitCode() int {
|
||||
rand.Seed(time.Now().UTC().UnixNano())
|
||||
|
||||
chanOsSignal = make(chan os.Signal, 1)
|
||||
signal.Notify(chanOsSignal, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
|
||||
shutdownSigCh := make(chan os.Signal, 1)
|
||||
signalCh := make(chan os.Signal, 1)
|
||||
// Use a single signal listener and fan out shutdown signals to avoid races
|
||||
// where long-running workers consume the OS signal before main shutdown runs.
|
||||
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
|
||||
var shutdownOnce sync.Once
|
||||
go func() {
|
||||
sig := <-signalCh
|
||||
shutdownOnce.Do(func() {
|
||||
// Flip global shutdown state and close chanOsSignal to broadcast shutdown.
|
||||
// Closing the channel unblocks select loops that only receive from it.
|
||||
common.SetInShutdown()
|
||||
close(chanOsSignal)
|
||||
// Ensure waitForSignalAndShutdown can proceed even if the OS signal
|
||||
// was already consumed by another goroutine in previous versions.
|
||||
shutdownSigCh <- sig
|
||||
})
|
||||
}()
|
||||
|
||||
glog.Infof("Blockbook: %+v, debug mode %v", common.GetVersionInfo(), *debugMode)
|
||||
|
||||
@@ -174,7 +192,13 @@ func mainWithExitCode() int {
|
||||
glog.Error("rocksDB: ", err)
|
||||
return exitCodeFatal
|
||||
}
|
||||
defer index.Close()
|
||||
defer func() {
|
||||
glog.Info("shutdown: rocksdb close start")
|
||||
if err := index.Close(); err != nil {
|
||||
glog.Error("shutdown: rocksdb close error: ", err)
|
||||
}
|
||||
glog.Info("shutdown: rocksdb close finished")
|
||||
}()
|
||||
|
||||
internalState, err = newInternalState(config, index, *enableSubNewTx)
|
||||
if err != nil {
|
||||
@@ -358,17 +382,18 @@ func mainWithExitCode() int {
|
||||
if internalServer != nil || publicServer != nil || chain != nil {
|
||||
// start fiat rates downloader only if not shutting down immediately
|
||||
initDownloaders(index, chain, config)
|
||||
waitForSignalAndShutdown(internalServer, publicServer, chain, 10*time.Second)
|
||||
waitForSignalAndShutdown(internalServer, publicServer, chain, shutdownSigCh, 10*time.Second)
|
||||
}
|
||||
|
||||
// Always stop periodic state storage to prevent writes during shutdown.
|
||||
close(chanStoreInternalState)
|
||||
if *synchronize {
|
||||
close(chanSyncIndex)
|
||||
close(chanSyncMempool)
|
||||
close(chanStoreInternalState)
|
||||
<-chanSyncIndexDone
|
||||
<-chanSyncMempoolDone
|
||||
<-chanStoreInternalStateDone
|
||||
}
|
||||
<-chanStoreInternalStateDone
|
||||
return exitCodeOK
|
||||
}
|
||||
|
||||
@@ -521,10 +546,16 @@ func syncIndexLoop() {
|
||||
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
|
||||
common.TickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
|
||||
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
|
||||
if err == db.ErrOperationInterrupted || common.IsInShutdown() {
|
||||
return
|
||||
}
|
||||
glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...")
|
||||
// retry once in case of random network error, after a slight delay
|
||||
time.Sleep(time.Millisecond * 2500)
|
||||
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
|
||||
if err == db.ErrOperationInterrupted || common.IsInShutdown() {
|
||||
return
|
||||
}
|
||||
glog.Error("syncIndexLoop ", errors.ErrorStack(err))
|
||||
}
|
||||
}
|
||||
@@ -572,12 +603,11 @@ func syncMempoolLoop() {
|
||||
}
|
||||
|
||||
func storeInternalStateLoop() {
|
||||
stopCompute := make(chan os.Signal)
|
||||
defer func() {
|
||||
close(stopCompute)
|
||||
close(chanStoreInternalStateDone)
|
||||
}()
|
||||
signal.Notify(stopCompute, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
|
||||
// Reuse the global shutdown channel so compute work stops when shutdown begins.
|
||||
stopCompute := chanOsSignal
|
||||
var computeRunning bool
|
||||
lastCompute := time.Now()
|
||||
lastAppInfo := time.Now()
|
||||
@@ -653,11 +683,17 @@ func pushSynchronizationHandler(nt bchain.NotificationType) {
|
||||
}
|
||||
}
|
||||
|
||||
func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, timeout time.Duration) {
|
||||
sig := <-chanOsSignal
|
||||
func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, shutdownSig <-chan os.Signal, timeout time.Duration) {
|
||||
// Read the first OS signal from the dedicated channel to avoid races with worker shutdown paths.
|
||||
sig := <-shutdownSig
|
||||
common.SetInShutdown()
|
||||
glog.Infof("shutdown: %v", sig)
|
||||
if sig != nil {
|
||||
glog.Infof("shutdown: %v", sig)
|
||||
} else {
|
||||
glog.Info("shutdown: signal received")
|
||||
}
|
||||
|
||||
// Bound server/RPC shutdown; RocksDB close happens after main returns via defer.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
|
||||
@@ -7,7 +7,10 @@ After=network.target
|
||||
ExecStart={{template "Backend.ExecCommandTemplate" .}}
|
||||
User={{.Backend.SystemUser}}
|
||||
Restart=on-failure
|
||||
# Allow enough time for graceful shutdown/flush work before SIGKILL.
|
||||
TimeoutStopSec=300
|
||||
# Be explicit about the signal used for graceful shutdown.
|
||||
KillSignal=SIGTERM
|
||||
WorkingDirectory={{.Env.BackendInstallPath}}/{{.Coin.Alias}}
|
||||
{{if eq .Backend.ServiceType "forking" -}}
|
||||
Type=forking
|
||||
|
||||
38
db/sync.go
38
db/sync.go
@@ -253,26 +253,26 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync
|
||||
return nil
|
||||
}
|
||||
|
||||
if initialSync {
|
||||
ConnectLoop:
|
||||
for {
|
||||
select {
|
||||
case <-w.chanOsSignal:
|
||||
glog.Info("connectBlocks interrupted at height ", lastRes.block.Height)
|
||||
return ErrOperationInterrupted
|
||||
case res := <-bch:
|
||||
if res == empty {
|
||||
break ConnectLoop
|
||||
}
|
||||
err := connect(res)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
logInterrupted := func() {
|
||||
if lastRes.block != nil {
|
||||
glog.Info("connectBlocks interrupted at height ", lastRes.block.Height)
|
||||
} else {
|
||||
glog.Info("connectBlocks interrupted")
|
||||
}
|
||||
} else {
|
||||
// while regular sync, OS sig is handled by waitForSignalAndShutdown
|
||||
for res := range bch {
|
||||
}
|
||||
// During regular sync, shutdown is now signaled by closing chanOsSignal,
|
||||
// so we honor it here to avoid leaving RocksDB in an open state.
|
||||
// Initial sync uses the same shutdown-aware loop.
|
||||
ConnectLoop:
|
||||
for {
|
||||
select {
|
||||
case <-w.chanOsSignal:
|
||||
logInterrupted()
|
||||
return ErrOperationInterrupted
|
||||
case res := <-bch:
|
||||
if res == empty {
|
||||
break ConnectLoop
|
||||
}
|
||||
err := connect(res)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -13,6 +13,17 @@ import (
|
||||
"github.com/trezor/blockbook/db"
|
||||
)
|
||||
|
||||
// blockingChain delays GetBlock so shutdown can be asserted deterministically.
|
||||
type blockingChain struct {
|
||||
bchain.BlockChain
|
||||
gate chan struct{}
|
||||
}
|
||||
|
||||
func (c *blockingChain) GetBlock(hash string, height uint32) (*bchain.Block, error) {
|
||||
<-c.gate
|
||||
return nil, bchain.ErrBlockNotFound
|
||||
}
|
||||
|
||||
func testConnectBlocks(t *testing.T, h *TestHandler) {
|
||||
for _, rng := range h.TestData.ConnectBlocks.SyncRanges {
|
||||
withRocksDBAndSyncWorker(t, h, rng.Lower, func(d *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) {
|
||||
@@ -43,6 +54,20 @@ func testConnectBlocks(t *testing.T, h *TestHandler) {
|
||||
t.Run("verifyAddresses", func(t *testing.T) { verifyAddresses(t, d, h, rng) })
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("shutdownDuringRegularSync", func(t *testing.T) {
|
||||
withRocksDBAndSyncWorker(t, h, 0, func(_ *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) {
|
||||
gate := make(chan struct{})
|
||||
db.SetBlockChain(sw, &blockingChain{BlockChain: h.Chain, gate: gate})
|
||||
close(ch)
|
||||
err := db.ConnectBlocks(sw, nil, false)
|
||||
if err != db.ErrOperationInterrupted {
|
||||
t.Fatalf("expected ErrOperationInterrupted, got %v", err)
|
||||
}
|
||||
// Allow the worker goroutine to exit cleanly.
|
||||
close(gate)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func testConnectBlocksParallel(t *testing.T, h *TestHandler) {
|
||||
|
||||
Reference in New Issue
Block a user