syncing/caching premetheus metrics

This commit is contained in:
pragmaxim
2026-02-11 08:13:40 +01:00
parent 9487b23db7
commit 9aed7923c6
5 changed files with 200 additions and 29 deletions

View File

@@ -8,34 +8,41 @@ import (
// Metrics holds prometheus collectors for various metrics collected by Blockbook
type Metrics struct {
SocketIORequests *prometheus.CounterVec
SocketIOSubscribes *prometheus.CounterVec
SocketIOClients prometheus.Gauge
SocketIOReqDuration *prometheus.HistogramVec
WebsocketRequests *prometheus.CounterVec
WebsocketSubscribes *prometheus.GaugeVec
WebsocketClients prometheus.Gauge
WebsocketReqDuration *prometheus.HistogramVec
IndexResyncDuration prometheus.Histogram
MempoolResyncDuration prometheus.Histogram
TxCacheEfficiency *prometheus.CounterVec
RPCLatency *prometheus.HistogramVec
IndexResyncErrors *prometheus.CounterVec
IndexDBSize prometheus.Gauge
ExplorerViews *prometheus.CounterVec
MempoolSize prometheus.Gauge
EstimatedFee *prometheus.GaugeVec
AvgBlockPeriod prometheus.Gauge
DbColumnRows *prometheus.GaugeVec
DbColumnSize *prometheus.GaugeVec
BlockbookAppInfo *prometheus.GaugeVec
BackendBestHeight prometheus.Gauge
BlockbookBestHeight prometheus.Gauge
ExplorerPendingRequests *prometheus.GaugeVec
WebsocketPendingRequests *prometheus.GaugeVec
SocketIOPendingRequests *prometheus.GaugeVec
XPubCacheSize prometheus.Gauge
CoingeckoRequests *prometheus.CounterVec
SocketIORequests *prometheus.CounterVec
SocketIOSubscribes *prometheus.CounterVec
SocketIOClients prometheus.Gauge
SocketIOReqDuration *prometheus.HistogramVec
WebsocketRequests *prometheus.CounterVec
WebsocketSubscribes *prometheus.GaugeVec
WebsocketClients prometheus.Gauge
WebsocketReqDuration *prometheus.HistogramVec
IndexResyncDuration prometheus.Histogram
MempoolResyncDuration prometheus.Histogram
TxCacheEfficiency *prometheus.CounterVec
RPCLatency *prometheus.HistogramVec
IndexResyncErrors *prometheus.CounterVec
IndexDBSize prometheus.Gauge
ExplorerViews *prometheus.CounterVec
MempoolSize prometheus.Gauge
EstimatedFee *prometheus.GaugeVec
AvgBlockPeriod prometheus.Gauge
SyncBlockStats *prometheus.GaugeVec
SyncHotnessStats *prometheus.GaugeVec
AddrContractsCacheEntries prometheus.Gauge
AddrContractsCacheBytes prometheus.Gauge
AddrContractsCacheHits prometheus.Counter
AddrContractsCacheMisses prometheus.Counter
AddrContractsCacheFlushes *prometheus.CounterVec
DbColumnRows *prometheus.GaugeVec
DbColumnSize *prometheus.GaugeVec
BlockbookAppInfo *prometheus.GaugeVec
BackendBestHeight prometheus.Gauge
BlockbookBestHeight prometheus.Gauge
ExplorerPendingRequests *prometheus.GaugeVec
WebsocketPendingRequests *prometheus.GaugeVec
SocketIOPendingRequests *prometheus.GaugeVec
XPubCacheSize prometheus.Gauge
CoingeckoRequests *prometheus.CounterVec
}
// Labels represents a collection of label name -> value mappings.
@@ -187,6 +194,58 @@ func GetMetrics(coin string) (*Metrics, error) {
ConstLabels: Labels{"coin": coin},
},
)
metrics.SyncBlockStats = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "blockbook_sync_block_stats",
Help: "Per-interval block stats for bulk sync and per-block stats at chain tip",
ConstLabels: Labels{"coin": coin},
},
[]string{"scope", "kind"},
)
metrics.SyncHotnessStats = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "blockbook_sync_hotness_stats",
Help: "Hot address stats for bulk sync intervals and per-block chain tip processing (Ethereum-type only)",
ConstLabels: Labels{"coin": coin},
},
[]string{"scope", "kind"},
)
metrics.AddrContractsCacheEntries = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "blockbook_addr_contracts_cache_entries",
Help: "Number of cached addressContracts entries",
ConstLabels: Labels{"coin": coin},
},
)
metrics.AddrContractsCacheBytes = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "blockbook_addr_contracts_cache_bytes",
Help: "Estimated bytes in the addressContracts cache",
ConstLabels: Labels{"coin": coin},
},
)
metrics.AddrContractsCacheHits = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "blockbook_addr_contracts_cache_hits_total",
Help: "Total number of addressContracts cache hits",
ConstLabels: Labels{"coin": coin},
},
)
metrics.AddrContractsCacheMisses = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "blockbook_addr_contracts_cache_misses_total",
Help: "Total number of addressContracts cache misses",
ConstLabels: Labels{"coin": coin},
},
)
metrics.AddrContractsCacheFlushes = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "blockbook_addr_contracts_cache_flush_total",
Help: "Total number of addressContracts cache flushes by reason",
ConstLabels: Labels{"coin": coin},
},
[]string{"reason"},
)
metrics.DbColumnRows = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "blockbook_dbcolumn_rows",

