diff --git a/bchain/coins/blockchain.go b/bchain/coins/blockchain.go index c2bb4d99..d17f26ef 100644 --- a/bchain/coins/blockchain.go +++ b/bchain/coins/blockchain.go @@ -5,14 +5,15 @@ import ( "blockbook/bchain/coins/btc" "blockbook/bchain/coins/zec" "blockbook/common" + "encoding/json" "fmt" + "io/ioutil" "reflect" - "time" "github.com/juju/errors" ) -type blockChainFactory func(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) +type blockChainFactory func(config json.RawMessage, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) var blockChainFactories = make(map[string]blockChainFactory) @@ -23,10 +24,19 @@ func init() { } // NewBlockChain creates bchain.BlockChain of type defined by parameter coin -func NewBlockChain(coin string, url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) { +func NewBlockChain(coin string, configfile string, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) { bcf, ok := blockChainFactories[coin] if !ok { return nil, errors.New(fmt.Sprint("Unsupported coin ", coin, ". Must be one of ", reflect.ValueOf(blockChainFactories).MapKeys())) } - return bcf(url, user, password, timeout, parse, metrics) + data, err := ioutil.ReadFile(configfile) + if err != nil { + return nil, errors.Annotatef(err, "Error reading file %v", configfile) + } + var config json.RawMessage + err = json.Unmarshal(data, &config) + if err != nil { + return nil, errors.Annotatef(err, "Error parsing file %v", configfile) + } + return bcf(config, pushHandler, metrics) } diff --git a/bchain/coins/btc/bitcoinrpc.go b/bchain/coins/btc/bitcoinrpc.go index 30b1db76..d14c106d 100644 --- a/bchain/coins/btc/bitcoinrpc.go +++ b/bchain/coins/btc/bitcoinrpc.go @@ -30,21 +30,38 @@ type BitcoinRPC struct { Mempool *bchain.Mempool ParseBlocks bool metrics *common.Metrics + mq *bchain.MQ +} + +type configuration struct { + RPCURL string `json:"rpcURL"` + RPCUser string `json:"rpcUser"` + RPCPass string `json:"rpcPass"` + RPCTimeout int `json:"rpcTimeout"` + Parse bool `json:"parse"` + ZeroMQBinding string `json:"zeroMQBinding"` } // NewBitcoinRPC returns new BitcoinRPC instance. -func NewBitcoinRPC(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) { +func NewBitcoinRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) { + var err error + var c configuration + err = json.Unmarshal(config, &c) + if err != nil { + return nil, errors.Annotatef(err, "Invalid configuragion file") + } transport := &http.Transport{ Dial: (&net.Dialer{KeepAlive: 600 * time.Second}).Dial, MaxIdleConns: 100, MaxIdleConnsPerHost: 100, // necessary to not to deplete ports } + s := &BitcoinRPC{ - client: http.Client{Timeout: timeout, Transport: transport}, - rpcURL: url, - user: user, - password: password, - ParseBlocks: parse, + client: http.Client{Timeout: time.Duration(c.RPCTimeout) * time.Second, Transport: transport}, + rpcURL: c.RPCURL, + user: c.RPCUser, + password: c.RPCPass, + ParseBlocks: c.Parse, metrics: metrics, } chainName, err := s.GetBlockChainInfo() @@ -66,12 +83,29 @@ func NewBitcoinRPC(url string, user string, password string, timeout time.Durati s.Network = "testnet" } + mq, err := bchain.NewMQ(c.ZeroMQBinding, pushHandler) + if err != nil { + glog.Error("mq: ", err) + return nil, err + } + s.mq = mq + s.Mempool = bchain.NewMempool(s, metrics) glog.Info("rpc: block chain ", s.Parser.Params.Name) return s, nil } +func (b *BitcoinRPC) Shutdown() error { + if b.mq != nil { + if err := b.mq.Shutdown(); err != nil { + glog.Error("MQ.Shutdown error: ", err) + return err + } + } + return nil +} + func (b *BitcoinRPC) IsTestnet() bool { return b.Testnet } diff --git a/bchain/coins/btc/btc.md b/bchain/coins/btc/btc.md index 06f091f5..89dc07ff 100644 --- a/bchain/coins/btc/btc.md +++ b/bchain/coins/btc/btc.md @@ -29,7 +29,7 @@ Create script that runs blockbook *run-btc-blockbook.sh* #!/bin/bash cd go/src/blockbook -./blockbook -path=/data/btc/blockbook/db -sync -parse -rpcurl=http://127.0.0.1:8332 -httpserver=:8335 -socketio=:8336 -certfile=server/testcert -zeromq=tcp://127.0.0.1:8334 -explorer=https://bitcore1.trezor.io/ -coin=btc $1 +./blockbook -datadir=/data/btc/blockbook/db -sync -parse -rpcurl=http://127.0.0.1:8332 -httpserver=:8335 -socketio=:8336 -certfile=server/testcert -zeromq=tcp://127.0.0.1:8334 -explorer=https://bitcore1.trezor.io/ -coin=btc $1 ``` To run blockbook with logging to file (run with nohup or daemonize or using screen) ``` diff --git a/bchain/coins/btc/btctestnet.md b/bchain/coins/btc/btctestnet.md index 5a43b379..c639d753 100644 --- a/bchain/coins/btc/btctestnet.md +++ b/bchain/coins/btc/btctestnet.md @@ -29,7 +29,7 @@ Create script that runs blockbook *run-testnet-blockbook.sh* #!/bin/bash cd go/src/blockbook -./blockbook -path=/data/testnet/blockbook/db -sync -parse -rpcurl=http://127.0.0.1:18332 -httpserver=:18335 -socketio=:18336 -certfile=server/testcert -zeromq=tcp://127.0.0.1:18334 -explorer=https://testnet-bitcore1.trezor.io -coin=btc-testnet $1 +./blockbook -datadir=/data/testnet/blockbook/db -sync -parse -rpcurl=http://127.0.0.1:18332 -httpserver=:18335 -socketio=:18336 -certfile=server/testcert -zeromq=tcp://127.0.0.1:18334 -explorer=https://testnet-bitcore1.trezor.io -coin=btc-testnet $1 ``` To run blockbook with logging to file (run with nohup or daemonize or using screen) ``` diff --git a/bchain/coins/zec/zcashrpc.go b/bchain/coins/zec/zcashrpc.go index c3539a29..4f7e38db 100644 --- a/bchain/coins/zec/zcashrpc.go +++ b/bchain/coins/zec/zcashrpc.go @@ -4,15 +4,15 @@ import ( "blockbook/bchain" "blockbook/bchain/coins/btc" "blockbook/common" - "time" + "encoding/json" ) type ZCashRPC struct { *btc.BitcoinRPC } -func NewZCashRPC(url string, user string, password string, timeout time.Duration, parse bool, metrics *common.Metrics) (bchain.BlockChain, error) { - b, err := btc.NewBitcoinRPC(url, user, password, timeout, parse, metrics) +func NewZCashRPC(config json.RawMessage, pushHandler func(*bchain.MQMessage), metrics *common.Metrics) (bchain.BlockChain, error) { + b, err := btc.NewBitcoinRPC(config, pushHandler, metrics) if err != nil { return nil, err } diff --git a/bchain/types.go b/bchain/types.go index 6dd9da46..79e9a471 100644 --- a/bchain/types.go +++ b/bchain/types.go @@ -86,6 +86,8 @@ func (e *RPCError) Error() string { } type BlockChain interface { + // cleanup + Shutdown() error // chain info IsTestnet() bool GetNetworkName() string diff --git a/blockbook.go b/blockbook.go index 90627325..8543ab22 100644 --- a/blockbook.go +++ b/blockbook.go @@ -37,12 +37,9 @@ const resyncMempoolPeriodMs = 60017 const debounceResyncMempoolMs = 1009 var ( - rpcURL = flag.String("rpcurl", "http://localhost:8332", "url of blockchain RPC service") - rpcUser = flag.String("rpcuser", "rpc", "rpc username") - rpcPass = flag.String("rpcpass", "rpc", "rpc password") - rpcTimeout = flag.Uint("rpctimeout", 25, "rpc timeout in seconds") + blockchain = flag.String("blockchaincfg", "", "path to blockchain RPC service configuration json file") - dbPath = flag.String("path", "./data", "path to address index directory") + dbPath = flag.String("datadir", "./data", "path to database directory") blockFrom = flag.Int("blockheight", -1, "height of the starting block") blockUntil = flag.Int("blockuntil", -1, "height of the final block") @@ -57,7 +54,6 @@ var ( syncChunk = flag.Int("chunk", 100, "block chunk size for processing") syncWorkers = flag.Int("workers", 8, "number of workers to process blocks") dryRun = flag.Bool("dryrun", false, "do not index blocks, only download") - parse = flag.Bool("parse", false, "use in-process block parsing") httpServerBinding = flag.String("httpserver", "", "http server binding [address]:port, (default no http server)") @@ -65,8 +61,6 @@ var ( certFiles = flag.String("certfile", "", "to enable SSL specify path to certificate files without extension, expecting .crt and .key, (default no SSL)") - zeroMQBinding = flag.String("zeromq", "", "binding to zeromq, if missing no zeromq connection") - explorerURL = flag.String("explorer", "", "address of blockchain explorer") coin = flag.String("coin", "btc", "coin name (default btc)") @@ -115,7 +109,11 @@ func main() { glog.Fatal("GetMetrics: ", err) } - if chain, err = coins.NewBlockChain(*coin, *rpcURL, *rpcUser, *rpcPass, time.Duration(*rpcTimeout)*time.Second, *parse, metrics); err != nil { + if *blockchain == "" { + glog.Fatal("Missing blockchaincfg configuration parameter") + } + + if chain, err = coins.NewBlockChain(*coin, *blockchain, pushSynchronizationHandler, metrics); err != nil { glog.Fatal("rpc: ", err) } @@ -222,19 +220,6 @@ func main() { go syncMempoolLoop() } - var mq *bchain.MQ - if *zeroMQBinding != "" { - if !*synchronize { - glog.Error("zeromq connection without synchronization does not make sense, ignoring zeromq parameter") - } else { - mq, err = bchain.NewMQ(*zeroMQBinding, mqHandler) - if err != nil { - glog.Error("mq: ", err) - return - } - } - } - if *blockFrom >= 0 { if *blockUntil < 0 { *blockUntil = *blockFrom @@ -261,8 +246,8 @@ func main() { } } - if httpServer != nil || socketIoServer != nil || mq != nil { - waitForSignalAndShutdown(httpServer, socketIoServer, mq, 5*time.Second) + if httpServer != nil || socketIoServer != nil || chain != nil { + waitForSignalAndShutdown(httpServer, socketIoServer, chain, 5*time.Second) } if *synchronize { @@ -332,7 +317,7 @@ func onNewTxAddr(txid string, addr string) { } } -func mqHandler(m *bchain.MQMessage) { +func pushSynchronizationHandler(m *bchain.MQMessage) { // TODO - is coin specific, item for abstraction body := hex.EncodeToString(m.Body) glog.V(1).Infof("MQ: %s-%d %s", m.Topic, m.Sequence, body) @@ -345,7 +330,7 @@ func mqHandler(m *bchain.MQMessage) { } } -func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, mq *bchain.MQ, timeout time.Duration) { +func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketIoServer, chain bchain.BlockChain, timeout time.Duration) { sig := <-chanOsSignal ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -353,12 +338,6 @@ func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketI glog.Infof("Shutdown: %v", sig) - if mq != nil { - if err := mq.Shutdown(); err != nil { - glog.Error("MQ.Shutdown error: ", err) - } - } - if https != nil { if err := https.Shutdown(ctx); err != nil { glog.Error("HttpServer.Shutdown error: ", err) @@ -370,6 +349,12 @@ func waitForSignalAndShutdown(https *server.HTTPServer, socketio *server.SocketI glog.Error("SocketIo.Shutdown error: ", err) } } + + if chain != nil { + if err := chain.Shutdown(); err != nil { + glog.Error("BlockChain.Shutdown error: ", err) + } + } } func printResult(txid string, vout uint32, isOutput bool) error {