diff --git a/db/addrcontracts_cache_config.go b/db/addrcontracts_cache_config.go new file mode 100644 index 00000000..e772b806 --- /dev/null +++ b/db/addrcontracts_cache_config.go @@ -0,0 +1,153 @@ +package db + +import ( + "os" + "strconv" + "strings" + "time" + + "github.com/golang/glog" +) + +func (d *RocksDB) loadAddrContractsCacheConfigFromEnv() { + if d.is == nil { + return + } + network := strings.ToUpper(d.is.GetNetwork()) + if network == "" { + return + } + + if v, ok := lookupEnvInt(network+"_ADDR_CONTRACTS_CACHE_MIN_SIZE", d.addrContractsCacheMinSizeBytes); ok { + d.addrContractsCacheMinSizeBytes = v + } + if v, ok := lookupEnvInt(network+"_ADDR_CONTRACTS_CACHE_ALWAYS_SIZE", d.addrContractsCacheAlwaysBytes); ok { + d.addrContractsCacheAlwaysBytes = v + } + if v, ok := lookupEnvFloat(network+"_ADDR_CONTRACTS_CACHE_HOT_MIN_SCORE", d.addrContractsCacheHotMinScore); ok { + d.addrContractsCacheHotMinScore = v + } + if v, ok := lookupEnvDuration(network+"_ADDR_CONTRACTS_CACHE_HOT_HALF_LIFE", d.addrContractsHotHalfLife); ok { + d.addrContractsHotHalfLife = v + } + if v, ok := lookupEnvDuration(network+"_ADDR_CONTRACTS_CACHE_HOT_EVICT_AFTER", d.addrContractsHotEvictAfter); ok { + d.addrContractsHotEvictAfter = v + } +} + +func lookupEnvInt(key string, current int) (int, bool) { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return current, false + } + if v, ok := parseSizeBytes(raw); ok { + glog.Infof("address cache: env %s=%s", key, raw) + return v, true + } + glog.Warningf("address cache: invalid %s=%s (expected bytes or K/M/G suffix)", key, raw) + return current, false +} + +func lookupEnvFloat(key string, current float64) (float64, bool) { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return current, false + } + v, err := strconv.ParseFloat(raw, 64) + if err != nil { + glog.Warningf("address cache: invalid %s=%s (expected float)", key, raw) + return current, false + } + glog.Infof("address cache: env %s=%s", key, raw) + return v, true +} + +func lookupEnvDuration(key string, current time.Duration) (time.Duration, bool) { + raw := strings.TrimSpace(os.Getenv(key)) + if raw == "" { + return current, false + } + if v, ok := parseDuration(raw); ok { + glog.Infof("address cache: env %s=%s", key, raw) + return v, true + } + glog.Warningf("address cache: invalid %s=%s (expected duration, e.g. 30m)", key, raw) + return current, false +} + +func parseDuration(raw string) (time.Duration, bool) { + if d, err := time.ParseDuration(raw); err == nil { + return d, true + } + if v, err := strconv.Atoi(raw); err == nil { + return time.Duration(v) * time.Minute, true + } + return 0, false +} + +func parseSizeBytes(raw string) (int, bool) { + s := strings.TrimSpace(raw) + if s == "" { + return 0, false + } + mult := int64(1) + upper := strings.ToUpper(s) + switch { + case strings.HasSuffix(upper, "KIB"): + mult = 1 << 10 + s = s[:len(s)-3] + case strings.HasSuffix(upper, "MIB"): + mult = 1 << 20 + s = s[:len(s)-3] + case strings.HasSuffix(upper, "GIB"): + mult = 1 << 30 + s = s[:len(s)-3] + case strings.HasSuffix(upper, "TIB"): + mult = 1 << 40 + s = s[:len(s)-3] + case strings.HasSuffix(upper, "KB"): + mult = 1 << 10 + s = s[:len(s)-2] + case strings.HasSuffix(upper, "MB"): + mult = 1 << 20 + s = s[:len(s)-2] + case strings.HasSuffix(upper, "GB"): + mult = 1 << 30 + s = s[:len(s)-2] + case strings.HasSuffix(upper, "TB"): + mult = 1 << 40 + s = s[:len(s)-2] + default: + last := upper[len(upper)-1] + switch last { + case 'K': + mult = 1 << 10 + s = s[:len(s)-1] + case 'M': + mult = 1 << 20 + s = s[:len(s)-1] + case 'G': + mult = 1 << 30 + s = s[:len(s)-1] + case 'T': + mult = 1 << 40 + s = s[:len(s)-1] + } + } + s = strings.TrimSpace(s) + if s == "" { + return 0, false + } + v, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0, false + } + bytes := int64(v * float64(mult)) + if bytes <= 0 { + return 0, false + } + if bytes > int64(^uint(0)>>1) { + return 0, false + } + return int(bytes), true +} diff --git a/db/rocksdb.go b/db/rocksdb.go index c7e7de86..e33f97b8 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -58,6 +58,10 @@ const ( ) const addrContractsCacheMinSize = 300_000 // limit for caching address contracts in memory to speed up indexing +const addrContractsCacheAlwaysSize = 1_000_000 +const addrContractsCacheHotMinScore = 2.0 +const addrContractsHotHalfLife = 30 * time.Minute +const addrContractsHotEvictAfter = 6 * time.Hour // RocksDB handle type RocksDB struct { @@ -76,6 +80,16 @@ type RocksDB struct { connectBlockMux sync.Mutex addrContractsCacheMux sync.Mutex addrContractsCache map[string]*unpackedAddrContracts + addrContractsHotMux sync.Mutex + addrContractsHot map[string]*addrContractsHotEntry + addrContractsHotSeen map[string]struct{} + addrContractsHotBlock uint32 + addrContractsHotLastTime int64 + addrContractsCacheMinSizeBytes int + addrContractsCacheAlwaysBytes int + addrContractsCacheHotMinScore float64 + addrContractsHotHalfLife time.Duration + addrContractsHotEvictAfter time.Duration addrContractsCacheHit uint64 addrContractsCacheMiss uint64 addrContractsCacheSkipped uint64 @@ -162,7 +176,31 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha } wo := grocksdb.NewDefaultWriteOptions() ro := grocksdb.NewDefaultReadOptions() - r := &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}, extendedIndex, sync.Mutex{}, sync.Mutex{}, make(map[string]*unpackedAddrContracts)} + r := &RocksDB{ + path: path, + db: db, + wo: wo, + ro: ro, + cfh: cfh, + chainParser: parser, + is: nil, + metrics: metrics, + cache: c, + maxOpenFiles: maxOpenFiles, + cbs: connectBlockStats{}, + extendedIndex: extendedIndex, + connectBlockMux: sync.Mutex{}, + addrContractsCacheMux: sync.Mutex{}, + addrContractsCache: make(map[string]*unpackedAddrContracts), + addrContractsHotMux: sync.Mutex{}, + addrContractsHot: make(map[string]*addrContractsHotEntry), + addrContractsHotSeen: make(map[string]struct{}), + addrContractsCacheMinSizeBytes: addrContractsCacheMinSize, + addrContractsCacheAlwaysBytes: addrContractsCacheAlwaysSize, + addrContractsCacheHotMinScore: addrContractsCacheHotMinScore, + addrContractsHotHalfLife: addrContractsHotHalfLife, + addrContractsHotEvictAfter: addrContractsHotEvictAfter, + } if chainType == bchain.ChainEthereumType { go r.periodicStoreAddrContractsCache() } @@ -2103,6 +2141,7 @@ func (d *RocksDB) SetInconsistentState(inconsistent bool) error { // SetInternalState sets the InternalState to be used by db to collect internal state func (d *RocksDB) SetInternalState(is *common.InternalState) { d.is = is + d.loadAddrContractsCacheConfigFromEnv() } // GetInternalState gets the InternalState diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index 4fcb4920..13b72e0f 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/hex" "fmt" + "math" "math/big" "os" "sort" @@ -64,6 +65,11 @@ func (s *Ids) remove(id big.Int) { } } +type addrContractsHotEntry struct { + score float64 + lastUpdateTime int64 +} + type MultiTokenValues []bchain.MultiTokenValue func (s *MultiTokenValues) sort() bool { @@ -438,7 +444,7 @@ func addToContract(c *unpackedAddrContract, contractIndex int, index int32, cont return index } -func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.AddressDescriptor, btxID []byte, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts) error { +func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.AddressDescriptor, btxID []byte, index int32, contract bchain.AddressDescriptor, transfer *bchain.TokenTransfer, addTxCount bool, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, blockHeight uint32, blockTime int64) error { var err error strAddrDesc := string(addrDesc) ac, e := addressContracts[strAddrDesc] @@ -455,6 +461,12 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address } else { d.cbs.balancesHit++ } + sizeBytes := 0 + if ac.Packed != nil { + sizeBytes = len(ac.Packed) + } + hotScore := d.updateAddrContractsHotness(strAddrDesc, sizeBytes, blockHeight, blockTime) + d.maybeCacheAddrContracts(strAddrDesc, ac, sizeBytes, hotScore) if contract == nil { if addTxCount { if index == internalTransferFrom || index == internalTransferTo { @@ -492,6 +504,90 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address return nil } +func (d *RocksDB) updateAddrContractsHotness(addrKey string, sizeBytes int, blockHeight uint32, blockTime int64) float64 { + now := blockTime + if now <= 0 { + now = time.Now().Unix() + } + atomic.StoreInt64(&d.addrContractsHotLastTime, now) + if sizeBytes < d.addrContractsCacheMinSizeBytes { + return 0 + } + + d.addrContractsHotMux.Lock() + defer d.addrContractsHotMux.Unlock() + + if d.addrContractsHotBlock != blockHeight { + d.addrContractsHotBlock = blockHeight + d.addrContractsHotSeen = make(map[string]struct{}) + } + if _, seen := d.addrContractsHotSeen[addrKey]; seen { + if entry, ok := d.addrContractsHot[addrKey]; ok { + return entry.score + } + return 0 + } + d.addrContractsHotSeen[addrKey] = struct{}{} + + entry, ok := d.addrContractsHot[addrKey] + if !ok { + entry = &addrContractsHotEntry{score: 0, lastUpdateTime: now} + d.addrContractsHot[addrKey] = entry + } + + if entry.lastUpdateTime > 0 && now > entry.lastUpdateTime { + delta := float64(now - entry.lastUpdateTime) + halfLife := d.addrContractsHotHalfLife.Seconds() + if halfLife > 0 { + decay := math.Exp(-math.Ln2 * delta / halfLife) + entry.score *= decay + } + } + if entry.lastUpdateTime > 0 && now < entry.lastUpdateTime { + now = entry.lastUpdateTime + } + entry.score++ + entry.lastUpdateTime = now + return entry.score +} + +func (d *RocksDB) maybeCacheAddrContracts(addrKey string, acs *unpackedAddrContracts, sizeBytes int, hotScore float64) { + if acs == nil || sizeBytes == 0 { + return + } + if sizeBytes < d.addrContractsCacheAlwaysBytes && (sizeBytes < d.addrContractsCacheMinSizeBytes || hotScore < d.addrContractsCacheHotMinScore) { + return + } + d.addrContractsCacheMux.Lock() + if _, found := d.addrContractsCache[addrKey]; !found { + d.addrContractsCache[addrKey] = acs + } + d.addrContractsCacheMux.Unlock() +} + +func (d *RocksDB) evictAddrContractsHot(now int64) { + if d.addrContractsHotEvictAfter <= 0 { + return + } + if now <= 0 { + now = atomic.LoadInt64(&d.addrContractsHotLastTime) + if now <= 0 { + now = time.Now().Unix() + } + } + cutoff := now - int64(d.addrContractsHotEvictAfter.Seconds()) + if cutoff <= 0 { + return + } + d.addrContractsHotMux.Lock() + for key, entry := range d.addrContractsHot { + if entry.lastUpdateTime > 0 && entry.lastUpdateTime < cutoff { + delete(d.addrContractsHot, key) + } + } + d.addrContractsHotMux.Unlock() +} + type ethBlockTxContract struct { from, to, contract bchain.AddressDescriptor transferStandard bchain.TokenStandard @@ -519,7 +615,7 @@ type ethBlockTx struct { internalData *ethInternalData } -func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts) error { +func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, blockHeight uint32, blockTime int64) error { var from, to bchain.AddressDescriptor var err error // there is only one output address in EthereumType transaction, store it in format txid 0 @@ -531,7 +627,7 @@ func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresse glog.Warningf("rocksdb: processBaseTxData: %v, tx %v, output", err, tx.Txid) } } else { - if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, transferTo, nil, nil, true, addresses, addressContracts); err != nil { + if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, transferTo, nil, nil, true, addresses, addressContracts, blockHeight, blockTime); err != nil { return err } blockTx.to = to @@ -545,7 +641,7 @@ func (d *RocksDB) processBaseTxData(blockTx *ethBlockTx, tx *bchain.Tx, addresse glog.Warningf("rocksdb: processBaseTxData: %v, tx %v, input", err, tx.Txid) } } else { - if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, transferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil { + if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, transferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts, blockHeight, blockTime); err != nil { return err } blockTx.from = from @@ -570,7 +666,7 @@ func (d *RocksDB) setAddressTxIndexesToAddressMap(addrDesc bchain.AddressDescrip } // existingBlock signals that internal data are reconnected to already indexed block after they failed during standard sync -func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, existingBlock bool) error { +func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bchain.EthereumInternalData, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, existingBlock bool, blockHeight uint32, blockTime int64) error { blockTx.internalData = ðInternalData{ internalType: id.Type, errorMsg: id.Error, @@ -591,7 +687,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc return err } } - if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil { + if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts, blockHeight, blockTime); err != nil { return err } } @@ -614,7 +710,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc return err } } - if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts); err != nil { + if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, internalTransferTo, nil, nil, true, addresses, addressContracts, blockHeight, blockTime); err != nil { return err } ito.to = to @@ -630,7 +726,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc return err } } - if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, internalTransferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts); err != nil { + if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, internalTransferFrom, nil, nil, !bytes.Equal(from, to), addresses, addressContracts, blockHeight, blockTime); err != nil { return err } ito.from = from @@ -642,7 +738,7 @@ func (d *RocksDB) processInternalData(blockTx *ethBlockTx, tx *bchain.Tx, id *bc return nil } -func (d *RocksDB) processContractTransfers(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts) error { +func (d *RocksDB) processContractTransfers(blockTx *ethBlockTx, tx *bchain.Tx, addresses addressesMap, addressContracts map[string]*unpackedAddrContracts, blockHeight uint32, blockTime int64) error { tokenTransfers, err := d.chainParser.EthereumTypeGetTokenTransfersFromTx(tx) if err != nil { glog.Warningf("rocksdb: processContractTransfers %v, tx %v", err, tx.Txid) @@ -661,11 +757,11 @@ func (d *RocksDB) processContractTransfers(blockTx *ethBlockTx, tx *bchain.Tx, a glog.Warningf("rocksdb: processContractTransfers %v, tx %v, transfer %v", err, tx.Txid, t) continue } - if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, int32(i), contract, t, true, addresses, addressContracts); err != nil { + if err = d.addToAddressesAndContractsEthereumType(to, blockTx.btxID, int32(i), contract, t, true, addresses, addressContracts, blockHeight, blockTime); err != nil { return err } eq := bytes.Equal(from, to) - if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, ^int32(i), contract, t, !eq, addresses, addressContracts); err != nil { + if err = d.addToAddressesAndContractsEthereumType(from, blockTx.btxID, ^int32(i), contract, t, !eq, addresses, addressContracts, blockHeight, blockTime); err != nil { return err } bc := &blockTx.contracts[i] @@ -689,18 +785,18 @@ func (d *RocksDB) processAddressesEthereumType(block *bchain.Block, addresses ad } blockTx := &blockTxs[txi] blockTx.btxID = btxID - if err = d.processBaseTxData(blockTx, tx, addresses, addressContracts); err != nil { + if err = d.processBaseTxData(blockTx, tx, addresses, addressContracts, block.Height, block.Time); err != nil { return nil, err } // process internal data eid, _ := tx.CoinSpecificData.(bchain.EthereumSpecificData) if eid.InternalData != nil { - if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, false); err != nil { + if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, false, block.Height, block.Time); err != nil { return nil, err } } // store contract transfers - if err = d.processContractTransfers(blockTx, tx, addresses, addressContracts); err != nil { + if err = d.processContractTransfers(blockTx, tx, addresses, addressContracts, block.Height, block.Time); err != nil { return nil, err } } @@ -734,7 +830,7 @@ func (d *RocksDB) ReconnectInternalDataToBlockEthereumType(block *bchain.Block) blockTx := &blockTxs[txi] blockTx.btxID = btxID tx.BlockHeight = block.Height - if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, true); err != nil { + if err = d.processInternalData(blockTx, tx, eid.InternalData, addresses, addressContracts, true, block.Height, block.Time); err != nil { return err } } @@ -1681,11 +1777,6 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor return nil, nil } rv, err = partiallyUnpackAddrContracts(buf) - if err == nil && rv != nil && len(buf) > addrContractsCacheMinSize { - d.addrContractsCacheMux.Lock() - d.addrContractsCache[string(addrDesc)] = rv - d.addrContractsCacheMux.Unlock() - } return rv, err } @@ -1878,6 +1969,7 @@ func (d *RocksDB) periodicStoreAddrContractsCache() { timer.Reset(period) d.storeAddrContractsCache() d.logAddrContractsCacheMetrics(period) + d.evictAddrContractsHot(0) } } diff --git a/docs/env.md b/docs/env.md index da924ba3..272b92e1 100644 --- a/docs/env.md +++ b/docs/env.md @@ -10,6 +10,22 @@ Some behavior of Blockbook can be modified by environment variables. The variabl - `_ALLOWED_RPC_CALL_TO` - Addresses to which `rpcCall` websocket requests can be made, as a comma-separated list. If omitted, `rpcCall` is enabled for all addresses. +- `_ADDR_CONTRACTS_CACHE_MIN_SIZE` + Default: `300000` + Description: Minimum packed size (bytes) to consider addressContracts hotness/caching. Accepts bytes or `K/M/G/T` suffixes (e.g. `300000`, `300K`, `1MiB`). +- `_ADDR_CONTRACTS_CACHE_ALWAYS_SIZE` + Default: `1000000` + Description: Always cache addressContracts above this packed size (bytes). Accepts bytes or `K/M/G/T` suffixes. +- `_ADDR_CONTRACTS_CACHE_HOT_MIN_SCORE` + Default: `2` + Description: Hotness score threshold for caching (float). +- `_ADDR_CONTRACTS_CACHE_HOT_HALF_LIFE` + Default: `30m` + Description: EWMA half‑life for hotness decay (duration, e.g. `30m`, `2h`). +- `_ADDR_CONTRACTS_CACHE_HOT_EVICT_AFTER` + Default: `6h` + Description: Evict hotness entries not updated for this duration (e.g. `6h`). + ## Build-time variables - `BB_RPC_URL_HTTP_` - Overrides `ipc.rpc_url_template` during package/config generation so build and