time-based hotness address caching

This commit is contained in:
pragmaxim
2026-02-05 10:33:46 +01:00
parent d69f82523f
commit 5c59146e0b
4 changed files with 321 additions and 21 deletions

View File

@@ -0,0 +1,153 @@
package db
import (
"os"
"strconv"
"strings"
"time"
"github.com/golang/glog"
)
func (d *RocksDB) loadAddrContractsCacheConfigFromEnv() {
if d.is == nil {
return
}
network := strings.ToUpper(d.is.GetNetwork())
if network == "" {
return
}
if v, ok := lookupEnvInt(network+"_ADDR_CONTRACTS_CACHE_MIN_SIZE", d.addrContractsCacheMinSizeBytes); ok {
d.addrContractsCacheMinSizeBytes = v
}
if v, ok := lookupEnvInt(network+"_ADDR_CONTRACTS_CACHE_ALWAYS_SIZE", d.addrContractsCacheAlwaysBytes); ok {
d.addrContractsCacheAlwaysBytes = v
}
if v, ok := lookupEnvFloat(network+"_ADDR_CONTRACTS_CACHE_HOT_MIN_SCORE", d.addrContractsCacheHotMinScore); ok {
d.addrContractsCacheHotMinScore = v
}
if v, ok := lookupEnvDuration(network+"_ADDR_CONTRACTS_CACHE_HOT_HALF_LIFE", d.addrContractsHotHalfLife); ok {
d.addrContractsHotHalfLife = v
}
if v, ok := lookupEnvDuration(network+"_ADDR_CONTRACTS_CACHE_HOT_EVICT_AFTER", d.addrContractsHotEvictAfter); ok {
d.addrContractsHotEvictAfter = v
}
}
func lookupEnvInt(key string, current int) (int, bool) {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return current, false
}
if v, ok := parseSizeBytes(raw); ok {
glog.Infof("address cache: env %s=%s", key, raw)
return v, true
}
glog.Warningf("address cache: invalid %s=%s (expected bytes or K/M/G suffix)", key, raw)
return current, false
}
func lookupEnvFloat(key string, current float64) (float64, bool) {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return current, false
}
v, err := strconv.ParseFloat(raw, 64)
if err != nil {
glog.Warningf("address cache: invalid %s=%s (expected float)", key, raw)
return current, false
}
glog.Infof("address cache: env %s=%s", key, raw)
return v, true
}
func lookupEnvDuration(key string, current time.Duration) (time.Duration, bool) {
raw := strings.TrimSpace(os.Getenv(key))
if raw == "" {
return current, false
}
if v, ok := parseDuration(raw); ok {
glog.Infof("address cache: env %s=%s", key, raw)
return v, true
}
glog.Warningf("address cache: invalid %s=%s (expected duration, e.g. 30m)", key, raw)
return current, false
}
func parseDuration(raw string) (time.Duration, bool) {
if d, err := time.ParseDuration(raw); err == nil {
return d, true
}
if v, err := strconv.Atoi(raw); err == nil {
return time.Duration(v) * time.Minute, true
}
return 0, false
}
func parseSizeBytes(raw string) (int, bool) {
s := strings.TrimSpace(raw)
if s == "" {
return 0, false
}
mult := int64(1)
upper := strings.ToUpper(s)
switch {
case strings.HasSuffix(upper, "KIB"):
mult = 1 << 10
s = s[:len(s)-3]
case strings.HasSuffix(upper, "MIB"):
mult = 1 << 20
s = s[:len(s)-3]
case strings.HasSuffix(upper, "GIB"):
mult = 1 << 30
s = s[:len(s)-3]
case strings.HasSuffix(upper, "TIB"):
mult = 1 << 40
s = s[:len(s)-3]
case strings.HasSuffix(upper, "KB"):
mult = 1 << 10
s = s[:len(s)-2]
case strings.HasSuffix(upper, "MB"):
mult = 1 << 20
s = s[:len(s)-2]
case strings.HasSuffix(upper, "GB"):
mult = 1 << 30
s = s[:len(s)-2]
case strings.HasSuffix(upper, "TB"):
mult = 1 << 40
s = s[:len(s)-2]
default:
last := upper[len(upper)-1]
switch last {
case 'K':
mult = 1 << 10
s = s[:len(s)-1]
case 'M':
mult = 1 << 20
s = s[:len(s)-1]
case 'G':
mult = 1 << 30
s = s[:len(s)-1]
case 'T':
mult = 1 << 40
s = s[:len(s)-1]
}
}
s = strings.TrimSpace(s)
if s == "" {
return 0, false
}
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0, false
}
bytes := int64(v * float64(mult))
if bytes <= 0 {
return 0, false
}
if bytes > int64(^uint(0)>>1) {
return 0, false
}
return int(bytes), true
}

