diff --git a/common/metrics.go b/common/metrics.go index 0cb1ec45..922364fb 100644 --- a/common/metrics.go +++ b/common/metrics.go @@ -8,34 +8,41 @@ import ( // Metrics holds prometheus collectors for various metrics collected by Blockbook type Metrics struct { - SocketIORequests *prometheus.CounterVec - SocketIOSubscribes *prometheus.CounterVec - SocketIOClients prometheus.Gauge - SocketIOReqDuration *prometheus.HistogramVec - WebsocketRequests *prometheus.CounterVec - WebsocketSubscribes *prometheus.GaugeVec - WebsocketClients prometheus.Gauge - WebsocketReqDuration *prometheus.HistogramVec - IndexResyncDuration prometheus.Histogram - MempoolResyncDuration prometheus.Histogram - TxCacheEfficiency *prometheus.CounterVec - RPCLatency *prometheus.HistogramVec - IndexResyncErrors *prometheus.CounterVec - IndexDBSize prometheus.Gauge - ExplorerViews *prometheus.CounterVec - MempoolSize prometheus.Gauge - EstimatedFee *prometheus.GaugeVec - AvgBlockPeriod prometheus.Gauge - DbColumnRows *prometheus.GaugeVec - DbColumnSize *prometheus.GaugeVec - BlockbookAppInfo *prometheus.GaugeVec - BackendBestHeight prometheus.Gauge - BlockbookBestHeight prometheus.Gauge - ExplorerPendingRequests *prometheus.GaugeVec - WebsocketPendingRequests *prometheus.GaugeVec - SocketIOPendingRequests *prometheus.GaugeVec - XPubCacheSize prometheus.Gauge - CoingeckoRequests *prometheus.CounterVec + SocketIORequests *prometheus.CounterVec + SocketIOSubscribes *prometheus.CounterVec + SocketIOClients prometheus.Gauge + SocketIOReqDuration *prometheus.HistogramVec + WebsocketRequests *prometheus.CounterVec + WebsocketSubscribes *prometheus.GaugeVec + WebsocketClients prometheus.Gauge + WebsocketReqDuration *prometheus.HistogramVec + IndexResyncDuration prometheus.Histogram + MempoolResyncDuration prometheus.Histogram + TxCacheEfficiency *prometheus.CounterVec + RPCLatency *prometheus.HistogramVec + IndexResyncErrors *prometheus.CounterVec + IndexDBSize prometheus.Gauge + ExplorerViews *prometheus.CounterVec + MempoolSize prometheus.Gauge + EstimatedFee *prometheus.GaugeVec + AvgBlockPeriod prometheus.Gauge + SyncBlockStats *prometheus.GaugeVec + SyncHotnessStats *prometheus.GaugeVec + AddrContractsCacheEntries prometheus.Gauge + AddrContractsCacheBytes prometheus.Gauge + AddrContractsCacheHits prometheus.Counter + AddrContractsCacheMisses prometheus.Counter + AddrContractsCacheFlushes *prometheus.CounterVec + DbColumnRows *prometheus.GaugeVec + DbColumnSize *prometheus.GaugeVec + BlockbookAppInfo *prometheus.GaugeVec + BackendBestHeight prometheus.Gauge + BlockbookBestHeight prometheus.Gauge + ExplorerPendingRequests *prometheus.GaugeVec + WebsocketPendingRequests *prometheus.GaugeVec + SocketIOPendingRequests *prometheus.GaugeVec + XPubCacheSize prometheus.Gauge + CoingeckoRequests *prometheus.CounterVec } // Labels represents a collection of label name -> value mappings. @@ -187,6 +194,58 @@ func GetMetrics(coin string) (*Metrics, error) { ConstLabels: Labels{"coin": coin}, }, ) + metrics.SyncBlockStats = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "blockbook_sync_block_stats", + Help: "Per-interval block stats for bulk sync and per-block stats at chain tip", + ConstLabels: Labels{"coin": coin}, + }, + []string{"scope", "kind"}, + ) + metrics.SyncHotnessStats = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "blockbook_sync_hotness_stats", + Help: "Hot address stats for bulk sync intervals and per-block chain tip processing (Ethereum-type only)", + ConstLabels: Labels{"coin": coin}, + }, + []string{"scope", "kind"}, + ) + metrics.AddrContractsCacheEntries = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "blockbook_addr_contracts_cache_entries", + Help: "Number of cached addressContracts entries", + ConstLabels: Labels{"coin": coin}, + }, + ) + metrics.AddrContractsCacheBytes = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "blockbook_addr_contracts_cache_bytes", + Help: "Estimated bytes in the addressContracts cache", + ConstLabels: Labels{"coin": coin}, + }, + ) + metrics.AddrContractsCacheHits = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "blockbook_addr_contracts_cache_hits_total", + Help: "Total number of addressContracts cache hits", + ConstLabels: Labels{"coin": coin}, + }, + ) + metrics.AddrContractsCacheMisses = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "blockbook_addr_contracts_cache_misses_total", + Help: "Total number of addressContracts cache misses", + ConstLabels: Labels{"coin": coin}, + }, + ) + metrics.AddrContractsCacheFlushes = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "blockbook_addr_contracts_cache_flush_total", + Help: "Total number of addressContracts cache flushes by reason", + ConstLabels: Labels{"coin": coin}, + }, + []string{"reason"}, + ) metrics.DbColumnRows = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "blockbook_dbcolumn_rows", diff --git a/db/address_hotness.go b/db/address_hotness.go index 7ffa18eb..68005705 100644 --- a/db/address_hotness.go +++ b/db/address_hotness.go @@ -122,6 +122,13 @@ func (h *addressHotness) LogSuffix() string { h.blockEligibleLookups, h.blockLRUHits, h.blockPromotions, h.blockEvictions, hitRate) } +func (h *addressHotness) Stats() (eligible, hits, promotions, evictions uint64) { + if h == nil { + return 0, 0, 0, 0 + } + return h.blockEligibleLookups, h.blockLRUHits, h.blockPromotions, h.blockEvictions +} + type hotAddressLRU struct { capacity int order *list.List diff --git a/db/bulkconnect.go b/db/bulkconnect.go index ab40d9fe..fa9849c2 100644 --- a/db/bulkconnect.go +++ b/db/bulkconnect.go @@ -7,6 +7,7 @@ import ( "github.com/golang/glog" "github.com/linxGnu/grocksdb" "github.com/trezor/blockbook/bchain" + "github.com/trezor/blockbook/common" ) // bulk connect @@ -33,6 +34,7 @@ type BulkConnect struct { addressContracts map[string]*unpackedAddrContracts height uint32 bulkStats bulkConnectStats + bulkHotness bulkHotnessStats } const ( @@ -47,6 +49,7 @@ const ( ) type bulkConnectStats struct { + blocks uint64 txs uint64 tokenTransfers uint64 internalTransfers uint64 @@ -54,6 +57,13 @@ type bulkConnectStats struct { vout uint64 } +type bulkHotnessStats struct { + eligible uint64 + hits uint64 + promotions uint64 + evictions uint64 +} + // InitBulkConnect initializes bulk connect and switches DB to inconsistent state func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) { b := &BulkConnect{ @@ -194,6 +204,7 @@ func (b *BulkConnect) storeBulkBlockFilters(wb *grocksdb.WriteBatch) error { } func (b *BulkConnect) addEthereumStats(blockTxs []ethBlockTx) { + b.bulkStats.blocks++ b.bulkStats.txs += uint64(len(blockTxs)) for i := range blockTxs { b.bulkStats.tokenTransfers += uint64(len(blockTxs[i].contracts)) @@ -201,9 +212,17 @@ func (b *BulkConnect) addEthereumStats(blockTxs []ethBlockTx) { b.bulkStats.internalTransfers += uint64(len(blockTxs[i].internalData.transfers)) } } + if b.d.hotAddrTracker != nil { + eligible, hits, promotions, evictions := b.d.hotAddrTracker.Stats() + b.bulkHotness.eligible += eligible + b.bulkHotness.hits += hits + b.bulkHotness.promotions += promotions + b.bulkHotness.evictions += evictions + } } func (b *BulkConnect) addBitcoinStats(block *bchain.Block) { + b.bulkStats.blocks++ b.bulkStats.txs += uint64(len(block.Txs)) for i := range block.Txs { b.bulkStats.vin += uint64(len(block.Txs[i].Vin)) @@ -211,6 +230,22 @@ func (b *BulkConnect) addBitcoinStats(block *bchain.Block) { } } +func (b *BulkConnect) updateSyncMetrics(scope string) { + if b.d.metrics == nil { + return + } + b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "blocks"}).Set(float64(b.bulkStats.blocks)) + b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "txs"}).Set(float64(b.bulkStats.txs)) + b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "token_transfers"}).Set(float64(b.bulkStats.tokenTransfers)) + b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "internal_transfers"}).Set(float64(b.bulkStats.internalTransfers)) + b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "vin"}).Set(float64(b.bulkStats.vin)) + b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "vout"}).Set(float64(b.bulkStats.vout)) + b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "eligible_lookups"}).Set(float64(b.bulkHotness.eligible)) + b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "lru_hits"}).Set(float64(b.bulkHotness.hits)) + b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "promotions"}).Set(float64(b.bulkHotness.promotions)) + b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "evictions"}).Set(float64(b.bulkHotness.evictions)) +} + func (b *BulkConnect) statsLogSuffix() string { if b.bulkStats.txs == 0 && b.bulkStats.tokenTransfers == 0 && b.bulkStats.internalTransfers == 0 && b.bulkStats.vin == 0 && b.bulkStats.vout == 0 { return "" @@ -231,6 +266,7 @@ func (b *BulkConnect) statsLogSuffix() string { func (b *BulkConnect) resetStats() { b.bulkStats = bulkConnectStats{} + b.bulkHotness = bulkHotnessStats{} } func (b *BulkConnect) connectBlockBitcoinType(block *bchain.Block, storeBlockTxs bool) error { @@ -303,6 +339,7 @@ func (b *BulkConnect) connectBlockBitcoinType(block *bchain.Block, storeBlockTxs suffix += b.d.hotAddrTracker.LogSuffix() } glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start), suffix) + b.updateSyncMetrics("bulk") b.resetStats() } } @@ -417,6 +454,7 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx suffix += b.d.hotAddrTracker.LogSuffix() } glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start), suffix) + b.updateSyncMetrics("bulk") b.resetStats() } } else { @@ -489,6 +527,7 @@ func (b *BulkConnect) Close() error { suffix += b.d.hotAddrTracker.LogSuffix() } glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start), suffix) + b.updateSyncMetrics("bulk") b.resetStats() if storeTxAddressesChan != nil { if err := <-storeTxAddressesChan; err != nil { diff --git a/db/rocksdb.go b/db/rocksdb.go index 5b9d865c..c1d496ca 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -389,6 +389,12 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error { wb := grocksdb.NewWriteBatch() defer wb.Destroy() + var tipTxs uint64 + var tipTokenTransfers uint64 + var tipInternalTransfers uint64 + var tipVin uint64 + var tipVout uint64 + if glog.V(2) { glog.Infof("rocksdb: insert %d %s", block.Height, block.Hash) } @@ -412,6 +418,13 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error { if err := d.processAddressesBitcoinType(block, addresses, txAddressesMap, balances, gf); err != nil { return err } + if d.metrics != nil { + tipTxs = uint64(len(block.Txs)) + for i := range block.Txs { + tipVin += uint64(len(block.Txs[i].Vin)) + tipVout += uint64(len(block.Txs[i].Vout)) + } + } if err := d.storeTxAddresses(wb, txAddressesMap); err != nil { return err } @@ -433,6 +446,15 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error { if err != nil { return err } + if d.metrics != nil { + for i := range blockTxs { + tipTokenTransfers += uint64(len(blockTxs[i].contracts)) + if blockTxs[i].internalData != nil { + tipInternalTransfers += uint64(len(blockTxs[i].internalData.transfers)) + } + } + tipTxs = uint64(len(blockTxs)) + } if err := d.storeUnpackedAddressContracts(wb, addressContracts); err != nil { return err } @@ -458,6 +480,24 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error { if d.metrics != nil { d.metrics.AvgBlockPeriod.Set(float64(avg)) } + if d.metrics != nil { + if chainType == bchain.ChainBitcoinType { + d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "txs"}).Set(float64(tipTxs)) + d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "vin"}).Set(float64(tipVin)) + d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "vout"}).Set(float64(tipVout)) + } else if chainType == bchain.ChainEthereumType { + d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "txs"}).Set(float64(tipTxs)) + d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "token_transfers"}).Set(float64(tipTokenTransfers)) + d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "internal_transfers"}).Set(float64(tipInternalTransfers)) + if d.hotAddrTracker != nil { + eligible, hits, promotions, evictions := d.hotAddrTracker.Stats() + d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "eligible_lookups"}).Set(float64(eligible)) + d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "lru_hits"}).Set(float64(hits)) + d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "promotions"}).Set(float64(promotions)) + d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "evictions"}).Set(float64(evictions)) + } + } + } return nil } diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index 71ffce7f..1a4fb93e 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -15,6 +15,7 @@ import ( "github.com/linxGnu/grocksdb" "github.com/trezor/blockbook/bchain" "github.com/trezor/blockbook/bchain/coins/eth" + "github.com/trezor/blockbook/common" ) const InternalTxIndexOffset = 1 @@ -1750,8 +1751,14 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor rv, found := d.addrContractsCache[string(addrDesc)] d.addrContractsCacheMux.Unlock() if found && rv != nil { + if d.metrics != nil { + d.metrics.AddrContractsCacheHits.Inc() + } return rv, nil } + if d.metrics != nil { + d.metrics.AddrContractsCacheMisses.Inc() + } val, err := d.db.GetCF(d.ro, d.cfh[cfAddressContracts], addrDesc) if err != nil { return nil, err @@ -1767,6 +1774,8 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor minSize = addrContractsCacheMinSize } if err == nil && rv != nil && len(buf) > minSize { + var cacheEntries int + var cacheBytes int64 shouldFlush := false d.addrContractsCacheMux.Lock() key := string(addrDesc) @@ -1778,7 +1787,13 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor shouldFlush = true } } + cacheEntries = len(d.addrContractsCache) + cacheBytes = d.addrContractsCacheBytes d.addrContractsCacheMux.Unlock() + if d.metrics != nil { + d.metrics.AddrContractsCacheEntries.Set(float64(cacheEntries)) + d.metrics.AddrContractsCacheBytes.Set(float64(cacheBytes)) + } if shouldFlush { // Flush early when we exceed the cap to avoid unbounded memory growth. d.flushAddrContractsCache() @@ -1956,6 +1971,13 @@ func (d *RocksDB) flushAddrContractsCache() { d.addrContractsCache = make(map[string]*unpackedAddrContracts) d.addrContractsCacheBytes = 0 d.addrContractsCacheMux.Unlock() + if d.metrics != nil { + d.metrics.AddrContractsCacheEntries.Set(0) + d.metrics.AddrContractsCacheBytes.Set(0) + if count > 0 { + d.metrics.AddrContractsCacheFlushes.With(common.Labels{"reason": "cap"}).Inc() + } + } if count > 0 { d.writeContractsCacheSnapshot(cache) } @@ -1964,9 +1986,13 @@ func (d *RocksDB) flushAddrContractsCache() { func (d *RocksDB) storeAddrContractsCache() { start := time.Now() - if len(d.addrContractsCache) > 0 { + count := len(d.addrContractsCache) + if count > 0 { d.writeContractsCache() } + if d.metrics != nil && count > 0 { + d.metrics.AddrContractsCacheFlushes.With(common.Labels{"reason": "timer"}).Inc() + } glog.Info("storeAddrContractsCache: store ", len(d.addrContractsCache), " entries in ", time.Since(start)) }