ws mempool sync

This commit is contained in:
pragmaxim
2026-02-12 09:59:48 +01:00
parent f37e1e6706
commit 1381913fd8
2 changed files with 84 additions and 6 deletions

View File

@@ -59,6 +59,7 @@ type Configuration struct {
AddressAliases bool `json:"address_aliases,omitempty"`
MempoolTxTimeoutHours int `json:"mempoolTxTimeoutHours"`
QueryBackendOnMempoolResync bool `json:"queryBackendOnMempoolResync"`
MempoolSyncOverWS *bool `json:"mempool_sync_over_ws,omitempty"`
ProcessInternalTransactions bool `json:"processInternalTransactions"`
ProcessZeroInternalTransactions bool `json:"processZeroInternalTransactions"`
ConsensusNodeVersionURL string `json:"consensusNodeVersion"`
@@ -138,6 +139,10 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
if c.AddressContractsCacheMaxBytes <= 0 {
c.AddressContractsCacheMaxBytes = defaultAddressContractsCacheMaxBytes
}
if c.MempoolSyncOverWS == nil {
defaultMempoolSyncOverWS := true
c.MempoolSyncOverWS = &defaultMempoolSyncOverWS
}
s := &EthereumRPC{
BaseChain: &bchain.BaseChain{},
@@ -242,6 +247,30 @@ func dialRPC(rawURL string) (*rpc.Client, error) {
return rpc.DialOptions(context.Background(), rawURL, opts...)
}
func (b *EthereumRPC) shouldUseMempoolSyncOverWS() bool {
if b.ChainConfig == nil {
return true
}
if b.ChainConfig.MempoolSyncOverWS == nil {
return true
}
return *b.ChainConfig.MempoolSyncOverWS
}
func (b *EthereumRPC) wsCallClient() *rpc.Client {
if b.RPC == nil {
return nil
}
switch c := b.RPC.(type) {
case *DualRPCClient:
return c.SubClient
case *EthereumRPCClient:
return c.Client
default:
return nil
}
}
// OpenRPC opens RPC connection to ETH backend.
var OpenRPC = func(httpURL, wsURL string) (bchain.EVMRPCClient, bchain.EVMClient, error) {
callURL, subURL, err := NormalizeRPCURLs(httpURL, wsURL)
@@ -795,19 +824,21 @@ func (b *EthereumRPC) computeConfirmations(n uint64) (uint32, error) {
return uint32(bn - n + 1), nil
}
func (b *EthereumRPC) getBlockRaw(hash string, height uint32, fullTxs bool) (json.RawMessage, error) {
type callContextFunc func(ctx context.Context, result interface{}, method string, args ...interface{}) error
func (b *EthereumRPC) getBlockRawWithCall(call callContextFunc, hash string, height uint32, fullTxs bool) (json.RawMessage, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)
defer cancel()
var raw json.RawMessage
var err error
if hash != "" {
if hash == "pending" {
err = b.RPC.CallContext(ctx, &raw, "eth_getBlockByNumber", hash, fullTxs)
err = call(ctx, &raw, "eth_getBlockByNumber", hash, fullTxs)
} else {
err = b.RPC.CallContext(ctx, &raw, "eth_getBlockByHash", ethcommon.HexToHash(hash), fullTxs)
err = call(ctx, &raw, "eth_getBlockByHash", ethcommon.HexToHash(hash), fullTxs)
}
} else {
err = b.RPC.CallContext(ctx, &raw, "eth_getBlockByNumber", fmt.Sprintf("%#x", height), fullTxs)
err = call(ctx, &raw, "eth_getBlockByNumber", fmt.Sprintf("%#x", height), fullTxs)
}
if err != nil {
return nil, errors.Annotatef(err, "hash %v, height %v", hash, height)
@@ -817,6 +848,10 @@ func (b *EthereumRPC) getBlockRaw(hash string, height uint32, fullTxs bool) (jso
return raw, nil
}
func (b *EthereumRPC) getBlockRaw(hash string, height uint32, fullTxs bool) (json.RawMessage, error) {
return b.getBlockRawWithCall(b.RPC.CallContext, hash, height, fullTxs)
}
func (b *EthereumRPC) processEventsForBlock(blockNumber string) (map[string][]*bchain.RpcLog, []bchain.AddressAliasRecord, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)
defer cancel()
@@ -1105,7 +1140,38 @@ func (b *EthereumRPC) GetBlockInfo(hash string) (*bchain.BlockInfo, error) {
// GetTransactionForMempool returns a transaction by the transaction ID.
// It could be optimized for mempool, i.e. without block time and confirmations
func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) {
return b.GetTransaction(txid)
call := b.RPC.CallContext
if b.shouldUseMempoolSyncOverWS() {
if wsClient := b.wsCallClient(); wsClient != nil {
call = wsClient.CallContext
}
}
var tx *bchain.RpcTransaction
var txFound bool
hash := ethcommon.HexToHash(txid)
if b.alternativeSendTxProvider != nil {
tx, txFound = b.alternativeSendTxProvider.GetTransaction(txid)
}
if !txFound || tx == nil {
tx = &bchain.RpcTransaction{}
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)
defer cancel()
if err := call(ctx, tx, "eth_getTransactionByHash", hash); err != nil {
return nil, err
}
}
if *tx == (bchain.RpcTransaction{}) {
b.removeTransactionFromMempool(txid)
return nil, bchain.ErrTxNotFound
}
if tx.BlockNumber != "" {
return nil, bchain.ErrTxNotFound
}
btx, err := b.Parser.ethTxToTx(tx, nil, nil, 0, 0, true)
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
return btx, nil
}
func (b *EthereumRPC) removeTransactionFromMempool(txid string) {
@@ -1207,7 +1273,17 @@ func (b *EthereumRPC) GetTransactionSpecific(tx *bchain.Tx) (json.RawMessage, er
// GetMempoolTransactions returns transactions in mempool
func (b *EthereumRPC) GetMempoolTransactions() ([]string, error) {
raw, err := b.getBlockRaw("pending", 0, false)
var raw json.RawMessage
var err error
if b.shouldUseMempoolSyncOverWS() {
if wsClient := b.wsCallClient(); wsClient != nil {
raw, err = b.getBlockRawWithCall(wsClient.CallContext, "pending", 0, false)
} else {
raw, err = b.getBlockRaw("pending", 0, false)
}
} else {
raw, err = b.getBlockRaw("pending", 0, false)
}
if err != nil {
return nil, err
}

View File

@@ -102,6 +102,8 @@ Good examples of coin configuration are
* Address-contracts cache configuration (Blockbook, Ethereum-type indexing):
* `address_contracts_cache_min_size` Minimum packed size (bytes) before an addressContracts entry is cached (default **300000**).
* `address_contracts_cache_max_bytes` Cache size cap in bytes; when exceeded, cached entries are flushed early (default **4000000000**).
* Mempool sync configuration (Blockbook, Ethereum-type indexing):
* `mempool_sync_over_ws` Use the WebSocket RPC connection for mempool sync lookups (pending block fetch + tx lookups). Default **true**.
* `meta` Common package metadata.
* `package_maintainer` Full name of package maintainer.