diff --git a/blockbook.go b/blockbook.go index f253d828..83a71ee7 100644 --- a/blockbook.go +++ b/blockbook.go @@ -149,11 +149,26 @@ func main() { } defer index.Close() + common.IS, err = index.LoadInternalState() + if err != nil { + glog.Fatal("internalState: ", err) + } + if common.IS.DbState != common.DbStateClosed { + glog.Warning("internalState: database in not closed state ", common.IS.DbState, ", possibly previous ungraceful shutdown") + } + syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics) if err != nil { glog.Fatalf("NewSyncWorker %v", err) } + // set the DbState to open at this moment, after all important workers are initialized + common.IS.DbState = common.DbStateOpen + err = index.StoreInternalState(common.IS) + if err != nil { + glog.Fatal("internalState: ", err) + } + if *rollbackHeight >= 0 { bestHeight, bestHash, err := index.GetBestBlock() if err != nil { diff --git a/common/internalstate.go b/common/internalstate.go new file mode 100644 index 00000000..5827d8e6 --- /dev/null +++ b/common/internalstate.go @@ -0,0 +1,106 @@ +package common + +import ( + "encoding/json" + "sync" + "time" +) + +const ( + // DbStateClosed means db was closed gracefully + DbStateClosed = uint32(iota) + // DbStateOpen means db is open or application died without closing the db + DbStateOpen +) + +// InternalStateColumn contains the data of a db column +type InternalStateColumn struct { + Name string `json:"name"` + Version uint32 `json:"version"` + Rows int64 `json:"rows"` + KeysSum int64 `json:"keysSum"` + ValuesSum int64 `json:"valuesSum"` +} + +// InternalState contains the data of the internal state +type InternalState struct { + mux sync.Mutex + + DbState uint32 `json:"dbState"` + + LastStore time.Time `json:"lastStore"` + + IsSynchronized bool `json:"isSynchronized"` + BestHeight uint32 `json:"bestHeight"` + LastSync time.Time `json:"lastSync"` + + IsMempoolSynchronized bool `json:"isMempoolSynchronized"` + LastMempoolSync time.Time `json:"lastMempoolSync"` + + DbColumns []InternalStateColumn `json:"dbColumns"` +} + +// IS is a singleton holding internal state of the application +var IS *InternalState + +func (is *InternalState) StartedSync() { + is.mux.Lock() + defer is.mux.Unlock() + is.IsSynchronized = false +} + +func (is *InternalState) FinishedSync(bestHeight uint32) { + is.mux.Lock() + defer is.mux.Unlock() + is.IsSynchronized = true + is.BestHeight = bestHeight + is.LastSync = time.Now() +} + +func (is *InternalState) GetSyncState() (bool, uint32, time.Time) { + is.mux.Lock() + defer is.mux.Unlock() + return is.IsSynchronized, is.BestHeight, is.LastSync +} + +func (is *InternalState) StartedMempoolSync() { + is.mux.Lock() + defer is.mux.Unlock() + is.IsMempoolSynchronized = false +} + +func (is *InternalState) FinishedMempoolSync() { + is.mux.Lock() + defer is.mux.Unlock() + is.IsMempoolSynchronized = true + is.LastMempoolSync = time.Now() +} + +func (is *InternalState) GetMempoolSyncState() (bool, time.Time) { + is.mux.Lock() + defer is.mux.Unlock() + return is.IsMempoolSynchronized, is.LastMempoolSync +} + +func (is *InternalState) AddDBColumnStats(c int, rowsDiff int64, keysSumDiff int64, valuesSumDiff int64) { + is.mux.Lock() + defer is.mux.Unlock() + is.DbColumns[c].Rows += rowsDiff + is.DbColumns[c].KeysSum += keysSumDiff + is.DbColumns[c].ValuesSum += valuesSumDiff +} + +func (is *InternalState) Pack() ([]byte, error) { + is.mux.Lock() + defer is.mux.Unlock() + is.LastStore = time.Now() + return json.Marshal(is) +} + +func UnpackInternalState(buf []byte) (*InternalState, error) { + var is InternalState + if err := json.Unmarshal(buf, &is); err != nil { + return nil, err + } + return &is, nil +} diff --git a/db/rocksdb.go b/db/rocksdb.go index 12432834..97e8fb84 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -2,6 +2,7 @@ package db import ( "blockbook/bchain" + "blockbook/common" "bytes" "encoding/binary" "encoding/hex" @@ -109,15 +110,25 @@ func (d *RocksDB) closeDB() error { h.Destroy() } d.db.Close() + d.db = nil return nil } // Close releases the RocksDB environment opened in NewRocksDB. func (d *RocksDB) Close() error { - glog.Infof("rocksdb: close") - d.closeDB() - d.wo.Destroy() - d.ro.Destroy() + if d.db != nil { + // store the internal state of the app + if common.IS.DbState == common.DbStateOpen { + common.IS.DbState = common.DbStateClosed + if err := d.StoreInternalState(common.IS); err != nil { + glog.Infof("internalState: ", err) + } + } + glog.Infof("rocksdb: close") + d.closeDB() + d.wo.Destroy() + d.ro.Destroy() + } return nil } @@ -860,6 +871,54 @@ func (d *RocksDB) DeleteTx(txid string) error { return d.db.DeleteCF(d.wo, d.cfh[cfTransactions], key) } +// internal state +const internalStateKey = "internalState" + +// LoadInternalState loads from db internal state or initializes a new one if not yet stored +func (d *RocksDB) LoadInternalState() (*common.InternalState, error) { + val, err := d.db.GetCF(d.ro, d.cfh[cfDefault], []byte(internalStateKey)) + if err != nil { + return nil, err + } + defer val.Free() + data := val.Data() + var is *common.InternalState + if len(data) == 0 { + is = &common.InternalState{} + } else { + is, err = common.UnpackInternalState(data) + if err != nil { + return nil, err + } + } + // make sure that column stats match the columns + sc := is.DbColumns + nc := make([]common.InternalStateColumn, len(cfNames)) + for i := 0; i < len(nc); i++ { + nc[i].Name = cfNames[i] + for j := 0; j < len(sc); j++ { + if sc[j].Name == nc[i].Name { + nc[i].Version = sc[j].Version + nc[i].Rows = sc[j].Rows + nc[i].KeysSum = sc[j].KeysSum + nc[i].ValuesSum = sc[j].ValuesSum + break + } + } + } + is.DbColumns = nc + return is, nil +} + +// StoreInternalState stores the internal state to db +func (d *RocksDB) StoreInternalState(is *common.InternalState) error { + buf, err := is.Pack() + if err != nil { + return err + } + return d.db.PutCF(d.wo, d.cfh[cfDefault], []byte(internalStateKey), buf) +} + // Helpers func packAddressKey(addrID []byte, height uint32) []byte {