diff --git a/blockbook.go b/blockbook.go index 6675aec3..fa0cfdd7 100644 --- a/blockbook.go +++ b/blockbook.go @@ -345,7 +345,7 @@ func mainWithExitCode() int { until := uint32(*blockUntil) if !*synchronize { - if err = syncWorker.ConnectBlocksParallel(height, until); err != nil { + if err = syncWorker.BulkConnectBlocks(height, until); err != nil { if err != db.ErrOperationInterrupted { glog.Error("connectBlocksParallel ", err) return exitCodeFatal diff --git a/db/sync.go b/db/sync.go index 5c3edc52..e0ba75fc 100644 --- a/db/sync.go +++ b/db/sync.go @@ -153,7 +153,8 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b // if parallel operation is enabled and the number of blocks to be connected is large, // use parallel routine to load majority of blocks // use parallel sync only in case of initial sync because it puts the db to inconsistent state - if w.syncWorkers > 1 && initialSync { + // or in case of ChainEthereumType if the tip is farther + if w.syncWorkers > 1 && (initialSync || w.chain.GetChainParser().GetChainType() == bchain.ChainEthereumType) { remoteBestHeight, err := w.chain.GetBestBlockHeight() if err != nil { return err @@ -162,15 +163,30 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b glog.Error("resync: error - remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight) return errors.New("resync: remote best height error") } - if remoteBestHeight-w.startHeight > uint32(w.syncChunk) { - glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers) - err = w.ConnectBlocksParallel(w.startHeight, remoteBestHeight) - if err != nil { - return err + if initialSync { + if remoteBestHeight-w.startHeight > uint32(w.syncChunk) { + glog.Infof("resync: bulk sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers) + err = w.BulkConnectBlocks(w.startHeight, remoteBestHeight) + if err != nil { + return err + } + // after parallel load finish the sync using standard way, + // new blocks may have been created in the meantime + return w.resyncIndex(onNewBlock, initialSync) + } + } + if w.chain.GetChainParser().GetChainType() == bchain.ChainEthereumType { + syncWorkers := uint32(4) + if remoteBestHeight-w.startHeight >= syncWorkers { + glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, syncWorkers) + err = w.ParallelConnectBlocks(onNewBlock, w.startHeight, remoteBestHeight, syncWorkers) + if err != nil { + return err + } + // after parallel load finish the sync using standard way, + // new blocks may have been created in the meantime + return w.resyncIndex(onNewBlock, initialSync) } - // after parallel load finish the sync using standard way, - // new blocks may have been created in the meantime - return w.resyncIndex(onNewBlock, initialSync) } } err = w.connectBlocks(onNewBlock, initialSync) @@ -184,7 +200,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on // find forked blocks, disconnect them and then synchronize again var height uint32 hashes := []string{localBestHash} - for height = localBestHeight - 1; height >= 0; height-- { + for height = localBestHeight - 1; ; height-- { local, err := w.db.GetBlockHash(height) if err != nil { return err @@ -271,12 +287,140 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync return nil } -// ConnectBlocksParallel uses parallel goroutines to get data from blockchain daemon -func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { - type hashHeight struct { - hash string - height uint32 +type hashHeight struct { + hash string + height uint32 +} + +// ParallelConnectBlocks uses parallel goroutines to get data from blockchain daemon but keeps Blockbook in +func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, lower, higher uint32, syncWorkers uint32) error { + var err error + var wg sync.WaitGroup + bch := make([]chan *bchain.Block, syncWorkers) + for i := 0; i < int(syncWorkers); i++ { + bch[i] = make(chan *bchain.Block) } + hch := make(chan hashHeight, syncWorkers) + hchClosed := atomic.Value{} + hchClosed.Store(false) + writeBlockDone := make(chan struct{}) + terminating := make(chan struct{}) + writeBlockWorker := func() { + defer close(writeBlockDone) + lastBlock := lower - 1 + WriteBlockLoop: + for { + select { + case b := <-bch[(lastBlock+1)%syncWorkers]: + if b == nil { + // channel is closed and empty - work is done + break WriteBlockLoop + } + if b.Height != lastBlock+1 { + glog.Fatal("writeBlockWorker skipped block, expected block ", lastBlock+1, ", new block ", b.Height) + } + err := w.db.ConnectBlock(b) + if err != nil { + glog.Fatal("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err) + } + + if onNewBlock != nil { + onNewBlock(b.Hash, b.Height) + } + w.metrics.BlockbookBestHeight.Set(float64(b.Height)) + + if b.Height > 0 && b.Height%1000 == 0 { + glog.Info("connected block ", b.Height, " ", b.Hash) + } + + lastBlock = b.Height + case <-terminating: + break WriteBlockLoop + } + } + if err != nil { + glog.Error("sync: ParallelConnectBlocks.Close error ", err) + } + glog.Info("WriteBlock exiting...") + } + for i := 0; i < int(syncWorkers); i++ { + wg.Add(1) + go w.getBlockWorker(i, syncWorkers, &wg, hch, bch, &hchClosed, terminating) + } + go writeBlockWorker() + var hash string +ConnectLoop: + for h := lower; h <= higher; { + select { + case <-w.chanOsSignal: + glog.Info("connectBlocksParallel interrupted at height ", h) + err = ErrOperationInterrupted + // signal all workers to terminate their loops (error loops are interrupted below) + close(terminating) + break ConnectLoop + default: + hash, err = w.chain.GetBlockHash(h) + if err != nil { + glog.Error("GetBlockHash error ", err) + w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() + time.Sleep(time.Millisecond * 500) + continue + } + hch <- hashHeight{hash, h} + h++ + } + } + close(hch) + // signal stop to workers that are in a error loop + hchClosed.Store(true) + // wait for workers and close bch that will stop writer loop + wg.Wait() + for i := 0; i < int(syncWorkers); i++ { + close(bch[i]) + } + <-writeBlockDone + return err +} + +func (w *SyncWorker) getBlockWorker(i int, syncWorkers uint32, wg *sync.WaitGroup, hch chan hashHeight, bch []chan *bchain.Block, hchClosed *atomic.Value, terminating chan struct{}) { + defer wg.Done() + var err error + var block *bchain.Block +GetBlockLoop: + for hh := range hch { + for { + block, err = w.chain.GetBlock(hh.hash, hh.height) + if err != nil { + // signal came while looping in the error loop + if hchClosed.Load() == true { + glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...") + return + } + if err == bchain.ErrBlockNotFound { + glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...") + } else { + glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...") + } + w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() + time.Sleep(time.Millisecond * 500) + } else { + break + } + } + if w.dryRun { + continue + } + select { + case bch[hh.height%syncWorkers] <- block: + case <-terminating: + break GetBlockLoop + } + } + glog.Info("getBlockWorker ", i, " exiting...") +} + +// BulkConnectBlocks uses parallel goroutines to get data from blockchain daemon +func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error { var err error var wg sync.WaitGroup bch := make([]chan *bchain.Block, w.syncWorkers) @@ -322,45 +466,9 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error { } glog.Info("WriteBlock exiting...") } - getBlockWorker := func(i int) { - defer wg.Done() - var err error - var block *bchain.Block - GetBlockLoop: - for hh := range hch { - for { - block, err = w.chain.GetBlock(hh.hash, hh.height) - if err != nil { - // signal came while looping in the error loop - if hchClosed.Load() == true { - glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...") - return - } - if err == bchain.ErrBlockNotFound { - glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...") - } else { - glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...") - } - w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc() - time.Sleep(time.Millisecond * 500) - } else { - break - } - } - if w.dryRun { - continue - } - select { - case bch[hh.height%uint32(w.syncWorkers)] <- block: - case <-terminating: - break GetBlockLoop - } - } - glog.Info("getBlockWorker ", i, " exiting...") - } for i := 0; i < w.syncWorkers; i++ { wg.Add(1) - go getBlockWorker(i) + go w.getBlockWorker(i, uint32(w.syncWorkers), &wg, hch, bch, &hchClosed, terminating) } go writeBlockWorker() var hash string @@ -422,7 +530,6 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) { hash := w.startHash height := w.startHeight prevHash := "" - // loop until error ErrBlockNotFound for { select {