diff --git a/blockbook.go b/blockbook.go index cca19881..96cb61f2 100644 --- a/blockbook.go +++ b/blockbook.go @@ -81,6 +81,7 @@ var ( index *db.RocksDB txCache *db.TxCache syncWorker *db.SyncWorker + internalState *common.InternalState callbacksOnNewBlockHash []func(hash string) callbacksOnNewTxAddr []func(txid string, addr string) chanOsSignal chan os.Signal @@ -156,22 +157,23 @@ func main() { } defer index.Close() - common.IS, err = index.LoadInternalState(*coin) + internalState, err = newInternalState(*coin, index) if err != nil { glog.Fatal("internalState: ", err) } - if common.IS.DbState != common.DbStateClosed { - glog.Warning("internalState: database in not closed state ", common.IS.DbState, ", possibly previous ungraceful shutdown") + index.SetInternalState(internalState) + if internalState.DbState != common.DbStateClosed { + glog.Warning("internalState: database in not closed state ", internalState.DbState, ", possibly previous ungraceful shutdown") } - syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics) + syncWorker, err = db.NewSyncWorker(index, chain, *syncWorkers, *syncChunk, *blockFrom, *dryRun, chanOsSignal, metrics, internalState) if err != nil { glog.Fatalf("NewSyncWorker %v", err) } // set the DbState to open at this moment, after all important workers are initialized - common.IS.DbState = common.DbStateOpen - err = index.StoreInternalState(common.IS) + internalState.DbState = common.DbStateOpen + err = index.StoreInternalState(internalState) if err != nil { glog.Fatal("internalState: ", err) } @@ -242,7 +244,7 @@ func main() { var socketIoServer *server.SocketIoServer if *socketIoBinding != "" { socketIoServer, err = server.NewSocketIoServer( - *socketIoBinding, *certFiles, index, chain, txCache, *explorerURL, metrics) + *socketIoBinding, *certFiles, index, chain, txCache, *explorerURL, metrics, internalState) if err != nil { glog.Error("socketio: ", err) return @@ -304,6 +306,10 @@ func main() { } } +func newInternalState(coin string, d *db.RocksDB) (*common.InternalState, error) { + return d.LoadInternalState(coin) +} + func tickAndDebounce(tickTime time.Duration, debounceTime time.Duration, input chan struct{}, f func()) { timer := time.NewTimer(tickTime) var firstDebounce time.Time @@ -360,11 +366,11 @@ func syncMempoolLoop() { glog.Info("syncMempoolLoop starting") // resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second tickAndDebounce(resyncMempoolPeriodMs*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() { - common.IS.StartedMempoolSync() + internalState.StartedMempoolSync() if err := chain.ResyncMempool(onNewTxAddr); err != nil { glog.Error("syncMempoolLoop ", errors.ErrorStack(err)) } else { - common.IS.FinishedMempoolSync() + internalState.FinishedMempoolSync() } }) glog.Info("syncMempoolLoop stopped") @@ -374,7 +380,7 @@ func storeInternalStateLoop() { defer close(chanStoreInternalStateDone) glog.Info("storeInternalStateLoop starting") tickAndDebounce(storeInternalStatePeriodMs*time.Millisecond, (storeInternalStatePeriodMs-1)*time.Millisecond, chanStoreInternalState, func() { - if err := index.StoreInternalState(common.IS); err != nil { + if err := index.StoreInternalState(internalState); err != nil { glog.Error("storeInternalStateLoop ", errors.ErrorStack(err)) } }) diff --git a/common/internalstate.go b/common/internalstate.go index fcbd7ccf..0b1573e8 100644 --- a/common/internalstate.go +++ b/common/internalstate.go @@ -42,9 +42,6 @@ type InternalState struct { DbColumns []InternalStateColumn `json:"dbColumns"` } -// IS is a singleton holding internal state of the application -var IS *InternalState - // StartedSync signals start of synchronization func (is *InternalState) StartedSync() { is.mux.Lock() diff --git a/db/rocksdb.go b/db/rocksdb.go index ab1b86e2..4ae710ed 100644 --- a/db/rocksdb.go +++ b/db/rocksdb.go @@ -36,6 +36,7 @@ type RocksDB struct { ro *gorocksdb.ReadOptions cfh []*gorocksdb.ColumnFamilyHandle chainParser bchain.BlockChainParser + is *common.InternalState } const ( @@ -102,7 +103,7 @@ func NewRocksDB(path string, parser bchain.BlockChainParser) (d *RocksDB, err er wo := gorocksdb.NewDefaultWriteOptions() ro := gorocksdb.NewDefaultReadOptions() ro.SetFillCache(false) - return &RocksDB{path, db, wo, ro, cfh, parser}, nil + return &RocksDB{path, db, wo, ro, cfh, parser, nil}, nil } func (d *RocksDB) closeDB() error { @@ -118,9 +119,9 @@ func (d *RocksDB) closeDB() error { func (d *RocksDB) Close() error { if d.db != nil { // store the internal state of the app - if common.IS.DbState == common.DbStateOpen { - common.IS.DbState = common.DbStateClosed - if err := d.StoreInternalState(common.IS); err != nil { + if d.is != nil && d.is.DbState == common.DbStateOpen { + d.is.DbState = common.DbStateClosed + if err := d.StoreInternalState(d.is); err != nil { glog.Infof("internalState: ", err) } } @@ -917,6 +918,11 @@ func (d *RocksDB) LoadInternalState(rpcCoin string) (*common.InternalState, erro return is, nil } +// SetInternalState sets the InternalState to be used by db to collect internal state +func (d *RocksDB) SetInternalState(is *common.InternalState) { + d.is = is +} + // StoreInternalState stores the internal state to db func (d *RocksDB) StoreInternalState(is *common.InternalState) error { buf, err := is.Pack() diff --git a/db/rocksdb_test.go b/db/rocksdb_test.go index 0c28cb80..5a281bf8 100644 --- a/db/rocksdb_test.go +++ b/db/rocksdb_test.go @@ -3,7 +3,6 @@ package db import ( "blockbook/bchain" "blockbook/bchain/coins/btc" - "blockbook/common" "encoding/hex" "fmt" "io/ioutil" @@ -19,7 +18,7 @@ import ( // simplified explanation of signed varint packing, used in many index data structures // for number n, the packing is: 2*n if n>=0 else 2*(-n)-1 -// take only 1 byte if abs(n)<127 +// takes only 1 byte if abs(n)<127 func setupRocksDB(t *testing.T, p bchain.BlockChainParser) *RocksDB { tmp, err := ioutil.TempDir("", "testdb") @@ -30,10 +29,6 @@ func setupRocksDB(t *testing.T, p bchain.BlockChainParser) *RocksDB { if err != nil { t.Fatal(err) } - common.IS, err = d.LoadInternalState("btc-testnet") - if err != nil { - t.Fatal("internalState: ", err) - } return d } diff --git a/db/sync.go b/db/sync.go index 90f70cd8..a664e94f 100644 --- a/db/sync.go +++ b/db/sync.go @@ -22,10 +22,11 @@ type SyncWorker struct { startHash string chanOsSignal chan os.Signal metrics *common.Metrics + is *common.InternalState } // NewSyncWorker creates new SyncWorker and returns its handle -func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics) (*SyncWorker, error) { +func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk int, minStartHeight int, dryRun bool, chanOsSignal chan os.Signal, metrics *common.Metrics, is *common.InternalState) (*SyncWorker, error) { if minStartHeight < 0 { minStartHeight = 0 } @@ -38,6 +39,7 @@ func NewSyncWorker(db *RocksDB, chain bchain.BlockChain, syncWorkers, syncChunk startHeight: uint32(minStartHeight), chanOsSignal: chanOsSignal, metrics: metrics, + is: is, }, nil } @@ -47,7 +49,7 @@ var errSynced = errors.New("synced") // onNewBlock is called when new block is connected, but not in initial parallel sync func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { start := time.Now() - common.IS.StartedSync() + w.is.StartedSync() err := w.resyncIndex(onNewBlock) @@ -59,12 +61,12 @@ func (w *SyncWorker) ResyncIndex(onNewBlock func(hash string)) error { w.metrics.IndexDBSize.Set(float64(w.db.DatabaseSizeOnDisk())) bh, _, err := w.db.GetBestBlock() if err == nil { - common.IS.FinishedSync(bh) + w.is.FinishedSync(bh) } return nil case errSynced: // this is not actually error but flag that resync wasn't necessary - common.IS.FinishedSyncNoChange() + w.is.FinishedSyncNoChange() return nil } diff --git a/server/socketio.go b/server/socketio.go index a45530be..b1ade76a 100644 --- a/server/socketio.go +++ b/server/socketio.go @@ -33,10 +33,11 @@ type SocketIoServer struct { chainParser bchain.BlockChainParser explorerURL string metrics *common.Metrics + is *common.InternalState } // NewSocketIoServer creates new SocketIo interface to blockbook and returns its handle -func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics) (*SocketIoServer, error) { +func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, explorerURL string, metrics *common.Metrics, is *common.InternalState) (*SocketIoServer, error) { server := gosocketio.NewServer(transport.GetDefaultWebsocketTransport()) server.On(gosocketio.OnConnection, func(c *gosocketio.Channel) { @@ -76,6 +77,7 @@ func NewSocketIoServer(binding string, certFiles string, db *db.RocksDB, chain b chainParser: chain.GetChainParser(), explorerURL: explorerURL, metrics: metrics, + is: is, } // support for tests of socket.io interface @@ -155,10 +157,10 @@ type resAboutBlockbookPublic struct { func (s *SocketIoServer) index(w http.ResponseWriter, r *http.Request) { vi := common.GetVersionInfo() - ss, bh, st := common.IS.GetSyncState() - ms, mt := common.IS.GetMempoolSyncState() + ss, bh, st := s.is.GetSyncState() + ms, mt := s.is.GetMempoolSyncState() a := resAboutBlockbookPublic{ - Coin: common.IS.Coin, + Coin: s.is.Coin, About: blockbookAbout, Version: vi.Version, GitCommit: vi.GitCommit,