mirror of
https://github.com/trezor/blockbook.git
synced 2026-03-06 15:48:01 +01:00
EthereumType: Fetch mempool transactions from alternative provider
This commit is contained in:
@@ -60,29 +60,32 @@ type Configuration struct {
|
||||
// EthereumRPC is an interface to JSON-RPC eth service.
|
||||
type EthereumRPC struct {
|
||||
*bchain.BaseChain
|
||||
Client bchain.EVMClient
|
||||
RPC bchain.EVMRPCClient
|
||||
MainNetChainID Network
|
||||
Timeout time.Duration
|
||||
Parser *EthereumParser
|
||||
PushHandler func(bchain.NotificationType)
|
||||
OpenRPC func(string) (bchain.EVMRPCClient, bchain.EVMClient, error)
|
||||
Mempool *bchain.MempoolEthereumType
|
||||
mempoolInitialized bool
|
||||
bestHeaderLock sync.Mutex
|
||||
bestHeader bchain.EVMHeader
|
||||
bestHeaderTime time.Time
|
||||
NewBlock bchain.EVMNewBlockSubscriber
|
||||
newBlockSubscription bchain.EVMClientSubscription
|
||||
NewTx bchain.EVMNewTxSubscriber
|
||||
newTxSubscription bchain.EVMClientSubscription
|
||||
ChainConfig *Configuration
|
||||
supportedStakingPools []string
|
||||
stakingPoolNames []string
|
||||
stakingPoolContracts []string
|
||||
alternativeFeeProvider alternativeFeeProviderInterface
|
||||
alternativeSendTxURLs []string
|
||||
alternativeSendTxOnly bool
|
||||
Client bchain.EVMClient
|
||||
RPC bchain.EVMRPCClient
|
||||
MainNetChainID Network
|
||||
Timeout time.Duration
|
||||
Parser *EthereumParser
|
||||
PushHandler func(bchain.NotificationType)
|
||||
OpenRPC func(string) (bchain.EVMRPCClient, bchain.EVMClient, error)
|
||||
Mempool *bchain.MempoolEthereumType
|
||||
mempoolInitialized bool
|
||||
bestHeaderLock sync.Mutex
|
||||
bestHeader bchain.EVMHeader
|
||||
bestHeaderTime time.Time
|
||||
NewBlock bchain.EVMNewBlockSubscriber
|
||||
newBlockSubscription bchain.EVMClientSubscription
|
||||
NewTx bchain.EVMNewTxSubscriber
|
||||
newTxSubscription bchain.EVMClientSubscription
|
||||
ChainConfig *Configuration
|
||||
supportedStakingPools []string
|
||||
stakingPoolNames []string
|
||||
stakingPoolContracts []string
|
||||
alternativeFeeProvider alternativeFeeProviderInterface
|
||||
alternativeSendTxURLs []string
|
||||
alternativeSendTxOnly bool
|
||||
alternativeFetchMempoolTx bool
|
||||
alternativeMempoolTxs map[string]*bchain.RpcTransaction
|
||||
alternativeMempoolTxsMux sync.Mutex
|
||||
}
|
||||
|
||||
// ProcessInternalTransactions specifies if internal transactions are processed
|
||||
@@ -136,8 +139,13 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification
|
||||
}
|
||||
s.alternativeSendTxURLs = strings.Split(os.Getenv(strings.ToUpper(network)+"_ALTERNATIVE_SENDTX_URLS"), ",")
|
||||
s.alternativeSendTxOnly = strings.ToUpper(os.Getenv(strings.ToUpper(network)+"_ALTERNATIVE_SENDTX_ONLY")) == "TRUE"
|
||||
s.alternativeFetchMempoolTx = strings.ToUpper(os.Getenv(strings.ToUpper(network)+"_ALTERNATIVE_FETCH_MEMPOOL_TX")) == "TRUE"
|
||||
if len(s.alternativeSendTxURLs) > 0 {
|
||||
glog.Infof("Using alternative send transaction providers %v. Use only alternative providers %v", s.alternativeSendTxURLs, s.alternativeSendTxOnly)
|
||||
glog.Infof("Using alternative send transaction providers %v. Only alternative providers %v", s.alternativeSendTxURLs, s.alternativeSendTxOnly)
|
||||
}
|
||||
if s.alternativeFetchMempoolTx {
|
||||
s.alternativeMempoolTxs = make(map[string]*bchain.RpcTransaction)
|
||||
glog.Infof("Alternative fetch mempool tx %v", s.alternativeFetchMempoolTx)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
@@ -830,9 +838,7 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error
|
||||
return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash)
|
||||
}
|
||||
btxs[i] = *btx
|
||||
if b.mempoolInitialized {
|
||||
b.Mempool.RemoveTransactionFromMempool(tx.Hash)
|
||||
}
|
||||
b.removeTransactionFromMempool(tx.Hash)
|
||||
}
|
||||
bbk := bchain.Block{
|
||||
BlockHeader: *bbh,
|
||||
@@ -874,20 +880,41 @@ func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error)
|
||||
return b.GetTransaction(txid)
|
||||
}
|
||||
|
||||
func (b *EthereumRPC) removeTransactionFromMempool(txid string) {
|
||||
// remove tx from mempool
|
||||
if b.mempoolInitialized {
|
||||
b.Mempool.RemoveTransactionFromMempool(txid)
|
||||
}
|
||||
// remove tx from mempool txs fetched by alternative method
|
||||
if b.alternativeFetchMempoolTx {
|
||||
b.alternativeMempoolTxsMux.Lock()
|
||||
delete(b.alternativeMempoolTxs, txid)
|
||||
b.alternativeMempoolTxsMux.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// GetTransaction returns a transaction by the transaction ID.
|
||||
func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)
|
||||
defer cancel()
|
||||
tx := &bchain.RpcTransaction{}
|
||||
var tx *bchain.RpcTransaction
|
||||
var txFound bool
|
||||
var err error
|
||||
hash := ethcommon.HexToHash(txid)
|
||||
err := b.RPC.CallContext(ctx, tx, "eth_getTransactionByHash", hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if b.alternativeFetchMempoolTx {
|
||||
b.alternativeMempoolTxsMux.Lock()
|
||||
tx, txFound = b.alternativeMempoolTxs[txid]
|
||||
b.alternativeMempoolTxsMux.Unlock()
|
||||
}
|
||||
if !txFound {
|
||||
tx = &bchain.RpcTransaction{}
|
||||
err = b.RPC.CallContext(ctx, tx, "eth_getTransactionByHash", hash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if *tx == (bchain.RpcTransaction{}) {
|
||||
if b.mempoolInitialized {
|
||||
b.Mempool.RemoveTransactionFromMempool(txid)
|
||||
}
|
||||
b.removeTransactionFromMempool(txid)
|
||||
return nil, bchain.ErrTxNotFound
|
||||
}
|
||||
var btx *bchain.Tx
|
||||
@@ -932,10 +959,7 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
|
||||
if err != nil {
|
||||
return nil, errors.Annotatef(err, "txid %v", txid)
|
||||
}
|
||||
// remove tx from mempool if it is there
|
||||
if b.mempoolInitialized {
|
||||
b.Mempool.RemoveTransactionFromMempool(txid)
|
||||
}
|
||||
b.removeTransactionFromMempool(txid)
|
||||
}
|
||||
return btx, nil
|
||||
}
|
||||
@@ -1106,24 +1130,44 @@ func (b *EthereumRPC) EthereumTypeGetEip1559Fees() (*bchain.Eip1559Fees, error)
|
||||
|
||||
// SendRawTransaction sends raw transaction
|
||||
func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) {
|
||||
var txid string
|
||||
var retErr error
|
||||
if len(b.alternativeSendTxURLs) > 0 {
|
||||
var retVal string
|
||||
var retErr error
|
||||
for i := range b.alternativeSendTxURLs {
|
||||
glog.Info("eth_sendRawTransaction to ", b.alternativeSendTxURLs[i])
|
||||
r, err := b.callHttpStringResult(b.alternativeSendTxURLs[i], "eth_sendRawTransaction", hex)
|
||||
// set success return value; or error only if there was no previous success
|
||||
if err == nil || len(retVal) == 0 {
|
||||
retVal = r
|
||||
if err == nil || len(txid) == 0 {
|
||||
txid = r
|
||||
retErr = err
|
||||
}
|
||||
}
|
||||
if b.alternativeSendTxOnly {
|
||||
return retVal, retErr
|
||||
if b.alternativeSendTxOnly && b.alternativeFetchMempoolTx {
|
||||
hash := ethcommon.HexToHash(txid)
|
||||
raw, err := b.callHttpRawResult(b.alternativeSendTxURLs[0], "eth_getTransactionByHash", hash)
|
||||
if err != nil || raw == nil {
|
||||
glog.Errorf("eth_getTransactionByHash from %s returned error %v", b.alternativeSendTxURLs[0], err)
|
||||
} else {
|
||||
var tx bchain.RpcTransaction
|
||||
if err := json.Unmarshal(raw, &tx); err != nil {
|
||||
glog.Errorf("eth_getTransactionByHash from %s unmarshal returned error %v", b.alternativeSendTxURLs[0], err)
|
||||
}
|
||||
b.alternativeMempoolTxsMux.Lock()
|
||||
b.alternativeMempoolTxs[txid] = &tx
|
||||
b.alternativeMempoolTxsMux.Unlock()
|
||||
b.Mempool.AddTransactionToMempool(txid)
|
||||
}
|
||||
return txid, retErr
|
||||
}
|
||||
}
|
||||
glog.Info("eth_sendRawTransaction default")
|
||||
return b.callRpcStringResult("eth_sendRawTransaction", hex)
|
||||
txid, retErr = b.callRpcStringResult("eth_sendRawTransaction", hex)
|
||||
if b.ChainConfig.DisableMempoolSync {
|
||||
// add transactions submitted by us to mempool if sync is disabled
|
||||
b.Mempool.AddTransactionToMempool(txid)
|
||||
}
|
||||
return txid, retErr
|
||||
|
||||
}
|
||||
|
||||
// EthereumTypeGetRawTransaction gets raw transaction in hex format
|
||||
@@ -1131,21 +1175,30 @@ func (b *EthereumRPC) EthereumTypeGetRawTransaction(txid string) (string, error)
|
||||
return b.callRpcStringResult("eth_getRawTransactionByHash", txid)
|
||||
}
|
||||
|
||||
// Helper function for calling ETH RPC over http with parameters and getting string result. Creates and closes a new client for every call.
|
||||
func (b *EthereumRPC) callHttpStringResult(url string, rpcMethod string, args ...interface{}) (string, error) {
|
||||
// Helper function for calling ETH RPC over http with parameters. Creates and closes a new client for every call.
|
||||
func (b *EthereumRPC) callHttpRawResult(url string, rpcMethod string, args ...interface{}) (json.RawMessage, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)
|
||||
defer cancel()
|
||||
client, err := rpc.DialContext(ctx, url)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
var raw json.RawMessage
|
||||
err = client.CallContext(ctx, &raw, rpcMethod, args...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
} else if len(raw) == 0 {
|
||||
return "", errors.New(url + " " + rpcMethod + " : failed")
|
||||
return nil, errors.New(url + " " + rpcMethod + " : failed")
|
||||
}
|
||||
return raw, nil
|
||||
}
|
||||
|
||||
// Helper function for calling ETH RPC over http with parameters and getting string result. Creates and closes a new client for every call.
|
||||
func (b *EthereumRPC) callHttpStringResult(url string, rpcMethod string, args ...interface{}) (string, error) {
|
||||
raw, err := b.callHttpRawResult(url, rpcMethod, args...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var result string
|
||||
if err := json.Unmarshal(raw, &result); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user