Merge pull request #1409 from trezor/fix/continue-syncing-on-missing-block

fix: continue syncing on missing block error
This commit is contained in:
pragmaxim
2026-02-10 10:47:08 +01:00
committed by GitHub
5 changed files with 304 additions and 24 deletions

View File

@@ -0,0 +1,53 @@
#!/usr/bin/env bash
set -euo pipefail
die() { echo "error: $1" >&2; exit 1; }
[[ $# -ge 1 ]] || die "missing coin argument. usage: blockbook_backend_status.sh <coin>"
coin="$1"
var="BB_RPC_URL_HTTP_${coin}"
url="${!var-}"
[[ -n "$url" ]] || die "environment variable ${var} is not set"
user_var="BB_RPC_USER"
pass_var="BB_RPC_PASS"
user="${!user_var-}"
pass="${!pass_var-}"
auth=()
if [[ -n "$user" || -n "$pass" ]]; then
[[ -n "$user" && -n "$pass" ]] || die "set both ${user_var} and ${pass_var}"
auth=(-u "${user}:${pass}")
fi
command -v curl >/dev/null 2>&1 || die "curl is not installed"
command -v jq >/dev/null 2>&1 || die "jq is not installed"
rpc() { curl -skS "${auth[@]}" -H 'content-type: application/json' --data "{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"$1\",\"params\":${2:-[]}}" "$url"; }
resp="$(rpc eth_syncing)"
if echo "$resp" | jq -e '.error|not' >/dev/null 2>&1; then
if echo "$resp" | jq -e '.result == false' >/dev/null 2>&1; then
bn="$(rpc eth_blockNumber)"
echo "$bn" | jq -e '.error|not' >/dev/null 2>&1 || die "eth_blockNumber failed"
hex="$(echo "$bn" | jq -r '.result')"
[[ -n "$hex" && "$hex" != "null" ]] || die "eth_blockNumber returned empty result"
height=$((16#${hex#0x}))
jq -n --argjson height "$height" '{backend:"evm", is_synced:true, height:$height}'
else
cur_hex="$(echo "$resp" | jq -r '.result.currentBlock')"
high_hex="$(echo "$resp" | jq -r '.result.highestBlock')"
[[ -n "$cur_hex" && "$cur_hex" != "null" ]] || die "eth_syncing returned empty currentBlock"
[[ -n "$high_hex" && "$high_hex" != "null" ]] || die "eth_syncing returned empty highestBlock"
cur=$((16#${cur_hex#0x}))
high=$((16#${high_hex#0x}))
jq -n --argjson height "$cur" --argjson highest "$high" \
'{backend:"evm", is_synced:false, height:$height, highest:$highest}'
fi
exit 0
fi
resp="$(rpc getblockchaininfo)"
if echo "$resp" | jq -e '.result and (.error|not)' >/dev/null 2>&1; then
echo "$resp" | jq '{backend:"utxo", is_synced:(.result.initialblockdownload|not), height:.result.blocks, getblockchaininfo:.}'
exit 0
fi
die "backend did not return a valid eth_syncing or getblockchaininfo response"

View File

@@ -0,0 +1,19 @@
#!/usr/bin/env bash
set -euo pipefail
die() { echo "error: $1" >&2; exit 1; }
[[ $# -ge 1 ]] || die "missing coin argument. usage: blockbook_status.sh <coin> [hostname]"
coin="$1"
if [[ -n "${2-}" ]]; then
host="$2"
else
host="localhost"
fi
var="B_PORT_PUBLIC_${coin}"
port="${!var-}"
[[ -n "$port" ]] || die "environment variable ${var} is not set"
command -v curl >/dev/null 2>&1 || die "curl is not installed"
command -v jq >/dev/null 2>&1 || die "jq is not installed"
curl -skv "https://${host}:${port}/api/status" | jq

View File

@@ -1,6 +1,7 @@
package db
import (
stdErrors "errors"
"os"
"sync"
"sync/atomic"
@@ -21,31 +22,73 @@ type SyncWorker struct {
startHeight uint32
startHash string
chanOsSignal chan os.Signal
missingBlockRetry MissingBlockRetryConfig
metrics *common.Metrics
is *common.InternalState
}
// MissingBlockRetryConfig controls how long we retry a missing block before re-checking chain state.
type MissingBlockRetryConfig struct {
// RecheckThreshold is the number of consecutive ErrBlockNotFound retries
// before re-checking the tip/hash for a reorg or rollback.
RecheckThreshold int
// TipRecheckThreshold is a lower threshold used once the hash queue is
// closed (we are at the tail of the requested range).
TipRecheckThreshold int
// RetryDelay keeps retry pressure low while still reacting quickly to transient backend gaps.
RetryDelay time.Duration
}
// SyncWorkerConfig bundles optional tuning knobs for SyncWorker.
type SyncWorkerConfig struct {
MissingBlockRetry MissingBlockRetryConfig
}
func defaultSyncWorkerConfig() SyncWorkerConfig {
return SyncWorkerConfig{
MissingBlockRetry: MissingBlockRetryConfig{
RecheckThreshold: 10, // - RecheckThreshold >= 1
RetryDelay: 1 * time.Second, // - TipRecheckThreshold >= 1 && TipRecheckThreshold <= RecheckThreshold
TipRecheckThreshold: 3, // - RetryDelay > 0
},
}
}
// NewSyncWorker creates new SyncWorker and returns its handle
func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics, is *common.InternalState) (*SyncWorker, error) {
return NewSyncWorkerWithConfig(db, chain, syncWorkers, syncChunk, minStartHeight, dryRun, chanOsSignal, metrics, is, nil)
}
// NewSyncWorkerWithConfig allows tests or callers to override SyncWorker defaults.
func NewSyncWorkerWithConfig(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics, is *common.InternalState, cfg *SyncWorkerConfig) (*SyncWorker, error) {
if minStartHeight < 0 {
minStartHeight = 0
}
effectiveCfg := defaultSyncWorkerConfig()
if cfg != nil {
effectiveCfg = *cfg
}
return &SyncWorker{
db: db,
chain: chain,
syncWorkers: syncWorkers,
syncChunk: syncChunk,
dryRun: dryRun,
startHeight: uint32(minStartHeight),
chanOsSignal: chanOsSignal,
metrics: metrics,
is: is,
db: db,
chain: chain,
syncWorkers: syncWorkers,
syncChunk: syncChunk,
dryRun: dryRun,
startHeight: uint32(minStartHeight),
chanOsSignal: chanOsSignal,
missingBlockRetry: effectiveCfg.MissingBlockRetry,
metrics: metrics,
is: is,
}, nil
}
var errSynced = errors.New("synced")
var errFork = errors.New("fork")
// errResync signals that the parallel/bulk sync should restart because the
// target block hash no longer matches the chain (likely reorg/rollback).
var errResync = errors.New("resync")
// ErrOperationInterrupted is returned when operation is interrupted by OS signal
var ErrOperationInterrupted = errors.New("ErrOperationInterrupted")
@@ -132,7 +175,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
if localBestHash != "" {
remoteHash, err := w.chain.GetBlockHash(localBestHeight)
// for some coins (eth) remote can be at lower best height after rollback
if err != nil && err != bchain.ErrBlockNotFound {
if err != nil && !stdErrors.Is(err, bchain.ErrBlockNotFound) {
return err
}
if remoteHash != localBestHash {
@@ -166,8 +209,14 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
if initialSync {
if remoteBestHeight-w.startHeight > uint32(w.syncChunk) {
glog.Infof("resync: bulk sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers)
// Bulk sync can encounter a disappearing block hash during reorgs.
// When that happens, it returns errResync to trigger a full restart.
err = w.BulkConnectBlocks(w.startHeight, remoteBestHeight)
if err != nil {
if stdErrors.Is(err, errResync) {
// block hash changed during parallel sync, restart the full resync
return w.resyncIndex(onNewBlock, initialSync)
}
return err
}
// after parallel load finish the sync using standard way,
@@ -179,8 +228,14 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
syncWorkers := uint32(4)
if remoteBestHeight-w.startHeight >= syncWorkers {
glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, syncWorkers)
// Parallel sync also returns errResync when a requested hash no longer
// exists at its height; restart to realign with the canonical chain.
err = w.ParallelConnectBlocks(onNewBlock, w.startHeight, remoteBestHeight, syncWorkers)
if err != nil {
if stdErrors.Is(err, errResync) {
// block hash changed during parallel sync, restart the full resync
return w.resyncIndex(onNewBlock, initialSync)
}
return err
}
// after parallel load finish the sync using standard way,
@@ -190,7 +245,7 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
}
}
err = w.connectBlocks(onNewBlock, initialSync)
if err == errFork {
if stdErrors.Is(err, errFork) || stdErrors.Is(err, errResync) {
return w.resyncIndex(onNewBlock, initialSync)
}
return err
@@ -210,7 +265,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on
}
remote, err := w.chain.GetBlockHash(height)
// for some coins (eth) remote can be at lower best height after rollback
if err != nil && err != bchain.ErrBlockNotFound {
if err != nil && !stdErrors.Is(err, bchain.ErrBlockNotFound) {
return err
}
if local == remote {
@@ -292,6 +347,27 @@ type hashHeight struct {
height uint32
}
func (w *SyncWorker) shouldRestartSyncOnMissingBlock(height uint32, expectedHash string) (bool, error) {
// When a block hash disappears at a given height, it usually indicates a
// reorg/rollback. Confirm by checking the current tip and block hash.
bestHeight, err := w.chain.GetBestBlockHeight()
if err != nil {
return false, err
}
if bestHeight < height {
// The tip moved below the requested height, so this block is no longer valid.
return true, nil
}
currentHash, err := w.chain.GetBlockHash(height)
if err != nil {
if stdErrors.Is(err, bchain.ErrBlockNotFound) {
return true, nil
}
return false, err
}
return currentHash != expectedHash, nil
}
// ParallelConnectBlocks uses parallel goroutines to get data from blockchain daemon but keeps Blockbook in
func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, lower, higher uint32, syncWorkers uint32) error {
var err error
@@ -305,6 +381,8 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low
hchClosed.Store(false)
writeBlockDone := make(chan struct{})
terminating := make(chan struct{})
// abortCh is used by workers to signal a resync-worthy reorg.
abortCh := make(chan error, 1)
writeBlockWorker := func() {
defer close(writeBlockDone)
lastBlock := lower - 1
@@ -345,13 +423,19 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low
}
for i := 0; i < int(syncWorkers); i++ {
wg.Add(1)
go w.getBlockWorker(i, syncWorkers, &wg, hch, bch, &hchClosed, terminating)
go w.getBlockWorker(i, syncWorkers, &wg, hch, bch, &hchClosed, terminating, abortCh)
}
go writeBlockWorker()
var hash string
ConnectLoop:
for h := lower; h <= higher; {
select {
case abortErr := <-abortCh:
// Another worker observed a missing block that no longer matches the chain.
glog.Warning("sync: parallel connect aborted, restarting sync")
err = abortErr
close(terminating)
break ConnectLoop
case <-w.chanOsSignal:
glog.Info("connectBlocksParallel interrupted at height ", h)
err = ErrOperationInterrupted
@@ -382,27 +466,67 @@ ConnectLoop:
return err
}
func (w *SyncWorker) getBlockWorker(i int, syncWorkers uint32, wg *sync.WaitGroup, hch chan hashHeight, bch []chan *bchain.Block, hchClosed *atomic.Value, terminating chan struct{}) {
func (w *SyncWorker) getBlockWorker(i int, syncWorkers uint32, wg *sync.WaitGroup, hch chan hashHeight, bch []chan *bchain.Block, hchClosed *atomic.Value, terminating chan struct{}, abortCh chan error) {
defer wg.Done()
var err error
var block *bchain.Block
cfg := w.missingBlockRetry
GetBlockLoop:
for hh := range hch {
// Track consecutive not-found errors per block so we only re-check the
// chain once the backend has had a chance to catch up.
notFoundRetries := 0
for {
// Allow global shutdown or an abort to stop the retry loop promptly.
select {
case <-terminating:
return
case <-w.chanOsSignal:
return
default:
}
block, err = w.chain.GetBlock(hh.hash, hh.height)
if err != nil {
// signal came while looping in the error loop
if hchClosed.Load() == true {
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...")
return
}
if err == bchain.ErrBlockNotFound {
if stdErrors.Is(err, bchain.ErrBlockNotFound) {
notFoundRetries++
glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...")
threshold := cfg.RecheckThreshold
// Once the hash queue is closed we are at the tail of the range; use
// a smaller threshold to avoid stalling on a missing tip block.
if hchClosed.Load() == true {
threshold = cfg.TipRecheckThreshold
}
if notFoundRetries >= threshold {
restart, checkErr := w.shouldRestartSyncOnMissingBlock(hh.height, hh.hash)
if checkErr != nil {
glog.Error("getBlockWorker ", i, " missing block check error ", checkErr)
} else if restart {
// The block hash at this height no longer exists; restart sync to realign.
glog.Warning("sync: block ", hh.height, " ", hh.hash, " no longer on chain, restarting sync")
select {
case abortCh <- errResync:
default:
}
return
}
}
} else {
// When the hash queue is closed, stop retrying non-notfound errors.
if hchClosed.Load() == true {
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...")
return
}
notFoundRetries = 0
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...")
}
w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc()
time.Sleep(time.Millisecond * 500)
select {
case <-terminating:
return
case <-w.chanOsSignal:
return
case <-time.After(cfg.RetryDelay):
}
} else {
break
}
@@ -432,6 +556,8 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error {
hchClosed.Store(false)
writeBlockDone := make(chan struct{})
terminating := make(chan struct{})
// abortCh is used by workers to signal a resync-worthy reorg.
abortCh := make(chan error, 1)
writeBlockWorker := func() {
defer close(writeBlockDone)
bc, err := w.db.InitBulkConnect()
@@ -468,7 +594,7 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error {
}
for i := 0; i < w.syncWorkers; i++ {
wg.Add(1)
go w.getBlockWorker(i, uint32(w.syncWorkers), &wg, hch, bch, &hchClosed, terminating)
go w.getBlockWorker(i, uint32(w.syncWorkers), &wg, hch, bch, &hchClosed, terminating, abortCh)
}
go writeBlockWorker()
var hash string
@@ -477,6 +603,12 @@ func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error {
ConnectLoop:
for h := lower; h <= higher; {
select {
case abortErr := <-abortCh:
// Another worker observed a missing block that no longer matches the chain.
glog.Warning("sync: bulk connect aborted, restarting sync")
err = abortErr
close(terminating)
break ConnectLoop
case <-w.chanOsSignal:
glog.Info("connectBlocksParallel interrupted at height ", h)
err = ErrOperationInterrupted
@@ -539,7 +671,7 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) {
}
block, err := w.chain.GetBlock(hash, height)
if err != nil {
if err == bchain.ErrBlockNotFound {
if stdErrors.Is(err, bchain.ErrBlockNotFound) {
break
}
out <- blockResult{err: err}

View File

@@ -16,6 +16,47 @@ import (
func testHandleFork(t *testing.T, h *TestHandler) {
for _, rng := range h.TestData.HandleFork.SyncRanges {
t.Run("missingBlockResync", func(t *testing.T) {
withRocksDBAndSyncWorker(t, h, rng.Lower, func(d *db.RocksDB, sw *db.SyncWorker, _ chan os.Signal) {
fakeBlocks := getFakeBlocks(h, rng)
if len(fakeBlocks) == 0 {
t.Skip("no fake blocks for missing block test")
}
chain, err := makeFakeChain(h.Chain, fakeBlocks, rng.Upper)
if err != nil {
t.Fatal(err)
}
// Use the last fake block (at upper height) to simulate a hash that disappears.
fakeUpper := fakeBlocks[len(fakeBlocks)-1]
realUpperHash, err := h.Chain.GetBlockHash(fakeUpper.Height)
if err != nil {
t.Fatal(err)
}
if realUpperHash == fakeUpper.Hash {
t.Skip("fake block hash matches real hash, cannot simulate missing block")
}
missingChain := &missingBlockChain{
fakeBlockChain: chain,
missingHeight: fakeUpper.Height,
missingHash: fakeUpper.Hash,
switchAfter: 3,
}
db.SetBlockChain(sw, missingChain)
if err := sw.ResyncIndex(nil, true); err != nil {
t.Fatalf("ResyncIndex failed after missing block: %v", err)
}
bestHeight, bestHash, err := d.GetBestBlock()
if err != nil {
t.Fatal(err)
}
if bestHeight != rng.Upper {
t.Fatalf("best height mismatch: %d != %d", bestHeight, rng.Upper)
}
if bestHash != realUpperHash {
t.Fatalf("best hash mismatch after resync: %s != %s", bestHash, realUpperHash)
}
})
})
withRocksDBAndSyncWorker(t, h, rng.Lower, func(d *db.RocksDB, sw *db.SyncWorker, ch chan os.Signal) {
fakeBlocks := getFakeBlocks(h, rng)
chain, err := makeFakeChain(h.Chain, fakeBlocks, rng.Upper)
@@ -67,6 +108,30 @@ func testHandleFork(t *testing.T, h *TestHandler) {
}
}
// missingBlockChain simulates a temporary "block not found" error for a known hash,
// then flips the fake chain back to real hashes to emulate a reorg/rollback.
type missingBlockChain struct {
*fakeBlockChain
missingHeight uint32
missingHash string
switchAfter int
notFoundCount int
switched bool
}
func (c *missingBlockChain) GetBlock(hash string, height uint32) (*bchain.Block, error) {
if !c.switched && height == c.missingHeight && hash == c.missingHash {
c.notFoundCount++
if c.notFoundCount >= c.switchAfter {
// Stop serving fake hashes so GetBlockHash returns the real chain hash.
c.returnFakes = false
c.switched = true
}
return nil, bchain.ErrBlockNotFound
}
return c.fakeBlockChain.GetBlock(hash, height)
}
func verifyAddresses2(t *testing.T, d *db.RocksDB, chain bchain.BlockChain, blks []BlockID) {
parser := chain.GetChainParser()

View File

@@ -9,6 +9,7 @@ import (
"os"
"path/filepath"
"testing"
"time"
"github.com/trezor/blockbook/bchain"
"github.com/trezor/blockbook/common"
@@ -162,6 +163,16 @@ func makeRocksDB(parser bchain.BlockChainParser, m *common.Metrics, is *common.I
var metricsRegistry = map[string]*common.Metrics{}
// Lower thresholds speed up integration tests that intentionally trigger
// missing-block retries.
var testSyncWorkerConfig = &db.SyncWorkerConfig{
MissingBlockRetry: db.MissingBlockRetryConfig{
RecheckThreshold: 3,
TipRecheckThreshold: 2,
RetryDelay: 50 * time.Millisecond,
},
}
func getMetrics(name string) (*common.Metrics, error) {
if m, found := metricsRegistry[name]; found {
return m, nil
@@ -190,7 +201,7 @@ func withRocksDBAndSyncWorker(t *testing.T, h *TestHandler, startHeight uint32,
ch := make(chan os.Signal)
sw, err := db.NewSyncWorker(d, h.Chain, 8, 0, int(startHeight), false, ch, m, is)
sw, err := db.NewSyncWorkerWithConfig(d, h.Chain, 8, 0, int(startHeight), false, ch, m, is, testSyncWorkerConfig)
if err != nil {
t.Fatal(err)
}