Add AddrContractsCache to speed up indexing

This commit is contained in:
Martin Boehm
2025-09-01 20:03:15 +02:00
committed by Martin
parent a1f7bacbc1
commit db2d8cd248
3 changed files with 78 additions and 18 deletions

View File

@@ -504,7 +504,7 @@ func (b *EthereumRPC) getBestHeader() (bchain.EVMHeader, error) {
// UpdateBestHeader keeps track of the latest block header confirmed on chain
func (b *EthereumRPC) UpdateBestHeader(h bchain.EVMHeader) {
glog.V(2).Info("rpc: new block header ", h.Number())
glog.V(2).Info("rpc: new block header ", h.Number().Uint64())
b.bestHeaderLock.Lock()
b.bestHeader = h
b.bestHeaderTime = time.Now()

View File

@@ -57,21 +57,25 @@ const (
addressBalanceDetailUTXOIndexed = 2
)
const addrContractsCacheMinSize = 300_000 // limit for caching address contracts in memory to speed up indexing
// RocksDB handle
type RocksDB struct {
path string
db *grocksdb.DB
wo *grocksdb.WriteOptions
ro *grocksdb.ReadOptions
cfh []*grocksdb.ColumnFamilyHandle
chainParser bchain.BlockChainParser
is *common.InternalState
metrics *common.Metrics
cache *grocksdb.Cache
maxOpenFiles int
cbs connectBlockStats
extendedIndex bool
connectBlockMux sync.Mutex
path string
db *grocksdb.DB
wo *grocksdb.WriteOptions
ro *grocksdb.ReadOptions
cfh []*grocksdb.ColumnFamilyHandle
chainParser bchain.BlockChainParser
is *common.InternalState
metrics *common.Metrics
cache *grocksdb.Cache
maxOpenFiles int
cbs connectBlockStats
extendedIndex bool
connectBlockMux sync.Mutex
addrContractsCacheMux sync.Mutex
addrContractsCache map[string]*unpackedAddrContracts
}
const (
@@ -150,7 +154,11 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha
}
wo := grocksdb.NewDefaultWriteOptions()
ro := grocksdb.NewDefaultReadOptions()
return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}, extendedIndex, sync.Mutex{}}, nil
r := &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}, extendedIndex, sync.Mutex{}, sync.Mutex{}, make(map[string]*unpackedAddrContracts)}
if chainType == bchain.ChainEthereumType {
go r.periodicStoreAddrContractsCache()
}
return r, nil
}
func (d *RocksDB) closeDB() error {
@@ -165,6 +173,10 @@ func (d *RocksDB) closeDB() error {
// Close releases the RocksDB environment opened in NewRocksDB.
func (d *RocksDB) Close() error {
if d.db != nil {
// store cached address contracts
if d.chainParser.GetChainType() == bchain.ChainEthereumType {
d.storeAddrContractsCache()
}
// store the internal state of the app
if d.is != nil && d.is.DbState == common.DbStateOpen {
d.is.DbState = common.DbStateClosed

View File

@@ -7,6 +7,7 @@ import (
"os"
"sort"
"sync"
"time"
vlq "github.com/bsm/go-vlq"
"github.com/golang/glog"
@@ -1660,6 +1661,12 @@ func (s *unpackedMultiTokenValues) upsert(m bchain.MultiTokenValue, index int32,
// getUnpackedAddrDescContracts returns partially unpacked AddrContracts for given addrDesc
func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor) (*unpackedAddrContracts, error) {
d.addrContractsCacheMux.Lock()
rv, found := d.addrContractsCache[string(addrDesc)]
d.addrContractsCacheMux.Unlock()
if found && rv != nil {
return rv, nil
}
val, err := d.db.GetCF(d.ro, d.cfh[cfAddressContracts], addrDesc)
if err != nil {
return nil, err
@@ -1669,7 +1676,13 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor
if len(buf) == 0 {
return nil, nil
}
return partiallyUnpackAddrContracts(buf)
rv, err = partiallyUnpackAddrContracts(buf)
if err == nil && rv != nil && len(buf) > addrContractsCacheMinSize {
d.addrContractsCacheMux.Lock()
d.addrContractsCache[string(addrDesc)] = rv
d.addrContractsCacheMux.Unlock()
}
return rv, err
}
// to speed up import of blocks, the unpacking of big ints is deferred to time when they are needed
@@ -1797,9 +1810,44 @@ func (d *RocksDB) storeUnpackedAddressContracts(wb *grocksdb.WriteBatch, acm map
if acs == nil || (acs.NonContractTxs == 0 && acs.InternalTxs == 0 && len(acs.Contracts) == 0) {
wb.DeleteCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc))
} else {
buf := packUnpackedAddrContracts(acs)
wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf)
// do not store large address contracts found in cache
if _, found := d.addrContractsCache[addrDesc]; !found {
buf := packUnpackedAddrContracts(acs)
wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf)
}
}
}
return nil
}
func (d *RocksDB) writeContractsCache() {
wb := grocksdb.NewWriteBatch()
defer wb.Destroy()
d.addrContractsCacheMux.Lock()
for addrDesc, acs := range d.addrContractsCache {
buf := packUnpackedAddrContracts(acs)
wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf)
}
d.addrContractsCacheMux.Unlock()
if err := d.WriteBatch(wb); err != nil {
glog.Error("writeContractsCache: failed to store addrContractsCache: ", err)
}
}
func (d *RocksDB) storeAddrContractsCache() {
start := time.Now()
if len(d.addrContractsCache) > 0 {
d.writeContractsCache()
}
glog.Info("storeAddrContractsCache: store ", len(d.addrContractsCache), " entries in ", time.Since(start))
}
func (d *RocksDB) periodicStoreAddrContractsCache() {
period := time.Duration(5) * time.Minute
timer := time.NewTimer(period)
for {
<-timer.C
timer.Reset(period)
d.storeAddrContractsCache()
}
}