diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index bcc247e1..69ce3340 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -66,18 +66,22 @@ type Configuration struct { // EthereumRPC is an interface to JSON-RPC eth service. type EthereumRPC struct { *bchain.BaseChain - Client bchain.EVMClient - RPC bchain.EVMRPCClient - MainNetChainID Network - Timeout time.Duration - Parser *EthereumParser - PushHandler func(bchain.NotificationType) - OpenRPC func(string, string) (bchain.EVMRPCClient, bchain.EVMClient, error) - Mempool *bchain.MempoolEthereumType - mempoolInitialized bool - bestHeaderLock sync.Mutex - bestHeader bchain.EVMHeader - bestHeaderTime time.Time + Client bchain.EVMClient + RPC bchain.EVMRPCClient + MainNetChainID Network + Timeout time.Duration + Parser *EthereumParser + PushHandler func(bchain.NotificationType) + OpenRPC func(string, string) (bchain.EVMRPCClient, bchain.EVMClient, error) + Mempool *bchain.MempoolEthereumType + mempoolInitialized bool + bestHeaderLock sync.Mutex + bestHeader bchain.EVMHeader + bestHeaderTime time.Time + // newBlockNotifyCh coalesces bursts of newHeads events into a single wake-up. + // This keeps the subscription reader unblocked while we refresh the canonical tip. + newBlockNotifyCh chan struct{} + newBlockNotifyOnce sync.Once NewBlock bchain.EVMNewBlockSubscriber newBlockSubscription bchain.EVMClientSubscription NewTx bchain.EVMNewTxSubscriber @@ -113,6 +117,8 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification BaseChain: &bchain.BaseChain{}, ChainConfig: &c, } + // 1-slot buffer ensures we only queue one "refresh tip" signal at a time. + s.newBlockNotifyCh = make(chan struct{}, 1) ProcessInternalTransactions = c.ProcessInternalTransactions @@ -342,16 +348,17 @@ func (b *EthereumRPC) InitializeMempool(addrDescForOutpoint bchain.AddrDescForOu } func (b *EthereumRPC) subscribeEvents() error { + b.newBlockNotifyOnce.Do(func() { + go b.newBlockNotifier() + }) // new block notifications handling go func() { for { - h, ok := b.NewBlock.Read() + _, ok := b.NewBlock.Read() if !ok { break } - b.UpdateBestHeader(h) - // notify blockbook - b.PushHandler(bchain.NotificationNewBlock) + b.signalNewBlock() } }() @@ -608,11 +615,69 @@ func (b *EthereumRPC) getBestHeader() (bchain.EVMHeader, error) { // UpdateBestHeader keeps track of the latest block header confirmed on chain func (b *EthereumRPC) UpdateBestHeader(h bchain.EVMHeader) { + if h == nil || h.Number() == nil { + return + } glog.V(2).Info("rpc: new block header ", h.Number().Uint64()) + b.setBestHeader(h) +} + +func (b *EthereumRPC) signalNewBlock() { + // Non-blocking send: one pending signal is enough to refresh the tip. + select { + case b.newBlockNotifyCh <- struct{}{}: + default: + } +} + +func (b *EthereumRPC) newBlockNotifier() { + for range b.newBlockNotifyCh { + updated, err := b.refreshBestHeaderFromChain() + if err != nil { + glog.Error("refreshBestHeaderFromChain ", err) + continue + } + if updated { + b.PushHandler(bchain.NotificationNewBlock) + } + } +} + +func (b *EthereumRPC) refreshBestHeaderFromChain() (bool, error) { + if b.Client == nil { + return false, errors.New("rpc client not initialized") + } + ctx, cancel := context.WithTimeout(context.Background(), b.Timeout) + defer cancel() + h, err := b.Client.HeaderByNumber(ctx, nil) + if err != nil { + return false, err + } + if h == nil || h.Number() == nil { + return false, errors.New("best header is nil") + } + return b.setBestHeader(h), nil +} + +func (b *EthereumRPC) setBestHeader(h bchain.EVMHeader) bool { + if h == nil || h.Number() == nil { + return false + } b.bestHeaderLock.Lock() + defer b.bestHeaderLock.Unlock() + changed := false + if b.bestHeader == nil || b.bestHeader.Number() == nil { + changed = true + } else { + prevNum := b.bestHeader.Number().Uint64() + newNum := h.Number().Uint64() + if prevNum != newNum || b.bestHeader.Hash() != h.Hash() { + changed = true + } + } b.bestHeader = h b.bestHeaderTime = time.Now() - b.bestHeaderLock.Unlock() + return changed } // GetBestBlockHash returns hash of the tip of the best-block-chain