From c0cd45225271766b0c2cac0fe77406453519c522 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 9 Jan 2017 20:07:53 +0900 Subject: [PATCH 1/4] Clarify README; no need to resync. --- README.rst | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/README.rst b/README.rst index cacb62a..cad07c1 100644 --- a/README.rst +++ b/README.rst @@ -151,8 +151,8 @@ Version 0.10.3 Version 0.10.2 -------------- -* Note the **NETWORK** environment variable was renamed **NET** to - bring it into line with lib/coins.py. +* The **NETWORK** environment variable was renamed **NET** to bring it + into line with lib/coins.py. * The genesis hash is now compared with the genesis hash expected by **COIN** and **NET**. This sanity check was not done previously, so you could easily be syncing to a network daemon different to what @@ -162,9 +162,11 @@ Version 0.10.2 versions of ElectrumX as long as you used an older bitcoind too, such as 0.13.0 or Bitcoin Unlimited. - **Note**: for testnet, you need to set *NET** to *testnet-segwit* if - using recent RPC incompatible core bitcoinds, or *testnet* if using - older RPC compatible bitcoinds. + **Note**: for testnet, you need to set **NET** to *testnet-segwit* + if using a recent Core bitcoind that broke RPC compatibility, or + *testnet* if using a bitcoind that maintains RPC compatibility. + Changing **NET** for Bitcoin testnet can be done dynamically; it is + not necessary to resync from scratch. Version 0.10.1 -------------- From 13aa2cc731a2138cecad1832819b1cc5383f8f66 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 10 Jan 2017 07:50:52 +0900 Subject: [PATCH 2/4] Update HOWTO --- docs/HOWTO.rst | 252 ++++++++++++++++++++++++++----------------------- 1 file changed, 135 insertions(+), 117 deletions(-) diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index 00cdb49..fc38967 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -1,12 +1,13 @@ +============= Prerequisites ============= -ElectrumX should run on any flavour of unix. I have run it +**ElectrumX** should run on any flavour of unix. I have run it successfully on MaxOSX and DragonFlyBSD. It won't run out-of-the-box on Windows, but the changes required to make it do so should be -small - patches welcome. +small - pull requests are welcome. -+ Python3: ElectrumX uses asyncio. Python version >=3.5 is required. ++ Python3: ElectrumX uses asyncio. Python version >= 3.5 is **required**. + plyvel: Python interface to LevelDB. I am using plyvel-0.9. + pylru: Python LRU cache package. I'm using 1.0.9. + aiohttp: Python library for asynchronous HTTP. ElectrumX uses it for @@ -18,30 +19,30 @@ small - patches welcome. + x11_hash: Python X11 Hash package. Only required if you use ElectrumX with Dash Mainnet or Testnet. Version 1.4 tested. -While not requirements for running ElectrumX, it is intended to be run -with supervisor software such as Daniel Bernstein's daemontools, -Gerald Pape's runit package or systemd. These make administration of secure -unix servers very easy, and I strongly recommend you install one of these -and familiarise yourself with them. The instructions below and sample -run scripts assume daemontools; adapting to runit should be trivial -for someone used to either. +While not a requirement for running ElectrumX, it is intended to be +run with supervisor software such as Daniel Bernstein's +`daemontools`_, Gerald Pape's `runit`_ package or `systemd`. These +make administration of secure unix servers very easy, and I strongly +recommend you install one of these and familiarise yourself with them. +The instructions below and sample run scripts assume `daemontools`; +adapting to `runit` should be trivial for someone used to either. When building the database form the genesis block, ElectrumX has to -flush large quantities of data to disk and to leveldb. You will have -a much nicer experience if the database directory is on an SSD than on -an HDD. Currently to around height 439,800 of the Bitcoin blockchain -the final size of the leveldb database, and other ElectrumX file -metadata comes to just over 18GB. Leveldb needs a bit more for brief -periods, and the block chain is only getting longer, so I would -recommend having at least 30-40GB free space. +flush large quantities of data to disk and its DB. You will have a +better experience if the database directory is on an SSD than on an +HDD. Currently to around height 447,100 of the Bitcoin blockchain the +final size of the leveldb database, and other ElectrumX file metadata +comes to just over 18.7GB (17.5 GiB). LevelDB needs a bit more for +brief periods, and the block chain is only getting longer, so I would +recommend having at least 30-40GB of free space before starting. Database Engine =============== You can choose from RocksDB, LevelDB or LMDB to store transaction information on disk. Currently, the fastest seems to be RocksDB with -LevelDB being about 10% slower. LMDB is slowest but that is because it -is not yet efficiently abstracted. +LevelDB being a few percent slower. LMDB is slowest but that is +because the code does not currently suit the LMDB design. You will need to install one of: @@ -59,14 +60,14 @@ Check out the code from Github:: git clone https://github.com/kyuupichan/electrumx.git cd electrumx -You can install with setup.py, or run the code from the source tree or -a copy of it. +You can install with `setup.py` or run the code from the source tree +or a copy of it. You should create a standard user account to run the server under; your own is probably adequate unless paranoid. The paranoid might also want to create another user account for the daemontools logging process. The sample scripts and these instructions assume it is all -under one account which I have called 'electrumx'. +under one account which I have called *electrumx*. Next create a directory where the database will be stored and make it writeable by the electrumx account. I recommend this directory live @@ -75,42 +76,43 @@ on an SSD:: mkdir /path/to/db_directory chown electrumx /path/to/db_directory + Process limits -------------- -You should ensure the ElectrumX process has a large open file limit. +You must ensure the ElectrumX process has a large open file limit. During sync it should not need more than about 1,024 open files. When serving it will use approximately 256 for LevelDB plus the number of incoming connections. It is not unusual to have 1,000 to 2,000 connections being served, so I suggest you set your open files limit to at least 2,500. -Note that setting the limit in your shell does NOT affect ElectrumX +Note that setting the limit in your shell does *NOT* affect ElectrumX unless you are invoking ElectrumX directly from your shell. If you -are using systemd, you need to set it in the .service file (see -samples/systemd/electrumx.service in the ElectrumX source). +are using `systemd`, you need to set it in the `.service` file (see +`samples/systemd/electrumx.service`_). Using daemontools ----------------- Next create a daemontools service directory; this only holds symlinks -(see daemontools documentation). The 'svscan' program will ensure the -servers in the directory are running by launching a 'supervise' +(see daemontools documentation). The `svscan` program will ensure the +servers in the directory are running by launching a `supervise` supervisor for the server and another for its logging process. You -can run 'svscan' under the electrumx account if that is the only one +can run `svscan` under the *electrumx* account if that is the only one involved (server and logger) otherwise it will need to run as root so that the user can be switched to electrumx. -Assuming this directory is called service, you would do one of:: +Assuming this directory is called `service`, you would do one of:: mkdir /service # If running svscan as root mkdir ~/service # As electrumx if running svscan as that a/c -Next create a directory to hold the scripts that the 'supervise' -process spawned by 'svscan' will run - this directory must be readable -by the 'svscan' process. Suppose this directory is called scripts, you -might do:: +Next create a directory to hold the scripts that the `supervise` +process spawned by `svscan` will run - this directory must be readable +by the `svscan` process. Suppose this directory is called *scripts*, +you might do:: mkdir -p ~/scripts/electrumx @@ -122,7 +124,7 @@ This copies 3 things: the top level server run script, a log/ directory with the logger run script, an env/ directory. You need to configure the environment variables under env/ to your -setup, as explained in docs/ENV-NOTES. ElectrumX server currently +setup, as explained in `ENVIRONMENT.rst`_. ElectrumX server currently takes no command line arguments; all of its configuration is taken from its environment which is set up according to env/ directory (see 'envdir' man page). Finally you need to change the log/run script to @@ -159,7 +161,7 @@ The sample unit file assumes that the repository is located at change the unit file accordingly. You need to set a few configuration variables in :code:`/etc/electrumx.conf`, -see `docs/ENV-NOTES` for the list of required variables. +see `ENVIRONMENT.rst`_ for the list of required variables. Now you can start ElectrumX using :code:`systemctl`:: @@ -169,51 +171,54 @@ You can use :code:`journalctl` to check the log output:: journalctl -u electrumx -f -Once configured, you may want to start ElectrumX at boot:: +Once configured you may want to start ElectrumX at boot:: systemctl enable electrumx -systemd is aggressive in shutting down processes. ElectrumX can need -several minutes to flush cached data to disk during sync. You should -set TimeoutStopSec to at least 10 mins in your .service file. +**Warning**: systemd is aggressive in forcibly shutting down +processes. Depending on your hardware, ElectrumX can need several +minutes to flush cached data to disk during initial sync. You should +set TimeoutStopSec to *at least* 10 mins in your `.service` file. Sync Progress ============= -Speed indexing the blockchain depends on your hardware of course. As -Python is single-threaded most of the time only 1 core is kept busy. -ElectrumX uses Python's asyncio to prefill a cache of future blocks -asynchronously; this keeps the CPU busy processing the chain and not -waiting for blocks to be delivered. I therefore doubt there will be -much boost in performance if the daemon is on the same host: indeed it -may even be beneficial to have the daemon on a separate machine so the -machine doing the indexing is focussing on the one task and not the -wider network. +Time taken to index the blockchain depends on your hardware of course. +As Python is single-threaded most of the time only 1 core is kept +busy. ElectrumX uses Python's `asyncio` to prefill a cache of future +blocks asynchronously to keep the CPU busy processing the chain +without pausing. -The HIST_MB and CACHE_MB environment variables control cache sizes -before they spill to disk; see the ENV-NOTES file under docs/. +Consequently there will probably be only a minor boost in performance +if the daemon is on the same host. It may even be beneficial to have +the daemon on a *separate* machine so the machine doing the indexing +has its caches and disk I/O tuned to that task only. + +The **CACHE_MB** environment variable controls the total cache size +ElectrumX uses; see `ENVIRONMENT.rst`_ for caveats. Here is my experience with the current codebase, to given heights and -rough wall-time:: +rough wall-time. The period from heights 363,000 to 378,000 is the +most sluggish:: - Machine A Machine B DB + Metadata - 181,000 7m 09s 0.4 GiB - 255,000 1h 02m 2.7 GiB - 289,000 1h 46m 3.3 GiB - 317,000 2h 33m - 351,000 3h 58m - 377,000 6h 06m 6.5 GiB - 403,400 8h 51m - 436,196 14h 03m 17.3 GiB + Machine A Machine B + 181,000 25m 00s 5m 30s + 283,500 1h 00m + 321,800 1h 40m + 357,000 12h 32m 2h 41m + 386,000 21h 56m 4h 25m + 414,200 1d 6h 30m + 447,168 9h 47m -Machine A: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, 8GB -RAM and a DragonFlyBSD HAMMER fileystem on an SSD. It requests blocks -over the LAN from a bitcoind on machine B. +*Machine A*: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, +8GB RAM and a DragonFlyBSD UFS fileystem on an SSD. It requests +blocks over the LAN from a bitcoind on machine B. **DB_CACHE** the +default of 1,200. LevelDB. -Machine B: a late 2012 iMac running El-Capitan 10.11.6, 2.9GHz -quad-core Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on -the same machine. HIST_MB of 350, UTXO_MB of 1,600. LevelDB. +*Machine B*: a late 2012 iMac running Sierra 10.12.2, 2.9GHz quad-core +Intel i5 CPU with an HDD and 24GB RAM. Running bitcoind on the same +machine. **DB_CACHE** set to 1,800. LevelDB. For chains other than bitcoin-mainnet sychronization should be much faster. @@ -223,26 +228,22 @@ Terminating ElectrumX ===================== The preferred way to terminate the server process is to send it the -TERM signal. For a daemontools supervised process this is best done -by bringing it down like so:: +INT or TERM signals. For a daemontools supervised process this is best +done by bringing it down like so:: svc -d ~/service/electrumx -If processing the blockchain the server will start the process of -flushing to disk. Once that is complete the server will exit. Be -patient as disk flushing can take many minutes. +ElectrumX will note receipt of the signals in the logs, and ensure the +block chain index is flushed to disk before terminating. You should +be patient as flushing data to disk can take many minutes. -ElectrumX flushes to leveldb using its transaction functionality. The -plyvel documentation claims this is atomic. I have written ElectrumX -with the intent that, to the extent this atomicity guarantee holds, -the database should not get corrupted even if the ElectrumX process if -forcibly killed or there is loss of power. The worst case is losing -unflushed in-memory blockchain processing and having to restart from -the state as of the prior successfully completed UTXO flush. - -If you do have any database corruption as a result of terminating the -process (without modifying the code) I would be interested in the -details. +ElectrumX uses the transaction functionality, with fsync enabled, of +the databases. I have written it with the intent that, to the extent +the atomicity guarantees are upheld by the DB software, the operating +system, and the hardware, the database should not get corrupted even +if the ElectrumX process if forcibly killed or there is loss of power. +The worst case should be having to restart indexing from the most +recent UTXO flush. Once the process has terminated, you can start it up again with:: @@ -252,9 +253,9 @@ You can see the status of a running service with:: svstat ~/service/electrumx -Of course, svscan can handle multiple services simultaneously from the -same service directory, such as a testnet or altcoin server. See the -man pages of these various commands for more information. +`svscan` can of course handle multiple services simultaneously from +the same service directory, such as a testnet or altcoin server. See +the man pages of these various commands for more information. Understanding the Logs @@ -266,38 +267,55 @@ You can see the logs usefully like so:: Here is typical log output on startup:: - 2016-10-14 20:22:10.747808500 Launching ElectrumX server... - 2016-10-14 20:22:13.032415500 INFO:root:ElectrumX server starting - 2016-10-14 20:22:13.032633500 INFO:root:switching current directory to /Users/neil/server-btc - 2016-10-14 20:22:13.038495500 INFO:DB:created new database Bitcoin-mainnet - 2016-10-14 20:22:13.038892500 INFO:DB:Bitcoin/mainnet height: -1 tx count: 0 flush count: 0 utxo flush count: 0 sync time: 0d 00h 00m 00s - 2016-10-14 20:22:13.038935500 INFO:DB:flushing all after cache reaches 2,000 MB - 2016-10-14 20:22:13.038978500 INFO:DB:flushing history cache at 400 MB - 2016-10-14 20:22:13.039076500 INFO:BlockCache:using RPC URL http://user:password@192.168.0.2:8332/ - 2016-10-14 20:22:13.039796500 INFO:BlockCache:catching up, block cache limit 10MB... - 2016-10-14 20:22:14.092192500 INFO:DB:cache stats at height 0 daemon height: 434,293 - 2016-10-14 20:22:14.092243500 INFO:DB: entries: UTXO: 1 DB: 0 hist count: 1 hist size: 1 - 2016-10-14 20:22:14.092288500 INFO:DB: size: 0MB (UTXOs 0MB hist 0MB) - 2016-10-14 20:22:32.302394500 INFO:UTXO:duplicate tx hash d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599 - 2016-10-14 20:22:32.310441500 INFO:UTXO:duplicate tx hash e3bf3d07d4b0375638d5f1db5255fe07ba2c4cb067cd81b84ee974b6585fb468 - 2016-10-14 20:23:14.094855500 INFO:DB:cache stats at height 125,278 daemon height: 434,293 - 2016-10-14 20:23:14.095026500 INFO:DB: entries: UTXO: 191,155 DB: 0 hist count: 543,455 hist size: 1,394,187 - 2016-10-14 20:23:14.095028500 INFO:DB: size: 172MB (UTXOs 44MB hist 128MB) + INFO:BlockProcessor:switching current directory to /crucial/server-good + INFO:BlockProcessor:using leveldb for DB backend + INFO:BlockProcessor:created new database + INFO:BlockProcessor:creating metadata diretcory + INFO:BlockProcessor:software version: ElectrumX 0.10.2 + INFO:BlockProcessor:DB version: 5 + INFO:BlockProcessor:coin: Bitcoin + INFO:BlockProcessor:network: mainnet + INFO:BlockProcessor:height: -1 + INFO:BlockProcessor:tip: 0000000000000000000000000000000000000000000000000000000000000000 + INFO:BlockProcessor:tx count: 0 + INFO:BlockProcessor:sync time so far: 0d 00h 00m 00s + INFO:BlockProcessor:reorg limit is 200 blocks + INFO:Daemon:daemon at 192.168.0.2:8332/ + INFO:BlockProcessor:flushing DB cache at 1,200 MB + INFO:Controller:RPC server listening on localhost:8000 + INFO:Prefetcher:catching up to daemon height 447,187... + INFO:Prefetcher:verified genesis block with hash 000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f + INFO:BlockProcessor:our height: 9 daemon: 447,187 UTXOs 0MB hist 0MB + INFO:BlockProcessor:our height: 52,509 daemon: 447,187 UTXOs 9MB hist 14MB + INFO:BlockProcessor:our height: 85,009 daemon: 447,187 UTXOs 12MB hist 31MB + INFO:BlockProcessor:our height: 102,384 daemon: 447,187 UTXOs 15MB hist 47MB + [...] + INFO:BlockProcessor:our height: 133,375 daemon: 447,187 UTXOs 80MB hist 222MB + INFO:BlockProcessor:our height: 134,692 daemon: 447,187 UTXOs 96MB hist 250MB + INFO:BlockProcessor:flushed to FS in 0.7s + INFO:BlockProcessor:flushed history in 16.3s for 1,124,512 addrs + INFO:BlockProcessor:flush #1 took 18.7s. Height 134,692 txs: 941,963 + INFO:BlockProcessor:tx/sec since genesis: 2,399, since last flush: 2,400 + INFO:BlockProcessor:sync time: 0d 00h 06m 32s ETA: 1d 13h 03m 42s -Under normal operation these cache stats repeat roughly every minute. -Flushes can take many minutes and look like this:: +Under normal operation these cache stats repeat once or twice a +minute. UTXO flushes can take several minutes and look like this:: - 2016-10-14 21:30:29.085479500 INFO:DB:flushing UTXOs: 22,910,848 txs and 254,753 blocks - 2016-10-14 21:32:05.383413500 INFO:UTXO:UTXO cache adds: 55,647,862 spends: 48,751,219 - 2016-10-14 21:32:05.383460500 INFO:UTXO:UTXO DB adds: 6,875,315 spends: 0. Collisions: hash168: 268 UTXO: 0 - 2016-10-14 21:32:07.056008500 INFO:DB:6,982,386 history entries in 1,708,991 addrs - 2016-10-14 21:32:08.169468500 INFO:DB:committing transaction... - 2016-10-14 21:33:17.644296500 INFO:DB:flush #11 to height 254,752 took 168s - 2016-10-14 21:33:17.644357500 INFO:DB:txs: 22,910,848 tx/sec since genesis: 5,372, since last flush: 3,447 - 2016-10-14 21:33:17.644536500 INFO:DB:sync time: 0d 01h 11m 04s ETA: 0d 11h 22m 42s + INFO:BlockProcessor:our height: 378,745 daemon: 447,332 UTXOs 1,013MB hist 184MB + INFO:BlockProcessor:our height: 378,787 daemon: 447,332 UTXOs 1,014MB hist 194MB + INFO:BlockProcessor:flushed to FS in 0.3s + INFO:BlockProcessor:flushed history in 13.4s for 934,933 addrs + INFO:BlockProcessor:flushed 6,403 blocks with 5,879,440 txs, 2,920,524 UTXO adds, 3,646,572 spends in 93.1s, committing... + INFO:BlockProcessor:flush #120 took 226.4s. Height 378,787 txs: 87,695,588 + INFO:BlockProcessor:tx/sec since genesis: 1,280, since last flush: 359 + INFO:BlockProcessor:sync time: 0d 19h 01m 06s ETA: 3d 21h 17m 52s + INFO:BlockProcessor:our height: 378,812 daemon: 447,334 UTXOs 10MB hist 10MB -After flush-to-disk you may see an aiohttp error; this is the daemon -timing out the connection while the disk flush was in progress. This -is harmless. +The ETA shown is just a rough guide and in the short term can be quite +volatile. It tends to be a little optimistic at first; once you get +to height 280,000 is should be fairly accurate. -The ETA is just a guide and can be quite volatile around flushes. +.. _`ENVIRONMENT.rst`: https://github.com/kyuupichan/electrumx/blob/master/docs/ENVIRONMENT.rst +.. _`samples/systemd/electrumx.service`: https://github.com/kyuupichan/electrumx/blob/master/samples/systemd/electrumx.service +.. _`daemontools`: http://cr.yp.to/daemontools.html +.. _`runit`: http://smarden.org/runit/index.html From 057ec09b9ef4a3d8b120b2a5bf0767b36d8becbe Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 10 Jan 2017 14:55:30 +0900 Subject: [PATCH 3/4] Update docs --- README.rst | 7 +++---- docs/HOWTO.rst | 40 +++++++++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/README.rst b/README.rst index cad07c1..acefa9a 100644 --- a/README.rst +++ b/README.rst @@ -6,11 +6,10 @@ =============================================== ElectrumX - Reimplementation of electrum-server =============================================== -:: - Licence: MIT - Author: Neil Booth - Language: Python (>=3.5) + :Licence: MIT + :Language: Python (>= 3.5) + :Author: Neil Booth Getting Started =============== diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index fc38967..11875cd 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -7,17 +7,23 @@ successfully on MaxOSX and DragonFlyBSD. It won't run out-of-the-box on Windows, but the changes required to make it do so should be small - pull requests are welcome. -+ Python3: ElectrumX uses asyncio. Python version >= 3.5 is **required**. -+ plyvel: Python interface to LevelDB. I am using plyvel-0.9. -+ pylru: Python LRU cache package. I'm using 1.0.9. -+ aiohttp: Python library for asynchronous HTTP. ElectrumX uses it for - communication with the daemon. Version >= 1.0 required; I am - using 1.0.5. -+ irc: Python IRC package. Only required if you enable IRC; ElectrumX - will happily serve clients that try to connect directly. - I use 15.0.4 but older versions likely are fine. -+ x11_hash: Python X11 Hash package. Only required if you use ElectrumX - with Dash Mainnet or Testnet. Version 1.4 tested. +================ ======================== +Package Notes +================ ======================== +Python3 ElectrumX uses asyncio. Python version >= 3.5 is **required**. +`aiohttp`_ Python library for asynchronous HTTP. Version >= + 1.0 required; I am using 1.0.5. +`pylru`_ Python LRU cache package. I'm using 1.0.9. +DB Engine I use `plyvel`_ 0.9, a Python interface to LevelDB. + A database engine package is required but others + are supported (see **Database Engine** below). +`IRC`_ Python IRC package. Only required if you enable + IRC; ElectrumX will happily serve clients that + try to connect directly. I use 15.0.4 but + older versions likely are fine. +`x11_hash`_ Only required for DASH. Python X11 Hash package. Only + required if for Dash. Version 1.4 tested. +================ ======================== While not a requirement for running ElectrumX, it is intended to be run with supervisor software such as Daniel Bernstein's @@ -41,8 +47,8 @@ Database Engine You can choose from RocksDB, LevelDB or LMDB to store transaction information on disk. Currently, the fastest seems to be RocksDB with -LevelDB being a few percent slower. LMDB is slowest but that is -because the code does not currently suit the LMDB design. +LevelDB being slightly slower. LMDB is slowest but that is because the +code needs reworking to be better usable with LMDB. You will need to install one of: @@ -208,7 +214,7 @@ most sluggish:: 321,800 1h 40m 357,000 12h 32m 2h 41m 386,000 21h 56m 4h 25m - 414,200 1d 6h 30m + 414,200 1d 12h 29m 6h 30m 447,168 9h 47m *Machine A*: a low-spec 2011 1.6GHz AMD E-350 dual-core fanless CPU, @@ -308,7 +314,7 @@ minute. UTXO flushes can take several minutes and look like this:: INFO:BlockProcessor:flushed 6,403 blocks with 5,879,440 txs, 2,920,524 UTXO adds, 3,646,572 spends in 93.1s, committing... INFO:BlockProcessor:flush #120 took 226.4s. Height 378,787 txs: 87,695,588 INFO:BlockProcessor:tx/sec since genesis: 1,280, since last flush: 359 - INFO:BlockProcessor:sync time: 0d 19h 01m 06s ETA: 3d 21h 17m 52s + INFO:BlockProcessor:sync t ime: 0d 19h 01m 06s ETA: 3d 21h 17m 52s INFO:BlockProcessor:our height: 378,812 daemon: 447,334 UTXOs 10MB hist 10MB The ETA shown is just a rough guide and in the short term can be quite @@ -319,3 +325,7 @@ to height 280,000 is should be fairly accurate. .. _`samples/systemd/electrumx.service`: https://github.com/kyuupichan/electrumx/blob/master/samples/systemd/electrumx.service .. _`daemontools`: http://cr.yp.to/daemontools.html .. _`runit`: http://smarden.org/runit/index.html +.. _`aiohttp`: https://pypi.python.org/pypi/aiohttp +.. _`pylru`: https://pypi.python.org/pypi/pylru +.. _`IRC`: https://pypi.python.org/pypi/irc +.. _`x11_hash`: https://pypi.python.org/pypi/x11_hash From 4eed43accb8a8e0fb1c59332e808725feeb52cf9 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 11 Jan 2017 06:05:31 +0900 Subject: [PATCH 4/4] Add named-argument handling as per JSON RPC 2.0 This involved a rewrite of the JSON RPC layer. I took the opportunity to clean up the handling of requests in general. It should now be easy to return nice help from the docstrings. Closes issue #99 --- electrumx_rpc.py | 2 + lib/jsonrpc.py | 296 +++++++++++++++++------------- server/controller.py | 418 +++++++++++++++++++++++++++++++++++++------ server/session.py | 368 ++++++------------------------------- 4 files changed, 589 insertions(+), 495 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index abaa330..7cc2fbb 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -33,6 +33,8 @@ class RPCClient(JSONRPC): async def send_and_wait(self, method, params, timeout=None): # Raise incoming buffer size - presumably connection is trusted self.max_buffer_size = 5000000 + if params: + params = [params] payload = self.request_payload(method, id_=method, params=params) self.encode_and_send_payload(payload) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index ccf46e1..91cb405 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -8,19 +8,30 @@ '''Class for handling JSON RPC 2.0 connections, server or client.''' import asyncio +import inspect import json import numbers import time +import traceback from lib.util import LoggedClass +class RPCError(Exception): + '''RPC handlers raise this error.''' + def __init__(self, msg, code=-1, **kw_args): + super().__init__(**kw_args) + self.msg = msg + self.code = code + + class RequestBase(object): '''An object that represents a queued request.''' def __init__(self, remaining): self.remaining = remaining + class SingleRequest(RequestBase): '''An object that represents a single request.''' @@ -62,7 +73,8 @@ class BatchRequest(RequestBase): self.parts.append(part) total_len = sum(len(part) + 2 for part in self.parts) - session.check_oversized_request(total_len) + if session.is_oversized_request(total_len): + raise RPCError('request too large', JSONRPC.INVALID_REQUEST) if not self.remaining: if self.parts: @@ -83,34 +95,31 @@ class JSONRPC(asyncio.Protocol, LoggedClass): Derived classes may want to override connection_made() and connection_lost() but should be sure to call the implementation in - this base class first. They will also want to implement some or - all of the asynchronous functions handle_notification(), - handle_response() and handle_request(). + this base class first. They may also want to implement the asynchronous + function handle_response() which by default does nothing. - handle_request() returns the result to pass over the network, and - must raise an RPCError if there is an error. - handle_notification() and handle_response() should not return - anything or raise any exceptions. All three functions have - default "ignore" implementations supplied by this class. + The functions request_handler() and notification_handler() are + passed an RPC method name, and should return an asynchronous + function to call to handle it. The functions' docstrings are used + for help, and the arguments are what can be used as JSONRPC 2.0 + named arguments (and thus become part of the external interface). + If the method is unknown return None. + + Request handlers should return a Python object to return to the + caller, or raise an RPCError on error. Notification handlers + should not return a value or raise any exceptions. ''' # See http://www.jsonrpc.org/specification PARSE_ERROR = -32700 INVALID_REQUEST = -32600 METHOD_NOT_FOUND = -32601 - INVALID_PARAMS = -32602 + INVALID_ARGS = -32602 INTERNAL_ERROR = -32603 ID_TYPES = (type(None), str, numbers.Number) NEXT_SESSION_ID = 0 - class RPCError(Exception): - '''RPC handlers raise this error.''' - def __init__(self, msg, code=-1, **kw_args): - super().__init__(**kw_args) - self.msg = msg - self.code = code - @classmethod def request_payload(cls, method, id_, params=None): payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} @@ -120,8 +129,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): @classmethod def response_payload(cls, result, id_): - # We should not respond to notifications - assert id_ is not None return {'jsonrpc': '2.0', 'result': result, 'id': id_} @classmethod @@ -133,9 +140,29 @@ class JSONRPC(asyncio.Protocol, LoggedClass): error = {'message': message, 'code': code} return {'jsonrpc': '2.0', 'error': error, 'id': id_} + @classmethod + def check_payload_id(cls, payload): + '''Extract and return the ID from the payload. + + Raises an RPCError if it is missing or invalid.''' + if not 'id' in payload: + raise RPCError('missing id', JSONRPC.INVALID_REQUEST) + + id_ = payload['id'] + if not isinstance(id_, JSONRPC.ID_TYPES): + raise RPCError('invalid id: {}'.format(id_), + JSONRPC.INVALID_REQUEST) + return id_ + @classmethod def payload_id(cls, payload): - return payload.get('id') if isinstance(payload, dict) else None + '''Extract and return the ID from the payload. + + Returns None if it is missing or invalid.''' + try: + return cls.check_payload_id(payload) + except RPCError: + return None def __init__(self): super().__init__() @@ -157,6 +184,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.send_count = 0 self.send_size = 0 self.error_count = 0 + self.close_after_send = False self.peer_info = None # Sends longer than max_send are prevented, instead returning # an oversized request error to other end of the network @@ -260,20 +288,20 @@ class JSONRPC(asyncio.Protocol, LoggedClass): message = message.decode() except UnicodeDecodeError as e: msg = 'cannot decode binary bytes: {}'.format(e) - self.send_json_error(msg, self.PARSE_ERROR, close=True) + self.send_json_error(msg, JSONRPC.PARSE_ERROR) return try: message = json.loads(message) except json.JSONDecodeError as e: msg = 'cannot decode JSON: {}'.format(e) - self.send_json_error(msg, self.PARSE_ERROR, close=True) + self.send_json_error(msg, JSONRPC.PARSE_ERROR) return if isinstance(message, list): - # Batches must have at least one request. + # Batches must have at least one object. if not message: - self.send_json_error('empty batch', self.INVALID_REQUEST) + self.send_json_error('empty batch', JSONRPC.INVALID_REQUEST) return request = BatchRequest(message) else: @@ -284,35 +312,43 @@ class JSONRPC(asyncio.Protocol, LoggedClass): if self.log_me: self.log_info('queued {}'.format(message)) + def send_json_error(self, message, code, id_=None): + '''Send a JSON error.''' + self._send_bytes(self.json_error_bytes(message, code, id_)) + def encode_payload(self, payload): + assert isinstance(payload, dict) + try: binary = json.dumps(payload).encode() except TypeError: msg = 'JSON encoding failure: {}'.format(payload) self.log_error(msg) - return self.send_json_error(msg, self.INTERNAL_ERROR, - self.payload_id(payload)) + binary = self.json_error_bytes(msg, JSONRPC.INTERNAL_ERROR, + payload.get('id')) - self.check_oversized_request(len(binary)) + if self.is_oversized_request(len(binary)): + binary = self.json_error_bytes('request too large', + JSONRPC.INVALID_REQUEST, + payload.get('id')) self.send_count += 1 self.send_size += len(binary) self.using_bandwidth(len(binary)) return binary - def _send_bytes(self, binary, close=False): + def is_oversized_request(self, total_len): + return total_len > max(1000, self.max_send) + + def _send_bytes(self, binary): '''Send JSON text over the transport. Close it if close is True.''' # Confirmed this happens, sometimes a lot if self.transport.is_closing(): return self.transport.write(binary) self.transport.write(b'\n') - if close or self.error_count > 10: + if self.close_after_send: self.close_connection() - def send_json_error(self, message, code, id_=None, close=False): - '''Send a JSON error and close the connection by default.''' - self._send_bytes(self.json_error_bytes(message, code, id_), close) - def encode_and_send_payload(self, payload): '''Encode the payload and send it.''' self._send_bytes(self.encode_payload(payload)) @@ -330,124 +366,134 @@ class JSONRPC(asyncio.Protocol, LoggedClass): return self.encode_payload(self.response_payload(result, id_)) def json_error_bytes(self, message, code, id_=None): - '''Return the bytes of a JSON error.''' + '''Return the bytes of a JSON error. + + Flag the connection to close on a fatal error or too many errors.''' self.error_count += 1 + if (code in (JSONRPC.PARSE_ERROR, JSONRPC.INVALID_REQUEST) + or self.error_count > 10): + self.close_after_send = True return self.encode_payload(self.error_payload(message, code, id_)) async def process_single_payload(self, payload): - '''Return the binary JSON result of a single JSON request, response or - notification. - - The result is empty if nothing is to be sent. - ''' + '''Handle a single JSON request, notification or response. + If it is a request, return the binary response, oterhwise None.''' if not isinstance(payload, dict): return self.json_error_bytes('request must be a dict', - self.INVALID_REQUEST) + JSONRPC.INVALID_REQUEST) + # Requests and notifications must have a method. + # Notifications are distinguished by having no 'id'. + if 'method' in payload: + if 'id' in payload: + return await self.process_single_request(payload) + else: + await self.process_single_notification(payload) + else: + await self.process_single_response(payload) + + return None + + async def process_single_request(self, payload): + '''Handle a single JSON request and return the binary response.''' try: - if not 'id' in payload: - return await self.process_json_notification(payload) - - id_ = payload['id'] - if not isinstance(id_, self.ID_TYPES): - return self.json_error_bytes('invalid id: {}'.format(id_), - self.INVALID_REQUEST) - - if 'method' in payload: - return await self.process_json_request(payload) - - return await self.process_json_response(payload) - except self.RPCError as e: + result = await self.handle_payload(payload, self.request_handler) + return self.json_response_bytes(result, payload['id']) + except RPCError as e: return self.json_error_bytes(e.msg, e.code, self.payload_id(payload)) + except Exception: + self.log_error(traceback.format_exc()) + return self.json_error_bytes('internal error processing request', + JSONRPC.INTERNAL_ERROR, + self.payload_id(payload)) - @classmethod - def method_and_params(cls, payload): + async def process_single_notification(self, payload): + '''Handle a single JSON notification.''' + try: + await self.handle_payload(payload, self.notification_handler) + except RPCError: + pass + except Exception: + self.log_error(traceback.format_exc()) + + async def process_single_response(self, payload): + '''Handle a single JSON response.''' + try: + id_ = self.check_payload_id(payload) + # Only one of result and error should exist + if 'error' in payload: + error = payload['error'] + if (not 'result' in payload and isinstance(error, dict) + and 'code' in error and 'message' in error): + await self.handle_response(None, error, id_) + elif 'result' in payload: + await self.handle_response(payload['result'], None, id_) + except RPCError: + pass + except Exception: + self.log_error(traceback.format_exc()) + + async def handle_payload(self, payload, get_handler): + '''Handle a request or notification payload given the handlers.''' + # An argument is the value passed to a function parameter... + args = payload.get('params', []) method = payload.get('method') - params = payload.get('params', []) if not isinstance(method, str): - raise cls.RPCError('invalid method: {}'.format(method), - cls.INVALID_REQUEST) + raise RPCError("invalid method: '{}'".format(method), + JSONRPC.INVALID_REQUEST) - if not isinstance(params, list): - raise cls.RPCError('params should be an array', - cls.INVALID_REQUEST) + handler = get_handler(method) + if not handler: + raise RPCError("unknown method: '{}'".format(method), + JSONRPC.METHOD_NOT_FOUND) - return method, params + if not isinstance(args, (list, dict)): + raise RPCError('arguments should be an array or a dict', + JSONRPC.INVALID_REQUEST) - async def process_json_notification(self, payload): - try: - method, params = self.method_and_params(payload) - except self.RPCError: - pass + params = inspect.signature(handler).parameters + names = list(params) + min_args = sum(p.default is p.empty for p in params.values()) + + if len(args) < min_args: + raise RPCError('too few arguments: expected {:d} got {:d}' + .format(min_args, len(args)), JSONRPC.INVALID_ARGS) + + if len(args) > len(params): + raise RPCError('too many arguments: expected {:d} got {:d}' + .format(len(params), len(args)), + JSONRPC.INVALID_ARGS) + + if isinstance(args, list): + kw_args = {name: arg for name, arg in zip(names, args)} else: - await self.handle_notification(method, params) - return b'' - - async def process_json_request(self, payload): - method, params = self.method_and_params(payload) - result = await self.handle_request(method, params) - return self.json_response_bytes(result, payload['id']) - - async def process_json_response(self, payload): - # Only one of result and error should exist; we go with 'error' - # if both are supplied. - if 'error' in payload: - await self.handle_response(None, payload['error'], payload['id']) - elif 'result' in payload: - await self.handle_response(payload['result'], None, payload['id']) - return b'' - - def check_oversized_request(self, total_len): - if total_len > max(1000, self.max_send): - raise self.RPCError('request too large', self.INVALID_REQUEST) - - def raise_unknown_method(self, method): - '''Respond to a request with an unknown method.''' - raise self.RPCError("unknown method: '{}'".format(method), - self.METHOD_NOT_FOUND) - - # Common parameter verification routines - @classmethod - def param_to_non_negative_integer(cls, param): - '''Return param if it is or can be converted to a non-negative - integer, otherwise raise an RPCError.''' - try: - param = int(param) - if param >= 0: - return param - except ValueError: - pass - - raise cls.RPCError('param {} should be a non-negative integer' - .format(param)) - - @classmethod - def params_to_non_negative_integer(cls, params): - if len(params) == 1: - return cls.param_to_non_negative_integer(params[0]) - raise cls.RPCError('params {} should contain one non-negative integer' - .format(params)) - - @classmethod - def require_empty_params(cls, params): - if params: - raise cls.RPCError('params {} should be empty'.format(params)) + kw_args = args + bad_names = ['<{}>'.format(name) for name in args + if name not in names] + if bad_names: + raise RPCError('invalid parameter names: {}' + .format(', '.join(bad_names))) + return await handler(**kw_args) # --- derived classes are intended to override these functions def enqueue_request(self, request): '''Enqueue a request for later asynchronous processing.''' raise NotImplementedError - async def handle_notification(self, method, params): - '''Handle a notification.''' + async def handle_response(self, result, error, id_): + '''Handle a JSON response. - async def handle_request(self, method, params): - '''Handle a request.''' + Should not raise an exception. Return values are ignored. + ''' + + def notification_handler(self, method): + '''Return the async handler for the given notification method.''' return None - async def handle_response(self, result, error, id_): - '''Handle a response.''' + def request_handler(self, method): + '''Return the async handler for the given request method.''' + return None diff --git a/server/controller.py b/server/controller.py index d1c4462..73585d3 100644 --- a/server/controller.py +++ b/server/controller.py @@ -6,6 +6,7 @@ # and warranty status of this software. import asyncio +import codecs import json import os import ssl @@ -16,12 +17,14 @@ from functools import partial import pylru -from lib.jsonrpc import JSONRPC, RequestBase +from lib.jsonrpc import JSONRPC, RPCError, RequestBase +from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor from server.irc import IRC from server.session import LocalRPC, ElectrumX from server.mempool import MemPool +from server.version import VERSION class Controller(util.LoggedClass): @@ -48,7 +51,9 @@ class Controller(util.LoggedClass): super().__init__() self.loop = asyncio.get_event_loop() self.start = time.time() + self.coin = env.coin self.bp = BlockProcessor(env) + self.daemon = self.bp.daemon self.mempool = MemPool(self.bp) self.irc = IRC(env) self.env = env @@ -69,10 +74,27 @@ class Controller(util.LoggedClass): self.queue = asyncio.PriorityQueue() self.delayed_sessions = [] self.next_queue_id = 0 - self.height = 0 + self.cache_height = 0 self.futures = [] env.max_send = max(350000, env.max_send) self.setup_bands() + # Set up the RPC request handlers + cmds = 'disconnect getinfo groups log peers reorg sessions'.split() + self.rpc_handlers = {cmd: getattr(self, 'rpc_' + cmd) for cmd in cmds} + # Set up the ElectrumX request handlers + rpcs = [ + ('blockchain', + 'address.get_balance address.get_history address.get_mempool ' + 'address.get_proof address.listunspent ' + 'block.get_header block.get_chunk estimatefee relayfee ' + 'transaction.get transaction.get_merkle utxo.get_address'), + ('server', + 'banner donation_address peers.subscribe version'), + ] + self.electrumx_handlers = {'.'.join([prefix, suffix]): + getattr(self, suffix.replace('.', '_')) + for prefix, suffixes in rpcs + for suffix in suffixes.split()} async def mempool_transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -167,7 +189,7 @@ class Controller(util.LoggedClass): await session.serve_requests() async def main_loop(self): - '''Server manager main loop.''' + '''Controller main loop.''' def add_future(coro): self.futures.append(asyncio.ensure_future(coro)) @@ -259,8 +281,8 @@ class Controller(util.LoggedClass): hc = self.history_cache for hashX in set(hc).intersection(touched): del hc[hashX] - if self.bp.db_height != self.height: - self.height = self.bp.db_height + if self.bp.db_height != self.cache_height: + self.cache_height = self.bp.db_height self.header_cache.clear() for session in self.sessions: @@ -280,32 +302,14 @@ class Controller(util.LoggedClass): def electrum_header(self, height): '''Return the binary header at the given height.''' if not 0 <= height <= self.bp.db_height: - raise JSONRPC.RPCError('height {:,d} out of range'.format(height)) + raise RPCError('height {:,d} out of range'.format(height)) if height in self.header_cache: return self.header_cache[height] header = self.bp.read_headers(height, 1) - header = self.env.coin.electrum_header(header, height) + header = self.coin.electrum_header(header, height) self.header_cache[height] = header return header - async def async_get_history(self, hashX): - '''Get history asynchronously to reduce latency.''' - if hashX in self.history_cache: - return self.history_cache[hashX] - - def job(): - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage - # on bloated history requests, and uses a smaller divisor - # so large requests are logged before refusing them. - limit = self.env.max_send // 97 - return list(self.bp.get_history(hashX, limit=limit)) - - loop = asyncio.get_event_loop() - history = await loop.run_in_executor(None, job) - self.history_cache[hashX] = history - return history - async def shutdown(self): '''Call to shutdown everything. Returns when done.''' self.state = self.SHUTTING_DOWN @@ -400,15 +404,6 @@ class Controller(util.LoggedClass): self.sessions[session] = new_gid self.groups[new_gid] = sessions - def new_subscription(self): - if self.subscription_count >= self.max_subs: - raise JSONRPC.RPCError('server subscription limit {:,d} reached' - .format(self.max_subs)) - self.subscription_count += 1 - - def irc_peers(self): - return self.irc.peers - def session_count(self): '''The number of connections that we've sent something to.''' return len(self.sessions) @@ -416,7 +411,7 @@ class Controller(util.LoggedClass): def server_summary(self): '''A one-line summary of server state.''' return { - 'daemon_height': self.bp.daemon.cached_height(), + 'daemon_height': self.daemon.cached_height(), 'db_height': self.bp.db_height, 'closing': len([s for s in self.sessions if s.is_closing()]), 'errors': sum(s.error_count for s in self.sessions), @@ -522,49 +517,360 @@ class Controller(util.LoggedClass): now - session.start) for session in sessions] - def lookup_session(self, param): + def lookup_session(self, session_id): try: - id_ = int(param) + session_id = int(session_id) except: pass else: for session in self.sessions: - if session.id_ == id_: + if session.id_ == session_id: return session return None - def for_each_session(self, params, operation): + def for_each_session(self, session_ids, operation): + if not isinstance(session_ids, list): + raise RPCError('expected a list of session IDs') + result = [] - for param in params: - session = self.lookup_session(param) + for session_id in session_ids: + session = self.lookup_session(session_id) if session: result.append(operation(session)) else: - result.append('unknown session: {}'.format(param)) + result.append('unknown session: {}'.format(session_id)) return result - async def rpc_disconnect(self, params): - return self.for_each_session(params, self.close_session) + # Local RPC command handlers - async def rpc_log(self, params): - return self.for_each_session(params, self.toggle_logging) + async def rpc_disconnect(self, session_ids): + '''Disconnect sesssions. - async def rpc_getinfo(self, params): + session_ids: array of session IDs + ''' + return self.for_each_session(session_ids, self.close_session) + + async def rpc_log(self, session_ids): + '''Toggle logging of sesssions. + + session_ids: array of session IDs + ''' + return self.for_each_session(session_ids, self.toggle_logging) + + async def rpc_getinfo(self): + '''Return summary information about the server process.''' return self.server_summary() - async def rpc_groups(self, params): + async def rpc_groups(self): + '''Return statistics about the session groups.''' return self.group_data() - async def rpc_sessions(self, params): + async def rpc_sessions(self): + '''Return statistics about connected sessions.''' return self.session_data(for_log=False) - async def rpc_peers(self, params): + async def rpc_peers(self): + '''Return a list of server peers, currently taken from IRC.''' return self.irc.peers - async def rpc_reorg(self, params): - '''Force a reorg of the given number of blocks, 3 by default.''' - count = 3 - if params: - count = JSONRPC.params_to_non_negative_integer(params) + async def rpc_reorg(self, count=3): + '''Force a reorg of the given number of blocks. + + count: number of blocks to reorg (default 3) + ''' + count = self.non_negative_integer(count) if not self.bp.force_chain_reorg(count): - raise JSONRPC.RPCError('still catching up with daemon') + raise RPCError('still catching up with daemon') + return 'scheduled a reorg of {:,d} blocks'.format(count) + + # Helpers for RPC "blockchain" command handlers + + def address_to_hashX(self, address): + if isinstance(address, str): + try: + return self.coin.address_to_hashX(address) + except: + pass + raise RPCError('{} is not a valid address'.format(address)) + + def to_tx_hash(self, value): + '''Raise an RPCError if the value is not a valid transaction + hash.''' + if isinstance(value, str) and len(value) == 64: + try: + bytes.fromhex(value) + return value + except ValueError: + pass + raise RPCError('{} should be a transaction hash'.format(value)) + + def non_negative_integer(self, value): + '''Return param value it is or can be converted to a non-negative + integer, otherwise raise an RPCError.''' + try: + value = int(value) + if value >= 0: + return value + except ValueError: + pass + raise RPCError('{} should be a non-negative integer'.format(value)) + + async def daemon_request(self, method, *args): + '''Catch a DaemonError and convert it to an RPCError.''' + try: + return await getattr(self.daemon, method)(*args) + except DaemonError as e: + raise RPCError('daemon error: {}'.format(e)) + + async def new_subscription(self, address): + if self.subscription_count >= self.max_subs: + raise RPCError('server subscription limit {:,d} reached' + .format(self.max_subs)) + self.subscription_count += 1 + hashX = self.address_to_hashX(address) + status = await self.address_status(hashX) + return hashX, status + + async def tx_merkle(self, tx_hash, height): + '''tx_hash is a hex string.''' + hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) + block = await self.daemon_request('deserialised_block', hex_hashes[0]) + tx_hashes = block['tx'] + try: + pos = tx_hashes.index(tx_hash) + except ValueError: + raise RPCError('tx hash {} not in block {} at height {:,d}' + .format(tx_hash, hex_hashes[0], height)) + + idx = pos + hashes = [hex_str_to_hash(txh) for txh in tx_hashes] + merkle_branch = [] + while len(hashes) > 1: + if len(hashes) & 1: + hashes.append(hashes[-1]) + idx = idx - 1 if (idx & 1) else idx + 1 + merkle_branch.append(hash_to_str(hashes[idx])) + idx //= 2 + hashes = [double_sha256(hashes[n] + hashes[n + 1]) + for n in range(0, len(hashes), 2)] + + return {"block_height": height, "merkle": merkle_branch, "pos": pos} + + async def get_balance(self, hashX): + utxos = await self.get_utxos(hashX) + confirmed = sum(utxo.value for utxo in utxos) + unconfirmed = self.mempool_value(hashX) + return {'confirmed': confirmed, 'unconfirmed': unconfirmed} + + async def unconfirmed_history(self, hashX): + # Note unconfirmed history is unordered in electrum-server + # Height is -1 if unconfirmed txins, otherwise 0 + mempool = await self.mempool_transactions(hashX) + return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} + for tx_hash, fee, unconfirmed in mempool] + + async def get_history(self, hashX): + '''Get history asynchronously to reduce latency.''' + if hashX in self.history_cache: + return self.history_cache[hashX] + + def job(): + # History DoS limit. Each element of history is about 99 + # bytes when encoded as JSON. This limits resource usage + # on bloated history requests, and uses a smaller divisor + # so large requests are logged before refusing them. + limit = self.env.max_send // 97 + return list(self.bp.get_history(hashX, limit=limit)) + + loop = asyncio.get_event_loop() + history = await loop.run_in_executor(None, job) + self.history_cache[hashX] = history + return history + + async def confirmed_and_unconfirmed_history(self, hashX): + # Note history is ordered but unconfirmed is unordered in e-s + history = await self.get_history(hashX) + conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height} + for tx_hash, height in history] + return conf + await self.unconfirmed_history(hashX) + + async def address_status(self, hashX): + '''Returns status as 32 bytes.''' + # Note history is ordered and mempool unordered in electrum-server + # For mempool, height is -1 if unconfirmed txins, otherwise 0 + history = await self.get_history(hashX) + mempool = await self.mempool_transactions(hashX) + + status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) + for tx_hash, height in history) + status += ''.join('{}:{:d}:'.format(hex_hash, -unconfirmed) + for hex_hash, tx_fee, unconfirmed in mempool) + if status: + return sha256(status.encode()).hex() + return None + + async def get_utxos(self, hashX): + '''Get UTXOs asynchronously to reduce latency.''' + def job(): + return list(self.bp.get_utxos(hashX, limit=None)) + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, job) + + def get_chunk(self, index): + '''Return header chunk as hex. Index is a non-negative integer.''' + chunk_size = self.coin.CHUNK_SIZE + next_height = self.bp.db_height + 1 + start_height = min(index * chunk_size, next_height) + count = min(next_height - start_height, chunk_size) + return self.bp.read_headers(start_height, count).hex() + + # Client RPC "blockchain" command handlers + + async def address_get_balance(self, address): + '''Return the confirmed and unconfirmed balance of an address.''' + hashX = self.address_to_hashX(address) + return await self.get_balance(hashX) + + async def address_get_history(self, address): + '''Return the confirmed and unconfirmed history of an address.''' + hashX = self.address_to_hashX(address) + return await self.confirmed_and_unconfirmed_history(hashX) + + async def address_get_mempool(self, address): + '''Return the mempool transactions touching an address.''' + hashX = self.address_to_hashX(address) + return await self.unconfirmed_history(hashX) + + async def address_get_proof(self, address): + '''Return the UTXO proof of an address.''' + hashX = self.address_to_hashX(address) + raise RPCError('address.get_proof is not yet implemented') + + async def address_listunspent(self, address): + '''Return the list of UTXOs of an address.''' + hashX = self.address_to_hashX(address) + return [{'tx_hash': hash_to_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, + 'height': utxo.height, 'value': utxo.value} + for utxo in sorted(await self.get_utxos(hashX))] + + async def block_get_chunk(self, index): + '''Return a chunk of block headers. + + index: the chunk index''' + index = self.non_negative_integer(index) + return self.get_chunk(index) + + async def block_get_header(self, height): + '''The deserialized header at a given height. + + height: the header's height''' + height = self.non_negative_integer(height) + return self.electrum_header(height) + + async def estimatefee(self, number): + '''The estimated transaction fee per kilobyte to be paid for a + transaction to be included within a certain number of blocks. + + number: the number of blocks + ''' + number = self.non_negative_integer(number) + return await self.daemon_request('estimatefee', [number]) + + async def relayfee(self): + '''The minimum fee a low-priority tx must pay in order to be accepted + to the daemon's memory pool.''' + return await self.daemon_request('relayfee') + + async def transaction_get(self, tx_hash, height=None): + '''Return the serialized raw transaction given its hash + + tx_hash: the transaction hash as a hexadecimal string + height: ignored, do not use + ''' + # For some reason Electrum passes a height. We don't require + # it in anticipation it might be dropped in the future. + tx_hash = self.to_tx_hash(tx_hash) + return await self.daemon_request('getrawtransaction', tx_hash) + + async def transaction_get_merkle(self, tx_hash, height): + '''Return the markle tree to a confirmed transaction given its hash + and height. + + tx_hash: the transaction hash as a hexadecimal string + height: the height of the block it is in + ''' + tx_hash = self.to_tx_hash(tx_hash) + height = self.non_negative_integer(height) + return await self.tx_merkle(tx_hash, height) + + async def utxo_get_address(self, tx_hash, index): + '''Returns the address sent to in a UTXO, or null if the UTXO + cannot be found. + + tx_hash: the transaction hash of the UTXO + index: the index of the UTXO in the transaction''' + # Used only for electrum client command-line requests. We no + # longer index by address, so need to request the raw + # transaction. So it works for any TXO not just UTXOs. + tx_hash = self.to_tx_hash(tx_hash) + index = self.non_negative_integer(index) + raw_tx = await self.daemon_request('getrawtransaction', tx_hash) + if not raw_tx: + return None + raw_tx = bytes.fromhex(raw_tx) + deserializer = self.coin.deserializer() + tx, tx_hash = deserializer(raw_tx).read_tx() + if index >= len(tx.outputs): + return None + return self.coin.address_from_script(tx.outputs[index].pk_script) + + # Client RPC "server" command handlers + + async def banner(self): + '''Return the server banner text.''' + banner = 'Welcome to Electrum!' + if self.env.banner_file: + try: + with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: + banner = f.read() + except Exception as e: + self.log_error('reading banner file {}: {}' + .format(self.env.banner_file, e)) + else: + network_info = await self.daemon_request('getnetworkinfo') + version = network_info['version'] + major, minor = divmod(version, 1000000) + minor, revision = divmod(minor, 10000) + revision //= 100 + version = '{:d}.{:d}.{:d}'.format(major, minor, revision) + for pair in [ + ('$VERSION', VERSION), + ('$DAEMON_VERSION', version), + ('$DAEMON_SUBVERSION', network_info['subversion']), + ('$DONATION_ADDRESS', self.env.donation_address), + ]: + banner = banner.replace(*pair) + + return banner + + async def donation_address(self): + '''Return the donation address as a string, empty if there is none.''' + return self.env.donation_address + + async def peers_subscribe(self): + '''Returns the server peers as a list of (ip, host, ports) tuples. + + Despite the name this is not currently treated as a subscription.''' + return list(self.irc.peers.values()) + + async def version(self, client_name=None, protocol_version=None): + '''Returns the server version as a string. + + client_name: a string identifying the client + protocol_version: the protocol version spoken by the client + ''' + if client_name: + self.client = str(client_name)[:15] + if protocol_version is not None: + self.protocol_version = protocol_version + return VERSION diff --git a/server/session.py b/server/session.py index 2f5d30a..d646c4f 100644 --- a/server/session.py +++ b/server/session.py @@ -9,13 +9,10 @@ import asyncio -import codecs import traceback -from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash -from lib.jsonrpc import JSONRPC +from lib.jsonrpc import JSONRPC, RPCError from server.daemon import DaemonError -from server.version import VERSION class Session(JSONRPC): @@ -26,13 +23,12 @@ class Session(JSONRPC): long-running requests should yield. ''' - def __init__(self, manager, bp, env, kind): + def __init__(self, controller, bp, env, kind): super().__init__() - self.manager = manager + self.controller = controller self.bp = bp self.env = env self.daemon = bp.daemon - self.coin = bp.coin self.kind = kind self.client = 'unknown' self.anon_logs = env.anon_logs @@ -53,7 +49,7 @@ class Session(JSONRPC): status += 'C' if self.log_me: status += 'L' - status += str(self.manager.session_priority(self)) + status += str(self.controller.session_priority(self)) return status def requests_remaining(self): @@ -63,7 +59,7 @@ class Session(JSONRPC): '''Add a request to the session's list.''' self.requests.append(request) if len(self.requests) == 1: - self.manager.enqueue_session(self) + self.controller.enqueue_session(self) async def serve_requests(self): '''Serve requests in batches.''' @@ -90,68 +86,27 @@ class Session(JSONRPC): self.requests = [req for req in self.requests if req.remaining and not req in errs] if self.requests: - self.manager.enqueue_session(self) + self.controller.enqueue_session(self) def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) - self.manager.add_session(self) + self.controller.add_session(self) def connection_lost(self, exc): '''Handle client disconnection.''' super().connection_lost(exc) - if (self.pause or self.manager.is_deprioritized(self) + if (self.pause or self.controller.is_deprioritized(self) or self.send_size >= 1024*1024 or self.error_count): self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages ' '{:,d} errors' .format(self.send_size, self.send_count, self.error_count)) - self.manager.remove_session(self) - - async def handle_request(self, method, params): - '''Handle a request.''' - handler = self.handlers.get(method) - if not handler: - self.raise_unknown_method(method) - - return await handler(params) + self.controller.remove_session(self) def sub_count(self): return 0 - async def daemon_request(self, method, *args): - '''Catch a DaemonError and convert it to an RPCError.''' - try: - return await getattr(self.daemon, method)(*args) - except DaemonError as e: - raise self.RPCError('daemon error: {}'.format(e)) - - def param_to_tx_hash(self, param): - '''Raise an RPCError if the parameter is not a valid transaction - hash.''' - if isinstance(param, str) and len(param) == 64: - try: - bytes.fromhex(param) - return param - except ValueError: - pass - raise self.RPCError('parameter should be a transaction hash: {}' - .format(param)) - - def param_to_hashX(self, param): - if isinstance(param, str): - try: - return self.coin.address_to_hashX(param) - except: - pass - raise self.RPCError('param {} is not a valid address'.format(param)) - - def params_to_hashX(self, params): - if len(params) == 1: - return self.param_to_hashX(params[0]) - raise self.RPCError('params {} should contain a single address' - .format(params)) - class ElectrumX(Session): '''A TCP server that handles incoming Electrum connections.''' @@ -163,20 +118,12 @@ class ElectrumX(Session): self.notified_height = None self.max_subs = self.env.max_session_subs self.hashX_subs = {} - rpcs = [ - ('blockchain', - 'address.get_balance address.get_history address.get_mempool ' - 'address.get_proof address.listunspent address.subscribe ' - 'block.get_header block.get_chunk estimatefee headers.subscribe ' - 'numblocks.subscribe relayfee transaction.broadcast ' - 'transaction.get transaction.get_merkle utxo.get_address'), - ('server', - 'banner donation_address peers.subscribe version'), - ] - self.handlers = {'.'.join([prefix, suffix]): - getattr(self, suffix.replace('.', '_')) - for prefix, suffixes in rpcs - for suffix in suffixes.split()} + self.electrumx_handlers = { + 'blockchain.address.subscribe': self.address_subscribe, + 'blockchain.headers.subscribe': self.headers_subscribe, + 'blockchain.numblocks.subscribe': self.numblocks_subscribe, + 'blockchain.transaction.broadcast': self.transaction_broadcast, + } def sub_count(self): return len(self.hashX_subs) @@ -191,7 +138,7 @@ class ElectrumX(Session): if self.subscribe_headers: payload = self.notification_payload( 'blockchain.headers.subscribe', - (self.manager.electrum_header(height), ), + (self.controller.electrum_header(height), ), ) self.encode_and_send_payload(payload) @@ -205,7 +152,7 @@ class ElectrumX(Session): matches = touched.intersection(self.hashX_subs) for hashX in matches: address = self.hashX_subs[hashX] - status = await self.address_status(hashX) + status = await self.controller.address_status(hashX) payload = self.notification_payload( 'blockchain.address.subscribe', (address, status)) self.encode_and_send_payload(payload) @@ -219,162 +166,44 @@ class ElectrumX(Session): def current_electrum_header(self): '''Used as response to a headers subscription request.''' - return self.manager.electrum_header(self.height()) + return self.controller.electrum_header(self.height()) - async def address_status(self, hashX): - '''Returns status as 32 bytes.''' - # Note history is ordered and mempool unordered in electrum-server - # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.manager.async_get_history(hashX) - mempool = await self.manager.mempool_transactions(hashX) - - status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) - for tx_hash, height in history) - status += ''.join('{}:{:d}:'.format(hex_hash, -unconfirmed) - for hex_hash, tx_fee, unconfirmed in mempool) - if status: - return sha256(status.encode()).hex() - return None - - async def tx_merkle(self, tx_hash, height): - '''tx_hash is a hex string.''' - hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) - block = await self.daemon_request('deserialised_block', hex_hashes[0]) - tx_hashes = block['tx'] - try: - pos = tx_hashes.index(tx_hash) - except ValueError: - raise self.RPCError('tx hash {} not in block {} at height {:,d}' - .format(tx_hash, hex_hashes[0], height)) - - idx = pos - hashes = [hex_str_to_hash(txh) for txh in tx_hashes] - merkle_branch = [] - while len(hashes) > 1: - if len(hashes) & 1: - hashes.append(hashes[-1]) - idx = idx - 1 if (idx & 1) else idx + 1 - merkle_branch.append(hash_to_str(hashes[idx])) - idx //= 2 - hashes = [double_sha256(hashes[n] + hashes[n + 1]) - for n in range(0, len(hashes), 2)] - - return {"block_height": height, "merkle": merkle_branch, "pos": pos} - - async def unconfirmed_history(self, hashX): - # Note unconfirmed history is unordered in electrum-server - # Height is -1 if unconfirmed txins, otherwise 0 - mempool = await self.manager.mempool_transactions(hashX) - return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} - for tx_hash, fee, unconfirmed in mempool] - - async def get_history(self, hashX): - # Note history is ordered but unconfirmed is unordered in e-s - history = await self.manager.async_get_history(hashX) - conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height} - for tx_hash, height in history] - - return conf + await self.unconfirmed_history(hashX) - - def get_chunk(self, index): - '''Return header chunk as hex. Index is a non-negative integer.''' - chunk_size = self.coin.CHUNK_SIZE - next_height = self.height() + 1 - start_height = min(index * chunk_size, next_height) - count = min(next_height - start_height, chunk_size) - return self.bp.read_headers(start_height, count).hex() - - async def get_utxos(self, hashX): - '''Get UTXOs asynchronously to reduce latency.''' - def job(): - return list(self.bp.get_utxos(hashX, limit=None)) - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, job) - - async def get_balance(self, hashX): - utxos = await self.get_utxos(hashX) - confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = self.manager.mempool_value(hashX) - return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - - async def list_unspent(self, hashX): - return [{'tx_hash': hash_to_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, - 'height': utxo.height, 'value': utxo.value} - for utxo in sorted(await self.get_utxos(hashX))] - - # --- blockchain commands - - async def address_get_balance(self, params): - hashX = self.params_to_hashX(params) - return await self.get_balance(hashX) - - async def address_get_history(self, params): - hashX = self.params_to_hashX(params) - return await self.get_history(hashX) - - async def address_get_mempool(self, params): - hashX = self.params_to_hashX(params) - return await self.unconfirmed_history(hashX) - - async def address_get_proof(self, params): - hashX = self.params_to_hashX(params) - raise self.RPCError('get_proof is not yet implemented') - - async def address_listunspent(self, params): - hashX = self.params_to_hashX(params) - return await self.list_unspent(hashX) - - async def address_subscribe(self, params): - hashX = self.params_to_hashX(params) - if len(self.hashX_subs) >= self.max_subs: - raise self.RPCError('your address subscription limit {:,d} reached' - .format(self.max_subs)) - result = await self.address_status(hashX) - # add_subscription can raise so call it before adding - self.manager.new_subscription() - self.hashX_subs[hashX] = params[0] - return result - - async def block_get_chunk(self, params): - index = self.params_to_non_negative_integer(params) - return self.get_chunk(index) - - async def block_get_header(self, params): - height = self.params_to_non_negative_integer(params) - return self.manager.electrum_header(height) - - async def estimatefee(self, params): - return await self.daemon_request('estimatefee', params) - - async def headers_subscribe(self, params): - self.require_empty_params(params) + async def headers_subscribe(self): + '''Subscribe to get headers of new blocks.''' self.subscribe_headers = True return self.current_electrum_header() - async def numblocks_subscribe(self, params): - self.require_empty_params(params) + async def numblocks_subscribe(self): + '''Subscribe to get height of new blocks.''' self.subscribe_height = True return self.height() - async def relayfee(self, params): - '''The minimum fee a low-priority tx must pay in order to be accepted - to the daemon's memory pool.''' - self.require_empty_params(params) - return await self.daemon_request('relayfee') + async def address_subscribe(self, address): + '''Subscribe to an address. - async def transaction_broadcast(self, params): - '''Pass through the parameters to the daemon. + address: the address to subscribe to''' + # First check our limit. + if len(self.hashX_subs) >= self.max_subs: + raise RPCError('your address subscription limit {:,d} reached' + .format(self.max_subs)) + # Now let the controller check its limit + hashX, status = await self.controller.new_subscription(address) + self.hashX_subs[hashX] = address + return status - An ugly API: current Electrum clients only pass the raw - transaction in hex and expect error messages to be returned in - the result field. And the server shouldn't be doing the client's - user interface job here. - ''' + async def transaction_broadcast(self, raw_tx): + '''Broadcast a raw transaction to the network. + + raw_tx: the raw transaction as a hexadecimal string''' + # An ugly API: current Electrum clients only pass the raw + # transaction in hex and expect error messages to be returned in + # the result field. And the server shouldn't be doing the client's + # user interface job here. try: - tx_hash = await self.daemon.sendrawtransaction(params) + tx_hash = await self.daemon.sendrawtransaction([raw_tx]) self.txs_sent += 1 self.log_info('sent tx: {}'.format(tx_hash)) - self.manager.sent_tx(tx_hash) + self.controller.sent_tx(tx_hash) return tx_hash except DaemonError as e: error = e.args[0] @@ -390,105 +219,15 @@ class ElectrumX(Session): return ( 'The transaction was rejected by network rules. ({})\n[{}]' - .format(message, params[0]) + .format(message, raw_tx) ) - async def transaction_get(self, params): - '''Return the serialized raw transaction.''' - # For some reason Electrum passes a height. Don't require it - # in anticipation it might be dropped in the future. - if 1 <= len(params) <= 2: - tx_hash = self.param_to_tx_hash(params[0]) - return await self.daemon_request('getrawtransaction', tx_hash) - - raise self.RPCError('params wrong length: {}'.format(params)) - - async def transaction_get_merkle(self, params): - if len(params) == 2: - tx_hash = self.param_to_tx_hash(params[0]) - height = self.param_to_non_negative_integer(params[1]) - return await self.tx_merkle(tx_hash, height) - - raise self.RPCError('params should contain a transaction hash ' - 'and height') - - async def utxo_get_address(self, params): - '''Returns the address for a TXO. - - Used only for electrum client command-line requests. We no - longer index by address, so need to request the raw - transaction. So it works for any TXO not just UTXOs. - ''' - if len(params) == 2: - tx_hash = self.param_to_tx_hash(params[0]) - index = self.param_to_non_negative_integer(params[1]) - raw_tx = await self.daemon_request('getrawtransaction', tx_hash) - if not raw_tx: - return None - raw_tx = bytes.fromhex(raw_tx) - deserializer = self.coin.deserializer() - tx, tx_hash = deserializer(raw_tx).read_tx() - if index >= len(tx.outputs): - return None - return self.coin.address_from_script(tx.outputs[index].pk_script) - - raise self.RPCError('params should contain a transaction hash ' - 'and index') - - # --- server commands - - async def banner(self, params): - '''Return the server banner.''' - self.require_empty_params(params) - banner = 'Welcome to Electrum!' - if self.env.banner_file: - try: - with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: - banner = f.read() - except Exception as e: - self.log_error('reading banner file {}: {}' - .format(self.env.banner_file, e)) - else: - network_info = await self.daemon.getnetworkinfo() - version = network_info['version'] - major, minor = divmod(version, 1000000) - minor, revision = divmod(minor, 10000) - revision //= 100 - version = '{:d}.{:d}.{:d}'.format(major, minor, revision) - for pair in [ - ('$VERSION', VERSION), - ('$DAEMON_VERSION', version), - ('$DAEMON_SUBVERSION', network_info['subversion']), - ('$DONATION_ADDRESS', self.env.donation_address), - ]: - banner = banner.replace(*pair) - - return banner - - async def donation_address(self, params): - '''Return the donation address as a string. - - If none is specified return the empty string. - ''' - self.require_empty_params(params) - return self.env.donation_address - - async def peers_subscribe(self, params): - '''Returns the peer (ip, host, ports) tuples. - - Despite the name electrum-server does not treat this as a - subscription. - ''' - self.require_empty_params(params) - return list(self.manager.irc_peers().values()) - - async def version(self, params): - '''Return the server version as a string.''' - if params: - self.client = str(params[0])[:15] - if len(params) > 1: - self.protocol_version = params[1] - return VERSION + def request_handler(self, method): + '''Return the async handler for the given request method.''' + handler = self.electrumx_handlers.get(method) + if not handler: + handler = self.controller.electrumx_handlers.get(method) + return handler class LocalRPC(Session): @@ -496,8 +235,9 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) - cmds = 'disconnect getinfo groups log peers reorg sessions'.split() - self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) - for cmd in cmds} self.client = 'RPC' self.max_send = 5000000 + + def request_handler(self, method): + '''Return the async handler for the given request method.''' + return self.controller.rpc_handlers.get(method)