diff --git a/server/websocket.go b/server/websocket.go index d347baaa..cbad4126 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -50,7 +50,9 @@ type websocketChannel struct { } type addressDetails struct { - requestID string + requestID string + // publishNewBlockTxs enables notifications for confirmed transactions + // detected while processing newly connected blocks. publishNewBlockTxs bool } @@ -73,11 +75,13 @@ type WebsocketServer struct { newTransactionSubscriptionsLock sync.Mutex addressSubscriptions map[string]map[*websocketChannel]*addressDetails addressSubscriptionsLock sync.Mutex - newBlockTxsSubscriptionCount int - fiatRatesSubscriptions map[string]map[*websocketChannel]string - fiatRatesTokenSubscriptions map[*websocketChannel][]string - fiatRatesSubscriptionsLock sync.Mutex - allowedRpcCallTo map[string]struct{} + // newBlockTxsSubscriptionCount is a fast-path guard for OnNewBlock. + // It tracks how many address subscriptions requested newBlockTxs=true. + newBlockTxsSubscriptionCount int + fiatRatesSubscriptions map[string]map[*websocketChannel]string + fiatRatesTokenSubscriptions map[*websocketChannel][]string + fiatRatesSubscriptionsLock sync.Mutex + allowedRpcCallTo map[string]struct{} } // NewWebsocketServer creates new websocket interface to blockbook and returns its handle @@ -907,7 +911,8 @@ func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, err return rv, r.NewBlockTxs, nil } -// doUnsubscribeAddresses addresses without addressSubscriptionsLock - can be called only from subscribeAddresses and unsubscribeAddresses +// doUnsubscribeAddresses removes all address subscriptions for a channel. +// addressSubscriptionsLock must be held by the caller. func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) { for _, ads := range c.addrDescs { sa, e := s.addressSubscriptions[ads] @@ -928,6 +933,9 @@ func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) { c.addrDescs = nil } +// subscribeAddresses replaces previous address subscriptions for the channel. +// If newBlockTxs is enabled, the channel receives both mempool notifications and +// confirmed notifications detected from newly connected blocks. func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, newBlockTxs bool, req *WsReq) (res interface{}, err error) { s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() @@ -1029,6 +1037,9 @@ func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) { glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels") } +// setConfirmedBlockTxMetadata normalizes parsed block transactions. +// ParseBlock can return txs with zero confirmations; we force first-confirmed +// metadata so conversion does not take mempool-only branches. func setConfirmedBlockTxMetadata(tx *bchain.Tx, blockTime int64) { if tx.Confirmations == 0 { tx.Confirmations = 1 @@ -1037,6 +1048,8 @@ func setConfirmedBlockTxMetadata(tx *bchain.Tx, blockTime int64) { } } +// getEthereumInternalTransfers safely extracts internal transfers from +// CoinSpecificData when present. func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransfer { esd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData) if !ok || esd.InternalData == nil { @@ -1045,6 +1058,8 @@ func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransf return esd.InternalData.Transfers } +// setEthereumReceiptIfAvailable adds receipt data to Ethereum txs on a +// best-effort basis; failures are logged and notifications continue. func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) { csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData) if !ok { @@ -1059,6 +1074,9 @@ func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bcha tx.CoinSpecificData = csd } +// populateBitcoinVinAddrDescs fills missing vin address descriptors by loading +// previous outputs. This enables sender-side address subscription matching for +// Bitcoin transactions parsed from connected blocks. func populateBitcoinVinAddrDescs(vins []bchain.MempoolVin, getAddrDesc func(string, uint32) (bchain.AddressDescriptor, error)) { if getAddrDesc == nil { return @@ -1074,6 +1092,8 @@ func populateBitcoinVinAddrDescs(vins []bchain.MempoolVin, getAddrDesc func(stri } } +// getBitcoinVinAddrDesc resolves an input outpoint to an address descriptor +// using txCache. It is best-effort and can return chain-level not-found errors. func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchain.AddressDescriptor, error) { if s.txCache == nil { return nil, bchain.ErrTxNotFound @@ -1088,6 +1108,8 @@ func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchai return s.chainParser.GetAddrDescFromVout(&prevTx.Vout[vout]) } +// publishNewBlockTxsByAddr emits confirmed transaction notifications only for +// subscribed addresses touched by transactions in the connected block. func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) { for _, tx := range block.Txs { setConfirmedBlockTxMetadata(&tx, block.Time) @@ -1106,6 +1128,8 @@ func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) { } subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers) if len(subscribed) > 0 { + // Convert and publish asynchronously so heavy tx conversion does not + // block processing of other transactions in the same block. go func(tx bchain.Tx, subscribed map[string]struct{}) { setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt) atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil) @@ -1127,6 +1151,7 @@ func (s *WebsocketServer) OnNewBlock(block *bchain.Block) { defer s.addressSubscriptionsLock.Unlock() go s.onNewBlockAsync(block.Hash, block.Height) if s.newBlockTxsSubscriptionCount > 0 { + // Skip per-tx address matching when nobody opted into newBlockTxs. go s.publishNewBlockTxsByAddr(block) } } @@ -1163,6 +1188,8 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap as, ok := s.addressSubscriptions[stringAddressDescriptor] if ok { for c, details := range as { + // Mempool notifications go to all address subscribers; confirmed + // block notifications only go to subscribers that requested them. if newBlockTx && !details.publishNewBlockTxs { continue } diff --git a/server/ws_types.go b/server/ws_types.go index 39ec11ff..96d0de62 100644 --- a/server/ws_types.go +++ b/server/ws_types.go @@ -148,7 +148,7 @@ type WsSendTransactionReq struct { // WsSubscribeAddressesReq is used to subscribe to updates on a list of addresses. type WsSubscribeAddressesReq struct { Addresses []string `json:"addresses" ts_doc:"List of addresses to subscribe for updates (e.g., new transactions)."` - NewBlockTxs bool `json:"newBlockTxs,omitempty"` + NewBlockTxs bool `json:"newBlockTxs,omitempty" ts_doc:"If true, also publish confirmed transactions for subscribed addresses when new blocks are connected."` } // WsSubscribeFiatRatesReq subscribes to updates of fiat rates for a specific currency or set of tokens.