connector
This commit is contained in:
parent
c29877c1ae
commit
1f7334718c
@ -1,6 +1,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
|
from pybtc.functions.tools import int_to_bytes, bytes_to_int
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from setproctitle import setproctitle
|
from setproctitle import setproctitle
|
||||||
import logging
|
import logging
|
||||||
@ -9,18 +10,68 @@ import sys
|
|||||||
import aiojsonrpc
|
import aiojsonrpc
|
||||||
import traceback
|
import traceback
|
||||||
import pickle
|
import pickle
|
||||||
|
|
||||||
class BlockLoader:
|
class BlockLoader:
|
||||||
def __init__(self, parent, workers=4):
|
def __init__(self, parent, workers=4):
|
||||||
self.worker = dict()
|
self.worker = dict()
|
||||||
self.worker_busy = dict()
|
self.worker_busy = dict()
|
||||||
|
self.parent = parent
|
||||||
|
self.loading_task = None
|
||||||
self.log = parent.log
|
self.log = parent.log
|
||||||
self.loop = parent.loop
|
self.loop = parent.loop
|
||||||
self.rpc_url = parent.rpc_url
|
self.rpc_url = parent.rpc_url
|
||||||
self.rpc_timeout = parent.rpc_timeout
|
self.rpc_timeout = parent.rpc_timeout
|
||||||
self.rpc_batch_limit = parent.rpc_batch_limit
|
self.rpc_batch_limit = parent.rpc_batch_limit
|
||||||
self.loop.set_default_executor(ThreadPoolExecutor(workers * 2))
|
self.loop.set_default_executor(ThreadPoolExecutor(workers * 2))
|
||||||
|
self.watchdog_task = self.loop.create_task(self.watchdog())
|
||||||
[self.loop.create_task(self.start_worker(i)) for i in range(workers)]
|
[self.loop.create_task(self.start_worker(i)) for i in range(workers)]
|
||||||
|
|
||||||
|
async def watchdog(self):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if self.loading_task is None or self.loading_task.done():
|
||||||
|
if self.parent.deep_synchronization:
|
||||||
|
self.loading_task = self.loop.create_task(self.loading())
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
# clear tail
|
||||||
|
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.log.info("connector watchdog terminated")
|
||||||
|
break
|
||||||
|
except Exception as err:
|
||||||
|
self.log.error(str(traceback.format_exc()))
|
||||||
|
self.log.error("watchdog error %s " % err)
|
||||||
|
await asyncio.sleep(60)
|
||||||
|
|
||||||
|
|
||||||
|
async def loading(self):
|
||||||
|
target_height = self.parent.node_last_block - self.parent.self.deep_sync_limit
|
||||||
|
height = self.parent.last_block_height + 1
|
||||||
|
while height < target_height:
|
||||||
|
new_requests = 0
|
||||||
|
if self.parent.block_preload._store_size < self.parent.block_preload_cache_limit:
|
||||||
|
try:
|
||||||
|
if height <= self.parent.last_block_height:
|
||||||
|
height = self.parent.last_block_height + 1
|
||||||
|
for i in self.worker_busy:
|
||||||
|
if not self.worker_busy[i]:
|
||||||
|
self.worker_busy[i] = True
|
||||||
|
self.pipe_sent_msg(self.worker[i].writer, b'get', int_to_bytes(height))
|
||||||
|
height += self.rpc_batch_limit
|
||||||
|
new_requests += 1
|
||||||
|
if not new_requests:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.log.info("Loading task terminated")
|
||||||
|
break
|
||||||
|
except Exception as err:
|
||||||
|
self.log.error("Loading task error %s " % err)
|
||||||
|
else:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def start_worker(self,index):
|
async def start_worker(self,index):
|
||||||
self.log.warning('Start block loader worker %s' % index)
|
self.log.warning('Start block loader worker %s' % index)
|
||||||
# prepare pipes for communications
|
# prepare pipes for communications
|
||||||
@ -48,21 +99,6 @@ class BlockLoader:
|
|||||||
del self.worker[index]
|
del self.worker[index]
|
||||||
self.log.warning('Block loader worker %s is stopped' % index)
|
self.log.warning('Block loader worker %s is stopped' % index)
|
||||||
|
|
||||||
async def load_blocks(self, batch):
|
|
||||||
while True:
|
|
||||||
for i in self.worker_busy:
|
|
||||||
if not self.worker_busy[i]:
|
|
||||||
self.worker_busy[i] = True
|
|
||||||
try:
|
|
||||||
self.log.warning("<<<<<")
|
|
||||||
self.pipe_sent_msg(self.worker[i].writer, b'get', pickle.dumps(batch))
|
|
||||||
self.log.warning("ok<")
|
|
||||||
except:
|
|
||||||
self.log.warning(str(traceback.format_exc()))
|
|
||||||
finally:
|
|
||||||
self.worker_busy[i] = False
|
|
||||||
return None
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_pipe_reader(self, fd_reader):
|
async def get_pipe_reader(self, fd_reader):
|
||||||
@ -147,20 +183,16 @@ class Worker:
|
|||||||
self.loop.run_forever()
|
self.loop.run_forever()
|
||||||
|
|
||||||
async def message_loop(self):
|
async def message_loop(self):
|
||||||
self.log.critical("xxx")
|
|
||||||
try:
|
try:
|
||||||
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
|
self.rpc = aiojsonrpc.rpc(self.rpc_url, self.loop, timeout=self.rpc_timeout)
|
||||||
self.reader = await self.get_pipe_reader(self.in_reader)
|
self.reader = await self.get_pipe_reader(self.in_reader)
|
||||||
self.log.critical("reader")
|
|
||||||
while True:
|
while True:
|
||||||
self.log.critical("get pos")
|
|
||||||
msg_type, msg = await self.pipe_get_msg(self.reader)
|
msg_type, msg = await self.pipe_get_msg(self.reader)
|
||||||
self.log.critical(str(len(msg)))
|
|
||||||
if msg_type == b'pipe_read_error':
|
if msg_type == b'pipe_read_error':
|
||||||
return
|
return
|
||||||
|
|
||||||
if msg_type == b'get':
|
if msg_type == b'get':
|
||||||
self.log.critical(str(len(msg)))
|
self.log.critical(str(bytes_to_int(msg)))
|
||||||
continue
|
continue
|
||||||
except:
|
except:
|
||||||
self.log.critical("exc")
|
self.log.critical("exc")
|
||||||
@ -208,4 +240,58 @@ class Worker:
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
batch = list()
|
||||||
|
h_list = list()
|
||||||
|
while True:
|
||||||
|
batch.append(["getblockhash", height])
|
||||||
|
h_list.append(height)
|
||||||
|
if len(batch) >= self.rpc_batch_limit or height >= max_height:
|
||||||
|
height += 1
|
||||||
|
break
|
||||||
|
height += 1
|
||||||
|
result = await self.rpc.batch(batch)
|
||||||
|
h = list()
|
||||||
|
batch = list()
|
||||||
|
for lh, r in zip(h_list, result):
|
||||||
|
try:
|
||||||
|
self.block_hashes.set(lh, r["result"])
|
||||||
|
batch.append(["getblock", r["result"], 0])
|
||||||
|
h.append(lh)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
self.log.critical(">>>")
|
||||||
|
blocks = await self.block_loader.load_blocks(batch)
|
||||||
|
|
||||||
|
for x,y in zip(h,blocks):
|
||||||
|
try:
|
||||||
|
self.block_preload.set(x, y)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.log.info("connector preload_block_hashes failed")
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if processed_height < self.last_block_height:
|
||||||
|
for i in range(processed_height, self.last_block_height ):
|
||||||
|
try:
|
||||||
|
self.block_preload.remove(i)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
processed_height = self.last_block_height
|
||||||
|
if self.block_preload._store and next(iter(self.block_preload._store)) < processed_height + 1:
|
||||||
|
for i in range(next(iter(self.block_preload._store)), self.last_block_height+1):
|
||||||
|
try:
|
||||||
|
self.block_preload.remove(i)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
if self.block_preload._store_size < self.block_preload_cache_limit * 0.9:
|
||||||
|
continue
|
||||||
|
|
||||||
|
await asyncio.sleep(10)
|
||||||
|
# remove unused items
|
||||||
|
|
||||||
|
"""
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import zmq
|
|||||||
import zmq.asyncio
|
import zmq.asyncio
|
||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
|
import pickle
|
||||||
|
|
||||||
class Connector:
|
class Connector:
|
||||||
def __init__(self, node_rpc_url, node_zerromq_url, logger,
|
def __init__(self, node_rpc_url, node_zerromq_url, logger,
|
||||||
@ -282,7 +282,7 @@ class Connector:
|
|||||||
if self.node_last_block <= self.last_block_height + self.backlog:
|
if self.node_last_block <= self.last_block_height + self.backlog:
|
||||||
d = await self.rpc.getblockcount()
|
d = await self.rpc.getblockcount()
|
||||||
if d == self.node_last_block:
|
if d == self.node_last_block:
|
||||||
self.log.info("blockchain is synchronized with backlog %s" % self.backlog)
|
self.log.info("Blockchain is synchronized with backlog %s" % self.backlog)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
self.node_last_block = d
|
self.node_last_block = d
|
||||||
@ -297,21 +297,16 @@ class Connector:
|
|||||||
self.log.warning("Normal synchronization mode")
|
self.log.warning("Normal synchronization mode")
|
||||||
# clear preload caches
|
# clear preload caches
|
||||||
self.deep_synchronization = False
|
self.deep_synchronization = False
|
||||||
|
block = None
|
||||||
if self.deep_synchronization:
|
if self.deep_synchronization:
|
||||||
block = self.block_preload.pop(self.last_block_height + 1)
|
raw_block = self.block_preload.pop(self.last_block_height + 1)
|
||||||
if not block:
|
if raw_block:
|
||||||
h = self.block_hashes.pop(self.last_block_height + 1)
|
block = pickle.loads(raw_block)
|
||||||
if h is None:
|
|
||||||
h = await self.rpc.getblockhash(self.last_block_height + 1)
|
if not block:
|
||||||
if not self.block_hashes_preload_mutex:
|
|
||||||
self.loop.create_task(self.preload_blocks())
|
|
||||||
block = await self._get_block_by_hash(h)
|
|
||||||
else:
|
|
||||||
h = await self.rpc.getblockhash(self.last_block_height + 1)
|
h = await self.rpc.getblockhash(self.last_block_height + 1)
|
||||||
block = await self._get_block_by_hash(h)
|
block = await self._get_block_by_hash(h)
|
||||||
|
|
||||||
|
|
||||||
self.loop.create_task(self._new_block(block))
|
self.loop.create_task(self._new_block(block))
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
self.log.error("get next block failed %s" % str(err))
|
self.log.error("get next block failed %s" % str(err))
|
||||||
@ -656,75 +651,6 @@ class Connector:
|
|||||||
return stxo
|
return stxo
|
||||||
|
|
||||||
|
|
||||||
async def preload_blocks(self):
|
|
||||||
if self.block_hashes_preload_mutex:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
self.block_hashes_preload_mutex = True
|
|
||||||
max_height = self.node_last_block - self.deep_synchronization
|
|
||||||
height = self.last_block_height + 1
|
|
||||||
processed_height = self.last_block_height
|
|
||||||
|
|
||||||
while height < max_height:
|
|
||||||
if self.block_preload._store_size < self.block_preload_cache_limit:
|
|
||||||
try:
|
|
||||||
if height < self.last_block_height:
|
|
||||||
height = self.last_block_height + 1
|
|
||||||
batch = list()
|
|
||||||
h_list = list()
|
|
||||||
while True:
|
|
||||||
batch.append(["getblockhash", height])
|
|
||||||
h_list.append(height)
|
|
||||||
if len(batch) >= self.rpc_batch_limit or height >= max_height:
|
|
||||||
height += 1
|
|
||||||
break
|
|
||||||
height += 1
|
|
||||||
result = await self.rpc.batch(batch)
|
|
||||||
h = list()
|
|
||||||
batch = list()
|
|
||||||
for lh, r in zip(h_list, result):
|
|
||||||
try:
|
|
||||||
self.block_hashes.set(lh, r["result"])
|
|
||||||
batch.append(["getblock", r["result"], 0])
|
|
||||||
h.append(lh)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
self.log.critical(">>>")
|
|
||||||
blocks = await self.block_loader.load_blocks(batch)
|
|
||||||
|
|
||||||
for x,y in zip(h,blocks):
|
|
||||||
try:
|
|
||||||
self.block_preload.set(x, y)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
self.log.info("connector preload_block_hashes failed")
|
|
||||||
break
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if processed_height < self.last_block_height:
|
|
||||||
for i in range(processed_height, self.last_block_height ):
|
|
||||||
try:
|
|
||||||
self.block_preload.remove(i)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
processed_height = self.last_block_height
|
|
||||||
if self.block_preload._store and next(iter(self.block_preload._store)) < processed_height + 1:
|
|
||||||
for i in range(next(iter(self.block_preload._store)), self.last_block_height+1):
|
|
||||||
try:
|
|
||||||
self.block_preload.remove(i)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
if self.block_preload._store_size < self.block_preload_cache_limit * 0.9:
|
|
||||||
continue
|
|
||||||
|
|
||||||
await asyncio.sleep(10)
|
|
||||||
# remove unused items
|
|
||||||
|
|
||||||
finally:
|
|
||||||
self.block_hashes_preload_mutex = False
|
|
||||||
|
|
||||||
|
|
||||||
async def stop(self):
|
async def stop(self):
|
||||||
self.active = False
|
self.active = False
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user