View File

@@ -58,6 +58,10 @@ const (
)
const addrContractsCacheMinSize = 300_000 // limit for caching address contracts in memory to speed up indexing
const addrContractsCacheAlwaysSize = 1_000_000
const addrContractsCacheHotMinScore = 2.0
const addrContractsHotHalfLife = 30 * time.Minute
const addrContractsHotEvictAfter = 6 * time.Hour
// RocksDB handle
type RocksDB struct {
@@ -76,6 +80,16 @@ type RocksDB struct {
connectBlockMux sync.Mutex
addrContractsCacheMux sync.Mutex
addrContractsCache map[string]*unpackedAddrContracts
addrContractsHotMux sync.Mutex
addrContractsHot map[string]*addrContractsHotEntry
addrContractsHotSeen map[string]struct{}
addrContractsHotBlock uint32
addrContractsHotLastTime int64
addrContractsCacheMinSizeBytes int
addrContractsCacheAlwaysBytes int
addrContractsCacheHotMinScore float64
addrContractsHotHalfLife time.Duration
addrContractsHotEvictAfter time.Duration
addrContractsCacheHit uint64
addrContractsCacheMiss uint64
addrContractsCacheSkipped uint64
@@ -162,7 +176,31 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha
}
wo := grocksdb.NewDefaultWriteOptions()
ro := grocksdb.NewDefaultReadOptions()
r := &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}, extendedIndex, sync.Mutex{}, sync.Mutex{}, make(map[string]*unpackedAddrContracts)}
r := &RocksDB{
path: path,
db: db,
wo: wo,
ro: ro,
cfh: cfh,
chainParser: parser,
is: nil,
metrics: metrics,
cache: c,
maxOpenFiles: maxOpenFiles,
cbs: connectBlockStats{},
extendedIndex: extendedIndex,
connectBlockMux: sync.Mutex{},
addrContractsCacheMux: sync.Mutex{},
addrContractsCache: make(map[string]*unpackedAddrContracts),
addrContractsHotMux: sync.Mutex{},
addrContractsHot: make(map[string]*addrContractsHotEntry),
addrContractsHotSeen: make(map[string]struct{}),
addrContractsCacheMinSizeBytes: addrContractsCacheMinSize,
addrContractsCacheAlwaysBytes: addrContractsCacheAlwaysSize,
addrContractsCacheHotMinScore: addrContractsCacheHotMinScore,
addrContractsHotHalfLife: addrContractsHotHalfLife,
addrContractsHotEvictAfter: addrContractsHotEvictAfter,
}
if chainType == bchain.ChainEthereumType {
go r.periodicStoreAddrContractsCache()
}
@@ -2103,6 +2141,7 @@ func (d *RocksDB) SetInconsistentState(inconsistent bool) error {
// SetInternalState sets the InternalState to be used by db to collect internal state
func (d *RocksDB) SetInternalState(is *common.InternalState) {
d.is = is
d.loadAddrContractsCacheConfigFromEnv()
}
// GetInternalState gets the InternalState

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"math"
"math/big"
"os"
"sort"
@@ -64,6 +65,11 @@ func (s *Ids) remove(id big.Int) {
}
}
type addrContractsHotEntry struct {
score float64
lastUpdateTime int64
}
type MultiTokenValues []bchain.MultiTokenValue
func (s *MultiTokenValues) sort() bool {
@@ -438,7 +444,7 @@ func addToContract(c *unpackedAddrContract, contractIndex int, index int32, cont
return index
}
func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.AddressDescriptor, btxID []byte, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts) error {
func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.AddressDescriptor, btxID []byte, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, blockHeight uint32, blockTime int64) error {
var err error
strAddrDesc := string(addrDesc)
ac, e := addressContracts[strAddrDesc]
@@ -455,6 +461,12 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address
} else {
d.cbs.balancesHit++
}
sizeBytes := 0
if ac.Packed != nil {
sizeBytes = len(ac.Packed)
}
hotScore := d.updateAddrContractsHotness(strAddrDesc, sizeBytes, blockHeight, blockTime)
d.maybeCacheAddrContracts(strAddrDesc, ac, sizeBytes, hotScore)
if contract == nil {
if addTxCount {
if index == internalTransferFrom || index == internalTransferTo {
@@ -492,6 +504,90 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address
return nil
}
func (d *RocksDB) updateAddrContractsHotness(addrKey string, sizeBytes int, blockHeight uint32, blockTime int64) float64 {
now := blockTime
if now <= 0 {
now = time.Now().Unix()
}
atomic.StoreInt64(&d.addrContractsHotLastTime, now)
if sizeBytes < d.addrContractsCacheMinSizeBytes {
return 0
}
d.addrContractsHotMux.Lock()
defer d.addrContractsHotMux.Unlock()
if d.addrContractsHotBlock != blockHeight {
d.addrContractsHotBlock = blockHeight
d.addrContractsHotSeen = make(map[string]struct{})
}
if _, seen := d.addrContractsHotSeen[addrKey]; seen {
if entry, ok := d.addrContractsHot[addrKey]; ok {
return entry.score
}
return 0
}
d.addrContractsHotSeen[addrKey] = struct{}{}
entry, ok := d.addrContractsHot[addrKey]
if !ok {
entry = &addrContractsHotEntry{score: 0, lastUpdateTime: now}
d.addrContractsHot[addrKey] = entry
}
if entry.lastUpdateTime > 0 && now > entry.lastUpdateTime {
delta := float64(now - entry.lastUpdateTime)
halfLife := d.addrContractsHotHalfLife.Seconds()
if halfLife > 0 {
decay := math.Exp(-math.Ln2 * delta / halfLife)
entry.score *= decay
}
}
if entry.lastUpdateTime > 0 && now < entry.lastUpdateTime {
now = entry.lastUpdateTime
}
entry.score++
entry.lastUpdateTime = now
return entry.score
}
func (d *RocksDB) maybeCacheAddrContracts(addrKey string, acs *unpackedAddrContracts, sizeBytes int, hotScore float64) {
if acs == nil || sizeBytes == 0 {
return
}
if sizeBytes < d.addrContractsCacheAlwaysBytes && (sizeBytes < d.addrContractsCacheMinSizeBytes || hotScore < d.addrContractsCacheHotMinScore) {
return
}
d.addrContractsCacheMux.Lock()
if _, found := d.addrContractsCache[addrKey]; !found {
d.addrContractsCache[addrKey] = acs
}
d.addrContractsCacheMux.Unlock()
}
func (d *RocksDB) evictAddrContractsHot(now int64) {
if d.addrContractsHotEvictAfter <= 0 {
return
}
if now <= 0 {
now = atomic.LoadInt64(&d.addrContractsHotLastTime)
if now <= 0 {
now = time.Now().Unix()
}
}
cutoff := now - int64(d.addrContractsHotEvictAfter.Seconds())
if cutoff <= 0 {
return
}
d.addrContractsHotMux.Lock()
for key, entry := range d.addrContractsHot {
if entry.lastUpdateTime > 0 && entry.lastUpdateTime < cutoff {
delete(d.addrContractsHot, key)
}
}
d.addrContractsHotMux.Unlock()
}
type ethBlockTxContract struct {
from, to, contract bchain.AddressDescriptor
transferStandard bchain.TokenStandard
@@ -519,7 +615,7 @@ type ethBlockTx struct {
internalData *ethInternalData
}
func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts) error {
func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, blockHeight uint32, blockTime int64) error {
var from, to bchain.AddressDescriptor
var err error
// there is only one output address in EthereumType transaction, store it in format txid 0
@@ -531,7 +627,7 @@ func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresse
glog.Warningf("rocksdb: processBaseTxData: %v, tx %v, output", err, tx.Txid)
}
} else {
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, transferTo, nil, nil, true, addresses, addressContracts); err != nil {
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, transferTo, nil, nil, true, addresses, addressContracts, blockHeight, blockTime); err != nil {
return err
}
blockTx.to = to
@@ -545,7 +641,7 @@ func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresse
glog.Warningf("rocksdb: processBaseTxData: %v, tx %v, input", err, tx.Txid)
}
} else {
if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, transferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil {
if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, transferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts, blockHeight, blockTime); err != nil {
return err
}
blockTx.from = from
@@ -570,7 +666,7 @@ func (d *RocksDB) setAddressTxIndexesToAddressMap(addrDesc bchain.AddressDescrip
}
// existingBlock signals that internal data are reconnected to already indexed block after they failed during standard sync
func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, existingBlock bool) error {
func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, existingBlock bool, blockHeight uint32, blockTime int64) error {
blockTx.internalData = &ethInternalData{
internalType: id.Type,
errorMsg: id.Error,
@@ -591,7 +687,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc
return err
}
}
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil {
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts, blockHeight, blockTime); err != nil {
return err
}
}
@@ -614,7 +710,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc
return err
}
}
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil {
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts, blockHeight, blockTime); err != nil {
return err
}
ito.to = to
@@ -630,7 +726,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc
return err
}
}
if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, internalTransferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil {
if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, internalTransferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts, blockHeight, blockTime); err != nil {
return err
}
ito.from = from
@@ -642,7 +738,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc
return nil
}
func (d *RocksDB) processContractTransfers(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts) error {
func (d *RocksDB) processContractTransfers(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, blockHeight uint32, blockTime int64) error {
tokenTransfers, err := d.chainParser.EthereumTypeGetTokenTransfersFromTx(tx)
if err != nil {
glog.Warningf("rocksdb: processContractTransfers %v, tx %v", err, tx.Txid)
@@ -661,11 +757,11 @@ func (d *RocksDB) processContractTransfers(blockTx *ethBlockTx, tx *bchain.Tx, a
glog.Warningf("rocksdb: processContractTransfers %v, tx %v, transfer %v", err, tx.Txid, t)
continue
}
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, int32(i), contract, t, true, addresses, addressContracts); err != nil {
if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, int32(i), contract, t, true, addresses, addressContracts, blockHeight, blockTime); err != nil {
return err
}
eq := bytes.Equal(from, to)
if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, ^int32(i), contract, t, !eq, addresses, addressContracts); err != nil {
if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, ^int32(i), contract, t, !eq, addresses, addressContracts, blockHeight, blockTime); err != nil {
return err
}
bc := &blockTx.contracts[i]
@@ -689,18 +785,18 @@ func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses ad
}
blockTx := &blockTxs[txi]
blockTx.btxID = btxID
if err = d.processBaseTxData(blockTx, tx, addresses, addressContracts); err != nil {
if err = d.processBaseTxData(blockTx, tx, addresses, addressContracts, block.Height, block.Time); err != nil {
return nil, err
}
// process internal data
eid, _ := tx.CoinSpecificData.(bchain.EthereumSpecificData)
if eid.InternalData != nil {
if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, false); err != nil {
if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, false, block.Height, block.Time); err != nil {
return nil, err
}
}
// store contract transfers
if err = d.processContractTransfers(blockTx, tx, addresses, addressContracts); err != nil {
if err = d.processContractTransfers(blockTx, tx, addresses, addressContracts, block.Height, block.Time); err != nil {
return nil, err
}
}
@@ -734,7 +830,7 @@ func (d *RocksDB) ReconnectInternalDataToBlockEthereumType(block *bchain.Block)
blockTx := &blockTxs[txi]
blockTx.btxID = btxID
tx.BlockHeight = block.Height
if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, true); err != nil {
if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, true, block.Height, block.Time); err != nil {
return err
}
}
@@ -1681,11 +1777,6 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor
return nil, nil
}
rv, err = partiallyUnpackAddrContracts(buf)
if err == nil && rv != nil && len(buf) > addrContractsCacheMinSize {
d.addrContractsCacheMux.Lock()
d.addrContractsCache[string(addrDesc)] = rv
d.addrContractsCacheMux.Unlock()
}
return rv, err
}
@@ -1878,6 +1969,7 @@ func (d *RocksDB) periodicStoreAddrContractsCache() {
timer.Reset(period)
d.storeAddrContractsCache()
d.logAddrContractsCacheMetrics(period)
d.evictAddrContractsHot(0)
}
}

