new websocket prometheus metrics

This commit is contained in:
pragmaxim
2026-02-17 12:05:03 +01:00
parent efbf7f559f
commit 174640273d
2 changed files with 204 additions and 74 deletions

View File

@@ -8,47 +8,54 @@ import (
// Metrics holds prometheus collectors for various metrics collected by Blockbook
type Metrics struct {
SocketIORequests *prometheus.CounterVec
SocketIOSubscribes *prometheus.CounterVec
SocketIOClients prometheus.Gauge
SocketIOReqDuration *prometheus.HistogramVec
WebsocketRequests *prometheus.CounterVec
WebsocketSubscribes *prometheus.GaugeVec
WebsocketClients prometheus.Gauge
WebsocketReqDuration *prometheus.HistogramVec
IndexResyncDuration prometheus.Histogram
MempoolResyncDuration prometheus.Histogram
TxCacheEfficiency *prometheus.CounterVec
RPCLatency *prometheus.HistogramVec
EthCallRequests *prometheus.CounterVec
EthCallErrors *prometheus.CounterVec
EthCallBatchSize prometheus.Histogram
EthCallContractInfo *prometheus.CounterVec
EthCallTokenURI *prometheus.CounterVec
EthCallStakingPool *prometheus.CounterVec
IndexResyncErrors *prometheus.CounterVec
IndexDBSize prometheus.Gauge
ExplorerViews *prometheus.CounterVec
MempoolSize prometheus.Gauge
EstimatedFee *prometheus.GaugeVec
AvgBlockPeriod prometheus.Gauge
SyncBlockStats *prometheus.GaugeVec
SyncHotnessStats *prometheus.GaugeVec
AddrContractsCacheEntries prometheus.Gauge
AddrContractsCacheBytes prometheus.Gauge
AddrContractsCacheHits prometheus.Counter
AddrContractsCacheMisses prometheus.Counter
AddrContractsCacheFlushes *prometheus.CounterVec
DbColumnRows *prometheus.GaugeVec
DbColumnSize *prometheus.GaugeVec
BlockbookAppInfo *prometheus.GaugeVec
BackendBestHeight prometheus.Gauge
BlockbookBestHeight prometheus.Gauge
ExplorerPendingRequests *prometheus.GaugeVec
WebsocketPendingRequests *prometheus.GaugeVec
SocketIOPendingRequests *prometheus.GaugeVec
XPubCacheSize prometheus.Gauge
CoingeckoRequests *prometheus.CounterVec
SocketIORequests *prometheus.CounterVec
SocketIOSubscribes *prometheus.CounterVec
SocketIOClients prometheus.Gauge
SocketIOReqDuration *prometheus.HistogramVec
WebsocketRequests *prometheus.CounterVec
WebsocketSubscribes *prometheus.GaugeVec
WebsocketClients prometheus.Gauge
WebsocketReqDuration *prometheus.HistogramVec
WebsocketChannelCloses *prometheus.CounterVec
WebsocketUnknownMethods *prometheus.CounterVec
WebsocketAddrNotifications *prometheus.CounterVec
WebsocketNewBlockTxs *prometheus.CounterVec
WebsocketNewBlockTxsDuration *prometheus.HistogramVec
WebsocketEthReceipt *prometheus.CounterVec
WebsocketNewBlockTxsSubscriptions prometheus.Gauge
IndexResyncDuration prometheus.Histogram
MempoolResyncDuration prometheus.Histogram
TxCacheEfficiency *prometheus.CounterVec
RPCLatency *prometheus.HistogramVec
EthCallRequests *prometheus.CounterVec
EthCallErrors *prometheus.CounterVec
EthCallBatchSize prometheus.Histogram
EthCallContractInfo *prometheus.CounterVec
EthCallTokenURI *prometheus.CounterVec
EthCallStakingPool *prometheus.CounterVec
IndexResyncErrors *prometheus.CounterVec
IndexDBSize prometheus.Gauge
ExplorerViews *prometheus.CounterVec
MempoolSize prometheus.Gauge
EstimatedFee *prometheus.GaugeVec
AvgBlockPeriod prometheus.Gauge
SyncBlockStats *prometheus.GaugeVec
SyncHotnessStats *prometheus.GaugeVec
AddrContractsCacheEntries prometheus.Gauge
AddrContractsCacheBytes prometheus.Gauge
AddrContractsCacheHits prometheus.Counter
AddrContractsCacheMisses prometheus.Counter
AddrContractsCacheFlushes *prometheus.CounterVec
DbColumnRows *prometheus.GaugeVec
DbColumnSize *prometheus.GaugeVec
BlockbookAppInfo *prometheus.GaugeVec
BackendBestHeight prometheus.Gauge
BlockbookBestHeight prometheus.Gauge
ExplorerPendingRequests *prometheus.GaugeVec
WebsocketPendingRequests *prometheus.GaugeVec
SocketIOPendingRequests *prometheus.GaugeVec
XPubCacheSize prometheus.Gauge
CoingeckoRequests *prometheus.CounterVec
}
// Labels represents a collection of label name -> value mappings.
@@ -122,6 +129,62 @@ func GetMetrics(coin string) (*Metrics, error) {
},
[]string{"method"},
)
metrics.WebsocketChannelCloses = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_websocket_channel_closes",
Help: "Total number of websocket channel closes by reason",
ConstLabels: Labels{"coin": coin},
},
[]string{"reason"},
)
metrics.WebsocketUnknownMethods = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_websocket_unknown_methods",
Help: "Total number of websocket requests with unknown method",
ConstLabels: Labels{"coin": coin},
},
[]string{"method"},
)
metrics.WebsocketAddrNotifications = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_websocket_addr_notifications",
Help: "Total number of per-address websocket tx notifications by source",
ConstLabels: Labels{"coin": coin},
},
[]string{"source"},
)
metrics.WebsocketNewBlockTxs = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_websocket_new_block_txs",
Help: "Total number of websocket newBlockTxs events by stage and status",
ConstLabels: Labels{"coin": coin},
},
[]string{"stage", "status"},
)
metrics.WebsocketNewBlockTxsDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "blockbook_websocket_new_block_txs_duration_seconds",
Help: "Duration of websocket newBlockTxs processing stages in seconds",
Buckets: []float64{0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10},
ConstLabels: Labels{"coin": coin},
},
[]string{"stage"},
)
metrics.WebsocketEthReceipt = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_websocket_eth_receipt",
Help: "Total number of websocket Ethereum receipt enrichment outcomes",
ConstLabels: Labels{"coin": coin},
},
[]string{"status"},
)
metrics.WebsocketNewBlockTxsSubscriptions = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "blockbook_websocket_new_block_txs_subscriptions",
Help: "Number of websocket address subscriptions with newBlockTxs enabled",
ConstLabels: Labels{"coin": coin},
},
)
metrics.IndexResyncDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "blockbook_index_resync_duration",

