diff --git a/db/sync.go b/db/sync.go index dbc333f8..1bf3d325 100644 --- a/db/sync.go +++ b/db/sync.go @@ -1,6 +1,7 @@ package db import ( + stdErrors "errors" "os" "sync" "sync/atomic" @@ -21,31 +22,73 @@ type SyncWorker struct { startHeight uint32 startHash string chanOsSignal chan os.Signal + missingBlockRetry MissingBlockRetryConfig metrics *common.Metrics is *common.InternalState } +// MissingBlockRetryConfig controls how long we retry a missing block before re-checking chain state. +type MissingBlockRetryConfig struct { + // RecheckThreshold is the number of consecutive ErrBlockNotFound retries + // before re-checking the tip/hash for a reorg or rollback. + RecheckThreshold int + // TipRecheckThreshold is a lower threshold used once the hash queue is + // closed (we are at the tail of the requested range). + TipRecheckThreshold int + // RetryDelay keeps retry pressure low while still reacting quickly to transient backend gaps. + RetryDelay time.Duration +} + +// SyncWorkerConfig bundles optional tuning knobs for SyncWorker. +type SyncWorkerConfig struct { + MissingBlockRetry MissingBlockRetryConfig +} + +func defaultSyncWorkerConfig() SyncWorkerConfig { + return SyncWorkerConfig{ + MissingBlockRetry: MissingBlockRetryConfig{ + RecheckThreshold: 10, // - RecheckThreshold >= 1 + RetryDelay: 1 * time.Second, // - TipRecheckThreshold >= 1 && TipRecheckThreshold <= RecheckThreshold + TipRecheckThreshold: 3, // - RetryDelay > 0 + }, + } +} + // NewSyncWorker creates new SyncWorker and returns its handle func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics, is *common.InternalState) (*SyncWorker, error) { + return NewSyncWorkerWithConfig(db, chain, syncWorkers, syncChunk, minStartHeight, dryRun, chanOsSignal, metrics, is, nil) +} + +// NewSyncWorkerWithConfig allows tests or callers to override SyncWorker defaults. +func NewSyncWorkerWithConfig(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics, is *common.InternalState, cfg *SyncWorkerConfig) (*SyncWorker, error) { if minStartHeight < 0 { minStartHeight = 0 } + effectiveCfg := defaultSyncWorkerConfig() + if cfg != nil { + effectiveCfg = *cfg + } return &SyncWorker{ - db: db, - chain: chain, - syncWorkers: syncWorkers, - syncChunk: syncChunk, - dryRun: dryRun, - startHeight: uint32(minStartHeight), - chanOsSignal: chanOsSignal, - metrics: metrics, - is: is, + db: db, + chain: chain, + syncWorkers: syncWorkers, + syncChunk: syncChunk, + dryRun: dryRun, + startHeight: uint32(minStartHeight), + chanOsSignal: chanOsSignal, + missingBlockRetry: effectiveCfg.MissingBlockRetry, + metrics: metrics, + is: is, }, nil } var errSynced = errors.New("synced") var errFork = errors.New("fork") +// errResync signals that the parallel/bulk sync should restart because the +// target block hash no longer matches the chain (likely reorg/rollback). +var errResync = errors.New("resync") + // ErrOperationInterrupted is returned when operation is interrupted by OS signal var ErrOperationInterrupted = errors.New("ErrOperationInterrupted") @@ -132,7 +175,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b if localBestHash != "" { remoteHash, err := w.chain.GetBlockHash(localBestHeight) // for some coins (eth) remote can be at lower best height after rollback - if err != nil && err != bchain.ErrBlockNotFound { + if err != nil && !stdErrors.Is(err, bchain.ErrBlockNotFound) { return err } if remoteHash != localBestHash { @@ -166,8 +209,14 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b if initialSync { if remoteBestHeight-w.startHeight > uint32(w.syncChunk) { glog.Infof("resync: bulk sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers) + // Bulk sync can encounter a disappearing block hash during reorgs. + // When that happens, it returns errResync to trigger a full restart. err = w.BulkConnectBlocks(w.startHeight, remoteBestHeight) if err != nil { + if stdErrors.Is(err, errResync) { + // block hash changed during parallel sync, restart the full resync + return w.resyncIndex(onNewBlock, initialSync) + } return err } // after parallel load finish the sync using standard way, @@ -179,8 +228,14 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b syncWorkers := uint32(4) if remoteBestHeight-w.startHeight >= syncWorkers { glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, syncWorkers) + // Parallel sync also returns errResync when a requested hash no longer + // exists at its height; restart to realign with the canonical chain. err = w.ParallelConnectBlocks(onNewBlock, w.startHeight, remoteBestHeight, syncWorkers) if err != nil { + if stdErrors.Is(err, errResync) { + // block hash changed during parallel sync, restart the full resync + return w.resyncIndex(onNewBlock, initialSync) + } return err } // after parallel load finish the sync using standard way, @@ -190,7 +245,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b } } err = w.connectBlocks(onNewBlock, initialSync) - if err == errFork { + if stdErrors.Is(err, errFork) || stdErrors.Is(err, errResync) { return w.resyncIndex(onNewBlock, initialSync) } return err @@ -210,7 +265,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on } remote, err := w.chain.GetBlockHash(height) // for some coins (eth) remote can be at lower best height after rollback - if err != nil && err != bchain.ErrBlockNotFound { + if err != nil && !stdErrors.Is(err, bchain.ErrBlockNotFound) { return err } if local == remote { @@ -292,6 +347,27 @@ type hashHeight struct { height uint32 } +func (w *SyncWorker) shouldRestartSyncOnMissingBlock(height uint32, expectedHash string) (bool, error) { + // When a block hash disappears at a given height, it usually indicates a + // reorg/rollback. Confirm by checking the current tip and block hash. + bestHeight, err := w.chain.GetBestBlockHeight() + if err != nil { + return false, err + } + if bestHeight < height { + // The tip moved below the requested height, so this block is no longer valid. + return true, nil + } + currentHash, err := w.chain.GetBlockHash(height) + if err != nil { + if stdErrors.Is(err, bchain.ErrBlockNotFound) { + return true, nil + } + return false, err + } + return currentHash != expectedHash, nil +} + // ParallelConnectBlocks uses parallel goroutines to get data from blockchain daemon but keeps Blockbook in func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, lower, higher uint32, syncWorkers uint32) error { var err error @@ -305,6 +381,8 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low hchClosed.Store(false) writeBlockDone := make(chan struct{}) terminating := make(chan struct{}) + // abortCh is used by workers to signal a resync-worthy reorg. + abortCh := make(chan error, 1) writeBlockWorker := func() { defer close(writeBlockDone) lastBlock := lower - 1 @@ -345,13 +423,19 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low } for i := 0; i < int(syncWorkers); i++ { wg.Add(1) - go w.getBlockWorker(i, syncWorkers, &wg, hch, bch, &hchClosed, terminating) + go w.getBlockWorker(i, syncWorkers, &wg, hch, bch, &hchClosed, terminating, abortCh) } go writeBlockWorker() var hash string ConnectLoop: for h := lower; h <= higher; { select { + case abortErr := <-abortCh: + // Another worker observed a missing block that no longer matches the chain. + glog.Warning("sync: parallel connect aborted, restarting sync") + err = abortErr + close(terminating) + break ConnectLoop case <-w.chanOsSignal: glog.Info("connectBlocksParallel interrupted at height ", h) err = ErrOperationInterrupted @@ -382,27 +466,67 @@ ConnectLoop: return err } -func (w *SyncWorker) getBlockWorker(i int, syncWorkers uint32, wg *sync.WaitGroup, hch chan hashHeight, bch []chan *bchain.Block, hchClosed *atomic.Value, terminating chan struct{}) { +func (w *SyncWorker) getBlockWorker(i int, syncWorkers uint32, wg *sync.WaitGroup, hch chan hashHeight, bch []chan *bchain.Block, hchClosed *atomic.Value, terminating chan struct{}, abortCh chan error) { defer wg.Done() var err error var block *bchain.Block + cfg := w.missingBlockRetry GetBlockLoop: for hh := range hch { + // Track consecutive not-found errors per block so we only re-check the + // chain once the backend has had a chance to catch up. + notFoundRetries := 0 for { + // Allow global shutdown or an abort to stop the retry loop promptly. + select { + case <-terminating: + return + case <-w.chanOsSignal: + return + default: + } block, err = w.chain.GetBlock(hh.hash, hh.height) if err != nil { - // signal came while looping in the error loop - if hchClosed.Load() == true { - glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...") - return - } - if err == bchain.ErrBlockNotFound { + if stdErrors.Is(err, bchain.ErrBlockNotFound) { + notFoundRetries++ glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...") + threshold := cfg.RecheckThreshold + // Once the hash queue is closed we are at the tail of the range; use + // a smaller threshold to avoid stalling on a missing tip block. + if hchClosed.Load() == true { + threshold = cfg.TipRecheckThreshold + } + if notFoundRetries >= threshold { + restart, checkErr := w.shouldRestartSyncOnMissingBlock(hh.height, hh.hash) + if checkErr != nil { + glog.Error("getBlockWorker ", i, " missing block check error ", checkErr) + } else if restart { + // The block hash at this height no longer exists; restart sync to realign. + glog.Warning("sync: block ", hh.height, " ", hh.hash, " no longer on chain, restarting sync") + select { + case abortCh <- errResync: + default: + } + return + } + } } else { + // When the hash queue is closed, stop retrying non-notfound errors. + if hchClosed.Load() == true { + glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...") + return + } + notFoundRetries = 0 glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...") } w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() - time.Sleep(time.Millisecond * 500) + select { + case <-terminating: + return + case <-w.chanOsSignal: + return + case <-time.After(cfg.RetryDelay): + } } else { break } @@ -432,6 +556,8 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error { hchClosed.Store(false) writeBlockDone := make(chan struct{}) terminating := make(chan struct{}) + // abortCh is used by workers to signal a resync-worthy reorg. + abortCh := make(chan error, 1) writeBlockWorker := func() { defer close(writeBlockDone) bc, err := w.db.InitBulkConnect() @@ -468,7 +594,7 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error { } for i := 0; i < w.syncWorkers; i++ { wg.Add(1) - go w.getBlockWorker(i, uint32(w.syncWorkers), &wg, hch, bch, &hchClosed, terminating) + go w.getBlockWorker(i, uint32(w.syncWorkers), &wg, hch, bch, &hchClosed, terminating, abortCh) } go writeBlockWorker() var hash string @@ -477,6 +603,12 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error { ConnectLoop: for h := lower; h <= higher; { select { + case abortErr := <-abortCh: + // Another worker observed a missing block that no longer matches the chain. + glog.Warning("sync: bulk connect aborted, restarting sync") + err = abortErr + close(terminating) + break ConnectLoop case <-w.chanOsSignal: glog.Info("connectBlocksParallel interrupted at height ", h) err = ErrOperationInterrupted @@ -539,7 +671,7 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) { } block, err := w.chain.GetBlock(hash, height) if err != nil { - if err == bchain.ErrBlockNotFound { + if stdErrors.Is(err, bchain.ErrBlockNotFound) { break } out <- blockResult{err: err} diff --git a/tests/sync/handlefork.go b/tests/sync/handlefork.go index 79761966..871d4fce 100644 --- a/tests/sync/handlefork.go +++ b/tests/sync/handlefork.go @@ -16,6 +16,47 @@ import ( func testHandleFork(t *testing.T, h *TestHandler) { for _, rng := range h.TestData.HandleFork.SyncRanges { + t.Run("missingBlockResync", func(t *testing.T) { + withRocksDBAndSyncWorker(t, h, rng.Lower, func(d *db.RocksDB, sw *db.SyncWorker, _ chan os.Signal) { + fakeBlocks := getFakeBlocks(h, rng) + if len(fakeBlocks) == 0 { + t.Skip("no fake blocks for missing block test") + } + chain, err := makeFakeChain(h.Chain, fakeBlocks, rng.Upper) + if err != nil { + t.Fatal(err) + } + // Use the last fake block (at upper height) to simulate a hash that disappears. + fakeUpper := fakeBlocks[len(fakeBlocks)-1] + realUpperHash, err := h.Chain.GetBlockHash(fakeUpper.Height) + if err != nil { + t.Fatal(err) + } + if realUpperHash == fakeUpper.Hash { + t.Skip("fake block hash matches real hash, cannot simulate missing block") + } + missingChain := &missingBlockChain{ + fakeBlockChain: chain, + missingHeight: fakeUpper.Height, + missingHash: fakeUpper.Hash, + switchAfter: 3, + } + db.SetBlockChain(sw, missingChain) + if err := sw.ResyncIndex(nil, true); err != nil { + t.Fatalf("ResyncIndex failed after missing block: %v", err) + } + bestHeight, bestHash, err := d.GetBestBlock() + if err != nil { + t.Fatal(err) + } + if bestHeight != rng.Upper { + t.Fatalf("best height mismatch: %d != %d", bestHeight, rng.Upper) + } + if bestHash != realUpperHash { + t.Fatalf("best hash mismatch after resync: %s != %s", bestHash, realUpperHash) + } + }) + }) withRocksDBAndSyncWorker(t, h, rng.Lower, func(d *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) { fakeBlocks := getFakeBlocks(h, rng) chain, err := makeFakeChain(h.Chain, fakeBlocks, rng.Upper) @@ -67,6 +108,30 @@ func testHandleFork(t *testing.T, h *TestHandler) { } } +// missingBlockChain simulates a temporary "block not found" error for a known hash, +// then flips the fake chain back to real hashes to emulate a reorg/rollback. +type missingBlockChain struct { + *fakeBlockChain + missingHeight uint32 + missingHash string + switchAfter int + notFoundCount int + switched bool +} + +func (c *missingBlockChain) GetBlock(hash string, height uint32) (*bchain.Block, error) { + if !c.switched && height == c.missingHeight && hash == c.missingHash { + c.notFoundCount++ + if c.notFoundCount >= c.switchAfter { + // Stop serving fake hashes so GetBlockHash returns the real chain hash. + c.returnFakes = false + c.switched = true + } + return nil, bchain.ErrBlockNotFound + } + return c.fakeBlockChain.GetBlock(hash, height) +} + func verifyAddresses2(t *testing.T, d *db.RocksDB, chain bchain.BlockChain, blks []BlockID) { parser := chain.GetChainParser() diff --git a/tests/sync/sync.go b/tests/sync/sync.go index b3e16421..7c7515f3 100644 --- a/tests/sync/sync.go +++ b/tests/sync/sync.go @@ -9,6 +9,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/trezor/blockbook/bchain" "github.com/trezor/blockbook/common" @@ -162,6 +163,16 @@ func makeRocksDB(parser bchain.BlockChainParser, m *common.Metrics, is *common.I var metricsRegistry = map[string]*common.Metrics{} +// Lower thresholds speed up integration tests that intentionally trigger +// missing-block retries. +var testSyncWorkerConfig = &db.SyncWorkerConfig{ + MissingBlockRetry: db.MissingBlockRetryConfig{ + RecheckThreshold: 3, + TipRecheckThreshold: 2, + RetryDelay: 50 * time.Millisecond, + }, +} + func getMetrics(name string) (*common.Metrics, error) { if m, found := metricsRegistry[name]; found { return m, nil @@ -190,7 +201,7 @@ func withRocksDBAndSyncWorker(t *testing.T, h *TestHandler, startHeight uint32, ch := make(chan os.Signal) - sw, err := db.NewSyncWorker(d, h.Chain, 8, 0, int(startHeight), false, ch, m, is) + sw, err := db.NewSyncWorkerWithConfig(d, h.Chain, 8, 0, int(startHeight), false, ch, m, is, testSyncWorkerConfig) if err != nil { t.Fatal(err) }