diff --git a/bchain/coins/eth/ethrpc.go b/bchain/coins/eth/ethrpc.go index c9c235b6..c7963357 100644 --- a/bchain/coins/eth/ethrpc.go +++ b/bchain/coins/eth/ethrpc.go @@ -504,7 +504,7 @@ func (b *EthereumRPC) getBestHeader() (bchain.EVMHeader, error) { // UpdateBestHeader keeps track of the latest block header confirmed on chain func (b *EthereumRPC) UpdateBestHeader(h bchain.EVMHeader) { - glog.V(2).Info("rpc: new block header ", h.Number()) + glog.V(2).Info("rpc: new block header ", h.Number().Uint64()) b.bestHeaderLock.Lock() b.bestHeader = h b.bestHeaderTime = time.Now() diff --git a/db/rocksdb.go b/db/rocksdb.go index f13bf29c..9fa6517f 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -57,21 +57,25 @@ const ( addressBalanceDetailUTXOIndexed = 2 ) +const addrContractsCacheMinSize = 300_000 // limit for caching address contracts in memory to speed up indexing + // RocksDB handle type RocksDB struct { - path string - db *grocksdb.DB - wo *grocksdb.WriteOptions - ro *grocksdb.ReadOptions - cfh []*grocksdb.ColumnFamilyHandle - chainParser bchain.BlockChainParser - is *common.InternalState - metrics *common.Metrics - cache *grocksdb.Cache - maxOpenFiles int - cbs connectBlockStats - extendedIndex bool - connectBlockMux sync.Mutex + path string + db *grocksdb.DB + wo *grocksdb.WriteOptions + ro *grocksdb.ReadOptions + cfh []*grocksdb.ColumnFamilyHandle + chainParser bchain.BlockChainParser + is *common.InternalState + metrics *common.Metrics + cache *grocksdb.Cache + maxOpenFiles int + cbs connectBlockStats + extendedIndex bool + connectBlockMux sync.Mutex + addrContractsCacheMux sync.Mutex + addrContractsCache map[string]*unpackedAddrContracts } const ( @@ -150,7 +154,11 @@ func NewRocksDB(path string, cacheSize, maxOpenFiles int, parser bchain.BlockCha } wo := grocksdb.NewDefaultWriteOptions() ro := grocksdb.NewDefaultReadOptions() - return &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}, extendedIndex, sync.Mutex{}}, nil + r := &RocksDB{path, db, wo, ro, cfh, parser, nil, metrics, c, maxOpenFiles, connectBlockStats{}, extendedIndex, sync.Mutex{}, sync.Mutex{}, make(map[string]*unpackedAddrContracts)} + if chainType == bchain.ChainEthereumType { + go r.periodicStoreAddrContractsCache() + } + return r, nil } func (d *RocksDB) closeDB() error { @@ -165,6 +173,10 @@ func (d *RocksDB) closeDB() error { // Close releases the RocksDB environment opened in NewRocksDB. func (d *RocksDB) Close() error { if d.db != nil { + // store cached address contracts + if d.chainParser.GetChainType() == bchain.ChainEthereumType { + d.storeAddrContractsCache() + } // store the internal state of the app if d.is != nil && d.is.DbState == common.DbStateOpen { d.is.DbState = common.DbStateClosed diff --git a/db/rocksdb_ethereumtype.go b/db/rocksdb_ethereumtype.go index bb2798e0..7d59825e 100644 --- a/db/rocksdb_ethereumtype.go +++ b/db/rocksdb_ethereumtype.go @@ -7,6 +7,7 @@ import ( "os" "sort" "sync" + "time" vlq "github.com/bsm/go-vlq" "github.com/golang/glog" @@ -1660,6 +1661,12 @@ func (s *unpackedMultiTokenValues) upsert(m bchain.MultiTokenValue, index int32, // getUnpackedAddrDescContracts returns partially unpacked AddrContracts for given addrDesc func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor) (*unpackedAddrContracts, error) { + d.addrContractsCacheMux.Lock() + rv, found := d.addrContractsCache[string(addrDesc)] + d.addrContractsCacheMux.Unlock() + if found && rv != nil { + return rv, nil + } val, err := d.db.GetCF(d.ro, d.cfh[cfAddressContracts], addrDesc) if err != nil { return nil, err @@ -1669,7 +1676,13 @@ func (d *RocksDB) getUnpackedAddrDescContracts(addrDesc bchain.AddressDescriptor if len(buf) == 0 { return nil, nil } - return partiallyUnpackAddrContracts(buf) + rv, err = partiallyUnpackAddrContracts(buf) + if err == nil && rv != nil && len(buf) > addrContractsCacheMinSize { + d.addrContractsCacheMux.Lock() + d.addrContractsCache[string(addrDesc)] = rv + d.addrContractsCacheMux.Unlock() + } + return rv, err } // to speed up import of blocks, the unpacking of big ints is deferred to time when they are needed @@ -1797,9 +1810,44 @@ func (d *RocksDB) storeUnpackedAddressContracts(wb *grocksdb.WriteBatch, acm map if acs == nil || (acs.NonContractTxs == 0 && acs.InternalTxs == 0 && len(acs.Contracts) == 0) { wb.DeleteCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc)) } else { - buf := packUnpackedAddrContracts(acs) - wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf) + // do not store large address contracts found in cache + if _, found := d.addrContractsCache[addrDesc]; !found { + buf := packUnpackedAddrContracts(acs) + wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf) + } } } return nil } + +func (d *RocksDB) writeContractsCache() { + wb := grocksdb.NewWriteBatch() + defer wb.Destroy() + d.addrContractsCacheMux.Lock() + for addrDesc, acs := range d.addrContractsCache { + buf := packUnpackedAddrContracts(acs) + wb.PutCF(d.cfh[cfAddressContracts], bchain.AddressDescriptor(addrDesc), buf) + } + d.addrContractsCacheMux.Unlock() + if err := d.WriteBatch(wb); err != nil { + glog.Error("writeContractsCache: failed to store addrContractsCache: ", err) + } +} + +func (d *RocksDB) storeAddrContractsCache() { + start := time.Now() + if len(d.addrContractsCache) > 0 { + d.writeContractsCache() + } + glog.Info("storeAddrContractsCache: store ", len(d.addrContractsCache), " entries in ", time.Since(start)) +} + +func (d *RocksDB) periodicStoreAddrContractsCache() { + period := time.Duration(5) * time.Minute + timer := time.NewTimer(period) + for { + <-timer.C + timer.Reset(period) + d.storeAddrContractsCache() + } +}