comments. remove async packet parsing.

This commit is contained in:
Christopher Jeffrey 2014-11-17 13:06:22 -08:00
parent c781b0c297
commit f647de9a32
2 changed files with 1 additions and 588 deletions

View File

@ -180,19 +180,6 @@ Bitcoin.prototype.start = function(options, callback) {
setInterval(function() {
var packets = bitcoindjs.hookPackets();
// bitcoindjs.hookPackets(function(err, packets) {
// if (err || !packets) {
// if (self.debug) {
// self.error('Error polling packet queue.');
// }
// return;
// }
// if (!packets.length) {
// return;
// }
// ;
// });
if (!packets) {
if (self.debug) {
self.error('Error polling packet queue.');

View File

@ -228,6 +228,7 @@ extern std::string DecodeDumpString(const std::string &str);
using namespace node;
using namespace v8;
// Need this because account names can be an empty string.
#define EMPTY ("\\x01")
/**
@ -392,14 +393,6 @@ jsblock_to_cblock(const Local<Object> jsblock, CBlock& cblock);
static inline void
jstx_to_ctx(const Local<Object> jstx, CTransaction& ctx);
#if 0
static void
async_hook_packets(uv_work_t *req);
static void
async_hook_packets_after(uv_work_t *req);
#endif
static void
hook_packets(void);
@ -589,64 +582,6 @@ struct async_dump_wallet_data {
Persistent<Function> callback;
};
/**
* async_hook_packets_data
*/
#if 0
struct async_hook_packets_data {
std::string err_msg;
Persistent<Function> callback;
// NOTE: Could use a union here
std::string strCommand;
std::string name;
int64_t received;
int64_t /*?*/ peerId;
std::string userAgent;
int64_t /*?*/ receiveVersion;
int64_t /*?*/ version;
int64_t /*?*/ height;
std::string us;
std::string address;
bool relay;
std::string services;
int64_t /*?*/ time;
int64_t /*?*/ last;
std::string ip;
int64_t /*?*/ port;
bool reachable;
packet_addresses_list addresses;
bool have;
std::string hash;
std::string type;
bool filtered;
packet_items_list items;
int64_t /*?*/ size;
std::string first;
int64_t /*?*/ fromHeight;
std::string toHash;
int64_t /*?*/ limit;
CTransaction tx;
CBlock block;
std::string nonce;
std::string expected;
int64_t /*?*/ bytes;
std::string problem;
bool finished;
std::string message;
std::string signature;
bool misbehaving;
std::string data;
bool full;
bool empty;
int64_t /*?*/ hashFuncs;
int64_t /*?*/ tweaks;
int64_t /*?*/ flags;
bool unknown;
bool connected;
};
#endif
/**
* Read Raw DB
*/
@ -2815,518 +2750,9 @@ NAN_METHOD(HookPackets) {
poll_packets_mutex.unlock();
#if 0
Local<Function> callback = Local<Function>::Cast(args[0]);
async_hook_packets_data *data = new async_hook_packets_data();
data->err_msg = std::string("");
data->callback = Persistent<Function>::New(callback);
uv_work_t *req = new uv_work_t();
req->data = data;
int status = uv_queue_work(uv_default_loop(),
req, async_hook_packets,
(uv_after_work_cb)async_hook_packets_after);
assert(status == 0);
NanReturnValue(Undefined());
#endif
NanReturnValue(obj);
}
#if 0
static void
async_hook_packets(uv_work_t *req) {
async_hook_packets_data* data = static_cast<async_hook_packets_data*>(req->data);
Local<Array> obj = NanNew<Array>();
poll_packets_list *cur = NULL;
poll_packets_list *next = NULL;
int i = 0;
poll_packets_mutex.lock();
for (cur = packets_queue_head; cur; cur = next) {
CNode *pfrom = cur->pfrom;
std::string strCommand(cur->strCommand);
CDataStream vRecv = *cur->vRecv;
int64_t nTimeReceived = cur->nTimeReceived;
async_hook_packets_data *pdata = new async_hook_packets_data();
pdata->strCommand = strCommand;
pdata->name = strCommand;
pdata->received = (int64_t)nTimeReceived;
pdata->peerId = pfrom->id;
//pdata->peerId = pfrom->GetId();
pdata->userAgent = pfrom->cleanSubVer;
if (strCommand == "version") {
// Each connection can only send one version message
if (pfrom->nVersion != 0) {
return;
}
bool fRelayTxes = false;
int nStartingHeight = 0;
int cleanSubVer = 0;
//std::string strSubVer(strdup(pfrom->strSubVer.c_str()));
std::string strSubVer = pfrom->strSubVer;
int nVersion = pfrom->nVersion;
uint64_t nServices = pfrom->nServices;
int64_t nTime;
CAddress addrMe;
CAddress addrFrom;
uint64_t nNonce = 1;
vRecv >> nVersion >> nServices >> nTime >> addrMe;
if (pfrom->nVersion < MIN_PEER_PROTO_VERSION) {
// disconnect from peers older than this proto version
return;
}
if (nVersion == 10300) {
nVersion = 300;
}
if (!vRecv.empty()) {
vRecv >> addrFrom >> nNonce;
}
if (!vRecv.empty()) {
vRecv >> LIMITED_STRING(strSubVer, 256);
//cleanSubVer = SanitizeString(strSubVer);
cleanSubVer = atoi(strSubVer.c_str());
}
if (!vRecv.empty()) {
vRecv >> nStartingHeight;
}
if (!vRecv.empty()) {
fRelayTxes = false;
} else {
fRelayTxes = true;
}
// Disconnect if we connected to ourself
if (nNonce == nLocalHostNonce && nNonce > 1) {
// XXX set pdata here
return;
}
pdata->receiveVersion = cleanSubVer;
pdata->version = nVersion;
pdata->height = nStartingHeight;
pdata->us = addrMe.ToString();
pdata->address = pfrom->addr.ToString();
pdata->relay = fRelayTxes;
} else if (pfrom->nVersion == 0) {
// Must have a version message before anything else
return;
} else if (strCommand == "verack") {
pdata->receiveVersion = min(pfrom->nVersion, PROTOCOL_VERSION);
} else if (strCommand == "addr") {
vector<CAddress> vAddr;
vRecv >> vAddr;
// Don't want addr from older versions unless seeding
if (pfrom->nVersion < CADDR_TIME_VERSION && addrman.size() > 1000) {
// XXX set pdata here
return;
}
// Bad address size
if (vAddr.size() > 1000) {
return;
}
// XXX create linked list
Local<Array> array = NanNew<Array>();
int i = 0;
// Get the new addresses
int64_t nNow = GetAdjustedTime();
BOOST_FOREACH(CAddress& addr, vAddr) {
boost::this_thread::interruption_point();
unsigned int nTime = addr.nTime;
if (nTime <= 100000000 || nTime > nNow + 10 * 60) {
nTime = nNow - 5 * 24 * 60 * 60;
}
bool fReachable = IsReachable(addr);
Local<Object> obj = NanNew<Object>();
char nServices[21] = {0};
int written = snprintf(nServices, sizeof(nServices), "%020lu", (uint64_t)addr.nServices);
assert(written == 20);
obj->services = (char *)nServices;
obj->time = (unsigned int)nTime;
obj->last = (int64_t)addr.nLastTry;
obj->ip = (std::string)addr.ToStringIP();
obj->port = (unsigned short)addr.GetPort();
obj->address = (std::string)addr.ToStringIPPort();
obj->reachable = (bool)fReachable;
array->Set(i, obj);
i++;
}
pdata->addresses = array;
} else if (strCommand == "inv") {
vector<CInv> vInv;
vRecv >> vInv;
// Bad size
if (vInv.size() > MAX_INV_SZ) {
NanReturnValue(Undefined());
}
LOCK(cs_main);
// XXX create linked list
Local<Array> array = NanNew<Array>();
int i = 0;
for (unsigned int nInv = 0; nInv < vInv.size(); nInv++) {
const CInv &inv = vInv[nInv];
boost::this_thread::interruption_point();
//bool fAlreadyHave = AlreadyHave(inv);
// Bad size
if (pfrom->nSendSize > (SendBufferSize() * 2)) {
return;
}
// XXX create linked list item
Local<Object> item = NanNew<Object>();
//item->have = fAlreadyHave;
item->hash = inv.hash.GetHex();
item->type = inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK
? "block"
: "tx";
if (inv.type == MSG_FILTERED_BLOCK) {
item->filtered = true;
} else if (inv.type == MSG_BLOCK) {
item->filtered = false;
}
array->Set(i, item);
i++;
}
pdata->items = array;
} else if (strCommand == "getdata") {
vector<CInv> vInv;
vRecv >> vInv;
// Bad size
if (vInv.size() > MAX_INV_SZ) {
return;
}
pdata->size = vInv.size();
if (vInv.size() > 0) {
pdata->first = vInv[0].ToString();
}
} else if (strCommand == "getblocks") {
CBlockLocator locator;
uint256 hashStop;
vRecv >> locator >> hashStop;
LOCK(cs_main);
// Find the last block the caller has in the main chain
CBlockIndex* pindex = FindForkInGlobalIndex(chainActive, locator);
// Send the rest of the chain
if (pindex) {
pindex = chainActive.Next(pindex);
}
pdata->fromHeight = pindex ? pindex->nHeight : -1;
pdata->toHash = hashStop == uint256(0) ? "end" : hashStop.GetHex();
pdata->limit = 500;
} else if (strCommand == "getheaders") {
CBlockLocator locator;
uint256 hashStop;
vRecv >> locator >> hashStop;
LOCK(cs_main);
CBlockIndex* pindex = NULL;
if (locator.IsNull()) {
// If locator is null, return the hashStop block
BlockMap::iterator mi = mapBlockIndex.find(hashStop);
if (mi == mapBlockIndex.end()) {
// XXX set pdata here
return;
}
pindex = (*mi).second;
} else {
// Find the last block the caller has in the main chain
pindex = FindForkInGlobalIndex(chainActive, locator);
if (pindex) {
pindex = chainActive.Next(pindex);
}
}
pdata->fromHeight = pindex ? pindex->nHeight : -1;
pdata->toHash = hashStop.GetHex();
} else if (strCommand == "tx") {
// XXX May be able to do prev_list asynchronously
// XXX Potentially check for "reject" in original code
pdata->tx = tx;
} else if (strCommand == "block" && !fImporting && !fReindex) {
// XXX May be able to do prev_list asynchronously
pdata->block = block;
} else if (strCommand == "getaddr") {
; // not much other information in getaddr as long as we know we got a getaddr
} else if (strCommand == "mempool") {
; // not much other information in getaddr as long as we know we got a getaddr
} else if (strCommand == "ping") {
if (pfrom->nVersion > BIP0031_VERSION) {
uint64_t nonce = 0;
vRecv >> nonce;
char sNonce[21] = {0};
int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)nonce);
assert(written == 20);
pdata->nonce = sNonce;
} else {
char sNonce[21] = {0};
int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)0);
assert(written == 20);
pdata->none = sNonce;
}
} else if (strCommand == "pong") {
int64_t pingUsecEnd = nTimeReceived;
uint64_t nonce = 0;
size_t nAvail = vRecv.in_avail();
bool bPingFinished = false;
std::string sProblem;
if (nAvail >= sizeof(nonce)) {
vRecv >> nonce;
// Only process pong message if there is an outstanding ping (old ping without nonce should never pong)
if (pfrom->nPingNonceSent != 0) {
if (nonce == pfrom->nPingNonceSent) {
// Matching pong received, this ping is no longer outstanding
bPingFinished = true;
int64_t pingUsecTime = pingUsecEnd - pfrom->nPingUsecStart;
if (pingUsecTime > 0) {
// Successful ping time measurement, replace previous
;
} else {
// This should never happen
sProblem = "Timing mishap";
}
} else {
// Nonce mismatches are normal when pings are overlapping
sProblem = "Nonce mismatch";
if (nonce == 0) {
// This is most likely a bug in another implementation somewhere, cancel this ping
bPingFinished = true;
sProblem = "Nonce zero";
}
}
} else {
sProblem = "Unsolicited pong without ping";
}
} else {
// This is most likely a bug in another implementation somewhere, cancel this ping
bPingFinished = true;
sProblem = "Short payload";
}
char sNonce[21] = {0};
int written = snprintf(sNonce, sizeof(sNonce), "%020lu", (uint64_t)nonce);
assert(written == 20);
char sPingNonceSent[21] = {0};
written = snprintf(sPingNonceSent, sizeof(sPingNonceSent), "%020lu", (uint64_t)pfrom->nPingNonceSent);
assert(written == 20);
pdata->expected = sPingNonceSent;
pdata->received = sNonce;
pdata->bytes = (unsigned int)nAvail;
if (!(sProblem.empty())) {
pdata->problem = sProblem;
}
if (bPingFinished) {
pdata->finished = true;
} else {
pdata->finished = false;
}
} else if (strCommand == "alert") {
CAlert alert;
vRecv >> alert;
uint256 alertHash = alert.GetHash();
pdata->hash = alertHash.GetHex();
if (pfrom->setKnown.count(alertHash) == 0) {
if (alert.ProcessAlert()) {
std::string vchMsg(alert.vchMsg.begin(), alert.vchMsg.end());
std::string vchSig(alert.vchSig.begin(), alert.vchSig.end());
pdata->message = vchMsg;
pdata->signature = vchSig;
pdata->misbehaving = false;
} else {
// Small DoS penalty so peers that send us lots of
// duplicate/expired/invalid-signature/whatever alerts
// eventually get banned.
// This isn't a Misbehaving(100) (immediate ban) because the
// peer might be an older or different implementation with
// a different signature key, etc.
pdata->misbehaving = true;
}
}
} else if (strCommand == "filterload") {
CBloomFilter filter;
vRecv >> filter;
if (!filter.IsWithinSizeConstraints()) {
// There is no excuse for sending a too-large filter
pdata->misbehaving = true;
} else {
LOCK(pfrom->cs_filter);
filter.UpdateEmptyFull();
//std::string svData(filter.vData.begin(), filter.vData.end());
//char *cvData = svData.c_str();
//int vDataHexLen = sizeof(char) * (strlen(cvData) * 2) + 1;
//char *vDataHex = (char *)malloc(vDataHexLen);
//int written = snprintf(vDataHex, vDataHexLen, "%x", cvData);
//uint64_t dataHex;
//sscanf(cvData, "%x", &dataHex);
//// assert(written == vDataHexLen);
//vDataHex[written] = '\0';
//pdata->data = vDataHex;
////free(vDataHex);
//pdata->full = filter.isFull;
//pdata->empty = filter.isEmpty;
//pdata->hashFuncs = filter.nHashFuncs;
//pdata->tweaks = filter.nTweak;
//pdata->flags = filter.nFlags;
pdata->misbehaving = false;
}
} else if (strCommand == "filteradd") {
vector<unsigned char> vData;
vRecv >> vData;
// Nodes must NEVER send a data item > 520 bytes (the max size for a script data object,
// and thus, the maximum size any matched object can have) in a filteradd message
if (vData.size() > MAX_SCRIPT_ELEMENT_SIZE) {
pdata->misbehaving = true;
} else {
LOCK(pfrom->cs_filter);
if (pfrom->pfilter) {
//std::string svData(vData.begin(), vData.end());
//char *cvData = svData.c_str();
//int vDataHexLen = sizeof(char) * (strlen(cvData) * 2) + 1;
//char *vDataHex = (char *)malloc(vDataHexLen);
//int written = snprintf(vDataHex, vDataHexLen, "%x", cvData);
//uint64_t dataHex;
//sscanf(cvData, "%x", &dataHex);
//// assert(written == vDataHexLen);
//vDataHex[written] = '\0';
//pdata->data = vDataHex;
////free(vDataHex);
pdata->misbehaving = false;
} else {
pdata->misbehaving = true;
}
}
} else if (strCommand == "filterclear") {
; // nothing much to grab from this packet
} else if (strCommand == "reject") {
; // nothing much to grab from this packet
} else {
pdata->unknown = true;
}
// Update the last seen time for this node's address
if (pfrom->fNetworkNode) {
if (strCommand == "version"
|| strCommand == "addr"
|| strCommand == "inv"
|| strCommand == "getdata"
|| strCommand == "ping") {
pdata->connected = true;
}
}
obj->Set(i, o);
i++;
if (cur == packets_queue_head) {
packets_queue_head = NULL;
}
if (cur == packets_queue_tail) {
packets_queue_tail = NULL;
}
next = cur->next;
// delete cur->pfrom; // cleaned up on disconnect
free(cur->strCommand);
delete cur->vRecv;
free(cur);
}
poll_packets_mutex.unlock();
// XXX set pdata here
return;
}
static void
async_hook_packets_after(uv_work_t *req) {
NanScope();
async_hook_packets_data* data = static_cast<async_hook_packets_data*>(req->data);
if (data->err_msg != "") {
Local<Value> err = Exception::Error(NanNew<String>(data->err_msg));
const unsigned argc = 1;
Local<Value> argv[argc] = { err };
TryCatch try_catch;
data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
} else {
Local<Array> packets = NanNew<Array>();
const unsigned argc = 2;
Local<Value> argv[argc] = {
Local<Value>::New(Null()),
Local<Value>::New(packets)
};
TryCatch try_catch;
data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
}
data->callback.Dispose();
delete data;
delete req;
}
#endif
static void
hook_packets(void) {
CNodeSignals& nodeSignals = GetNodeSignals();