Sync using indexv2 - WIP

This commit is contained in:
Martin Boehm 2018-04-29 21:35:45 +02:00
parent 0a55ca61f6
commit df2a6b5551

View File

@ -175,6 +175,9 @@ func (w *SyncWorker) connectBlocks(onNewBlock func(hash string)) error {
if onNewBlock != nil { if onNewBlock != nil {
onNewBlock(res.block.Hash) onNewBlock(res.block.Hash)
} }
if res.block.Height > 0 && res.block.Height%1000 == 0 {
glog.Info("connected block ", res.block.Height, " ", res.block.Hash)
}
} }
if lastRes.block != nil { if lastRes.block != nil {
@ -191,15 +194,27 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
} }
var err error var err error
var wg sync.WaitGroup var wg sync.WaitGroup
bch := make(chan *bchain.Block, w.syncWorkers)
hch := make(chan hashHeight, w.syncWorkers) hch := make(chan hashHeight, w.syncWorkers)
hchClosed := atomic.Value{} hchClosed := atomic.Value{}
hchClosed.Store(false) hchClosed.Store(false)
lastConnectedBlock := int(lower) - 1 var getBlockMux sync.Mutex
connectedCh := make([]chan struct{}, w.syncWorkers) getBlockCond := sync.NewCond(&getBlockMux)
var connectedMux sync.Mutex
totalWaitDuration := time.Duration(0) totalWaitDuration := time.Duration(0)
totalWaitCount := 0 lastConnectedBlock := int(lower) - 1
work := func(i int) { writeBlockDone := make(chan struct{})
writeBlock := func() {
defer close(writeBlockDone)
for b := range bch {
// glog.Info("WriteBlock ", b.Height)
err = w.db.ConnectBlock(b)
if err != nil {
glog.Error("WriteBlock worker ", b.Height, " ", b.Hash, " error ", err)
}
}
glog.Info("WriteBlock exiting...")
}
getBlock := func(i int) {
defer wg.Done() defer wg.Done()
var err error var err error
var block *bchain.Block var block *bchain.Block
@ -222,59 +237,33 @@ func (w *SyncWorker) ConnectBlocksParallel(lower, higher uint32) error {
if w.dryRun { if w.dryRun {
continue continue
} }
// check if the block is the next in line to be connected start := time.Now()
// if not, wait for the previous block connect to complete getBlockMux.Lock()
chi := int(hh.height) % w.syncWorkers for {
waitForBlock := false if uint32(lastConnectedBlock+1) == hh.height {
waitDuration := time.Duration(0) lastConnectedBlock = int(hh.height)
glog.Info(i, " Going to connect block ", hh.height) // get data to writeBlock routine
connectedMux.Lock() // glog.Info("Worker ", i, " have block ", hh.height, ". Sending.")
if uint32(lastConnectedBlock+1) != hh.height { bch <- block
if connectedCh[chi] != nil { totalWaitDuration += time.Since(start)
glog.Fatal("Channel ", chi, " is not nil!") getBlockCond.Broadcast()
getBlockMux.Unlock()
break
} }
connectedCh[chi] = make(chan struct{}) // glog.Info("Worker ", i, " have block ", hh.height, ". Waiting.")
waitForBlock = true getBlockCond.Wait()
}
connectedMux.Unlock()
if waitForBlock {
start := time.Now()
glog.Info(i, " Waiting for block ", hh.height, " ", chi)
<-connectedCh[chi]
if hchClosed.Load() == true { if hchClosed.Load() == true {
glog.Error("Worker ", i, " connect block error ", err, ". Exiting...") break
return
} }
waitDuration = time.Since(start)
connectedCh[chi] = nil
} }
err = w.db.ConnectBlock(block)
if err != nil {
glog.Error("Worker ", i, " connect block ", hh.height, " ", hh.hash, " error ", err)
}
connectedMux.Lock()
if lastConnectedBlock < int(hh.height) {
lastConnectedBlock = int(hh.height)
}
chi = (chi + 1) % w.syncWorkers
if connectedCh[chi] != nil {
glog.Info(i, " closing channel ", chi)
close(connectedCh[chi])
connectedCh[chi] = nil
}
totalWaitDuration += waitDuration
if waitDuration > 0 {
totalWaitCount++
}
glog.Info("Connected block ", hh.height)
connectedMux.Unlock()
} }
glog.Info("Worker ", i, " exiting...") glog.Info("Worker ", i, " exiting...")
} }
for i := 0; i < w.syncWorkers; i++ { for i := 0; i < w.syncWorkers; i++ {
wg.Add(1) wg.Add(1)
go work(i) go getBlock(i)
} }
go writeBlock()
var hash string var hash string
ConnectLoop: ConnectLoop:
@ -293,7 +282,7 @@ ConnectLoop:
} }
hch <- hashHeight{hash, h} hch <- hashHeight{hash, h}
if h > 0 && h%1000 == 0 { if h > 0 && h%1000 == 0 {
glog.Info("connecting block ", h, " ", hash, " block wait time ", totalWaitDuration, " wait count ", totalWaitCount) glog.Info("connecting block ", h, " ", hash, " wait for writeBlock ", totalWaitDuration)
} }
h++ h++
} }
@ -301,70 +290,14 @@ ConnectLoop:
close(hch) close(hch)
// signal stop to workers that are in w.chain.GetBlockWithoutHeader error loop // signal stop to workers that are in w.chain.GetBlockWithoutHeader error loop
hchClosed.Store(true) hchClosed.Store(true)
connectedMux.Lock() // first wait for the getBlock routines to finish and then close bch channel
for _, ch := range connectedCh { getBlockCond.Broadcast()
if ch != nil {
close(ch)
}
}
connectedMux.Unlock()
wg.Wait() wg.Wait()
close(bch)
<-writeBlockDone
return err return err
} }
func (w *SyncWorker) connectBlockChunk(lower, higher uint32) error {
connected, err := w.isBlockConnected(higher)
if err != nil || connected {
// if higher is over the best block, continue with lower block, otherwise return error
if err != bchain.ErrBlockNotFound {
return err
}
}
height := lower
hash, err := w.chain.GetBlockHash(lower)
if err != nil {
return err
}
for height <= higher {
block, err := w.chain.GetBlock(hash, height)
if err != nil {
return err
}
hash = block.Next
height = block.Height + 1
if w.dryRun {
continue
}
err = w.db.ConnectBlock(block)
if err != nil {
return err
}
if block.Height%1000 == 0 {
glog.Info("connected block ", block.Height, " ", block.Hash)
go w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk()))
}
}
return nil
}
func (w *SyncWorker) isBlockConnected(height uint32) (bool, error) {
local, err := w.db.GetBlockHash(height)
if err != nil {
return false, err
}
remote, err := w.chain.GetBlockHash(height)
if err != nil {
return false, err
}
if local != remote {
return false, nil
}
return true, nil
}
type blockResult struct { type blockResult struct {
block *bchain.Block block *bchain.Block
err error err error