mirror of
https://github.com/trezor/blockbook.git
synced 2026-02-20 00:51:39 +01:00
caching improvements cleanup
This commit is contained in:
@@ -2,6 +2,7 @@ package db
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -23,7 +24,7 @@ type addrContractsCacheState struct {
|
||||
hotEvictAfter time.Duration
|
||||
flushIdle time.Duration
|
||||
flushMaxAge time.Duration
|
||||
enabled bool
|
||||
enabled atomic.Bool
|
||||
|
||||
hit uint64
|
||||
miss uint64
|
||||
|
||||
@@ -185,9 +185,9 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha
|
||||
hotEvictAfter: addrContractsHotEvictAfter,
|
||||
flushIdle: addrContractsCacheFlushIdle,
|
||||
flushMaxAge: addrContractsCacheFlushMaxAge,
|
||||
enabled: true,
|
||||
},
|
||||
}
|
||||
r.addrContractsCacheState.enabled.Store(true)
|
||||
if chainType == bchain.ChainEthereumType {
|
||||
go r.periodicStoreAddrContractsCache()
|
||||
}
|
||||
@@ -2132,9 +2132,7 @@ func (d *RocksDB) SetInternalState(is *common.InternalState) {
|
||||
}
|
||||
|
||||
func (d *RocksDB) setAddrContractsCacheEnabled(enabled bool) {
|
||||
d.addrContractsCacheState.cacheMux.Lock()
|
||||
d.addrContractsCacheState.enabled = enabled
|
||||
d.addrContractsCacheState.cacheMux.Unlock()
|
||||
d.addrContractsCacheState.enabled.Store(enabled)
|
||||
}
|
||||
|
||||
// GetInternalState gets the InternalState
|
||||
|
||||
@@ -475,10 +475,14 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address
|
||||
if ac.Packed != nil {
|
||||
sizeBytes = len(ac.Packed)
|
||||
}
|
||||
if d.addrContractsCacheState.enabled {
|
||||
if d.addrContractsCacheState.enabled.Load() {
|
||||
hotScore := d.updateAddrContractsHotness(strAddrDesc, sizeBytes, blockHeight, now)
|
||||
d.maybeCacheAddrContracts(strAddrDesc, ac, sizeBytes, hotScore, now)
|
||||
}
|
||||
cacheLocked := d.lockAddrContractsCacheIfPresent(strAddrDesc)
|
||||
if cacheLocked {
|
||||
defer d.addrContractsCacheState.cacheMux.Unlock()
|
||||
}
|
||||
if contract == nil {
|
||||
if addTxCount {
|
||||
if index == internalTransferFrom || index == internalTransferTo {
|
||||
@@ -513,8 +517,12 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address
|
||||
if !counted {
|
||||
ac.TotalTxs++
|
||||
}
|
||||
if d.addrContractsCacheState.enabled {
|
||||
d.markAddrContractsCacheDirty(strAddrDesc, sizeBytes, now)
|
||||
if d.addrContractsCacheState.enabled.Load() {
|
||||
if cacheLocked {
|
||||
d.markAddrContractsCacheDirtyLocked(strAddrDesc, now)
|
||||
} else {
|
||||
d.markAddrContractsCacheDirty(strAddrDesc, now)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -581,13 +589,16 @@ func (d *RocksDB) maybeCacheAddrContracts(addrKey string, acs *unpackedAddrContr
|
||||
d.addrContractsCacheState.cacheMux.Unlock()
|
||||
}
|
||||
|
||||
func (d *RocksDB) markAddrContractsCacheDirty(addrKey string, sizeBytes int, now int64) {
|
||||
func (d *RocksDB) markAddrContractsCacheDirty(addrKey string, now int64) {
|
||||
d.addrContractsCacheState.cacheMux.Lock()
|
||||
_, cached := d.addrContractsCacheState.cache[addrKey]
|
||||
if !cached {
|
||||
d.addrContractsCacheState.cacheMux.Unlock()
|
||||
defer d.addrContractsCacheState.cacheMux.Unlock()
|
||||
if _, cached := d.addrContractsCacheState.cache[addrKey]; !cached {
|
||||
return
|
||||
}
|
||||
d.markAddrContractsCacheDirtyLocked(addrKey, now)
|
||||
}
|
||||
|
||||
func (d *RocksDB) markAddrContractsCacheDirtyLocked(addrKey string, now int64) {
|
||||
// Track dirty state for adaptive flush decisions.
|
||||
meta, ok := d.addrContractsCacheState.cacheMeta[addrKey]
|
||||
if !ok {
|
||||
@@ -602,14 +613,25 @@ func (d *RocksDB) markAddrContractsCacheDirty(addrKey string, sizeBytes int, now
|
||||
}
|
||||
meta.dirty = true
|
||||
meta.lastUpdateTime = now
|
||||
}
|
||||
|
||||
func (d *RocksDB) lockAddrContractsCacheIfPresent(addrKey string) bool {
|
||||
if !d.addrContractsCacheState.enabled.Load() {
|
||||
return false
|
||||
}
|
||||
d.addrContractsCacheState.cacheMux.Lock()
|
||||
if _, cached := d.addrContractsCacheState.cache[addrKey]; cached {
|
||||
return true
|
||||
}
|
||||
d.addrContractsCacheState.cacheMux.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
func (d *RocksDB) evictAddrContractsHot(now int64) {
|
||||
if d.addrContractsCacheState.hotEvictAfter <= 0 {
|
||||
return
|
||||
}
|
||||
if !d.addrContractsCacheState.enabled {
|
||||
if !d.addrContractsCacheState.enabled.Load() {
|
||||
return
|
||||
}
|
||||
if now <= 0 {
|
||||
@@ -1469,6 +1491,10 @@ func (d *RocksDB) disconnectAddress(btxID []byte, internal bool, addrDesc bchain
|
||||
}
|
||||
}
|
||||
if addrContracts != nil {
|
||||
cacheLocked := d.lockAddrContractsCacheIfPresent(s)
|
||||
if cacheLocked {
|
||||
defer d.addrContractsCacheState.cacheMux.Unlock()
|
||||
}
|
||||
if !ftx {
|
||||
addrContracts.TotalTxs--
|
||||
}
|
||||
@@ -1518,6 +1544,14 @@ func (d *RocksDB) disconnectAddress(btxID []byte, internal bool, addrDesc bchain
|
||||
}
|
||||
}
|
||||
}
|
||||
if d.addrContractsCacheState.enabled.Load() {
|
||||
now := time.Now().Unix()
|
||||
if cacheLocked {
|
||||
d.markAddrContractsCacheDirtyLocked(s, now)
|
||||
} else {
|
||||
d.markAddrContractsCacheDirty(s, now)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !isZeroAddress(addrDesc) {
|
||||
glog.Warning("AddressContracts ", addrDesc, " not found, tx ", hex.EncodeToString(btxID))
|
||||
@@ -1956,7 +1990,10 @@ func (d *RocksDB) storeUnpackedAddressContracts(wb *grocksdb.WriteBatch, acm map
|
||||
d.addrContractsCacheState.cacheMux.Unlock()
|
||||
} else {
|
||||
// do not store large address contracts found in cache
|
||||
if _, found := d.addrContractsCacheState.cache[addrDesc]; !found {
|
||||
d.addrContractsCacheState.cacheMux.Lock()
|
||||
_, found := d.addrContractsCacheState.cache[addrDesc]
|
||||
d.addrContractsCacheState.cacheMux.Unlock()
|
||||
if !found {
|
||||
buf := packUnpackedAddrContracts(acs)
|
||||
wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf)
|
||||
writes++
|
||||
@@ -1978,7 +2015,7 @@ func (d *RocksDB) storeUnpackedAddressContracts(wb *grocksdb.WriteBatch, acm map
|
||||
|
||||
type cacheFlushEntry struct {
|
||||
addrDesc string
|
||||
acs *unpackedAddrContracts
|
||||
buf []byte
|
||||
lastUpdateTime int64
|
||||
}
|
||||
|
||||
@@ -2016,9 +2053,10 @@ func (d *RocksDB) writeContractsCache(force bool) (int, uint64) {
|
||||
continue
|
||||
}
|
||||
if d.shouldFlushAddrContracts(meta, now, force) {
|
||||
buf := packUnpackedAddrContracts(acs)
|
||||
flushEntries = append(flushEntries, cacheFlushEntry{
|
||||
addrDesc: addrDesc,
|
||||
acs: acs,
|
||||
buf: buf,
|
||||
lastUpdateTime: meta.lastUpdateTime,
|
||||
})
|
||||
}
|
||||
@@ -2028,10 +2066,9 @@ func (d *RocksDB) writeContractsCache(force bool) (int, uint64) {
|
||||
return 0, 0
|
||||
}
|
||||
for _, entry := range flushEntries {
|
||||
buf := packUnpackedAddrContracts(entry.acs)
|
||||
wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(entry.addrDesc), buf)
|
||||
wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(entry.addrDesc), entry.buf)
|
||||
entries++
|
||||
bytes += uint64(len(buf))
|
||||
bytes += uint64(len(entry.buf))
|
||||
}
|
||||
if err := d.WriteBatch(wb); err != nil {
|
||||
glog.Error("writeContractsCache: failed to store addrContractsCache: ", err)
|
||||
@@ -2068,7 +2105,7 @@ func (d *RocksDB) periodicStoreAddrContractsCache() {
|
||||
for {
|
||||
<-timer.C
|
||||
timer.Reset(period)
|
||||
if !d.addrContractsCacheState.enabled {
|
||||
if !d.addrContractsCacheState.enabled.Load() {
|
||||
continue
|
||||
}
|
||||
d.storeAddrContractsCache(false)
|
||||
@@ -2078,7 +2115,7 @@ func (d *RocksDB) periodicStoreAddrContractsCache() {
|
||||
}
|
||||
|
||||
func (d *RocksDB) logAddrContractsCacheMetrics(period time.Duration) {
|
||||
if !d.addrContractsCacheState.enabled {
|
||||
if !d.addrContractsCacheState.enabled.Load() {
|
||||
return
|
||||
}
|
||||
hits := atomic.SwapUint64(&d.addrContractsCacheState.hit, 0)
|
||||
|
||||
Reference in New Issue
Block a user