diff --git a/db/addrcontracts_cache_state.go b/db/addrcontracts_cache_state.go index e1c46456..7431af74 100644 --- a/db/addrcontracts_cache_state.go +++ b/db/addrcontracts_cache_state.go @@ -2,6 +2,7 @@ package db import ( "sync" + "sync/atomic" "time" ) @@ -23,7 +24,7 @@ type addrContractsCacheState struct { hotEvictAfter time.Duration flushIdle time.Duration flushMaxAge time.Duration - enabled bool + enabled atomic.Bool hit uint64 miss uint64 diff --git a/db/rocksdb.go b/db/rocksdb.go index 6488c082..c181ab20 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -185,9 +185,9 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha hotEvictAfter: addrContractsHotEvictAfter, flushIdle: addrContractsCacheFlushIdle, flushMaxAge: addrContractsCacheFlushMaxAge, - enabled: true, }, } + r.addrContractsCacheState.enabled.Store(true) if chainType == bchain.ChainEthereumType { go r.periodicStoreAddrContractsCache() } @@ -2132,9 +2132,7 @@ func (d *RocksDB) SetInternalState(is *common.InternalState) { } func (d *RocksDB) setAddrContractsCacheEnabled(enabled bool) { - d.addrContractsCacheState.cacheMux.Lock() - d.addrContractsCacheState.enabled = enabled - d.addrContractsCacheState.cacheMux.Unlock() + d.addrContractsCacheState.enabled.Store(enabled) } // GetInternalState gets the InternalState diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index 7cf7bd8e..6e46798d 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -475,10 +475,14 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address if ac.Packed != nil { sizeBytes = len(ac.Packed) } - if d.addrContractsCacheState.enabled { + if d.addrContractsCacheState.enabled.Load() { hotScore := d.updateAddrContractsHotness(strAddrDesc, sizeBytes, blockHeight, now) d.maybeCacheAddrContracts(strAddrDesc, ac, sizeBytes, hotScore, now) } + cacheLocked := d.lockAddrContractsCacheIfPresent(strAddrDesc) + if cacheLocked { + defer d.addrContractsCacheState.cacheMux.Unlock() + } if contract == nil { if addTxCount { if index == internalTransferFrom || index == internalTransferTo { @@ -513,8 +517,12 @@ func (d *RocksDB) addToAddressesAndContractsEthereumType(addrDesc bchain.Address if !counted { ac.TotalTxs++ } - if d.addrContractsCacheState.enabled { - d.markAddrContractsCacheDirty(strAddrDesc, sizeBytes, now) + if d.addrContractsCacheState.enabled.Load() { + if cacheLocked { + d.markAddrContractsCacheDirtyLocked(strAddrDesc, now) + } else { + d.markAddrContractsCacheDirty(strAddrDesc, now) + } } return nil } @@ -581,13 +589,16 @@ func (d *RocksDB) maybeCacheAddrContracts(addrKey string, acs *unpackedAddrContr d.addrContractsCacheState.cacheMux.Unlock() } -func (d *RocksDB) markAddrContractsCacheDirty(addrKey string, sizeBytes int, now int64) { +func (d *RocksDB) markAddrContractsCacheDirty(addrKey string, now int64) { d.addrContractsCacheState.cacheMux.Lock() - _, cached := d.addrContractsCacheState.cache[addrKey] - if !cached { - d.addrContractsCacheState.cacheMux.Unlock() + defer d.addrContractsCacheState.cacheMux.Unlock() + if _, cached := d.addrContractsCacheState.cache[addrKey]; !cached { return } + d.markAddrContractsCacheDirtyLocked(addrKey, now) +} + +func (d *RocksDB) markAddrContractsCacheDirtyLocked(addrKey string, now int64) { // Track dirty state for adaptive flush decisions. meta, ok := d.addrContractsCacheState.cacheMeta[addrKey] if !ok { @@ -602,14 +613,25 @@ func (d *RocksDB) markAddrContractsCacheDirty(addrKey string, sizeBytes int, now } meta.dirty = true meta.lastUpdateTime = now +} + +func (d *RocksDB) lockAddrContractsCacheIfPresent(addrKey string) bool { + if !d.addrContractsCacheState.enabled.Load() { + return false + } + d.addrContractsCacheState.cacheMux.Lock() + if _, cached := d.addrContractsCacheState.cache[addrKey]; cached { + return true + } d.addrContractsCacheState.cacheMux.Unlock() + return false } func (d *RocksDB) evictAddrContractsHot(now int64) { if d.addrContractsCacheState.hotEvictAfter <= 0 { return } - if !d.addrContractsCacheState.enabled { + if !d.addrContractsCacheState.enabled.Load() { return } if now <= 0 { @@ -1469,6 +1491,10 @@ func (d *RocksDB) disconnectAddress(btxID []byte, internal bool, addrDesc bchain } } if addrContracts != nil { + cacheLocked := d.lockAddrContractsCacheIfPresent(s) + if cacheLocked { + defer d.addrContractsCacheState.cacheMux.Unlock() + } if !ftx { addrContracts.TotalTxs-- } @@ -1518,6 +1544,14 @@ func (d *RocksDB) disconnectAddress(btxID []byte, internal bool, addrDesc bchain } } } + if d.addrContractsCacheState.enabled.Load() { + now := time.Now().Unix() + if cacheLocked { + d.markAddrContractsCacheDirtyLocked(s, now) + } else { + d.markAddrContractsCacheDirty(s, now) + } + } } else { if !isZeroAddress(addrDesc) { glog.Warning("AddressContracts ", addrDesc, " not found, tx ", hex.EncodeToString(btxID)) @@ -1956,7 +1990,10 @@ func (d *RocksDB) storeUnpackedAddressContracts(wb *grocksdb.WriteBatch, acm map d.addrContractsCacheState.cacheMux.Unlock() } else { // do not store large address contracts found in cache - if _, found := d.addrContractsCacheState.cache[addrDesc]; !found { + d.addrContractsCacheState.cacheMux.Lock() + _, found := d.addrContractsCacheState.cache[addrDesc] + d.addrContractsCacheState.cacheMux.Unlock() + if !found { buf := packUnpackedAddrContracts(acs) wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf) writes++ @@ -1978,7 +2015,7 @@ func (d *RocksDB) storeUnpackedAddressContracts(wb *grocksdb.WriteBatch, acm map type cacheFlushEntry struct { addrDesc string - acs *unpackedAddrContracts + buf []byte lastUpdateTime int64 } @@ -2016,9 +2053,10 @@ func (d *RocksDB) writeContractsCache(force bool) (int, uint64) { continue } if d.shouldFlushAddrContracts(meta, now, force) { + buf := packUnpackedAddrContracts(acs) flushEntries = append(flushEntries, cacheFlushEntry{ addrDesc: addrDesc, - acs: acs, + buf: buf, lastUpdateTime: meta.lastUpdateTime, }) } @@ -2028,10 +2066,9 @@ func (d *RocksDB) writeContractsCache(force bool) (int, uint64) { return 0, 0 } for _, entry := range flushEntries { - buf := packUnpackedAddrContracts(entry.acs) - wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(entry.addrDesc), buf) + wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(entry.addrDesc), entry.buf) entries++ - bytes += uint64(len(buf)) + bytes += uint64(len(entry.buf)) } if err := d.WriteBatch(wb); err != nil { glog.Error("writeContractsCache: failed to store addrContractsCache: ", err) @@ -2068,7 +2105,7 @@ func (d *RocksDB) periodicStoreAddrContractsCache() { for { <-timer.C timer.Reset(period) - if !d.addrContractsCacheState.enabled { + if !d.addrContractsCacheState.enabled.Load() { continue } d.storeAddrContractsCache(false) @@ -2078,7 +2115,7 @@ func (d *RocksDB) periodicStoreAddrContractsCache() { } func (d *RocksDB) logAddrContractsCacheMetrics(period time.Duration) { - if !d.addrContractsCacheState.enabled { + if !d.addrContractsCacheState.enabled.Load() { return } hits := atomic.SwapUint64(&d.addrContractsCacheState.hit, 0)