Merge pull request #1407 from trezor/base-newHeads-subscription-fix

fix: avoid Base newHeads bursts
This commit is contained in:
pragmaxim
2026-02-10 10:46:27 +01:00
committed by GitHub

View File

@@ -66,18 +66,22 @@ type Configuration struct {
// EthereumRPC is an interface to JSON-RPC eth service.
type EthereumRPC struct {
*bchain.BaseChain
Client bchain.EVMClient
RPC bchain.EVMRPCClient
MainNetChainID Network
Timeout time.Duration
Parser *EthereumParser
PushHandler func(bchain.NotificationType)
OpenRPC func(string, string) (bchain.EVMRPCClient, bchain.EVMClient, error)
Mempool *bchain.MempoolEthereumType
mempoolInitialized bool
bestHeaderLock sync.Mutex
bestHeader bchain.EVMHeader
bestHeaderTime time.Time
Client bchain.EVMClient
RPC bchain.EVMRPCClient
MainNetChainID Network
Timeout time.Duration
Parser *EthereumParser
PushHandler func(bchain.NotificationType)
OpenRPC func(string, string) (bchain.EVMRPCClient, bchain.EVMClient, error)
Mempool *bchain.MempoolEthereumType
mempoolInitialized bool
bestHeaderLock sync.Mutex
bestHeader bchain.EVMHeader
bestHeaderTime time.Time
// newBlockNotifyCh coalesces bursts of newHeads events into a single wake-up.
// This keeps the subscription reader unblocked while we refresh the canonical tip.
newBlockNotifyCh chan struct{}
newBlockNotifyOnce sync.Once
NewBlock bchain.EVMNewBlockSubscriber
newBlockSubscription bchain.EVMClientSubscription
NewTx bchain.EVMNewTxSubscriber
@@ -113,6 +117,8 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
BaseChain: &bchain.BaseChain{},
ChainConfig: &c,
}
// 1-slot buffer ensures we only queue one "refresh tip" signal at a time.
s.newBlockNotifyCh = make(chan struct{}, 1)
ProcessInternalTransactions = c.ProcessInternalTransactions
@@ -342,16 +348,17 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu
}
func (b *EthereumRPC) subscribeEvents() error {
b.newBlockNotifyOnce.Do(func() {
go b.newBlockNotifier()
})
// new block notifications handling
go func() {
for {
h, ok := b.NewBlock.Read()
_, ok := b.NewBlock.Read()
if !ok {
break
}
b.UpdateBestHeader(h)
// notify blockbook
b.PushHandler(bchain.NotificationNewBlock)
b.signalNewBlock()
}
}()
@@ -608,11 +615,69 @@ func (b *EthereumRPC) getBestHeader() (bchain.EVMHeader, error) {
// UpdateBestHeader keeps track of the latest block header confirmed on chain
func (b *EthereumRPC) UpdateBestHeader(h bchain.EVMHeader) {
if h == nil || h.Number() == nil {
return
}
glog.V(2).Info("rpc: new block header ", h.Number().Uint64())
b.setBestHeader(h)
}
func (b *EthereumRPC) signalNewBlock() {
// Non-blocking send: one pending signal is enough to refresh the tip.
select {
case b.newBlockNotifyCh <- struct{}{}:
default:
}
}
func (b *EthereumRPC) newBlockNotifier() {
for range b.newBlockNotifyCh {
updated, err := b.refreshBestHeaderFromChain()
if err != nil {
glog.Error("refreshBestHeaderFromChain ", err)
continue
}
if updated {
b.PushHandler(bchain.NotificationNewBlock)
}
}
}
func (b *EthereumRPC) refreshBestHeaderFromChain() (bool, error) {
if b.Client == nil {
return false, errors.New("rpc client not initialized")
}
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)
defer cancel()
h, err := b.Client.HeaderByNumber(ctx, nil)
if err != nil {
return false, err
}
if h == nil || h.Number() == nil {
return false, errors.New("best header is nil")
}
return b.setBestHeader(h), nil
}
func (b *EthereumRPC) setBestHeader(h bchain.EVMHeader) bool {
if h == nil || h.Number() == nil {
return false
}
b.bestHeaderLock.Lock()
defer b.bestHeaderLock.Unlock()
changed := false
if b.bestHeader == nil || b.bestHeader.Number() == nil {
changed = true
} else {
prevNum := b.bestHeader.Number().Uint64()
newNum := h.Number().Uint64()
if prevNum != newNum || b.bestHeader.Hash() != h.Hash() {
changed = true
}
}
b.bestHeader = h
b.bestHeaderTime = time.Now()
b.bestHeaderLock.Unlock()
return changed
}
// GetBestBlockHash returns hash of the tip of the best-block-chain