Minor refactor
This commit is contained in:
parent
ec510811cd
commit
d5e871818a
74
blockbook.go
74
blockbook.go
@ -13,7 +13,6 @@ import (
|
|||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -42,7 +41,7 @@ const exitCodeOK = 0
|
|||||||
const exitCodeFatal = 255
|
const exitCodeFatal = 255
|
||||||
|
|
||||||
var (
|
var (
|
||||||
blockchain = flag.String("blockchaincfg", "", "path to blockchain RPC service configuration json file")
|
configFile = flag.String("blockchaincfg", "", "path to blockchain RPC service configuration json file")
|
||||||
|
|
||||||
dbPath = flag.String("datadir", "./data", "path to database directory")
|
dbPath = flag.String("datadir", "./data", "path to database directory")
|
||||||
dbCache = flag.Int("dbcache", 1<<29, "size of the rocksdb cache")
|
dbCache = flag.Int("dbcache", 1<<29, "size of the rocksdb cache")
|
||||||
@ -105,7 +104,6 @@ var (
|
|||||||
callbacksOnNewTx []bchain.OnNewTxFunc
|
callbacksOnNewTx []bchain.OnNewTxFunc
|
||||||
callbacksOnNewFiatRatesTicker []fiat.OnNewFiatRatesTicker
|
callbacksOnNewFiatRatesTicker []fiat.OnNewFiatRatesTicker
|
||||||
chanOsSignal chan os.Signal
|
chanOsSignal chan os.Signal
|
||||||
inShutdown int32
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -151,26 +149,24 @@ func mainWithExitCode() int {
|
|||||||
return exitCodeOK
|
return exitCodeOK
|
||||||
}
|
}
|
||||||
|
|
||||||
if *blockchain == "" {
|
if *configFile == "" {
|
||||||
glog.Error("Missing blockchaincfg configuration parameter")
|
glog.Error("Missing blockchaincfg configuration parameter")
|
||||||
return exitCodeFatal
|
return exitCodeFatal
|
||||||
}
|
}
|
||||||
|
|
||||||
coin, coinShortcut, coinLabel, err := coins.GetCoinNameFromConfig(*blockchain)
|
coin, coinShortcut, coinLabel, err := coins.GetCoinNameFromConfig(*configFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("config: ", err)
|
glog.Error("config: ", err)
|
||||||
return exitCodeFatal
|
return exitCodeFatal
|
||||||
}
|
}
|
||||||
|
|
||||||
// gspt.SetProcTitle("blockbook-" + normalizeName(coin))
|
|
||||||
|
|
||||||
metrics, err = common.GetMetrics(coin)
|
metrics, err = common.GetMetrics(coin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Error("metrics: ", err)
|
glog.Error("metrics: ", err)
|
||||||
return exitCodeFatal
|
return exitCodeFatal
|
||||||
}
|
}
|
||||||
|
|
||||||
if chain, mempool, err = getBlockChainWithRetry(coin, *blockchain, pushSynchronizationHandler, metrics, 120); err != nil {
|
if chain, mempool, err = getBlockChainWithRetry(coin, *configFile, pushSynchronizationHandler, metrics, 120); err != nil {
|
||||||
glog.Error("rpc: ", err)
|
glog.Error("rpc: ", err)
|
||||||
return exitCodeFatal
|
return exitCodeFatal
|
||||||
}
|
}
|
||||||
@ -347,7 +343,7 @@ func mainWithExitCode() int {
|
|||||||
|
|
||||||
if internalServer != nil || publicServer != nil || chain != nil {
|
if internalServer != nil || publicServer != nil || chain != nil {
|
||||||
// start fiat rates downloader only if not shutting down immediately
|
// start fiat rates downloader only if not shutting down immediately
|
||||||
initFiatRatesDownloader(index, *blockchain)
|
initFiatRatesDownloader(index, *configFile)
|
||||||
waitForSignalAndShutdown(internalServer, publicServer, chain, 10*time.Second)
|
waitForSignalAndShutdown(internalServer, publicServer, chain, 10*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -362,13 +358,13 @@ func mainWithExitCode() int {
|
|||||||
return exitCodeOK
|
return exitCodeOK
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBlockChainWithRetry(coin string, configfile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, bchain.Mempool, error) {
|
func getBlockChainWithRetry(coin string, configFile string, pushHandler func(bchain.NotificationType), metrics *common.Metrics, seconds int) (bchain.BlockChain, bchain.Mempool, error) {
|
||||||
var chain bchain.BlockChain
|
var chain bchain.BlockChain
|
||||||
var mempool bchain.Mempool
|
var mempool bchain.Mempool
|
||||||
var err error
|
var err error
|
||||||
timer := time.NewTimer(time.Second)
|
timer := time.NewTimer(time.Second)
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
if chain, mempool, err = coins.NewBlockChain(coin, configfile, pushHandler, metrics); err != nil {
|
if chain, mempool, err = coins.NewBlockChain(coin, configFile, pushHandler, metrics); err != nil {
|
||||||
if i < seconds {
|
if i < seconds {
|
||||||
glog.Error("rpc: ", err, " Retrying...")
|
glog.Error("rpc: ", err, " Retrying...")
|
||||||
select {
|
select {
|
||||||
@ -496,46 +492,11 @@ func newInternalState(coin, coinShortcut, coinLabel string, d *db.RocksDB) (*com
|
|||||||
return is, nil
|
return is, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func tickAndDebounce(tickTime time.Duration, debounceTime time.Duration, input chan struct{}, f func()) {
|
|
||||||
timer := time.NewTimer(tickTime)
|
|
||||||
var firstDebounce time.Time
|
|
||||||
Loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case _, ok := <-input:
|
|
||||||
if !timer.Stop() {
|
|
||||||
<-timer.C
|
|
||||||
}
|
|
||||||
// exit loop on closed input channel
|
|
||||||
if !ok {
|
|
||||||
break Loop
|
|
||||||
}
|
|
||||||
if firstDebounce.IsZero() {
|
|
||||||
firstDebounce = time.Now()
|
|
||||||
}
|
|
||||||
// debounce for up to debounceTime period
|
|
||||||
// afterwards execute immediately
|
|
||||||
if firstDebounce.Add(debounceTime).After(time.Now()) {
|
|
||||||
timer.Reset(debounceTime)
|
|
||||||
} else {
|
|
||||||
timer.Reset(0)
|
|
||||||
}
|
|
||||||
case <-timer.C:
|
|
||||||
// do the action, if not in shutdown, then start the loop again
|
|
||||||
if atomic.LoadInt32(&inShutdown) == 0 {
|
|
||||||
f()
|
|
||||||
}
|
|
||||||
timer.Reset(tickTime)
|
|
||||||
firstDebounce = time.Time{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func syncIndexLoop() {
|
func syncIndexLoop() {
|
||||||
defer close(chanSyncIndexDone)
|
defer close(chanSyncIndexDone)
|
||||||
glog.Info("syncIndexLoop starting")
|
glog.Info("syncIndexLoop starting")
|
||||||
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
|
// resync index about every 15 minutes if there are no chanSyncIndex requests, with debounce 1 second
|
||||||
tickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
|
common.TickAndDebounce(time.Duration(*resyncIndexPeriodMs)*time.Millisecond, debounceResyncIndexMs*time.Millisecond, chanSyncIndex, func() {
|
||||||
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
|
if err := syncWorker.ResyncIndex(onNewBlockHash, false); err != nil {
|
||||||
glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...")
|
glog.Error("syncIndexLoop ", errors.ErrorStack(err), ", will retry...")
|
||||||
// retry once in case of random network error, after a slight delay
|
// retry once in case of random network error, after a slight delay
|
||||||
@ -574,7 +535,7 @@ func syncMempoolLoop() {
|
|||||||
defer close(chanSyncMempoolDone)
|
defer close(chanSyncMempoolDone)
|
||||||
glog.Info("syncMempoolLoop starting")
|
glog.Info("syncMempoolLoop starting")
|
||||||
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
|
// resync mempool about every minute if there are no chanSyncMempool requests, with debounce 1 second
|
||||||
tickAndDebounce(time.Duration(*resyncMempoolPeriodMs)*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
|
common.TickAndDebounce(time.Duration(*resyncMempoolPeriodMs)*time.Millisecond, debounceResyncMempoolMs*time.Millisecond, chanSyncMempool, func() {
|
||||||
internalState.StartedMempoolSync()
|
internalState.StartedMempoolSync()
|
||||||
if count, err := mempool.Resync(); err != nil {
|
if count, err := mempool.Resync(); err != nil {
|
||||||
glog.Error("syncMempoolLoop ", errors.ErrorStack(err))
|
glog.Error("syncMempoolLoop ", errors.ErrorStack(err))
|
||||||
@ -604,7 +565,7 @@ func storeInternalStateLoop() {
|
|||||||
} else {
|
} else {
|
||||||
glog.Info("storeInternalStateLoop starting with db stats compute disabled")
|
glog.Info("storeInternalStateLoop starting with db stats compute disabled")
|
||||||
}
|
}
|
||||||
tickAndDebounce(storeInternalStatePeriodMs*time.Millisecond, (storeInternalStatePeriodMs-1)*time.Millisecond, chanStoreInternalState, func() {
|
common.TickAndDebounce(storeInternalStatePeriodMs*time.Millisecond, (storeInternalStatePeriodMs-1)*time.Millisecond, chanStoreInternalState, func() {
|
||||||
if (*dbStatsPeriodHours) > 0 && !computeRunning && lastCompute.Add(computePeriod).Before(time.Now()) {
|
if (*dbStatsPeriodHours) > 0 && !computeRunning && lastCompute.Add(computePeriod).Before(time.Now()) {
|
||||||
computeRunning = true
|
computeRunning = true
|
||||||
go func() {
|
go func() {
|
||||||
@ -654,7 +615,7 @@ func onNewTx(tx *bchain.MempoolTx) {
|
|||||||
|
|
||||||
func pushSynchronizationHandler(nt bchain.NotificationType) {
|
func pushSynchronizationHandler(nt bchain.NotificationType) {
|
||||||
glog.V(1).Info("MQ: notification ", nt)
|
glog.V(1).Info("MQ: notification ", nt)
|
||||||
if atomic.LoadInt32(&inShutdown) != 0 {
|
if common.IsInShutdown() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if nt == bchain.NotificationNewBlock {
|
if nt == bchain.NotificationNewBlock {
|
||||||
@ -668,7 +629,7 @@ func pushSynchronizationHandler(nt bchain.NotificationType) {
|
|||||||
|
|
||||||
func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, timeout time.Duration) {
|
func waitForSignalAndShutdown(internal *server.InternalServer, public *server.PublicServer, chain bchain.BlockChain, timeout time.Duration) {
|
||||||
sig := <-chanOsSignal
|
sig := <-chanOsSignal
|
||||||
atomic.StoreInt32(&inShutdown, 1)
|
common.SetInShutdown()
|
||||||
glog.Infof("shutdown: %v", sig)
|
glog.Infof("shutdown: %v", sig)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||||
@ -693,17 +654,6 @@ func waitForSignalAndShutdown(internal *server.InternalServer, public *server.Pu
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func printResult(txid string, vout int32, isOutput bool) error {
|
|
||||||
glog.Info(txid, vout, isOutput)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func normalizeName(s string) string {
|
|
||||||
s = strings.ToLower(s)
|
|
||||||
s = strings.Replace(s, " ", "-", -1)
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
// computeFeeStats computes fee distribution in defined blocks
|
// computeFeeStats computes fee distribution in defined blocks
|
||||||
func computeFeeStats(stopCompute chan os.Signal, blockFrom, blockTo int, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error {
|
func computeFeeStats(stopCompute chan os.Signal, blockFrom, blockTo int, db *db.RocksDB, chain bchain.BlockChain, txCache *db.TxCache, is *common.InternalState, metrics *common.Metrics) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -16,6 +17,8 @@ const (
|
|||||||
DbStateInconsistent
|
DbStateInconsistent
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var inShutdown int32
|
||||||
|
|
||||||
// InternalStateColumn contains the data of a db column
|
// InternalStateColumn contains the data of a db column
|
||||||
type InternalStateColumn struct {
|
type InternalStateColumn struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
@ -265,3 +268,13 @@ func UnpackInternalState(buf []byte) (*InternalState, error) {
|
|||||||
}
|
}
|
||||||
return &is, nil
|
return &is, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetInShutdown sets the internal state to in shutdown state
|
||||||
|
func SetInShutdown() {
|
||||||
|
atomic.StoreInt32(&inShutdown, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsInShutdown returns true if in application shutdown state
|
||||||
|
func IsInShutdown() bool {
|
||||||
|
return atomic.LoadInt32(&inShutdown) != 0
|
||||||
|
}
|
||||||
|
|||||||
41
common/utils.go
Normal file
41
common/utils.go
Normal file
@ -0,0 +1,41 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TickAndDebounce calls function f on trigger channel or with tickTime period (whatever is sooner) with debounce
|
||||||
|
func TickAndDebounce(tickTime time.Duration, debounceTime time.Duration, trigger chan struct{}, f func()) {
|
||||||
|
timer := time.NewTimer(tickTime)
|
||||||
|
var firstDebounce time.Time
|
||||||
|
Loop:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case _, ok := <-trigger:
|
||||||
|
if !timer.Stop() {
|
||||||
|
<-timer.C
|
||||||
|
}
|
||||||
|
// exit loop on closed input channel
|
||||||
|
if !ok {
|
||||||
|
break Loop
|
||||||
|
}
|
||||||
|
if firstDebounce.IsZero() {
|
||||||
|
firstDebounce = time.Now()
|
||||||
|
}
|
||||||
|
// debounce for up to debounceTime period
|
||||||
|
// afterwards execute immediately
|
||||||
|
if firstDebounce.Add(debounceTime).After(time.Now()) {
|
||||||
|
timer.Reset(debounceTime)
|
||||||
|
} else {
|
||||||
|
timer.Reset(0)
|
||||||
|
}
|
||||||
|
case <-timer.C:
|
||||||
|
// do the action, if not in shutdown, then start the loop again
|
||||||
|
if !IsInShutdown() {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
timer.Reset(tickTime)
|
||||||
|
firstDebounce = time.Time{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
135
db/fiat.go
Normal file
135
db/fiat.go
Normal file
@ -0,0 +1,135 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"github.com/juju/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FiatRatesTimeFormat is a format string for storing FiatRates timestamps in rocksdb
|
||||||
|
const FiatRatesTimeFormat = "20060102150405" // YYYYMMDDhhmmss
|
||||||
|
|
||||||
|
// CurrencyRatesTicker contains coin ticker data fetched from API
|
||||||
|
type CurrencyRatesTicker struct {
|
||||||
|
Timestamp *time.Time // return as unix timestamp in API
|
||||||
|
Rates map[string]float64
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResultTickerAsString contains formatted CurrencyRatesTicker data
|
||||||
|
type ResultTickerAsString struct {
|
||||||
|
Timestamp int64 `json:"ts,omitempty"`
|
||||||
|
Rates map[string]float64 `json:"rates"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResultTickersAsString contains a formatted CurrencyRatesTicker list
|
||||||
|
type ResultTickersAsString struct {
|
||||||
|
Tickers []ResultTickerAsString `json:"tickers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResultTickerListAsString contains formatted data about available currency tickers
|
||||||
|
type ResultTickerListAsString struct {
|
||||||
|
Timestamp int64 `json:"ts,omitempty"`
|
||||||
|
Tickers []string `json:"available_currencies"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// FiatRatesConvertDate checks if the date is in correct format and returns the Time object.
|
||||||
|
// Possible formats are: YYYYMMDDhhmmss, YYYYMMDDhhmm, YYYYMMDDhh, YYYYMMDD
|
||||||
|
func FiatRatesConvertDate(date string) (*time.Time, error) {
|
||||||
|
for format := FiatRatesTimeFormat; len(format) >= 8; format = format[:len(format)-2] {
|
||||||
|
convertedDate, err := time.Parse(format, date)
|
||||||
|
if err == nil {
|
||||||
|
return &convertedDate, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
msg := "Date \"" + date + "\" does not match any of available formats. "
|
||||||
|
msg += "Possible formats are: YYYYMMDDhhmmss, YYYYMMDDhhmm, YYYYMMDDhh, YYYYMMDD"
|
||||||
|
return nil, errors.New(msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FiatRatesStoreTicker stores ticker data at the specified time
|
||||||
|
func (d *RocksDB) FiatRatesStoreTicker(ticker *CurrencyRatesTicker) error {
|
||||||
|
if len(ticker.Rates) == 0 {
|
||||||
|
return errors.New("Error storing ticker: empty rates")
|
||||||
|
} else if ticker.Timestamp == nil {
|
||||||
|
return errors.New("Error storing ticker: empty timestamp")
|
||||||
|
}
|
||||||
|
ratesMarshalled, err := json.Marshal(ticker.Rates)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("Error marshalling ticker rates: ", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
timeFormatted := ticker.Timestamp.UTC().Format(FiatRatesTimeFormat)
|
||||||
|
err = d.db.PutCF(d.wo, d.cfh[cfFiatRates], []byte(timeFormatted), ratesMarshalled)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("Error storing ticker: ", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FiatRatesFindTicker gets FiatRates data closest to the specified timestamp
|
||||||
|
func (d *RocksDB) FiatRatesFindTicker(tickerTime *time.Time) (*CurrencyRatesTicker, error) {
|
||||||
|
ticker := &CurrencyRatesTicker{}
|
||||||
|
tickerTimeFormatted := tickerTime.UTC().Format(FiatRatesTimeFormat)
|
||||||
|
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])
|
||||||
|
defer it.Close()
|
||||||
|
|
||||||
|
for it.Seek([]byte(tickerTimeFormatted)); it.Valid(); it.Next() {
|
||||||
|
timeObj, err := time.Parse(FiatRatesTimeFormat, string(it.Key().Data()))
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("FiatRatesFindTicker time parse error: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
timeObj = timeObj.UTC()
|
||||||
|
ticker.Timestamp = &timeObj
|
||||||
|
err = json.Unmarshal(it.Value().Data(), &ticker.Rates)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("FiatRatesFindTicker error unpacking rates: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := it.Err(); err != nil {
|
||||||
|
glog.Error("FiatRatesFindTicker Iterator error: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !it.Valid() {
|
||||||
|
return nil, nil // ticker not found
|
||||||
|
}
|
||||||
|
return ticker, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FiatRatesFindLastTicker gets the last FiatRates record
|
||||||
|
func (d *RocksDB) FiatRatesFindLastTicker() (*CurrencyRatesTicker, error) {
|
||||||
|
ticker := &CurrencyRatesTicker{}
|
||||||
|
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])
|
||||||
|
defer it.Close()
|
||||||
|
|
||||||
|
for it.SeekToLast(); it.Valid(); it.Next() {
|
||||||
|
timeObj, err := time.Parse(FiatRatesTimeFormat, string(it.Key().Data()))
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("FiatRatesFindTicker time parse error: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
timeObj = timeObj.UTC()
|
||||||
|
ticker.Timestamp = &timeObj
|
||||||
|
err = json.Unmarshal(it.Value().Data(), &ticker.Rates)
|
||||||
|
if err != nil {
|
||||||
|
glog.Error("FiatRatesFindTicker error unpacking rates: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := it.Err(); err != nil {
|
||||||
|
glog.Error("FiatRatesFindLastTicker Iterator error: ", err)
|
||||||
|
return ticker, err
|
||||||
|
}
|
||||||
|
if !it.Valid() {
|
||||||
|
return nil, nil // ticker not found
|
||||||
|
}
|
||||||
|
return ticker, nil
|
||||||
|
}
|
||||||
84
db/fiat_test.go
Normal file
84
db/fiat_test.go
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
//go:build unittest
|
||||||
|
|
||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRocksTickers(t *testing.T) {
|
||||||
|
d := setupRocksDB(t, &testBitcoinParser{
|
||||||
|
BitcoinParser: bitcoinTestnetParser(),
|
||||||
|
})
|
||||||
|
defer closeAndDestroyRocksDB(t, d)
|
||||||
|
|
||||||
|
// Test valid formats
|
||||||
|
for _, date := range []string{"20190130", "2019013012", "201901301250", "20190130125030"} {
|
||||||
|
_, err := FiatRatesConvertDate(date)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("%v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test invalid formats
|
||||||
|
for _, date := range []string{"01102019", "10201901", "", "abc", "20190130xxx"} {
|
||||||
|
_, err := FiatRatesConvertDate(date)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Wrongly-formatted date \"%v\" marked as valid!", date)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test storing & finding tickers
|
||||||
|
key, _ := time.Parse(FiatRatesTimeFormat, "20190627000000")
|
||||||
|
futureKey, _ := time.Parse(FiatRatesTimeFormat, "20190630000000")
|
||||||
|
|
||||||
|
ts1, _ := time.Parse(FiatRatesTimeFormat, "20190628000000")
|
||||||
|
ticker1 := &CurrencyRatesTicker{
|
||||||
|
Timestamp: &ts1,
|
||||||
|
Rates: map[string]float64{
|
||||||
|
"usd": 20000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ts2, _ := time.Parse(FiatRatesTimeFormat, "20190629000000")
|
||||||
|
ticker2 := &CurrencyRatesTicker{
|
||||||
|
Timestamp: &ts2,
|
||||||
|
Rates: map[string]float64{
|
||||||
|
"usd": 30000,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
err := d.FiatRatesStoreTicker(ticker1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error storing ticker! %v", err)
|
||||||
|
}
|
||||||
|
d.FiatRatesStoreTicker(ticker2)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error storing ticker! %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker, err := d.FiatRatesFindTicker(&key) // should find the closest key (ticker1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("TestRocksTickers err: %+v", err)
|
||||||
|
} else if ticker == nil {
|
||||||
|
t.Errorf("Ticker not found")
|
||||||
|
} else if ticker.Timestamp.Format(FiatRatesTimeFormat) != ticker1.Timestamp.Format(FiatRatesTimeFormat) {
|
||||||
|
t.Errorf("Incorrect ticker found. Expected: %v, found: %+v", ticker1.Timestamp, ticker.Timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker, err = d.FiatRatesFindLastTicker() // should find the last key (ticker2)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("TestRocksTickers err: %+v", err)
|
||||||
|
} else if ticker == nil {
|
||||||
|
t.Errorf("Ticker not found")
|
||||||
|
} else if ticker.Timestamp.Format(FiatRatesTimeFormat) != ticker2.Timestamp.Format(FiatRatesTimeFormat) {
|
||||||
|
t.Errorf("Incorrect ticker found. Expected: %v, found: %+v", ticker1.Timestamp, ticker.Timestamp)
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker, err = d.FiatRatesFindTicker(&futureKey) // should not find anything
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("TestRocksTickers err: %+v", err)
|
||||||
|
} else if ticker != nil {
|
||||||
|
t.Errorf("Ticker found, but the timestamp is older than the last ticker entry.")
|
||||||
|
}
|
||||||
|
}
|
||||||
127
db/rocksdb.go
127
db/rocksdb.go
@ -4,7 +4,6 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"os"
|
"os"
|
||||||
@ -31,34 +30,6 @@ const maxAddrDescLen = 1024
|
|||||||
// when doing huge scan, it is better to close it and reopen from time to time to free the resources
|
// when doing huge scan, it is better to close it and reopen from time to time to free the resources
|
||||||
const refreshIterator = 5000000
|
const refreshIterator = 5000000
|
||||||
|
|
||||||
// FiatRatesTimeFormat is a format string for storing FiatRates timestamps in rocksdb
|
|
||||||
const FiatRatesTimeFormat = "20060102150405" // YYYYMMDDhhmmss
|
|
||||||
|
|
||||||
// CurrencyRatesTicker contains coin ticker data fetched from API
|
|
||||||
type CurrencyRatesTicker struct {
|
|
||||||
Timestamp *time.Time // return as unix timestamp in API
|
|
||||||
Rates map[string]float64
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResultTickerAsString contains formatted CurrencyRatesTicker data
|
|
||||||
type ResultTickerAsString struct {
|
|
||||||
Timestamp int64 `json:"ts,omitempty"`
|
|
||||||
Rates map[string]float64 `json:"rates"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResultTickersAsString contains a formatted CurrencyRatesTicker list
|
|
||||||
type ResultTickersAsString struct {
|
|
||||||
Tickers []ResultTickerAsString `json:"tickers"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResultTickerListAsString contains formatted data about available currency tickers
|
|
||||||
type ResultTickerListAsString struct {
|
|
||||||
Timestamp int64 `json:"ts,omitempty"`
|
|
||||||
Tickers []string `json:"available_currencies"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// RepairRocksDB calls RocksDb db repair function
|
// RepairRocksDB calls RocksDb db repair function
|
||||||
func RepairRocksDB(name string) error {
|
func RepairRocksDB(name string) error {
|
||||||
glog.Infof("rocksdb: repair")
|
glog.Infof("rocksdb: repair")
|
||||||
@ -183,104 +154,6 @@ func (d *RocksDB) closeDB() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FiatRatesConvertDate checks if the date is in correct format and returns the Time object.
|
|
||||||
// Possible formats are: YYYYMMDDhhmmss, YYYYMMDDhhmm, YYYYMMDDhh, YYYYMMDD
|
|
||||||
func FiatRatesConvertDate(date string) (*time.Time, error) {
|
|
||||||
for format := FiatRatesTimeFormat; len(format) >= 8; format = format[:len(format)-2] {
|
|
||||||
convertedDate, err := time.Parse(format, date)
|
|
||||||
if err == nil {
|
|
||||||
return &convertedDate, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
msg := "Date \"" + date + "\" does not match any of available formats. "
|
|
||||||
msg += "Possible formats are: YYYYMMDDhhmmss, YYYYMMDDhhmm, YYYYMMDDhh, YYYYMMDD"
|
|
||||||
return nil, errors.New(msg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// FiatRatesStoreTicker stores ticker data at the specified time
|
|
||||||
func (d *RocksDB) FiatRatesStoreTicker(ticker *CurrencyRatesTicker) error {
|
|
||||||
if len(ticker.Rates) == 0 {
|
|
||||||
return errors.New("Error storing ticker: empty rates")
|
|
||||||
} else if ticker.Timestamp == nil {
|
|
||||||
return errors.New("Error storing ticker: empty timestamp")
|
|
||||||
}
|
|
||||||
ratesMarshalled, err := json.Marshal(ticker.Rates)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error("Error marshalling ticker rates: ", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
timeFormatted := ticker.Timestamp.UTC().Format(FiatRatesTimeFormat)
|
|
||||||
err = d.db.PutCF(d.wo, d.cfh[cfFiatRates], []byte(timeFormatted), ratesMarshalled)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error("Error storing ticker: ", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FiatRatesFindTicker gets FiatRates data closest to the specified timestamp
|
|
||||||
func (d *RocksDB) FiatRatesFindTicker(tickerTime *time.Time) (*CurrencyRatesTicker, error) {
|
|
||||||
ticker := &CurrencyRatesTicker{}
|
|
||||||
tickerTimeFormatted := tickerTime.UTC().Format(FiatRatesTimeFormat)
|
|
||||||
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])
|
|
||||||
defer it.Close()
|
|
||||||
|
|
||||||
for it.Seek([]byte(tickerTimeFormatted)); it.Valid(); it.Next() {
|
|
||||||
timeObj, err := time.Parse(FiatRatesTimeFormat, string(it.Key().Data()))
|
|
||||||
if err != nil {
|
|
||||||
glog.Error("FiatRatesFindTicker time parse error: ", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
timeObj = timeObj.UTC()
|
|
||||||
ticker.Timestamp = &timeObj
|
|
||||||
err = json.Unmarshal(it.Value().Data(), &ticker.Rates)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error("FiatRatesFindTicker error unpacking rates: ", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err := it.Err(); err != nil {
|
|
||||||
glog.Error("FiatRatesFindTicker Iterator error: ", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if !it.Valid() {
|
|
||||||
return nil, nil // ticker not found
|
|
||||||
}
|
|
||||||
return ticker, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// FiatRatesFindLastTicker gets the last FiatRates record
|
|
||||||
func (d *RocksDB) FiatRatesFindLastTicker() (*CurrencyRatesTicker, error) {
|
|
||||||
ticker := &CurrencyRatesTicker{}
|
|
||||||
it := d.db.NewIteratorCF(d.ro, d.cfh[cfFiatRates])
|
|
||||||
defer it.Close()
|
|
||||||
|
|
||||||
for it.SeekToLast(); it.Valid(); it.Next() {
|
|
||||||
timeObj, err := time.Parse(FiatRatesTimeFormat, string(it.Key().Data()))
|
|
||||||
if err != nil {
|
|
||||||
glog.Error("FiatRatesFindTicker time parse error: ", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
timeObj = timeObj.UTC()
|
|
||||||
ticker.Timestamp = &timeObj
|
|
||||||
err = json.Unmarshal(it.Value().Data(), &ticker.Rates)
|
|
||||||
if err != nil {
|
|
||||||
glog.Error("FiatRatesFindTicker error unpacking rates: ", err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err := it.Err(); err != nil {
|
|
||||||
glog.Error("FiatRatesFindLastTicker Iterator error: ", err)
|
|
||||||
return ticker, err
|
|
||||||
}
|
|
||||||
if !it.Valid() {
|
|
||||||
return nil, nil // ticker not found
|
|
||||||
}
|
|
||||||
return ticker, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close releases the RocksDB environment opened in NewRocksDB.
|
// Close releases the RocksDB environment opened in NewRocksDB.
|
||||||
func (d *RocksDB) Close() error {
|
func (d *RocksDB) Close() error {
|
||||||
if d.db != nil {
|
if d.db != nil {
|
||||||
|
|||||||
@ -12,7 +12,6 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
vlq "github.com/bsm/go-vlq"
|
vlq "github.com/bsm/go-vlq"
|
||||||
"github.com/juju/errors"
|
"github.com/juju/errors"
|
||||||
@ -1482,79 +1481,3 @@ func Test_reorderUtxo(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRocksTickers(t *testing.T) {
|
|
||||||
d := setupRocksDB(t, &testBitcoinParser{
|
|
||||||
BitcoinParser: bitcoinTestnetParser(),
|
|
||||||
})
|
|
||||||
defer closeAndDestroyRocksDB(t, d)
|
|
||||||
|
|
||||||
// Test valid formats
|
|
||||||
for _, date := range []string{"20190130", "2019013012", "201901301250", "20190130125030"} {
|
|
||||||
_, err := FiatRatesConvertDate(date)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("%v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test invalid formats
|
|
||||||
for _, date := range []string{"01102019", "10201901", "", "abc", "20190130xxx"} {
|
|
||||||
_, err := FiatRatesConvertDate(date)
|
|
||||||
if err == nil {
|
|
||||||
t.Errorf("Wrongly-formatted date \"%v\" marked as valid!", date)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test storing & finding tickers
|
|
||||||
key, _ := time.Parse(FiatRatesTimeFormat, "20190627000000")
|
|
||||||
futureKey, _ := time.Parse(FiatRatesTimeFormat, "20190630000000")
|
|
||||||
|
|
||||||
ts1, _ := time.Parse(FiatRatesTimeFormat, "20190628000000")
|
|
||||||
ticker1 := &CurrencyRatesTicker{
|
|
||||||
Timestamp: &ts1,
|
|
||||||
Rates: map[string]float64{
|
|
||||||
"usd": 20000,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
ts2, _ := time.Parse(FiatRatesTimeFormat, "20190629000000")
|
|
||||||
ticker2 := &CurrencyRatesTicker{
|
|
||||||
Timestamp: &ts2,
|
|
||||||
Rates: map[string]float64{
|
|
||||||
"usd": 30000,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
err := d.FiatRatesStoreTicker(ticker1)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Error storing ticker! %v", err)
|
|
||||||
}
|
|
||||||
d.FiatRatesStoreTicker(ticker2)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Error storing ticker! %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker, err := d.FiatRatesFindTicker(&key) // should find the closest key (ticker1)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestRocksTickers err: %+v", err)
|
|
||||||
} else if ticker == nil {
|
|
||||||
t.Errorf("Ticker not found")
|
|
||||||
} else if ticker.Timestamp.Format(FiatRatesTimeFormat) != ticker1.Timestamp.Format(FiatRatesTimeFormat) {
|
|
||||||
t.Errorf("Incorrect ticker found. Expected: %v, found: %+v", ticker1.Timestamp, ticker.Timestamp)
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker, err = d.FiatRatesFindLastTicker() // should find the last key (ticker2)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestRocksTickers err: %+v", err)
|
|
||||||
} else if ticker == nil {
|
|
||||||
t.Errorf("Ticker not found")
|
|
||||||
} else if ticker.Timestamp.Format(FiatRatesTimeFormat) != ticker2.Timestamp.Format(FiatRatesTimeFormat) {
|
|
||||||
t.Errorf("Incorrect ticker found. Expected: %v, found: %+v", ticker1.Timestamp, ticker.Timestamp)
|
|
||||||
}
|
|
||||||
|
|
||||||
ticker, err = d.FiatRatesFindTicker(&futureKey) // should not find anything
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("TestRocksTickers err: %+v", err)
|
|
||||||
} else if ticker != nil {
|
|
||||||
t.Errorf("Ticker found, but the timestamp is older than the last ticker entry.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user