View File

@@ -25,6 +25,7 @@ import (
const upgradeFailed = "Upgrade failed: "
const outChannelSize = 500
const defaultTimeout = 60 * time.Second
const unknownMethodLabel = "unknown"
// allRates is a special "currency" parameter that means all available currencies
const allFiatRates = "!ALL!"
@@ -44,6 +45,7 @@ type websocketChannel struct {
requestHeader http.Header
alive bool
aliveLock sync.Mutex
closeReason string
addrDescs []string // subscribed address descriptors as strings
getAddressInfoDescriptorsMux sync.Mutex
getAddressInfoDescriptors map[string]struct{}
@@ -125,6 +127,9 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain.
}
glog.Info("Support of rpcCall for these contracts: ", envRpcCall)
}
if s.metrics != nil {
s.metrics.WebsocketNewBlockTxsSubscriptions.Set(0)
}
return s, nil
}
@@ -177,8 +182,11 @@ func (s *WebsocketServer) GetHandler() http.Handler {
return s
}
func (s *WebsocketServer) closeChannel(c *websocketChannel) bool {
if c.CloseOut() {
func (s *WebsocketServer) closeChannel(c *websocketChannel, reason string) bool {
if closed, closeReason := c.CloseOut(reason); closed {
if s.metrics != nil {
s.metrics.WebsocketChannelCloses.With(common.Labels{"reason": closeReason}).Inc()
}
c.conn.Close()
s.onDisconnect(c)
return true
@@ -186,19 +194,23 @@ func (s *WebsocketServer) closeChannel(c *websocketChannel) bool {
return false
}
func (c *websocketChannel) CloseOut() bool {
func (c *websocketChannel) CloseOut(reason string) (bool, string) {
c.aliveLock.Lock()
defer c.aliveLock.Unlock()
if c.alive {
c.alive = false
if c.closeReason == "" {
c.closeReason = reason
}
closeReason := c.closeReason
//clean out
close(c.out)
for len(c.out) > 0 {
<-c.out
}
return true
return true, closeReason
}
return false
return false, ""
}
func (c *websocketChannel) DataOut(data *WsRes) {
@@ -209,6 +221,9 @@ func (c *websocketChannel) DataOut(data *WsRes) {
c.out <- data
} else {
glog.Warning("Channel ", c.id, " overflow, closing")
if c.closeReason == "" {
c.closeReason = "overflow"
}
// close the connection but do not call CloseOut - would call duplicate c.aliveLock.Lock
// CloseOut will be called because the closed connection will cause break in the inputLoop
c.conn.Close()
@@ -221,13 +236,13 @@ func (s *WebsocketServer) inputLoop(c *websocketChannel) {
if r := recover(); r != nil {
glog.Error("recovered from panic: ", r, ", ", c.id)
debug.PrintStack()
s.closeChannel(c)
s.closeChannel(c, "panic")
}
}()
for {
t, d, err := c.conn.ReadMessage()
if err != nil {
s.closeChannel(c)
s.closeChannel(c, "read_error")
return
}
switch t {
@@ -236,18 +251,18 @@ func (s *WebsocketServer) inputLoop(c *websocketChannel) {
err := json.Unmarshal(d, &req)
if err != nil {
glog.Error("Error parsing message from ", c.id, ", ", string(d), ", ", err)
s.closeChannel(c)
s.closeChannel(c, "protocol_error")
return
}
go s.onRequest(c, &req)
case websocket.BinaryMessage:
glog.Error("Binary message received from ", c.id, ", ", c.ip)
s.closeChannel(c)
s.closeChannel(c, "protocol_error")
return
case websocket.PingMessage:
c.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(defaultTimeout))
case websocket.CloseMessage:
s.closeChannel(c)
s.closeChannel(c, "client_close")
return
case websocket.PongMessage:
// do nothing
@@ -259,14 +274,14 @@ func (s *WebsocketServer) outputLoop(c *websocketChannel) {
defer func() {
if r := recover(); r != nil {
glog.Error("recovered from panic: ", r, ", ", c.id)
s.closeChannel(c)
s.closeChannel(c, "panic")
}
}()
for m := range c.out {
err := c.conn.WriteJSON(m)
if err != nil {
glog.Error("Error sending message to ", c.id, ", ", err)
s.closeChannel(c)
s.closeChannel(c, "write_error")
return
}
}
@@ -296,7 +311,7 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe
l := len(c.getAddressInfoDescriptors)
c.getAddressInfoDescriptorsMux.Unlock()
if l > s.is.WsGetAccountInfoLimit {
if s.closeChannel(c) {
if s.closeChannel(c, "limit_exceeded") {
glog.Info("Client ", c.id, " exceeded getAddressInfo limit, ", c.ip)
s.is.AddWsLimitExceedingIP(c.ip)
}
@@ -493,6 +508,11 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe
func (s *WebsocketServer) onRequest(c *websocketChannel, req *WsReq) {
var err error
var data interface{}
f, ok := requestHandlers[req.Method]
methodLabel := req.Method
if !ok {
methodLabel = unknownMethodLabel
}
defer func() {
if r := recover(); r != nil {
glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r)
@@ -508,29 +528,30 @@ func (s *WebsocketServer) onRequest(c *websocketChannel, req *WsReq) {
Data: data,
})
}
s.metrics.WebsocketPendingRequests.With((common.Labels{"method": req.Method})).Dec()
s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Dec()
}()
t := time.Now()
s.metrics.WebsocketPendingRequests.With((common.Labels{"method": req.Method})).Inc()
s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Inc()
defer func() {
s.metrics.WebsocketReqDuration.With(common.Labels{"method": req.Method}).Observe(float64(time.Since(t)) / 1e3) // in microseconds
s.metrics.WebsocketReqDuration.With(common.Labels{"method": methodLabel}).Observe(float64(time.Since(t)) / 1e3) // in microseconds
}()
f, ok := requestHandlers[req.Method]
if ok {
data, err = f(s, c, req)
if err == nil {
glog.V(1).Info("Client ", c.id, " onRequest ", req.Method, " success")
s.metrics.WebsocketRequests.With(common.Labels{"method": req.Method, "status": "success"}).Inc()
s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "success"}).Inc()
} else {
if apiErr, ok := err.(*api.APIError); !ok || !apiErr.Public {
glog.Error("Client ", c.id, " onMessage ", req.Method, ": ", errors.ErrorStack(err), ", data ", string(req.Params))
}
s.metrics.WebsocketRequests.With(common.Labels{"method": req.Method, "status": "failure"}).Inc()
s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc()
e := resultError{}
e.Error.Message = err.Error()
data = e
}
} else {
s.metrics.WebsocketUnknownMethods.With(common.Labels{"method": methodLabel}).Inc()
s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc()
glog.V(1).Info("Client ", c.id, " onMessage ", req.Method, ": unknown method, data ", string(req.Params))
}
}
@@ -860,7 +881,7 @@ func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *WsReq) (re
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
s.newBlockSubscriptions[c] = req.ID
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewBlock"})).Set(float64(len(s.newBlockSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions)))
return &subscriptionResponse{true}, nil
}
@@ -868,7 +889,7 @@ func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interfac
s.newBlockSubscriptionsLock.Lock()
defer s.newBlockSubscriptionsLock.Unlock()
delete(s.newBlockSubscriptions, c)
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewBlock"})).Set(float64(len(s.newBlockSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions)))
return &subscriptionResponse{false}, nil
}
@@ -879,7 +900,7 @@ func (s *WebsocketServer) subscribeNewTransaction(c *websocketChannel, req *WsRe
return &subscriptionResponseMessage{false, "subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil
}
s.newTransactionSubscriptions[c] = req.ID
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewTransaction"})).Set(float64(len(s.newTransactionSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions)))
return &subscriptionResponse{true}, nil
}
@@ -890,7 +911,7 @@ func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res in
return &subscriptionResponseMessage{false, "unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil
}
delete(s.newTransactionSubscriptions, c)
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewTransaction"})).Set(float64(len(s.newTransactionSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions)))
return &subscriptionResponse{false}, nil
}
@@ -956,7 +977,8 @@ func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []str
}
}
c.addrDescs = addrDesc
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions)))
s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount))
return &subscriptionResponse{true}, nil
}
@@ -965,7 +987,8 @@ func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interfa
s.addressSubscriptionsLock.Lock()
defer s.addressSubscriptionsLock.Unlock()
s.doUnsubscribeAddresses(c)
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions)))
s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount))
return &subscriptionResponse{false}, nil
}
@@ -1005,7 +1028,7 @@ func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, d *WsSubscribe
if len(d.Tokens) != 0 {
s.fiatRatesTokenSubscriptions[c] = d.Tokens
}
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeFiatRates"})).Set(float64(len(s.fiatRatesSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions)))
return &subscriptionResponse{true}, nil
}
@@ -1014,7 +1037,7 @@ func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interfa
s.fiatRatesSubscriptionsLock.Lock()
defer s.fiatRatesSubscriptionsLock.Unlock()
s.doUnsubscribeFiatRates(c)
s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeFiatRates"})).Set(float64(len(s.fiatRatesSubscriptions)))
s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions)))
return &subscriptionResponse{false}, nil
}
@@ -1060,18 +1083,38 @@ func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransf
// setEthereumReceiptIfAvailable adds receipt data to Ethereum txs on a
// best-effort basis; failures are logged and notifications continue.
func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) {
func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) string {
csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if !ok {
return
return "skipped_non_eth"
}
receipt, err := getReceipt(tx.Txid)
if err != nil {
glog.Error("EthereumTypeGetTransactionReceipt error ", err, " for ", tx.Txid)
return
return "error"
}
csd.Receipt = receipt
tx.CoinSpecificData = csd
return "success"
}
func observeNewBlockTxDuration(metrics *common.Metrics, stage string, started time.Time) {
if metrics == nil {
return
}
metrics.WebsocketNewBlockTxsDuration.With(common.Labels{"stage": stage}).Observe(time.Since(started).Seconds())
}
func incNewBlockTxMetric(metrics *common.Metrics, stage, status string, value float64) {
if metrics == nil {
return
}
counter := metrics.WebsocketNewBlockTxs.With(common.Labels{"stage": stage, "status": status})
if value == 1 {
counter.Inc()
} else {
counter.Add(value)
}
}
// populateBitcoinVinAddrDescs fills missing vin address descriptors by loading
@@ -1111,11 +1154,15 @@ func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchai
// publishNewBlockTxsByAddr emits confirmed transaction notifications only for
// subscribed addresses touched by transactions in the connected block.
func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) {
blockStart := time.Now()
defer observeNewBlockTxDuration(s.metrics, "per_block", blockStart)
chainType := s.chainParser.GetChainType()
for _, tx := range block.Txs {
incNewBlockTxMetric(s.metrics, "scanned", "success", 1)
setConfirmedBlockTxMetadata(&tx, block.Time)
var tokenTransfers bchain.TokenTransfers
var internalTransfers []bchain.EthereumInternalTransfer
if s.chainParser.GetChainType() == bchain.ChainEthereumType {
if chainType == bchain.ChainEthereumType {
tokenTransfers, _ = s.chainParser.EthereumTypeGetTokenTransfersFromTx(&tx)
internalTransfers = getEthereumInternalTransfers(&tx)
}
@@ -1123,23 +1170,36 @@ func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) {
for i, vin := range tx.Vin {
vins[i] = bchain.MempoolVin{Vin: vin}
}
if s.chainParser.GetChainType() == bchain.ChainBitcoinType {
if chainType == bchain.ChainBitcoinType {
populateBitcoinVinAddrDescs(vins, s.getBitcoinVinAddrDesc)
}
matchStart := time.Now()
subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers)
observeNewBlockTxDuration(s.metrics, "match", matchStart)
if len(subscribed) > 0 {
incNewBlockTxMetric(s.metrics, "matched", "success", 1)
// Convert and publish asynchronously so heavy tx conversion does not
// block processing of other transactions in the same block.
go func(tx bchain.Tx, subscribed map[string]struct{}) {
setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt)
if chainType == bchain.ChainEthereumType {
receiptStatus := setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt)
if s.metrics != nil {
s.metrics.WebsocketEthReceipt.With(common.Labels{"status": receiptStatus}).Inc()
}
}
convertStart := time.Now()
atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil)
observeNewBlockTxDuration(s.metrics, "convert", convertStart)
if err != nil {
incNewBlockTxMetric(s.metrics, "converted", "failure", 1)
glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid)
return
}
incNewBlockTxMetric(s.metrics, "converted", "success", 1)
for stringAddressDescriptor := range subscribed {
s.sendOnNewTxAddr(stringAddressDescriptor, atx, true)
}
incNewBlockTxMetric(s.metrics, "published", "success", float64(len(subscribed)))
}(tx, subscribed)
}
}
@@ -1187,12 +1247,19 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap
defer s.addressSubscriptionsLock.Unlock()
as, ok := s.addressSubscriptions[stringAddressDescriptor]
if ok {
source := "mempool"
if newBlockTx {
source = "new_block"
}
for c, details := range as {
// Mempool notifications go to all address subscribers; confirmed
// block notifications only go to subscribers that requested them.
if newBlockTx && !details.publishNewBlockTxs {
continue
}
if s.metrics != nil {
s.metrics.WebsocketAddrNotifications.With(common.Labels{"source": source}).Inc()
}
c.DataOut(&WsRes{
ID: details.requestID,
Data: &data,