inline documentation in websocket.go

This commit is contained in:
pragmaxim
2026-02-17 10:35:00 +01:00
parent f9fc15eddc
commit efbf7f559f
2 changed files with 35 additions and 8 deletions

View File

@@ -50,7 +50,9 @@ type websocketChannel struct {
}
type addressDetails struct {
requestID string
requestID string
// publishNewBlockTxs enables notifications for confirmed transactions
// detected while processing newly connected blocks.
publishNewBlockTxs bool
}
@@ -73,11 +75,13 @@ type WebsocketServer struct {
newTransactionSubscriptionsLock sync.Mutex
addressSubscriptions map[string]map[*websocketChannel]*addressDetails
addressSubscriptionsLock sync.Mutex
newBlockTxsSubscriptionCount int
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesTokenSubscriptions map[*websocketChannel][]string
fiatRatesSubscriptionsLock sync.Mutex
allowedRpcCallTo map[string]struct{}
// newBlockTxsSubscriptionCount is a fast-path guard for OnNewBlock.
// It tracks how many address subscriptions requested newBlockTxs=true.
newBlockTxsSubscriptionCount int
fiatRatesSubscriptions map[string]map[*websocketChannel]string
fiatRatesTokenSubscriptions map[*websocketChannel][]string
fiatRatesSubscriptionsLock sync.Mutex
allowedRpcCallTo map[string]struct{}
}
// NewWebsocketServer creates new websocket interface to blockbook and returns its handle
@@ -907,7 +911,8 @@ func (s *WebsocketServer) unmarshalAddresses(params []byte) ([]string, bool, err
return rv, r.NewBlockTxs, nil
}
// doUnsubscribeAddresses addresses without addressSubscriptionsLock - can be called only from subscribeAddresses and unsubscribeAddresses
// doUnsubscribeAddresses removes all address subscriptions for a channel.
// addressSubscriptionsLock must be held by the caller.
func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) {
for _, ads := range c.addrDescs {
sa, e := s.addressSubscriptions[ads]
@@ -928,6 +933,9 @@ func (s *WebsocketServer) doUnsubscribeAddresses(c *websocketChannel) {
c.addrDescs = nil
}
// subscribeAddresses replaces previous address subscriptions for the channel.
// If newBlockTxs is enabled, the channel receives both mempool notifications and
// confirmed notifications detected from newly connected blocks.
func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []string, newBlockTxs bool, req *WsReq) (res interface{}, err error) {
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
@@ -1029,6 +1037,9 @@ func (s *WebsocketServer) onNewBlockAsync(hash string, height uint32) {
glog.Info("broadcasting new block ", height, " ", hash, " to ", len(s.newBlockSubscriptions), " channels")
}
// setConfirmedBlockTxMetadata normalizes parsed block transactions.
// ParseBlock can return txs with zero confirmations; we force first-confirmed
// metadata so conversion does not take mempool-only branches.
func setConfirmedBlockTxMetadata(tx *bchain.Tx, blockTime int64) {
if tx.Confirmations == 0 {
tx.Confirmations = 1
@@ -1037,6 +1048,8 @@ func setConfirmedBlockTxMetadata(tx *bchain.Tx, blockTime int64) {
}
}
// getEthereumInternalTransfers safely extracts internal transfers from
// CoinSpecificData when present.
func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransfer {
esd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if !ok || esd.InternalData == nil {
@@ -1045,6 +1058,8 @@ func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransf
return esd.InternalData.Transfers
}
// setEthereumReceiptIfAvailable adds receipt data to Ethereum txs on a
// best-effort basis; failures are logged and notifications continue.
func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) {
csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if !ok {
@@ -1059,6 +1074,9 @@ func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bcha
tx.CoinSpecificData = csd
}
// populateBitcoinVinAddrDescs fills missing vin address descriptors by loading
// previous outputs. This enables sender-side address subscription matching for
// Bitcoin transactions parsed from connected blocks.
func populateBitcoinVinAddrDescs(vins []bchain.MempoolVin, getAddrDesc func(string, uint32) (bchain.AddressDescriptor, error)) {
if getAddrDesc == nil {
return
@@ -1074,6 +1092,8 @@ func populateBitcoinVinAddrDescs(vins []bchain.MempoolVin, getAddrDesc func(stri
}
}
// getBitcoinVinAddrDesc resolves an input outpoint to an address descriptor
// using txCache. It is best-effort and can return chain-level not-found errors.
func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchain.AddressDescriptor, error) {
if s.txCache == nil {
return nil, bchain.ErrTxNotFound
@@ -1088,6 +1108,8 @@ func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchai
return s.chainParser.GetAddrDescFromVout(&prevTx.Vout[vout])
}
// publishNewBlockTxsByAddr emits confirmed transaction notifications only for
// subscribed addresses touched by transactions in the connected block.
func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) {
for _, tx := range block.Txs {
setConfirmedBlockTxMetadata(&tx, block.Time)
@@ -1106,6 +1128,8 @@ func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) {
}
subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers)
if len(subscribed) > 0 {
// Convert and publish asynchronously so heavy tx conversion does not
// block processing of other transactions in the same block.
go func(tx bchain.Tx, subscribed map[string]struct{}) {
setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt)
atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil)
@@ -1127,6 +1151,7 @@ func (s *WebsocketServer) OnNewBlock(block *bchain.Block) {
defer s.addressSubscriptionsLock.Unlock()
go s.onNewBlockAsync(block.Hash, block.Height)
if s.newBlockTxsSubscriptionCount > 0 {
// Skip per-tx address matching when nobody opted into newBlockTxs.
go s.publishNewBlockTxsByAddr(block)
}
}
@@ -1163,6 +1188,8 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap
as, ok := s.addressSubscriptions[stringAddressDescriptor]
if ok {
for c, details := range as {
// Mempool notifications go to all address subscribers; confirmed
// block notifications only go to subscribers that requested them.
if newBlockTx && !details.publishNewBlockTxs {
continue
}

View File

@@ -148,7 +148,7 @@ type WsSendTransactionReq struct {
// WsSubscribeAddressesReq is used to subscribe to updates on a list of addresses.
type WsSubscribeAddressesReq struct {
Addresses []string `json:"addresses" ts_doc:"List of addresses to subscribe for updates (e.g., new transactions)."`
NewBlockTxs bool `json:"newBlockTxs,omitempty"`
NewBlockTxs bool `json:"newBlockTxs,omitempty" ts_doc:"If true, also publish confirmed transactions for subscribed addresses when new blocks are connected."`
}
// WsSubscribeFiatRatesReq subscribes to updates of fiat rates for a specific currency or set of tokens.