publish new block transactions by address

This commit is contained in:
kaladinlight
2025-01-22 09:22:57 -07:00
parent 39daa172c3
commit 32232953cd
13 changed files with 173 additions and 62 deletions

View File

@@ -221,7 +221,7 @@ func (w *Worker) getTransaction(txid string, spendingTxs bool, specificJSON bool
}
return nil, NewAPIError(fmt.Sprintf("Transaction '%v' not found (%v)", txid, err), true)
}
return w.getTransactionFromBchainTx(bchainTx, height, spendingTxs, specificJSON, addresses)
return w.GetTransactionFromBchainTx(bchainTx, height, spendingTxs, specificJSON, addresses)
}
func (w *Worker) getParsedEthereumInputData(data string) *bchain.EthereumParsedInputData {
@@ -284,8 +284,8 @@ func (w *Worker) getConfirmationETA(tx *Tx) (int64, uint32) {
return etaSeconds, etaBlocks
}
// getTransactionFromBchainTx reads transaction data from txid
func (w *Worker) getTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spendingTxs bool, specificJSON bool, addresses map[string]struct{}) (*Tx, error) {
// GetTransactionFromBchainTx reads transaction data from txid
func (w *Worker) GetTransactionFromBchainTx(bchainTx *bchain.Tx, height int, spendingTxs bool, specificJSON bool, addresses map[string]struct{}) (*Tx, error) {
var err error
var ta *db.TxAddresses
var tokens []TokenTransfer

View File

@@ -95,3 +95,7 @@ func (b *BaseChain) EthereumTypeRpcCall(data, to, from string) (string, error) {
func (b *BaseChain) EthereumTypeGetRawTransaction(txid string) (string, error) {
return "", errors.New("not supported")
}
func (b *BaseChain) EthereumTypeGetTransactionReceipt(txid string) (*RpcReceipt, error) {
return nil, errors.New("not supported")
}

View File

@@ -374,6 +374,11 @@ func (c *blockChainWithMetrics) EthereumTypeGetRawTransaction(txid string) (v st
return c.b.EthereumTypeGetRawTransaction(txid)
}
func (c *blockChainWithMetrics) EthereumTypeGetTransactionReceipt(txid string) (v *bchain.RpcReceipt, err error) {
defer func(s time.Time) { c.observeRPCLatency("EthereumTypeGetTransactionReceipt", s, err) }(time.Now())
return c.b.EthereumTypeGetTransactionReceipt(txid)
}
type mempoolWithMetrics struct {
mempool bchain.Mempool
m *common.Metrics

View File

@@ -944,8 +944,7 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
return nil, errors.Annotatef(err, "txid %v", txid)
}
tx.BaseFeePerGas = ht.BaseFeePerGas
var receipt bchain.RpcReceipt
err = b.RPC.CallContext(ctx, &receipt, "eth_getTransactionReceipt", hash)
receipt, err := b.EthereumTypeGetTransactionReceipt(txid)
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
@@ -957,7 +956,7 @@ func (b *EthereumRPC) GetTransaction(txid string) (*bchain.Tx, error) {
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
btx, err = b.Parser.ethTxToTx(tx, &receipt, nil, time, confirmations, true)
btx, err = b.Parser.ethTxToTx(tx, receipt, nil, time, confirmations, true)
if err != nil {
return nil, errors.Annotatef(err, "txid %v", txid)
}
@@ -1190,6 +1189,15 @@ func (b *EthereumRPC) callRpcStringResult(rpcMethod string, args ...interface{})
return result, nil
}
// EthereumTypeGetTransactionReceipt returns the transaction receipt by the transaction ID.
func (b *EthereumRPC) EthereumTypeGetTransactionReceipt(txid string) (*bchain.RpcReceipt, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)
defer cancel()
var r *bchain.RpcReceipt
err := b.RPC.CallContext(ctx, &r, "eth_getTransactionReceipt", ethcommon.HexToHash(txid))
return r, err
}
// EthereumTypeGetBalance returns current balance of an address
func (b *EthereumRPC) EthereumTypeGetBalance(addrDesc bchain.AddressDescriptor) (*big.Int, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.Timeout)

View File

@@ -288,7 +288,7 @@ type MempoolTxidFilterEntries struct {
}
// OnNewBlockFunc is used to send notification about a new block
type OnNewBlockFunc func(hash string, height uint32)
type OnNewBlockFunc func(block *Block)
// OnNewTxAddrFunc is used to send notification about a new transaction/address
type OnNewTxAddrFunc func(tx *Tx, desc AddressDescriptor)
@@ -346,6 +346,7 @@ type BlockChain interface {
EthereumTypeGetStakingPoolsData(addrDesc AddressDescriptor) ([]StakingPoolData, error)
EthereumTypeRpcCall(data, to, from string) (string, error)
EthereumTypeGetRawTransaction(txid string) (string, error)
EthereumTypeGetTransactionReceipt(txid string) (*RpcReceipt, error)
GetTokenURI(contractDesc AddressDescriptor, tokenID *big.Int) (string, error)
}

View File

@@ -760,6 +760,7 @@ export interface WsSendTransactionReq {
export interface WsSubscribeAddressesReq {
/** List of addresses to subscribe for updates (e.g., new transactions). */
addresses: string[];
newBlockTxs?: boolean;
}
export interface WsSubscribeFiatRatesReq {
/** Fiat currency code (e.g. 'USD'). */

View File

@@ -520,11 +520,11 @@ func syncIndexLoop() {
glog.Info("syncIndexLoop starting")
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
common.TickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
if err := syncWorker.ResyncIndex(onNewBlock, false); err != nil {
glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...")
// retry once in case of random network error, after a slight delay
time.Sleep(time.Millisecond * 2500)
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
if err := syncWorker.ResyncIndex(onNewBlock, false); err != nil {
glog.Error("syncIndexLoop ", errors.ErrorStack(err))
}
}
@@ -532,14 +532,14 @@ func syncIndexLoop() {
glog.Info("syncIndexLoop stopped")
}
func onNewBlockHash(hash string, height uint32) {
func onNewBlock(block *bchain.Block) {
defer func() {
if r := recover(); r != nil {
glog.Error("onNewBlockHash recovered from panic: ", r)
}
}()
for _, c := range callbacksOnNewBlock {
c(hash, height)
c(block)
}
}

View File

@@ -243,7 +243,7 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync
return err
}
if onNewBlock != nil {
onNewBlock(res.block.Hash, res.block.Height)
onNewBlock(res.block)
}
w.metrics.BlockbookBestHeight.Set(float64(res.block.Height))
if res.block.Height > 0 && res.block.Height%1000 == 0 {
@@ -325,7 +325,7 @@ func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, low
}
if onNewBlock != nil {
onNewBlock(b.Hash, b.Height)
onNewBlock(b)
}
w.metrics.BlockbookBestHeight.Set(float64(b.Height))

View File

@@ -1004,7 +1004,7 @@ The client can subscribe to the following events:
- `subscribeNewBlock` - new block added to blockchain
- `subscribeNewTransaction` - new transaction added to blockchain (all addresses)
- `subscribeAddresses` - new transaction for a given address (list of addresses) added to mempool
- `subscribeAddresses` - new transaction for a given address (list of addresses) added to mempool (and optionally confirmed in a new block)
- `subscribeFiatRates` - new currency rate ticker
There can be always only one subscription of given event per connection, i.e. new list of addresses replaces previous list of addresses.
@@ -1035,6 +1035,19 @@ Example for subscribing to an address (or multiple addresses)
}
```
Example for subscribing to an address (or multiple addresses) including new block (confirmed) transactions
```javascript
{
"id":"1",
"method":"subscribeAddresses",
"params":{
"addresses":["mnYYiDCb2JZXnqEeXta1nkt5oCVe2RVhJj", "tb1qp0we5epypgj4acd2c4au58045ruud2pd6heuee"]
"newBlockTxs" true,
}
}
```
## Legacy API V1
The legacy API is a compatible subset of API provided by **Bitcore Insight**. It is supported only for Bitcoin-type coins. The details of the REST/socket.io requests can be found in the Insight's documentation.

View File

@@ -233,9 +233,9 @@ func (s *PublicServer) Shutdown(ctx context.Context) error {
}
// OnNewBlock notifies users subscribed to bitcoind/hashblock about new block
func (s *PublicServer) OnNewBlock(hash string, height uint32) {
s.socketio.OnNewBlockHash(hash)
s.websocket.OnNewBlock(hash, height)
func (s *PublicServer) OnNewBlock(block *bchain.Block) {
s.socketio.OnNewBlockHash(block.Hash)
s.websocket.OnNewBlock(block)
}
// OnNewFiatRatesTicker notifies users subscribed to bitcoind/fiatrates about new ticker

View File

@@ -49,6 +49,11 @@ type websocketChannel struct {
getAddressInfoDescriptors map[string]struct{}
}
type addressDetails struct {
requestID string
publishNewBlockTxs bool
}
// WebsocketServer is a handle to websocket server
type WebsocketServer struct {
upgrader *websocket.Upgrader
@@ -66,8 +71,9 @@ type WebsocketServer struct {
newTransactionEnabled bool
newTransactionSubscriptions map[*websocketChannel]string
newTransactionSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]string
addressSubscriptions map[string]map[*websocketChannel]*addressDetails
addressSubscriptionsLock sync.Mutex
newBlockTxsSubscriptionCount int
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesTokenSubscriptions map[*websocketChannel][]string
fiatRatesSubscriptionsLock sync.Mutex
@@ -103,7 +109,7 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.
newBlockSubscriptions: make(map[*websocketChannel]string),
newTransactionEnabled: is.EnableSubNewTx,
newTransactionSubscriptions: make(map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]string),
addressSubscriptions: make(map[string]map[*websocketChannel]*addressDetails),
fiatRatesSubscriptions: make(map[string]map[*websocketChannel]string),
fiatRatesTokenSubscriptions: make(map[*websocketChannel][]string),
}
@@ -426,9 +432,9 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe
return s.unsubscribeNewTransaction(c)
},
"subscribeAddresses": func(s *WebsocketServer, c *websocketChannel, req *WsReq) (rv interface{}, err error) {
ad, err := s.unmarshalAddresses(req.Params)
ad, nbtxs, err := s.unmarshalAddresses(req.Params)
if err == nil {
rv, err = s.subscribeAddresses(c, ad, req)
rv, err = s.subscribeAddresses(c, ad, nbtxs, req)
}
return
},
@@ -884,21 +890,21 @@ func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res in
return &subscriptionResponse{false}, nil
}
func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, error) {
func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, error) {
r := WsSubscribeAddressesReq{}
err := json.Unmarshal(params, &r)
if err != nil {
return nil, err
return nil, false, err
}
rv := make([]string, len(r.Addresses))
for i, a := range r.Addresses {
ad, err := s.chainParser.GetAddrDescFromAddress(a)
if err != nil {
return nil, err
return nil, false, err
}
rv[i] = string(ad)
}
return rv, nil
return rv, r.NewBlockTxs, nil
}
// doUnsubscribeAddresses addresses without addressSubscriptionsLock - can be called only from subscribeAddresses and unsubscribeAddresses
@@ -906,8 +912,11 @@ func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) {
for _, ads := range c.addrDescs {
sa, e := s.addressSubscriptions[ads]
if e {
for sc := range sa {
for sc, details := range sa {
if sc == c {
if details.publishNewBlockTxs {
s.newBlockTxsSubscriptionCount--
}
delete(sa, c)
}
}
@@ -919,7 +928,7 @@ func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) {
c.addrDescs = nil
}
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, req *WsReq) (res interface{}, err error) {
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, newBlockTxs bool, req *WsReq) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
// unsubscribe all previous subscriptions
@@ -927,10 +936,16 @@ func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []str
for _, ads := range addrDesc {
as, ok := s.addressSubscriptions[ads]
if !ok {
as = make(map[*websocketChannel]string)
as = make(map[*websocketChannel]*addressDetails)
s.addressSubscriptions[ads] = as
}
as[c] = req.ID
as[c] = &addressDetails{
requestID: req.ID,
publishNewBlockTxs: newBlockTxs,
}
if newBlockTxs {
s.newBlockTxsSubscriptionCount++
}
}
c.addrDescs = addrDesc
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions)))
@@ -1014,9 +1029,54 @@ func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) {
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
}
func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) {
for _, tx := range block.Txs {
var tokenTransfers bchain.TokenTransfers
var internalTransfers []bchain.EthereumInternalTransfer
if s.chainParser.GetChainType() == bchain.ChainEthereumType {
tokenTransfers, _ = s.chainParser.EthereumTypeGetTokenTransfersFromTx(&tx)
esd := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if esd.InternalData != nil {
internalTransfers = esd.InternalData.Transfers
}
}
vins := make([]bchain.MempoolVin, len(tx.Vin))
for i, vin := range tx.Vin {
vins[i] = bchain.MempoolVin{Vin: vin}
}
subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers)
if len(subscribed) > 0 {
go func(tx bchain.Tx, subscribed map[string]struct{}) {
if csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData); ok {
receipt, err := s.chain.EthereumTypeGetTransactionReceipt(tx.Txid)
if err != nil {
glog.Error("EthereumTypeGetTransactionReceipt error ", err, " for ", tx.Txid)
return
}
csd.Receipt = receipt
tx.CoinSpecificData = csd
}
atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil)
if err != nil {
glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
return
}
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx, true)
}
}(tx, subscribed)
}
}
}
// OnNewBlock is a callback that broadcasts info about new block to subscribed clients
func (s *WebsocketServer) OnNewBlock(hash string, height uint32) {
go s.onNewBlockAsync(hash, height)
func (s *WebsocketServer) OnNewBlock(block *bchain.Block) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
go s.onNewBlockAsync(block.Hash, block.Height)
if s.newBlockTxsSubscriptionCount > 0 {
go s.publishNewBlockTxsByAddr(block)
}
}
func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) {
@@ -1031,7 +1091,7 @@ func (s *WebsocketServer) sendOnNewTx(tx *api.Tx) {
glog.Info("broadcasting new tx ", tx.Txid, " to ", len(s.newTransactionSubscriptions), " channels")
}
func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx) {
func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *api.Tx, newBlockTx bool) {
addrDesc := bchain.AddressDescriptor(stringAddressDescriptor)
addr, _, err := s.chainParser.GetAddressesFromAddrDesc(addrDesc)
if err != nil {
@@ -1050,9 +1110,12 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[stringAddressDescriptor]
if ok {
for c, id := range as {
for c, details := range as {
if newBlockTx && !details.publishNewBlockTxs {
continue
}
c.DataOut(&WsRes{
ID: id,
ID: details.requestID,
Data: &data,
})
}
@@ -1061,48 +1124,51 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap
}
}
func (s *WebsocketServer) getNewTxSubscriptions(tx *bchain.MempoolTx) map[string]struct{} {
// check if there is any subscription in inputs, outputs and token transfers
func (s *WebsocketServer) getNewTxSubscriptions(vins []bchain.MempoolVin, vouts []bchain.Vout, tokenTransfers bchain.TokenTransfers, internalTransfers []bchain.EthereumInternalTransfer) map[string]struct{} {
// check if there is any subscription in inputs, outputs and transfers
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
subscribed := make(map[string]struct{})
for i := range tx.Vin {
sad := string(tx.Vin[i].AddrDesc)
if len(sad) > 0 {
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
processAddress := func(address string) {
if addrDesc, err := s.chainParser.GetAddrDescFromAddress(address); err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
}
for i := range tx.Vout {
addrDesc, err := s.chainParser.GetAddrDescFromVout(&tx.Vout[i])
if err == nil && len(addrDesc) > 0 {
processVout := func(vout bchain.Vout) {
if addrDesc, err := s.chainParser.GetAddrDescFromVout(&vout); err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
}
for i := range tx.TokenTransfers {
addrDesc, err := s.chainParser.GetAddrDescFromAddress(tx.TokenTransfers[i].From)
if err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
for i := range vins {
if sad := string(vins[i].AddrDesc); len(sad) > 0 {
if as, ok := s.addressSubscriptions[sad]; ok && len(as) > 0 {
subscribed[sad] = struct{}{}
}
}
addrDesc, err = s.chainParser.GetAddrDescFromAddress(tx.TokenTransfers[i].To)
if err == nil && len(addrDesc) > 0 {
sad := string(addrDesc)
as, ok := s.addressSubscriptions[sad]
if ok && len(as) > 0 {
subscribed[sad] = struct{}{}
} else if s.chainParser.GetChainType() == bchain.ChainBitcoinType {
processVout(vouts[vins[i].Vout])
} else if s.chainParser.GetChainType() == bchain.ChainEthereumType {
if len(vins[i].Addresses) > 0 {
processAddress(vins[i].Addresses[0])
}
}
}
for i := range vouts {
processVout(vouts[i])
}
for i := range tokenTransfers {
processAddress(tokenTransfers[i].From)
processAddress(tokenTransfers[i].To)
}
for i := range internalTransfers {
processAddress(internalTransfers[i].From)
processAddress(internalTransfers[i].To)
}
return subscribed
}
@@ -1114,13 +1180,13 @@ func (s *WebsocketServer) onNewTxAsync(tx *bchain.MempoolTx, subscribed map[stri
}
s.sendOnNewTx(atx)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx)
s.sendOnNewTxAddr(stringAddressDescriptor, atx, false)
}
}
// OnNewTx is a callback that broadcasts info about a tx affecting subscribed address
func (s *WebsocketServer) OnNewTx(tx *bchain.MempoolTx) {
subscribed := s.getNewTxSubscriptions(tx)
subscribed := s.getNewTxSubscriptions(tx.Vin, tx.Vout, tx.TokenTransfers, nil)
if len(s.newTransactionSubscriptions) > 0 || len(subscribed) > 0 {
go s.onNewTxAsync(tx, subscribed)
}

View File

@@ -147,7 +147,8 @@ type WsSendTransactionReq struct {
// WsSubscribeAddressesReq is used to subscribe to updates on a list of addresses.
type WsSubscribeAddressesReq struct {
Addresses []string `json:"addresses" ts_doc:"List of addresses to subscribe for updates (e.g., new transactions)."`
Addresses []string `json:"addresses" ts_doc:"List of addresses to subscribe for updates (e.g., new transactions)."`
NewBlockTxs bool `json:"newBlockTxs,omitempty"`
}
// WsSubscribeFiatRatesReq subscribes to updates of fiat rates for a specific currency or set of tokens.

View File

@@ -385,8 +385,10 @@
function subscribeAddresses() {
const method = 'subscribeAddresses';
var addresses = paramAsArray('subscribeAddressesName');
var newBlockTxs = document.getElementById('newBlockTxs').checked
const params = {
addresses,
newBlockTxs,
};
if (subscribeAddressesId) {
delete subscriptions[subscribeAddressesId];
@@ -1264,6 +1266,16 @@
id="subscribeAddressesName"
value="0xba98d6a5ac827632e3457de7512d211e4ff7e8bd,0x73d0385f4d8e00c5e6504c6030f47bf6212736a8"
/>
<div class="form-check">
<input
type="checkbox"
class="form-check-input"
id="newBlockTxs"
/>
<label class="form-check-label" for="newBlockTxs">
New Block Txs
</label>
</div>
</div>
<div class="col">
<span id="subscribeAddressesIds"></span>