potentially make packet parsing async. still can't figure out what is making blockchain download block the main thread.

This commit is contained in:
Christopher Jeffrey 2014-11-17 12:53:55 -08:00
parent 3f7c206bb9
commit c781b0c297

View File

@ -2840,6 +2840,456 @@ NAN_METHOD(HookPackets) {
static void
async_hook_packets(uv_work_t *req) {
async_hook_packets_data* data = static_cast<async_hook_packets_data*>(req->data);
Local<Array> obj = NanNew<Array>();
poll_packets_list *cur = NULL;
poll_packets_list *next = NULL;
int i = 0;
poll_packets_mutex.lock();
for (cur = packets_queue_head; cur; cur = next) {
CNode *pfrom = cur->pfrom;
std::string strCommand(cur->strCommand);
CDataStream vRecv = *cur->vRecv;
int64_t nTimeReceived = cur->nTimeReceived;
async_hook_packets_data *pdata = new async_hook_packets_data();
pdata->strCommand = strCommand;
pdata->name = strCommand;
pdata->received = (int64_t)nTimeReceived;
pdata->peerId = pfrom->id;
//pdata->peerId = pfrom->GetId();
pdata->userAgent = pfrom->cleanSubVer;
if (strCommand == "version") {
// Each connection can only send one version message
if (pfrom->nVersion != 0) {
return;
}
bool fRelayTxes = false;
int nStartingHeight = 0;
int cleanSubVer = 0;
//std::string strSubVer(strdup(pfrom->strSubVer.c_str()));
std::string strSubVer = pfrom->strSubVer;
int nVersion = pfrom->nVersion;
uint64_t nServices = pfrom->nServices;
int64_t nTime;
CAddress addrMe;
CAddress addrFrom;
uint64_t nNonce = 1;
vRecv >> nVersion >> nServices >> nTime >> addrMe;
if (pfrom->nVersion < MIN_PEER_PROTO_VERSION) {
// disconnect from peers older than this proto version
return;
}
if (nVersion == 10300) {
nVersion = 300;
}
if (!vRecv.empty()) {
vRecv >> addrFrom >> nNonce;
}
if (!vRecv.empty()) {
vRecv >> LIMITED_STRING(strSubVer, 256);
//cleanSubVer = SanitizeString(strSubVer);
cleanSubVer = atoi(strSubVer.c_str());
}
if (!vRecv.empty()) {
vRecv >> nStartingHeight;
}
if (!vRecv.empty()) {
fRelayTxes = false;
} else {
fRelayTxes = true;
}
// Disconnect if we connected to ourself
if (nNonce == nLocalHostNonce && nNonce > 1) {
// XXX set pdata here
return;
}
pdata->receiveVersion = cleanSubVer;
pdata->version = nVersion;
pdata->height = nStartingHeight;
pdata->us = addrMe.ToString();
pdata->address = pfrom->addr.ToString();
pdata->relay = fRelayTxes;
} else if (pfrom->nVersion == 0) {
// Must have a version message before anything else
return;
} else if (strCommand == "verack") {
pdata->receiveVersion = min(pfrom->nVersion, PROTOCOL_VERSION);
} else if (strCommand == "addr") {
vector<CAddress> vAddr;
vRecv >> vAddr;
// Don't want addr from older versions unless seeding
if (pfrom->nVersion < CADDR_TIME_VERSION && addrman.size() > 1000) {
// XXX set pdata here
return;
}
// Bad address size
if (vAddr.size() > 1000) {
return;
}
// XXX create linked list
Local<Array> array = NanNew<Array>();
int i = 0;
// Get the new addresses
int64_t nNow = GetAdjustedTime();
BOOST_FOREACH(CAddress& addr, vAddr) {
boost::this_thread::interruption_point();
unsigned int nTime = addr.nTime;
if (nTime <= 100000000 || nTime > nNow + 10 * 60) {
nTime = nNow - 5 * 24 * 60 * 60;
}
bool fReachable = IsReachable(addr);
Local<Object> obj = NanNew<Object>();
char nServices[21] = {0};
int written = snprintf(nServices, sizeof(nServices), "%020lu", (uint64_t)addr.nServices);
assert(written == 20);
obj->services = (char *)nServices;
obj->time = (unsigned int)nTime;
obj->last = (int64_t)addr.nLastTry;
obj->ip = (std::string)addr.ToStringIP();
obj->port = (unsigned short)addr.GetPort();
obj->address = (std::string)addr.ToStringIPPort();
obj->reachable = (bool)fReachable;
array->Set(i, obj);
i++;
}
pdata->addresses = array;
} else if (strCommand == "inv") {
vector<CInv> vInv;
vRecv >> vInv;
// Bad size
if (vInv.size() > MAX_INV_SZ) {
NanReturnValue(Undefined());
}
LOCK(cs_main);
// XXX create linked list
Local<Array> array = NanNew<Array>();
int i = 0;
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
const CInv &inv = vInv[nInv];
boost::this_thread::interruption_point();
//bool fAlreadyHave = AlreadyHave(inv);
// Bad size
if (pfrom->nSendSize > (SendBufferSize() * 2)) {
return;
}
// XXX create linked list item
Local<Object> item = NanNew<Object>();
//item->have = fAlreadyHave;
item->hash = inv.hash.GetHex();
item->type = inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK
? "block"
: "tx";
if (inv.type == MSG_FILTERED_BLOCK) {
item->filtered = true;
} else if (inv.type == MSG_BLOCK) {
item->filtered = false;
}
array->Set(i, item);
i++;
}
pdata->items = array;
} else if (strCommand == "getdata") {
vector<CInv> vInv;
vRecv >> vInv;
// Bad size
if (vInv.size() > MAX_INV_SZ) {
return;
}
pdata->size = vInv.size();
if (vInv.size() > 0) {
pdata->first = vInv[0].ToString();
}
} else if (strCommand == "getblocks") {
CBlockLocator locator;
uint256 hashStop;
vRecv >> locator >> hashStop;
LOCK(cs_main);
// Find the last block the caller has in the main chain
CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator);
// Send the rest of the chain
if (pindex) {
pindex = chainActive.Next(pindex);
}
pdata->fromHeight = pindex ? pindex->nHeight : -1;
pdata->toHash = hashStop == uint256(0) ? "end" : hashStop.GetHex();
pdata->limit = 500;
} else if (strCommand == "getheaders") {
CBlockLocator locator;
uint256 hashStop;
vRecv >> locator >> hashStop;
LOCK(cs_main);
CBlockIndex* pindex = NULL;
if (locator.IsNull()) {
// If locator is null, return the hashStop block
BlockMap::iterator mi = mapBlockIndex.find(hashStop);
if (mi == mapBlockIndex.end()) {
// XXX set pdata here
return;
}
pindex = (*mi).second;
} else {
// Find the last block the caller has in the main chain
pindex = FindForkInGlobalIndex(chainActive, locator);
if (pindex) {
pindex = chainActive.Next(pindex);
}
}
pdata->fromHeight = pindex ? pindex->nHeight : -1;
pdata->toHash = hashStop.GetHex();
} else if (strCommand == "tx") {
// XXX May be able to do prev_list asynchronously
// XXX Potentially check for "reject" in original code
pdata->tx = tx;
} else if (strCommand == "block" && !fImporting && !fReindex) {
// XXX May be able to do prev_list asynchronously
pdata->block = block;
} else if (strCommand == "getaddr") {
; // not much other information in getaddr as long as we know we got a getaddr
} else if (strCommand == "mempool") {
; // not much other information in getaddr as long as we know we got a getaddr
} else if (strCommand == "ping") {
if (pfrom->nVersion > BIP0031_VERSION) {
uint64_t nonce = 0;
vRecv >> nonce;
char sNonce[21] = {0};
int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)nonce);
assert(written == 20);
pdata->nonce = sNonce;
} else {
char sNonce[21] = {0};
int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)0);
assert(written == 20);
pdata->none = sNonce;
}
} else if (strCommand == "pong") {
int64_t pingUsecEnd = nTimeReceived;
uint64_t nonce = 0;
size_t nAvail = vRecv.in_avail();
bool bPingFinished = false;
std::string sProblem;
if (nAvail >= sizeof(nonce)) {
vRecv >> nonce;
// Only process pong message if there is an outstanding ping (old ping without nonce should never pong)
if (pfrom->nPingNonceSent != 0) {
if (nonce == pfrom->nPingNonceSent) {
// Matching pong received, this ping is no longer outstanding
bPingFinished = true;
int64_t pingUsecTime = pingUsecEnd - pfrom->nPingUsecStart;
if (pingUsecTime > 0) {
// Successful ping time measurement, replace previous
;
} else {
// This should never happen
sProblem = "Timing mishap";
}
} else {
// Nonce mismatches are normal when pings are overlapping
sProblem = "Nonce mismatch";
if (nonce == 0) {
// This is most likely a bug in another implementation somewhere, cancel this ping
bPingFinished = true;
sProblem = "Nonce zero";
}
}
} else {
sProblem = "Unsolicited pong without ping";
}
} else {
// This is most likely a bug in another implementation somewhere, cancel this ping
bPingFinished = true;
sProblem = "Short payload";
}
char sNonce[21] = {0};
int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)nonce);
assert(written == 20);
char sPingNonceSent[21] = {0};
written = snprintf(sPingNonceSent, sizeof(sPingNonceSent), "%020lu", (uint64_t)pfrom->nPingNonceSent);
assert(written == 20);
pdata->expected = sPingNonceSent;
pdata->received = sNonce;
pdata->bytes = (unsigned int)nAvail;
if (!(sProblem.empty())) {
pdata->problem = sProblem;
}
if (bPingFinished) {
pdata->finished = true;
} else {
pdata->finished = false;
}
} else if (strCommand == "alert") {
CAlert alert;
vRecv >> alert;
uint256 alertHash = alert.GetHash();
pdata->hash = alertHash.GetHex();
if (pfrom->setKnown.count(alertHash) == 0) {
if (alert.ProcessAlert()) {
std::string vchMsg(alert.vchMsg.begin(), alert.vchMsg.end());
std::string vchSig(alert.vchSig.begin(), alert.vchSig.end());
pdata->message = vchMsg;
pdata->signature = vchSig;
pdata->misbehaving = false;
} else {
// Small DoS penalty so peers that send us lots of
// duplicate/expired/invalid-signature/whatever alerts
// eventually get banned.
// This isn't a Misbehaving(100) (immediate ban) because the
// peer might be an older or different implementation with
// a different signature key, etc.
pdata->misbehaving = true;
}
}
} else if (strCommand == "filterload") {
CBloomFilter filter;
vRecv >> filter;
if (!filter.IsWithinSizeConstraints()) {
// There is no excuse for sending a too-large filter
pdata->misbehaving = true;
} else {
LOCK(pfrom->cs_filter);
filter.UpdateEmptyFull();
//std::string svData(filter.vData.begin(), filter.vData.end());
//char *cvData = svData.c_str();
//int vDataHexLen = sizeof(char) * (strlen(cvData) * 2) + 1;
//char *vDataHex = (char *)malloc(vDataHexLen);
//int written = snprintf(vDataHex, vDataHexLen, "%x", cvData);
//uint64_t dataHex;
//sscanf(cvData, "%x", &dataHex);
//// assert(written == vDataHexLen);
//vDataHex[written] = '\0';
//pdata->data = vDataHex;
////free(vDataHex);
//pdata->full = filter.isFull;
//pdata->empty = filter.isEmpty;
//pdata->hashFuncs = filter.nHashFuncs;
//pdata->tweaks = filter.nTweak;
//pdata->flags = filter.nFlags;
pdata->misbehaving = false;
}
} else if (strCommand == "filteradd") {
vector<unsigned char> vData;
vRecv >> vData;
// Nodes must NEVER send a data item > 520 bytes (the max size for a script data object,
// and thus, the maximum size any matched object can have) in a filteradd message
if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) {
pdata->misbehaving = true;
} else {
LOCK(pfrom->cs_filter);
if (pfrom->pfilter) {
//std::string svData(vData.begin(), vData.end());
//char *cvData = svData.c_str();
//int vDataHexLen = sizeof(char) * (strlen(cvData) * 2) + 1;
//char *vDataHex = (char *)malloc(vDataHexLen);
//int written = snprintf(vDataHex, vDataHexLen, "%x", cvData);
//uint64_t dataHex;
//sscanf(cvData, "%x", &dataHex);
//// assert(written == vDataHexLen);
//vDataHex[written] = '\0';
//pdata->data = vDataHex;
////free(vDataHex);
pdata->misbehaving = false;
} else {
pdata->misbehaving = true;
}
}
} else if (strCommand == "filterclear") {
; // nothing much to grab from this packet
} else if (strCommand == "reject") {
; // nothing much to grab from this packet
} else {
pdata->unknown = true;
}
// Update the last seen time for this node's address
if (pfrom->fNetworkNode) {
if (strCommand == "version"
|| strCommand == "addr"
|| strCommand == "inv"
|| strCommand == "getdata"
|| strCommand == "ping") {
pdata->connected = true;
}
}
obj->Set(i, o);
i++;
if (cur == packets_queue_head) {
packets_queue_head = NULL;
}
if (cur == packets_queue_tail) {
packets_queue_tail = NULL;
}
next = cur->next;
// delete cur->pfrom; // cleaned up on disconnect
free(cur->strCommand);
delete cur->vRecv;
free(cur);
}
poll_packets_mutex.unlock();
// XXX set pdata here
return;
}
static void