mirror of
https://github.com/trezor/blockbook.git
synced 2026-02-20 00:51:39 +01:00
Add parallel connect of blocks for EthereumType coins
This commit is contained in:
@@ -345,7 +345,7 @@ func mainWithExitCode() int {
|
|||||||
until := uint32(*blockUntil)
|
until := uint32(*blockUntil)
|
||||||
|
|
||||||
if !*synchronize {
|
if !*synchronize {
|
||||||
if err = syncWorker.ConnectBlocksParallel(height, until); err != nil {
|
if err = syncWorker.BulkConnectBlocks(height, until); err != nil {
|
||||||
if err != db.ErrOperationInterrupted {
|
if err != db.ErrOperationInterrupted {
|
||||||
glog.Error("connectBlocksParallel ", err)
|
glog.Error("connectBlocksParallel ", err)
|
||||||
return exitCodeFatal
|
return exitCodeFatal
|
||||||
|
|||||||
213
db/sync.go
213
db/sync.go
@@ -153,7 +153,8 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
|
|||||||
// if parallel operation is enabled and the number of blocks to be connected is large,
|
// if parallel operation is enabled and the number of blocks to be connected is large,
|
||||||
// use parallel routine to load majority of blocks
|
// use parallel routine to load majority of blocks
|
||||||
// use parallel sync only in case of initial sync because it puts the db to inconsistent state
|
// use parallel sync only in case of initial sync because it puts the db to inconsistent state
|
||||||
if w.syncWorkers > 1 && initialSync {
|
// or in case of ChainEthereumType if the tip is farther
|
||||||
|
if w.syncWorkers > 1 && (initialSync || w.chain.GetChainParser().GetChainType() == bchain.ChainEthereumType) {
|
||||||
remoteBestHeight, err := w.chain.GetBestBlockHeight()
|
remoteBestHeight, err := w.chain.GetBestBlockHeight()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -162,15 +163,30 @@ func (w *SyncWorker) resyncIndex(onNewBlock bchain.OnNewBlockFunc, initialSync b
|
|||||||
glog.Error("resync: error - remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight)
|
glog.Error("resync: error - remote best height ", remoteBestHeight, " less than sync start height ", w.startHeight)
|
||||||
return errors.New("resync: remote best height error")
|
return errors.New("resync: remote best height error")
|
||||||
}
|
}
|
||||||
if remoteBestHeight-w.startHeight > uint32(w.syncChunk) {
|
if initialSync {
|
||||||
glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers)
|
if remoteBestHeight-w.startHeight > uint32(w.syncChunk) {
|
||||||
err = w.ConnectBlocksParallel(w.startHeight, remoteBestHeight)
|
glog.Infof("resync: bulk sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, w.syncWorkers)
|
||||||
if err != nil {
|
err = w.BulkConnectBlocks(w.startHeight, remoteBestHeight)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// after parallel load finish the sync using standard way,
|
||||||
|
// new blocks may have been created in the meantime
|
||||||
|
return w.resyncIndex(onNewBlock, initialSync)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if w.chain.GetChainParser().GetChainType() == bchain.ChainEthereumType {
|
||||||
|
syncWorkers := uint32(4)
|
||||||
|
if remoteBestHeight-w.startHeight >= syncWorkers {
|
||||||
|
glog.Infof("resync: parallel sync of blocks %d-%d, using %d workers", w.startHeight, remoteBestHeight, syncWorkers)
|
||||||
|
err = w.ParallelConnectBlocks(onNewBlock, w.startHeight, remoteBestHeight, syncWorkers)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// after parallel load finish the sync using standard way,
|
||||||
|
// new blocks may have been created in the meantime
|
||||||
|
return w.resyncIndex(onNewBlock, initialSync)
|
||||||
}
|
}
|
||||||
// after parallel load finish the sync using standard way,
|
|
||||||
// new blocks may have been created in the meantime
|
|
||||||
return w.resyncIndex(onNewBlock, initialSync)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = w.connectBlocks(onNewBlock, initialSync)
|
err = w.connectBlocks(onNewBlock, initialSync)
|
||||||
@@ -184,7 +200,7 @@ func (w *SyncWorker) handleFork(localBestHeight uint32, localBestHash string, on
|
|||||||
// find forked blocks, disconnect them and then synchronize again
|
// find forked blocks, disconnect them and then synchronize again
|
||||||
var height uint32
|
var height uint32
|
||||||
hashes := []string{localBestHash}
|
hashes := []string{localBestHash}
|
||||||
for height = localBestHeight - 1; height >= 0; height-- {
|
for height = localBestHeight - 1; ; height-- {
|
||||||
local, err := w.db.GetBlockHash(height)
|
local, err := w.db.GetBlockHash(height)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -271,12 +287,140 @@ func (w *SyncWorker) connectBlocks(onNewBlock bchain.OnNewBlockFunc, initialSync
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnectBlocksParallel uses parallel goroutines to get data from blockchain daemon
|
type hashHeight struct {
|
||||||
func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
hash string
|
||||||
type hashHeight struct {
|
height uint32
|
||||||
hash string
|
}
|
||||||
height uint32
|
|
||||||
|
// ParallelConnectBlocks uses parallel goroutines to get data from blockchain daemon but keeps Blockbook in
|
||||||
|
func (w *SyncWorker) ParallelConnectBlocks(onNewBlock bchain.OnNewBlockFunc, lower, higher uint32, syncWorkers uint32) error {
|
||||||
|
var err error
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
bch := make([]chan *bchain.Block, syncWorkers)
|
||||||
|
for i := 0; i < int(syncWorkers); i++ {
|
||||||
|
bch[i] = make(chan *bchain.Block)
|
||||||
}
|
}
|
||||||
|
hch := make(chan hashHeight, syncWorkers)
|
||||||
|
hchClosed := atomic.Value{}
|
||||||
|
hchClosed.Store(false)
|
||||||
|
writeBlockDone := make(chan struct{})
|
||||||
|
terminating := make(chan struct{})
|
||||||
|
writeBlockWorker := func() {
|
||||||
|
defer close(writeBlockDone)
|
||||||
|
lastBlock := lower - 1
|
||||||
|
WriteBlockLoop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case b := <-bch[(lastBlock+1)%syncWorkers]:
|
||||||
|
if b == nil {
|
||||||
|
// channel is closed and empty - work is done
|
||||||
|
break WriteBlockLoop
|
||||||
|
}
|
||||||
|
if b.Height != lastBlock+1 {
|
||||||
|
glog.Fatal("writeBlockWorker skipped block, expected block ", lastBlock+1, ", new block ", b.Height)
|
||||||
|
}
|
||||||
|
err := w.db.ConnectBlock(b)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatal("writeBlockWorker ", b.Height, " ", b.Hash, " error ", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if onNewBlock != nil {
|
||||||
|
onNewBlock(b.Hash, b.Height)
|
||||||
|
}
|
||||||
|
w.metrics.BlockbookBestHeight.Set(float64(b.Height))
|
||||||
|
|
||||||
|
if b.Height > 0 && b.Height%1000 == 0 {
|
||||||
|
glog.Info("connected block ", b.Height, " ", b.Hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
lastBlock = b.Height
|
||||||
|
case <-terminating:
|
||||||
|
break WriteBlockLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("sync: ParallelConnectBlocks.Close error ", err)
|
||||||
|
}
|
||||||
|
glog.Info("WriteBlock exiting...")
|
||||||
|
}
|
||||||
|
for i := 0; i < int(syncWorkers); i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go w.getBlockWorker(i, syncWorkers, &wg, hch, bch, &hchClosed, terminating)
|
||||||
|
}
|
||||||
|
go writeBlockWorker()
|
||||||
|
var hash string
|
||||||
|
ConnectLoop:
|
||||||
|
for h := lower; h <= higher; {
|
||||||
|
select {
|
||||||
|
case <-w.chanOsSignal:
|
||||||
|
glog.Info("connectBlocksParallel interrupted at height ", h)
|
||||||
|
err = ErrOperationInterrupted
|
||||||
|
// signal all workers to terminate their loops (error loops are interrupted below)
|
||||||
|
close(terminating)
|
||||||
|
break ConnectLoop
|
||||||
|
default:
|
||||||
|
hash, err = w.chain.GetBlockHash(h)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("GetBlockHash error ", err)
|
||||||
|
w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc()
|
||||||
|
time.Sleep(time.Millisecond * 500)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
hch <- hashHeight{hash, h}
|
||||||
|
h++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(hch)
|
||||||
|
// signal stop to workers that are in a error loop
|
||||||
|
hchClosed.Store(true)
|
||||||
|
// wait for workers and close bch that will stop writer loop
|
||||||
|
wg.Wait()
|
||||||
|
for i := 0; i < int(syncWorkers); i++ {
|
||||||
|
close(bch[i])
|
||||||
|
}
|
||||||
|
<-writeBlockDone
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *SyncWorker) getBlockWorker(i int, syncWorkers uint32, wg *sync.WaitGroup, hch chan hashHeight, bch []chan *bchain.Block, hchClosed *atomic.Value, terminating chan struct{}) {
|
||||||
|
defer wg.Done()
|
||||||
|
var err error
|
||||||
|
var block *bchain.Block
|
||||||
|
GetBlockLoop:
|
||||||
|
for hh := range hch {
|
||||||
|
for {
|
||||||
|
block, err = w.chain.GetBlock(hh.hash, hh.height)
|
||||||
|
if err != nil {
|
||||||
|
// signal came while looping in the error loop
|
||||||
|
if hchClosed.Load() == true {
|
||||||
|
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == bchain.ErrBlockNotFound {
|
||||||
|
glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...")
|
||||||
|
} else {
|
||||||
|
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...")
|
||||||
|
}
|
||||||
|
w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc()
|
||||||
|
time.Sleep(time.Millisecond * 500)
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if w.dryRun {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case bch[hh.height%syncWorkers] <- block:
|
||||||
|
case <-terminating:
|
||||||
|
break GetBlockLoop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.Info("getBlockWorker ", i, " exiting...")
|
||||||
|
}
|
||||||
|
|
||||||
|
// BulkConnectBlocks uses parallel goroutines to get data from blockchain daemon
|
||||||
|
func (w *SyncWorker) BulkConnectBlocks(lower, higher uint32) error {
|
||||||
var err error
|
var err error
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
bch := make([]chan *bchain.Block, w.syncWorkers)
|
bch := make([]chan *bchain.Block, w.syncWorkers)
|
||||||
@@ -322,45 +466,9 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
|
|||||||
}
|
}
|
||||||
glog.Info("WriteBlock exiting...")
|
glog.Info("WriteBlock exiting...")
|
||||||
}
|
}
|
||||||
getBlockWorker := func(i int) {
|
|
||||||
defer wg.Done()
|
|
||||||
var err error
|
|
||||||
var block *bchain.Block
|
|
||||||
GetBlockLoop:
|
|
||||||
for hh := range hch {
|
|
||||||
for {
|
|
||||||
block, err = w.chain.GetBlock(hh.hash, hh.height)
|
|
||||||
if err != nil {
|
|
||||||
// signal came while looping in the error loop
|
|
||||||
if hchClosed.Load() == true {
|
|
||||||
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Exiting...")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err == bchain.ErrBlockNotFound {
|
|
||||||
glog.Error("getBlockWorker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err, ". Retrying...")
|
|
||||||
} else {
|
|
||||||
glog.Error("getBlockWorker ", i, " connect block error ", err, ". Retrying...")
|
|
||||||
}
|
|
||||||
w.metrics.IndexResyncErrors.With(common.Labels{"error": "failure"}).Inc()
|
|
||||||
time.Sleep(time.Millisecond * 500)
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if w.dryRun {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case bch[hh.height%uint32(w.syncWorkers)] <- block:
|
|
||||||
case <-terminating:
|
|
||||||
break GetBlockLoop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
glog.Info("getBlockWorker ", i, " exiting...")
|
|
||||||
}
|
|
||||||
for i := 0; i < w.syncWorkers; i++ {
|
for i := 0; i < w.syncWorkers; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go getBlockWorker(i)
|
go w.getBlockWorker(i, uint32(w.syncWorkers), &wg, hch, bch, &hchClosed, terminating)
|
||||||
}
|
}
|
||||||
go writeBlockWorker()
|
go writeBlockWorker()
|
||||||
var hash string
|
var hash string
|
||||||
@@ -422,7 +530,6 @@ func (w *SyncWorker) getBlockChain(out chan blockResult, done chan struct{}) {
|
|||||||
hash := w.startHash
|
hash := w.startHash
|
||||||
height := w.startHeight
|
height := w.startHeight
|
||||||
prevHash := ""
|
prevHash := ""
|
||||||
|
|
||||||
// loop until error ErrBlockNotFound
|
// loop until error ErrBlockNotFound
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|||||||
Reference in New Issue
Block a user