From 174640273dde80ad684491ea21a6033bc9cb0789 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Tue, 17 Feb 2026 12:05:03 +0100 Subject: [PATCH] new websocket prometheus metrics --- common/metrics.go | 145 +++++++++++++++++++++++++++++++------------- server/websocket.go | 133 ++++++++++++++++++++++++++++++---------- 2 files changed, 204 insertions(+), 74 deletions(-) diff --git a/common/metrics.go b/common/metrics.go index a867e2c0..81f24fda 100644 --- a/common/metrics.go +++ b/common/metrics.go @@ -8,47 +8,54 @@ import ( // Metrics holds prometheus collectors for various metrics collected by Blockbook type Metrics struct { - SocketIORequests *prometheus.CounterVec - SocketIOSubscribes *prometheus.CounterVec - SocketIOClients prometheus.Gauge - SocketIOReqDuration *prometheus.HistogramVec - WebsocketRequests *prometheus.CounterVec - WebsocketSubscribes *prometheus.GaugeVec - WebsocketClients prometheus.Gauge - WebsocketReqDuration *prometheus.HistogramVec - IndexResyncDuration prometheus.Histogram - MempoolResyncDuration prometheus.Histogram - TxCacheEfficiency *prometheus.CounterVec - RPCLatency *prometheus.HistogramVec - EthCallRequests *prometheus.CounterVec - EthCallErrors *prometheus.CounterVec - EthCallBatchSize prometheus.Histogram - EthCallContractInfo *prometheus.CounterVec - EthCallTokenURI *prometheus.CounterVec - EthCallStakingPool *prometheus.CounterVec - IndexResyncErrors *prometheus.CounterVec - IndexDBSize prometheus.Gauge - ExplorerViews *prometheus.CounterVec - MempoolSize prometheus.Gauge - EstimatedFee *prometheus.GaugeVec - AvgBlockPeriod prometheus.Gauge - SyncBlockStats *prometheus.GaugeVec - SyncHotnessStats *prometheus.GaugeVec - AddrContractsCacheEntries prometheus.Gauge - AddrContractsCacheBytes prometheus.Gauge - AddrContractsCacheHits prometheus.Counter - AddrContractsCacheMisses prometheus.Counter - AddrContractsCacheFlushes *prometheus.CounterVec - DbColumnRows *prometheus.GaugeVec - DbColumnSize *prometheus.GaugeVec - BlockbookAppInfo *prometheus.GaugeVec - BackendBestHeight prometheus.Gauge - BlockbookBestHeight prometheus.Gauge - ExplorerPendingRequests *prometheus.GaugeVec - WebsocketPendingRequests *prometheus.GaugeVec - SocketIOPendingRequests *prometheus.GaugeVec - XPubCacheSize prometheus.Gauge - CoingeckoRequests *prometheus.CounterVec + SocketIORequests *prometheus.CounterVec + SocketIOSubscribes *prometheus.CounterVec + SocketIOClients prometheus.Gauge + SocketIOReqDuration *prometheus.HistogramVec + WebsocketRequests *prometheus.CounterVec + WebsocketSubscribes *prometheus.GaugeVec + WebsocketClients prometheus.Gauge + WebsocketReqDuration *prometheus.HistogramVec + WebsocketChannelCloses *prometheus.CounterVec + WebsocketUnknownMethods *prometheus.CounterVec + WebsocketAddrNotifications *prometheus.CounterVec + WebsocketNewBlockTxs *prometheus.CounterVec + WebsocketNewBlockTxsDuration *prometheus.HistogramVec + WebsocketEthReceipt *prometheus.CounterVec + WebsocketNewBlockTxsSubscriptions prometheus.Gauge + IndexResyncDuration prometheus.Histogram + MempoolResyncDuration prometheus.Histogram + TxCacheEfficiency *prometheus.CounterVec + RPCLatency *prometheus.HistogramVec + EthCallRequests *prometheus.CounterVec + EthCallErrors *prometheus.CounterVec + EthCallBatchSize prometheus.Histogram + EthCallContractInfo *prometheus.CounterVec + EthCallTokenURI *prometheus.CounterVec + EthCallStakingPool *prometheus.CounterVec + IndexResyncErrors *prometheus.CounterVec + IndexDBSize prometheus.Gauge + ExplorerViews *prometheus.CounterVec + MempoolSize prometheus.Gauge + EstimatedFee *prometheus.GaugeVec + AvgBlockPeriod prometheus.Gauge + SyncBlockStats *prometheus.GaugeVec + SyncHotnessStats *prometheus.GaugeVec + AddrContractsCacheEntries prometheus.Gauge + AddrContractsCacheBytes prometheus.Gauge + AddrContractsCacheHits prometheus.Counter + AddrContractsCacheMisses prometheus.Counter + AddrContractsCacheFlushes *prometheus.CounterVec + DbColumnRows *prometheus.GaugeVec + DbColumnSize *prometheus.GaugeVec + BlockbookAppInfo *prometheus.GaugeVec + BackendBestHeight prometheus.Gauge + BlockbookBestHeight prometheus.Gauge + ExplorerPendingRequests *prometheus.GaugeVec + WebsocketPendingRequests *prometheus.GaugeVec + SocketIOPendingRequests *prometheus.GaugeVec + XPubCacheSize prometheus.Gauge + CoingeckoRequests *prometheus.CounterVec } // Labels represents a collection of label name -> value mappings. @@ -122,6 +129,62 @@ func GetMetrics(coin string) (*Metrics, error) { }, []string{"method"}, ) + metrics.WebsocketChannelCloses = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_websocket_channel_closes", + Help: "Total number of websocket channel closes by reason", + ConstLabels: Labels{"coin": coin}, + }, + []string{"reason"}, + ) + metrics.WebsocketUnknownMethods = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_websocket_unknown_methods", + Help: "Total number of websocket requests with unknown method", + ConstLabels: Labels{"coin": coin}, + }, + []string{"method"}, + ) + metrics.WebsocketAddrNotifications = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_websocket_addr_notifications", + Help: "Total number of per-address websocket tx notifications by source", + ConstLabels: Labels{"coin": coin}, + }, + []string{"source"}, + ) + metrics.WebsocketNewBlockTxs = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_websocket_new_block_txs", + Help: "Total number of websocket newBlockTxs events by stage and status", + ConstLabels: Labels{"coin": coin}, + }, + []string{"stage", "status"}, + ) + metrics.WebsocketNewBlockTxsDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "blockbook_websocket_new_block_txs_duration_seconds", + Help: "Duration of websocket newBlockTxs processing stages in seconds", + Buckets: []float64{0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10}, + ConstLabels: Labels{"coin": coin}, + }, + []string{"stage"}, + ) + metrics.WebsocketEthReceipt = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_websocket_eth_receipt", + Help: "Total number of websocket Ethereum receipt enrichment outcomes", + ConstLabels: Labels{"coin": coin}, + }, + []string{"status"}, + ) + metrics.WebsocketNewBlockTxsSubscriptions = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "blockbook_websocket_new_block_txs_subscriptions", + Help: "Number of websocket address subscriptions with newBlockTxs enabled", + ConstLabels: Labels{"coin": coin}, + }, + ) metrics.IndexResyncDuration = prometheus.NewHistogram( prometheus.HistogramOpts{ Name: "blockbook_index_resync_duration", diff --git a/server/websocket.go b/server/websocket.go index cbad4126..607d1bd8 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -25,6 +25,7 @@ import ( const upgradeFailed = "Upgrade failed: " const outChannelSize = 500 const defaultTimeout = 60 * time.Second +const unknownMethodLabel = "unknown" // allRates is a special "currency" parameter that means all available currencies const allFiatRates = "!ALL!" @@ -44,6 +45,7 @@ type websocketChannel struct { requestHeader http.Header alive bool aliveLock sync.Mutex + closeReason string addrDescs []string // subscribed address descriptors as strings getAddressInfoDescriptorsMux sync.Mutex getAddressInfoDescriptors map[string]struct{} @@ -125,6 +127,9 @@ func NewWebsocketServer(db *db.RocksDB, chain bchain.BlockChain, mempool bchain. } glog.Info("Support of rpcCall for these contracts: ", envRpcCall) } + if s.metrics != nil { + s.metrics.WebsocketNewBlockTxsSubscriptions.Set(0) + } return s, nil } @@ -177,8 +182,11 @@ func (s *WebsocketServer) GetHandler() http.Handler { return s } -func (s *WebsocketServer) closeChannel(c *websocketChannel) bool { - if c.CloseOut() { +func (s *WebsocketServer) closeChannel(c *websocketChannel, reason string) bool { + if closed, closeReason := c.CloseOut(reason); closed { + if s.metrics != nil { + s.metrics.WebsocketChannelCloses.With(common.Labels{"reason": closeReason}).Inc() + } c.conn.Close() s.onDisconnect(c) return true @@ -186,19 +194,23 @@ func (s *WebsocketServer) closeChannel(c *websocketChannel) bool { return false } -func (c *websocketChannel) CloseOut() bool { +func (c *websocketChannel) CloseOut(reason string) (bool, string) { c.aliveLock.Lock() defer c.aliveLock.Unlock() if c.alive { c.alive = false + if c.closeReason == "" { + c.closeReason = reason + } + closeReason := c.closeReason //clean out close(c.out) for len(c.out) > 0 { <-c.out } - return true + return true, closeReason } - return false + return false, "" } func (c *websocketChannel) DataOut(data *WsRes) { @@ -209,6 +221,9 @@ func (c *websocketChannel) DataOut(data *WsRes) { c.out <- data } else { glog.Warning("Channel ", c.id, " overflow, closing") + if c.closeReason == "" { + c.closeReason = "overflow" + } // close the connection but do not call CloseOut - would call duplicate c.aliveLock.Lock // CloseOut will be called because the closed connection will cause break in the inputLoop c.conn.Close() @@ -221,13 +236,13 @@ func (s *WebsocketServer) inputLoop(c *websocketChannel) { if r := recover(); r != nil { glog.Error("recovered from panic: ", r, ", ", c.id) debug.PrintStack() - s.closeChannel(c) + s.closeChannel(c, "panic") } }() for { t, d, err := c.conn.ReadMessage() if err != nil { - s.closeChannel(c) + s.closeChannel(c, "read_error") return } switch t { @@ -236,18 +251,18 @@ func (s *WebsocketServer) inputLoop(c *websocketChannel) { err := json.Unmarshal(d, &req) if err != nil { glog.Error("Error parsing message from ", c.id, ", ", string(d), ", ", err) - s.closeChannel(c) + s.closeChannel(c, "protocol_error") return } go s.onRequest(c, &req) case websocket.BinaryMessage: glog.Error("Binary message received from ", c.id, ", ", c.ip) - s.closeChannel(c) + s.closeChannel(c, "protocol_error") return case websocket.PingMessage: c.conn.WriteControl(websocket.PongMessage, nil, time.Now().Add(defaultTimeout)) case websocket.CloseMessage: - s.closeChannel(c) + s.closeChannel(c, "client_close") return case websocket.PongMessage: // do nothing @@ -259,14 +274,14 @@ func (s *WebsocketServer) outputLoop(c *websocketChannel) { defer func() { if r := recover(); r != nil { glog.Error("recovered from panic: ", r, ", ", c.id) - s.closeChannel(c) + s.closeChannel(c, "panic") } }() for m := range c.out { err := c.conn.WriteJSON(m) if err != nil { glog.Error("Error sending message to ", c.id, ", ", err) - s.closeChannel(c) + s.closeChannel(c, "write_error") return } } @@ -296,7 +311,7 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe l := len(c.getAddressInfoDescriptors) c.getAddressInfoDescriptorsMux.Unlock() if l > s.is.WsGetAccountInfoLimit { - if s.closeChannel(c) { + if s.closeChannel(c, "limit_exceeded") { glog.Info("Client ", c.id, " exceeded getAddressInfo limit, ", c.ip) s.is.AddWsLimitExceedingIP(c.ip) } @@ -493,6 +508,11 @@ var requestHandlers = map[string]func(*WebsocketServer, *websocketChannel, *WsRe func (s *WebsocketServer) onRequest(c *websocketChannel, req *WsReq) { var err error var data interface{} + f, ok := requestHandlers[req.Method] + methodLabel := req.Method + if !ok { + methodLabel = unknownMethodLabel + } defer func() { if r := recover(); r != nil { glog.Error("Client ", c.id, ", onRequest ", req.Method, " recovered from panic: ", r) @@ -508,29 +528,30 @@ func (s *WebsocketServer) onRequest(c *websocketChannel, req *WsReq) { Data: data, }) } - s.metrics.WebsocketPendingRequests.With((common.Labels{"method": req.Method})).Dec() + s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Dec() }() t := time.Now() - s.metrics.WebsocketPendingRequests.With((common.Labels{"method": req.Method})).Inc() + s.metrics.WebsocketPendingRequests.With(common.Labels{"method": methodLabel}).Inc() defer func() { - s.metrics.WebsocketReqDuration.With(common.Labels{"method": req.Method}).Observe(float64(time.Since(t)) / 1e3) // in microseconds + s.metrics.WebsocketReqDuration.With(common.Labels{"method": methodLabel}).Observe(float64(time.Since(t)) / 1e3) // in microseconds }() - f, ok := requestHandlers[req.Method] if ok { data, err = f(s, c, req) if err == nil { glog.V(1).Info("Client ", c.id, " onRequest ", req.Method, " success") - s.metrics.WebsocketRequests.With(common.Labels{"method": req.Method, "status": "success"}).Inc() + s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "success"}).Inc() } else { if apiErr, ok := err.(*api.APIError); !ok || !apiErr.Public { glog.Error("Client ", c.id, " onMessage ", req.Method, ": ", errors.ErrorStack(err), ", data ", string(req.Params)) } - s.metrics.WebsocketRequests.With(common.Labels{"method": req.Method, "status": "failure"}).Inc() + s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc() e := resultError{} e.Error.Message = err.Error() data = e } } else { + s.metrics.WebsocketUnknownMethods.With(common.Labels{"method": methodLabel}).Inc() + s.metrics.WebsocketRequests.With(common.Labels{"method": methodLabel, "status": "failure"}).Inc() glog.V(1).Info("Client ", c.id, " onMessage ", req.Method, ": unknown method, data ", string(req.Params)) } } @@ -860,7 +881,7 @@ func (s *WebsocketServer) subscribeNewBlock(c *websocketChannel, req *WsReq) (re s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() s.newBlockSubscriptions[c] = req.ID - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewBlock"})).Set(float64(len(s.newBlockSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions))) return &subscriptionResponse{true}, nil } @@ -868,7 +889,7 @@ func (s *WebsocketServer) unsubscribeNewBlock(c *websocketChannel) (res interfac s.newBlockSubscriptionsLock.Lock() defer s.newBlockSubscriptionsLock.Unlock() delete(s.newBlockSubscriptions, c) - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewBlock"})).Set(float64(len(s.newBlockSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewBlock"}).Set(float64(len(s.newBlockSubscriptions))) return &subscriptionResponse{false}, nil } @@ -879,7 +900,7 @@ func (s *WebsocketServer) subscribeNewTransaction(c *websocketChannel, req *WsRe return &subscriptionResponseMessage{false, "subscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil } s.newTransactionSubscriptions[c] = req.ID - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewTransaction"})).Set(float64(len(s.newTransactionSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions))) return &subscriptionResponse{true}, nil } @@ -890,7 +911,7 @@ func (s *WebsocketServer) unsubscribeNewTransaction(c *websocketChannel) (res in return &subscriptionResponseMessage{false, "unsubscribeNewTransaction not enabled, use -enablesubnewtx flag to enable."}, nil } delete(s.newTransactionSubscriptions, c) - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeNewTransaction"})).Set(float64(len(s.newTransactionSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeNewTransaction"}).Set(float64(len(s.newTransactionSubscriptions))) return &subscriptionResponse{false}, nil } @@ -956,7 +977,8 @@ func (s *WebsocketServer) subscribeAddresses(c *websocketChannel, addrDesc []str } } c.addrDescs = addrDesc - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions))) + s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount)) return &subscriptionResponse{true}, nil } @@ -965,7 +987,8 @@ func (s *WebsocketServer) unsubscribeAddresses(c *websocketChannel) (res interfa s.addressSubscriptionsLock.Lock() defer s.addressSubscriptionsLock.Unlock() s.doUnsubscribeAddresses(c) - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeAddresses"})).Set(float64(len(s.addressSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeAddresses"}).Set(float64(len(s.addressSubscriptions))) + s.metrics.WebsocketNewBlockTxsSubscriptions.Set(float64(s.newBlockTxsSubscriptionCount)) return &subscriptionResponse{false}, nil } @@ -1005,7 +1028,7 @@ func (s *WebsocketServer) subscribeFiatRates(c *websocketChannel, d *WsSubscribe if len(d.Tokens) != 0 { s.fiatRatesTokenSubscriptions[c] = d.Tokens } - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeFiatRates"})).Set(float64(len(s.fiatRatesSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions))) return &subscriptionResponse{true}, nil } @@ -1014,7 +1037,7 @@ func (s *WebsocketServer) unsubscribeFiatRates(c *websocketChannel) (res interfa s.fiatRatesSubscriptionsLock.Lock() defer s.fiatRatesSubscriptionsLock.Unlock() s.doUnsubscribeFiatRates(c) - s.metrics.WebsocketSubscribes.With((common.Labels{"method": "subscribeFiatRates"})).Set(float64(len(s.fiatRatesSubscriptions))) + s.metrics.WebsocketSubscribes.With(common.Labels{"method": "subscribeFiatRates"}).Set(float64(len(s.fiatRatesSubscriptions))) return &subscriptionResponse{false}, nil } @@ -1060,18 +1083,38 @@ func getEthereumInternalTransfers(tx *bchain.Tx) []bchain.EthereumInternalTransf // setEthereumReceiptIfAvailable adds receipt data to Ethereum txs on a // best-effort basis; failures are logged and notifications continue. -func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) { +func setEthereumReceiptIfAvailable(tx *bchain.Tx, getReceipt func(string) (*bchain.RpcReceipt, error)) string { csd, ok := tx.CoinSpecificData.(bchain.EthereumSpecificData) if !ok { - return + return "skipped_non_eth" } receipt, err := getReceipt(tx.Txid) if err != nil { glog.Error("EthereumTypeGetTransactionReceipt error ", err, " for ", tx.Txid) - return + return "error" } csd.Receipt = receipt tx.CoinSpecificData = csd + return "success" +} + +func observeNewBlockTxDuration(metrics *common.Metrics, stage string, started time.Time) { + if metrics == nil { + return + } + metrics.WebsocketNewBlockTxsDuration.With(common.Labels{"stage": stage}).Observe(time.Since(started).Seconds()) +} + +func incNewBlockTxMetric(metrics *common.Metrics, stage, status string, value float64) { + if metrics == nil { + return + } + counter := metrics.WebsocketNewBlockTxs.With(common.Labels{"stage": stage, "status": status}) + if value == 1 { + counter.Inc() + } else { + counter.Add(value) + } } // populateBitcoinVinAddrDescs fills missing vin address descriptors by loading @@ -1111,11 +1154,15 @@ func (s *WebsocketServer) getBitcoinVinAddrDesc(txid string, vout uint32) (bchai // publishNewBlockTxsByAddr emits confirmed transaction notifications only for // subscribed addresses touched by transactions in the connected block. func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) { + blockStart := time.Now() + defer observeNewBlockTxDuration(s.metrics, "per_block", blockStart) + chainType := s.chainParser.GetChainType() for _, tx := range block.Txs { + incNewBlockTxMetric(s.metrics, "scanned", "success", 1) setConfirmedBlockTxMetadata(&tx, block.Time) var tokenTransfers bchain.TokenTransfers var internalTransfers []bchain.EthereumInternalTransfer - if s.chainParser.GetChainType() == bchain.ChainEthereumType { + if chainType == bchain.ChainEthereumType { tokenTransfers, _ = s.chainParser.EthereumTypeGetTokenTransfersFromTx(&tx) internalTransfers = getEthereumInternalTransfers(&tx) } @@ -1123,23 +1170,36 @@ func (s *WebsocketServer) publishNewBlockTxsByAddr(block *bchain.Block) { for i, vin := range tx.Vin { vins[i] = bchain.MempoolVin{Vin: vin} } - if s.chainParser.GetChainType() == bchain.ChainBitcoinType { + if chainType == bchain.ChainBitcoinType { populateBitcoinVinAddrDescs(vins, s.getBitcoinVinAddrDesc) } + matchStart := time.Now() subscribed := s.getNewTxSubscriptions(vins, tx.Vout, tokenTransfers, internalTransfers) + observeNewBlockTxDuration(s.metrics, "match", matchStart) if len(subscribed) > 0 { + incNewBlockTxMetric(s.metrics, "matched", "success", 1) // Convert and publish asynchronously so heavy tx conversion does not // block processing of other transactions in the same block. go func(tx bchain.Tx, subscribed map[string]struct{}) { - setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt) + if chainType == bchain.ChainEthereumType { + receiptStatus := setEthereumReceiptIfAvailable(&tx, s.chain.EthereumTypeGetTransactionReceipt) + if s.metrics != nil { + s.metrics.WebsocketEthReceipt.With(common.Labels{"status": receiptStatus}).Inc() + } + } + convertStart := time.Now() atx, err := s.api.GetTransactionFromBchainTx(&tx, int(block.Height), false, false, nil) + observeNewBlockTxDuration(s.metrics, "convert", convertStart) if err != nil { + incNewBlockTxMetric(s.metrics, "converted", "failure", 1) glog.Error("GetTransactionFromBchainTx error ", err, " for ", tx.Txid) return } + incNewBlockTxMetric(s.metrics, "converted", "success", 1) for stringAddressDescriptor := range subscribed { s.sendOnNewTxAddr(stringAddressDescriptor, atx, true) } + incNewBlockTxMetric(s.metrics, "published", "success", float64(len(subscribed))) }(tx, subscribed) } } @@ -1187,12 +1247,19 @@ func (s *WebsocketServer) sendOnNewTxAddr(stringAddressDescriptor string, tx *ap defer s.addressSubscriptionsLock.Unlock() as, ok := s.addressSubscriptions[stringAddressDescriptor] if ok { + source := "mempool" + if newBlockTx { + source = "new_block" + } for c, details := range as { // Mempool notifications go to all address subscribers; confirmed // block notifications only go to subscribers that requested them. if newBlockTx && !details.publishNewBlockTxs { continue } + if s.metrics != nil { + s.metrics.WebsocketAddrNotifications.With(common.Labels{"source": source}).Inc() + } c.DataOut(&WsRes{ ID: details.requestID, Data: &data,