View File

@@ -10,6 +10,22 @@ Some behavior of Blockbook can be modified by environment variables. The variabl
- `<coin shortcut>_ALLOWED_RPC_CALL_TO` - Addresses to which `rpcCall` websocket requests can be made, as a comma-separated list. If omitted, `rpcCall` is enabled for all addresses.
- `<coin shortcut>_ADDR_CONTRACTS_CACHE_MIN_SIZE`
Default: `300000`
Description: Minimum packed size (bytes) to consider addressContracts hotness/caching. Accepts bytes or `K/M/G/T` suffixes (e.g. `300000`, `300K`, `1MiB`).
- `<coin shortcut>_ADDR_CONTRACTS_CACHE_ALWAYS_SIZE`
Default: `1000000`
Description: Always cache addressContracts above this packed size (bytes). Accepts bytes or `K/M/G/T` suffixes.
- `<coin shortcut>_ADDR_CONTRACTS_CACHE_HOT_MIN_SCORE`
Default: `2`
Description: Hotness score threshold for caching (float).
- `<coin shortcut>_ADDR_CONTRACTS_CACHE_HOT_HALF_LIFE`
Default: `30m`
Description: EWMA halflife for hotness decay (duration, e.g. `30m`, `2h`).
- `<coin shortcut>_ADDR_CONTRACTS_CACHE_HOT_EVICT_AFTER`
Default: `6h`
Description: Evict hotness entries not updated for this duration (e.g. `6h`).
## Build-time variables
- `BB_RPC_URL_HTTP_<coin alias>` - Overrides `ipc.rpc_url_template` during package/config generation so build and