diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index 5c6f50ec..4b7dbbcb 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -60,29 +60,32 @@ type Configuration struct { // EthereumRPC is an interface to JSON-RPC eth service. type EthereumRPC struct { *bchain.BaseChain - Client bchain.EVMClient - RPC bchain.EVMRPCClient - MainNetChainID Network - Timeout time.Duration - Parser *EthereumParser - PushHandler func(bchain.NotificationType) - OpenRPC func(string) (bchain.EVMRPCClient, bchain.EVMClient, error) - Mempool *bchain.MempoolEthereumType - mempoolInitialized bool - bestHeaderLock sync.Mutex - bestHeader bchain.EVMHeader - bestHeaderTime time.Time - NewBlock bchain.EVMNewBlockSubscriber - newBlockSubscription bchain.EVMClientSubscription - NewTx bchain.EVMNewTxSubscriber - newTxSubscription bchain.EVMClientSubscription - ChainConfig *Configuration - supportedStakingPools []string - stakingPoolNames []string - stakingPoolContracts []string - alternativeFeeProvider alternativeFeeProviderInterface - alternativeSendTxURLs []string - alternativeSendTxOnly bool + Client bchain.EVMClient + RPC bchain.EVMRPCClient + MainNetChainID Network + Timeout time.Duration + Parser *EthereumParser + PushHandler func(bchain.NotificationType) + OpenRPC func(string) (bchain.EVMRPCClient, bchain.EVMClient, error) + Mempool *bchain.MempoolEthereumType + mempoolInitialized bool + bestHeaderLock sync.Mutex + bestHeader bchain.EVMHeader + bestHeaderTime time.Time + NewBlock bchain.EVMNewBlockSubscriber + newBlockSubscription bchain.EVMClientSubscription + NewTx bchain.EVMNewTxSubscriber + newTxSubscription bchain.EVMClientSubscription + ChainConfig *Configuration + supportedStakingPools []string + stakingPoolNames []string + stakingPoolContracts []string + alternativeFeeProvider alternativeFeeProviderInterface + alternativeSendTxURLs []string + alternativeSendTxOnly bool + alternativeFetchMempoolTx bool + alternativeMempoolTxs map[string]*bchain.RpcTransaction + alternativeMempoolTxsMux sync.Mutex } // ProcessInternalTransactions specifies if internal transactions are processed @@ -136,8 +139,13 @@ func NewEthereumRPC(config json.RawMessage, pushHandler func(bchain.Notification } s.alternativeSendTxURLs = strings.Split(os.Getenv(strings.ToUpper(network)+"_ALTERNATIVE_SENDTX_URLS"), ",") s.alternativeSendTxOnly = strings.ToUpper(os.Getenv(strings.ToUpper(network)+"_ALTERNATIVE_SENDTX_ONLY")) == "TRUE" + s.alternativeFetchMempoolTx = strings.ToUpper(os.Getenv(strings.ToUpper(network)+"_ALTERNATIVE_FETCH_MEMPOOL_TX")) == "TRUE" if len(s.alternativeSendTxURLs) > 0 { - glog.Infof("Using alternative send transaction providers %v. Use only alternative providers %v", s.alternativeSendTxURLs, s.alternativeSendTxOnly) + glog.Infof("Using alternative send transaction providers %v. Only alternative providers %v", s.alternativeSendTxURLs, s.alternativeSendTxOnly) + } + if s.alternativeFetchMempoolTx { + s.alternativeMempoolTxs = make(map[string]*bchain.RpcTransaction) + glog.Infof("Alternative fetch mempool tx %v", s.alternativeFetchMempoolTx) } return s, nil @@ -830,9 +838,7 @@ func (b *EthereumRPC) GetBlock(hash string, height uint32) (*bchain.Block, error return nil, errors.Annotatef(err, "hash %v, height %v, txid %v", hash, height, tx.Hash) } btxs[i] = *btx - if b.mempoolInitialized { - b.Mempool.RemoveTransactionFromMempool(tx.Hash) - } + b.removeTransactionFromMempool(tx.Hash) } bbk := bchain.Block{ BlockHeader: *bbh, @@ -874,20 +880,41 @@ func (b *EthereumRPC) GetTransactionForMempool(txid string) (*bchain.Tx, error) return b.GetTransaction(txid) } +func (b *EthereumRPC) removeTransactionFromMempool(txid string) { + // remove tx from mempool + if b.mempoolInitialized { + b.Mempool.RemoveTransactionFromMempool(txid) + } + // remove tx from mempool txs fetched by alternative method + if b.alternativeFetchMempoolTx { + b.alternativeMempoolTxsMux.Lock() + delete(b.alternativeMempoolTxs, txid) + b.alternativeMempoolTxsMux.Unlock() + } +} + // GetTransaction returns a transaction by the transaction ID. func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { ctx, cancel := context.WithTimeout(context.Background(), b.Timeout) defer cancel() - tx := &bchain.RpcTransaction{} + var tx *bchain.RpcTransaction + var txFound bool + var err error hash := ethcommon.HexToHash(txid) - err := b.RPC.CallContext(ctx, tx, "eth_getTransactionByHash", hash) - if err != nil { - return nil, err + if b.alternativeFetchMempoolTx { + b.alternativeMempoolTxsMux.Lock() + tx, txFound = b.alternativeMempoolTxs[txid] + b.alternativeMempoolTxsMux.Unlock() + } + if !txFound { + tx = &bchain.RpcTransaction{} + err = b.RPC.CallContext(ctx, tx, "eth_getTransactionByHash", hash) + if err != nil { + return nil, err + } } if *tx == (bchain.RpcTransaction{}) { - if b.mempoolInitialized { - b.Mempool.RemoveTransactionFromMempool(txid) - } + b.removeTransactionFromMempool(txid) return nil, bchain.ErrTxNotFound } var btx *bchain.Tx @@ -932,10 +959,7 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) { if err != nil { return nil, errors.Annotatef(err, "txid %v", txid) } - // remove tx from mempool if it is there - if b.mempoolInitialized { - b.Mempool.RemoveTransactionFromMempool(txid) - } + b.removeTransactionFromMempool(txid) } return btx, nil } @@ -1106,24 +1130,44 @@ func (b *EthereumRPC) EthereumTypeGetEip1559Fees() (*bchain.Eip1559Fees, error) // SendRawTransaction sends raw transaction func (b *EthereumRPC) SendRawTransaction(hex string) (string, error) { + var txid string + var retErr error if len(b.alternativeSendTxURLs) > 0 { - var retVal string - var retErr error for i := range b.alternativeSendTxURLs { glog.Info("eth_sendRawTransaction to ", b.alternativeSendTxURLs[i]) r, err := b.callHttpStringResult(b.alternativeSendTxURLs[i], "eth_sendRawTransaction", hex) // set success return value; or error only if there was no previous success - if err == nil || len(retVal) == 0 { - retVal = r + if err == nil || len(txid) == 0 { + txid = r retErr = err } } - if b.alternativeSendTxOnly { - return retVal, retErr + if b.alternativeSendTxOnly && b.alternativeFetchMempoolTx { + hash := ethcommon.HexToHash(txid) + raw, err := b.callHttpRawResult(b.alternativeSendTxURLs[0], "eth_getTransactionByHash", hash) + if err != nil || raw == nil { + glog.Errorf("eth_getTransactionByHash from %s returned error %v", b.alternativeSendTxURLs[0], err) + } else { + var tx bchain.RpcTransaction + if err := json.Unmarshal(raw, &tx); err != nil { + glog.Errorf("eth_getTransactionByHash from %s unmarshal returned error %v", b.alternativeSendTxURLs[0], err) + } + b.alternativeMempoolTxsMux.Lock() + b.alternativeMempoolTxs[txid] = &tx + b.alternativeMempoolTxsMux.Unlock() + b.Mempool.AddTransactionToMempool(txid) + } + return txid, retErr } } glog.Info("eth_sendRawTransaction default") - return b.callRpcStringResult("eth_sendRawTransaction", hex) + txid, retErr = b.callRpcStringResult("eth_sendRawTransaction", hex) + if b.ChainConfig.DisableMempoolSync { + // add transactions submitted by us to mempool if sync is disabled + b.Mempool.AddTransactionToMempool(txid) + } + return txid, retErr + } // EthereumTypeGetRawTransaction gets raw transaction in hex format @@ -1131,21 +1175,30 @@ func (b *EthereumRPC) EthereumTypeGetRawTransaction(txid string) (string, error) return b.callRpcStringResult("eth_getRawTransactionByHash", txid) } -// Helper function for calling ETH RPC over http with parameters and getting string result. Creates and closes a new client for every call. -func (b *EthereumRPC) callHttpStringResult(url string, rpcMethod string, args ...interface{}) (string, error) { +// Helper function for calling ETH RPC over http with parameters. Creates and closes a new client for every call. +func (b *EthereumRPC) callHttpRawResult(url string, rpcMethod string, args ...interface{}) (json.RawMessage, error) { ctx, cancel := context.WithTimeout(context.Background(), b.Timeout) defer cancel() client, err := rpc.DialContext(ctx, url) if err != nil { - return "", err + return nil, err } defer client.Close() var raw json.RawMessage err = client.CallContext(ctx, &raw, rpcMethod, args...) if err != nil { - return "", err + return nil, err } else if len(raw) == 0 { - return "", errors.New(url + " " + rpcMethod + " : failed") + return nil, errors.New(url + " " + rpcMethod + " : failed") + } + return raw, nil +} + +// Helper function for calling ETH RPC over http with parameters and getting string result. Creates and closes a new client for every call. +func (b *EthereumRPC) callHttpStringResult(url string, rpcMethod string, args ...interface{}) (string, error) { + raw, err := b.callHttpRawResult(url, rpcMethod, args...) + if err != nil { + return "", err } var result string if err := json.Unmarshal(raw, &result); err != nil {