View File

@@ -122,6 +122,13 @@ func (h *addressHotness) LogSuffix() string {
h.blockEligibleLookups, h.blockLRUHits, h.blockPromotions, h.blockEvictions, hitRate)
}
func (h *addressHotness) Stats() (eligible, hits, promotions, evictions uint64) {
if h == nil {
return 0, 0, 0, 0
}
return h.blockEligibleLookups, h.blockLRUHits, h.blockPromotions, h.blockEvictions
}
type hotAddressLRU struct {
capacity int
order *list.List

View File

@@ -7,6 +7,7 @@ import (
"github.com/golang/glog"
"github.com/linxGnu/grocksdb"
"github.com/trezor/blockbook/bchain"
"github.com/trezor/blockbook/common"
)
// bulk connect
@@ -33,6 +34,7 @@ type BulkConnect struct {
addressContracts map[string]*unpackedAddrContracts
height uint32
bulkStats bulkConnectStats
bulkHotness bulkHotnessStats
}
const (
@@ -47,6 +49,7 @@ const (
)
type bulkConnectStats struct {
blocks uint64
txs uint64
tokenTransfers uint64
internalTransfers uint64
@@ -54,6 +57,13 @@ type bulkConnectStats struct {
vout uint64
}
type bulkHotnessStats struct {
eligible uint64
hits uint64
promotions uint64
evictions uint64
}
// InitBulkConnect initializes bulk connect and switches DB to inconsistent state
func (d *RocksDB) InitBulkConnect() (*BulkConnect, error) {
b := &BulkConnect{
@@ -194,6 +204,7 @@ func (b *BulkConnect) storeBulkBlockFilters(wb *grocksdb.WriteBatch) error {
}
func (b *BulkConnect) addEthereumStats(blockTxs []ethBlockTx) {
b.bulkStats.blocks++
b.bulkStats.txs += uint64(len(blockTxs))
for i := range blockTxs {
b.bulkStats.tokenTransfers += uint64(len(blockTxs[i].contracts))
@@ -201,9 +212,17 @@ func (b *BulkConnect) addEthereumStats(blockTxs []ethBlockTx) {
b.bulkStats.internalTransfers += uint64(len(blockTxs[i].internalData.transfers))
}
}
if b.d.hotAddrTracker != nil {
eligible, hits, promotions, evictions := b.d.hotAddrTracker.Stats()
b.bulkHotness.eligible += eligible
b.bulkHotness.hits += hits
b.bulkHotness.promotions += promotions
b.bulkHotness.evictions += evictions
}
}
func (b *BulkConnect) addBitcoinStats(block *bchain.Block) {
b.bulkStats.blocks++
b.bulkStats.txs += uint64(len(block.Txs))
for i := range block.Txs {
b.bulkStats.vin += uint64(len(block.Txs[i].Vin))
@@ -211,6 +230,22 @@ func (b *BulkConnect) addBitcoinStats(block *bchain.Block) {
}
}
func (b *BulkConnect) updateSyncMetrics(scope string) {
if b.d.metrics == nil {
return
}
b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "blocks"}).Set(float64(b.bulkStats.blocks))
b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "txs"}).Set(float64(b.bulkStats.txs))
b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "token_transfers"}).Set(float64(b.bulkStats.tokenTransfers))
b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "internal_transfers"}).Set(float64(b.bulkStats.internalTransfers))
b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "vin"}).Set(float64(b.bulkStats.vin))
b.d.metrics.SyncBlockStats.With(common.Labels{"scope": scope, "kind": "vout"}).Set(float64(b.bulkStats.vout))
b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "eligible_lookups"}).Set(float64(b.bulkHotness.eligible))
b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "lru_hits"}).Set(float64(b.bulkHotness.hits))
b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "promotions"}).Set(float64(b.bulkHotness.promotions))
b.d.metrics.SyncHotnessStats.With(common.Labels{"scope": scope, "kind": "evictions"}).Set(float64(b.bulkHotness.evictions))
}
func (b *BulkConnect) statsLogSuffix() string {
if b.bulkStats.txs == 0 && b.bulkStats.tokenTransfers == 0 && b.bulkStats.internalTransfers == 0 && b.bulkStats.vin == 0 && b.bulkStats.vout == 0 {
return ""
@@ -231,6 +266,7 @@ func (b *BulkConnect) statsLogSuffix() string {
func (b *BulkConnect) resetStats() {
b.bulkStats = bulkConnectStats{}
b.bulkHotness = bulkHotnessStats{}
}
func (b *BulkConnect) connectBlockBitcoinType(block *bchain.Block, storeBlockTxs bool) error {
@@ -303,6 +339,7 @@ func (b *BulkConnect) connectBlockBitcoinType(block *bchain.Block, storeBlockTxs
suffix += b.d.hotAddrTracker.LogSuffix()
}
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start), suffix)
b.updateSyncMetrics("bulk")
b.resetStats()
}
}
@@ -417,6 +454,7 @@ func (b *BulkConnect) connectBlockEthereumType(block *bchain.Block, storeBlockTx
suffix += b.d.hotAddrTracker.LogSuffix()
}
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start), suffix)
b.updateSyncMetrics("bulk")
b.resetStats()
}
} else {
@@ -489,6 +527,7 @@ func (b *BulkConnect) Close() error {
suffix += b.d.hotAddrTracker.LogSuffix()
}
glog.Info("rocksdb: height ", b.height, ", stored ", bac, " addresses, done in ", time.Since(start), suffix)
b.updateSyncMetrics("bulk")
b.resetStats()
if storeTxAddressesChan != nil {
if err := <-storeTxAddressesChan; err != nil {

View File

@@ -389,6 +389,12 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error {
wb := grocksdb.NewWriteBatch()
defer wb.Destroy()
var tipTxs uint64
var tipTokenTransfers uint64
var tipInternalTransfers uint64
var tipVin uint64
var tipVout uint64
if glog.V(2) {
glog.Infof("rocksdb: insert %d %s", block.Height, block.Hash)
}
@@ -412,6 +418,13 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error {
if err := d.processAddressesBitcoinType(block, addresses, txAddressesMap, balances, gf); err != nil {
return err
}
if d.metrics != nil {
tipTxs = uint64(len(block.Txs))
for i := range block.Txs {
tipVin += uint64(len(block.Txs[i].Vin))
tipVout += uint64(len(block.Txs[i].Vout))
}
}
if err := d.storeTxAddresses(wb, txAddressesMap); err != nil {
return err
}
@@ -433,6 +446,15 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error {
if err != nil {
return err
}
if d.metrics != nil {
for i := range blockTxs {
tipTokenTransfers += uint64(len(blockTxs[i].contracts))
if blockTxs[i].internalData != nil {
tipInternalTransfers += uint64(len(blockTxs[i].internalData.transfers))
}
}
tipTxs = uint64(len(blockTxs))
}
if err := d.storeUnpackedAddressContracts(wb, addressContracts); err != nil {
return err
}
@@ -458,6 +480,24 @@ func (d *RocksDB) ConnectBlock(block *bchain.Block) error {
if d.metrics != nil {
d.metrics.AvgBlockPeriod.Set(float64(avg))
}
if d.metrics != nil {
if chainType == bchain.ChainBitcoinType {
d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "txs"}).Set(float64(tipTxs))
d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "vin"}).Set(float64(tipVin))
d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "vout"}).Set(float64(tipVout))
} else if chainType == bchain.ChainEthereumType {
d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "txs"}).Set(float64(tipTxs))
d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "token_transfers"}).Set(float64(tipTokenTransfers))
d.metrics.SyncBlockStats.With(common.Labels{"scope": "tip", "kind": "internal_transfers"}).Set(float64(tipInternalTransfers))
if d.hotAddrTracker != nil {
eligible, hits, promotions, evictions := d.hotAddrTracker.Stats()
d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "eligible_lookups"}).Set(float64(eligible))
d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "lru_hits"}).Set(float64(hits))
d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "promotions"}).Set(float64(promotions))
d.metrics.SyncHotnessStats.With(common.Labels{"scope": "tip", "kind": "evictions"}).Set(float64(evictions))
}
}
}
return nil
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/linxGnu/grocksdb"
"github.com/trezor/blockbook/bchain"
"github.com/trezor/blockbook/bchain/coins/eth"
"github.com/trezor/blockbook/common"
)
const InternalTxIndexOffset = 1
@@ -1750,8 +1751,14 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor
rv, found := d.addrContractsCache[string(addrDesc)]
d.addrContractsCacheMux.Unlock()
if found && rv != nil {
if d.metrics != nil {
d.metrics.AddrContractsCacheHits.Inc()
}
return rv, nil
}
if d.metrics != nil {
d.metrics.AddrContractsCacheMisses.Inc()
}
val, err := d.db.GetCF(d.ro, d.cfh[cfAddressContracts], addrDesc)
if err != nil {
return nil, err
@@ -1767,6 +1774,8 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor
minSize = addrContractsCacheMinSize
}
if err == nil && rv != nil && len(buf) > minSize {
var cacheEntries int
var cacheBytes int64
shouldFlush := false
d.addrContractsCacheMux.Lock()
key := string(addrDesc)
@@ -1778,7 +1787,13 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor
shouldFlush = true
}
}
cacheEntries = len(d.addrContractsCache)
cacheBytes = d.addrContractsCacheBytes
d.addrContractsCacheMux.Unlock()
if d.metrics != nil {
d.metrics.AddrContractsCacheEntries.Set(float64(cacheEntries))
d.metrics.AddrContractsCacheBytes.Set(float64(cacheBytes))
}
if shouldFlush {
// Flush early when we exceed the cap to avoid unbounded memory growth.
d.flushAddrContractsCache()
@@ -1956,6 +1971,13 @@ func (d *RocksDB) flushAddrContractsCache() {
d.addrContractsCache = make(map[string]*unpackedAddrContracts)
d.addrContractsCacheBytes = 0
d.addrContractsCacheMux.Unlock()
if d.metrics != nil {
d.metrics.AddrContractsCacheEntries.Set(0)
d.metrics.AddrContractsCacheBytes.Set(0)
if count > 0 {
d.metrics.AddrContractsCacheFlushes.With(common.Labels{"reason": "cap"}).Inc()
}
}
if count > 0 {
d.writeContractsCacheSnapshot(cache)
}
@@ -1964,9 +1986,13 @@ func (d *RocksDB) flushAddrContractsCache() {
func (d *RocksDB) storeAddrContractsCache() {
start := time.Now()
if len(d.addrContractsCache) > 0 {
count := len(d.addrContractsCache)
if count > 0 {
d.writeContractsCache()
}
if d.metrics != nil && count > 0 {
d.metrics.AddrContractsCacheFlushes.With(common.Labels{"reason": "timer"}).Inc()
}
glog.Info("storeAddrContractsCache: store ", len(d.addrContractsCache), " entries in ", time.Since(start))
}