diff --git a/src/bitcoindjs.cc b/src/bitcoindjs.cc index dd0dca05..c63941a1 100644 --- a/src/bitcoindjs.cc +++ b/src/bitcoindjs.cc @@ -103,16 +103,19 @@ using namespace v8; NAN_METHOD(StartBitcoind); static void -async_work(uv_work_t *req); +async_start_node_work(uv_work_t *req); static void -async_after(uv_work_t *req); +async_start_node_after(uv_work_t *req); static int start_node(void); -static unsigned int -parse_logs(char **); +static void +open_pipes(int **out_pipe, int **log_pipe); + +static void +parse_logs(int **out_pipe, int **log_pipe); static void async_parse_logs(uv_work_t *req); @@ -124,14 +127,28 @@ extern "C" void init(Handle); /** - * async_data + * async_node_data * Where the uv async request data resides. */ -struct async_data { +struct async_node_data { Persistent callback; bool err; - std::string err_msg; + char *err_msg; + char *result; +}; + +/** + * async_log_data + * Where the uv async request data resides. + */ + +struct async_log_data { + Persistent callback; + int **out_pipe; + int **log_pipe; + bool err; + char *err_msg; char *result; }; @@ -148,92 +165,104 @@ NAN_METHOD(StartBitcoind) { "Usage: bitcoind.start(callback)"); } - // Run on a separate thead: - // char *log_str; - // int log_fd = parse_logs(&log_str); - // handle->Set(NanNew("log"), NanNew(log_fd)); - - - // TODO: Init pipe/dup2 earlier so we set the - // FD on the object, updating log string dynamically. - // uv_work_t *req = new uv_work_t(); - // async_data* data = new async_data(); - // data->err = false; - // data->callback = Persistent::New(callback); - // req->data = data; - // int status_ = uv_queue_work(uv_default_loop(), - // req, async_parse_logs, (uv_after_work_cb)async_parse_logs_after); - // assert(status_ == 0); - - Local callback = Local::Cast(args[0]); - async_data* data = new async_data(); - data->err = false; - data->callback = Persistent::New(callback); + // + // Setup pipes to differentiate our logs from bitcoind's. + // Run in a separate thread. + // - uv_work_t *req = new uv_work_t(); - req->data = data; + int *out_pipe = (int *)malloc(2 * sizeof(int)); + int *log_pipe = (int *)malloc(2 * sizeof(int)); - int status = uv_queue_work(uv_default_loop(), - req, async_work, (uv_after_work_cb)async_after); + open_pipes(&out_pipe, &log_pipe); - assert(status == 0); + // handle->Set(NanNew("log_fd"), NanNew(log_pipe[1])); + + uv_work_t *req_parse_logs = new uv_work_t(); + async_log_data* data_parse_logs = new async_log_data(); + data_parse_logs->out_pipe = &out_pipe; + data_parse_logs->log_pipe = &log_pipe; + data_parse_logs->err = false; + data_parse_logs->callback = Persistent::New(callback); + req_parse_logs->data = data_parse_logs; + int status_parse_logs = uv_queue_work(uv_default_loop(), + req_parse_logs, async_parse_logs, + (uv_after_work_cb)async_parse_logs_after); + assert(status_parse_logs == 0); + + // + // Run bitcoind's StartNode() on a separate thread. + // + + async_node_data* data_start_node = new async_node_data(); + data_start_node->err = false; + data_start_node->callback = Persistent::New(callback); + + uv_work_t *req_start_node = new uv_work_t(); + req_start_node->data = data_start_node; + + int status_start_node = uv_queue_work(uv_default_loop(), + req_start_node, async_start_node_work, + (uv_after_work_cb)async_start_node_after); + + assert(status_start_node == 0); NanReturnValue(Undefined()); } /** - * async_work() + * async_start_node_work() * Call start_node() and start all our boost threads. */ static void -async_work(uv_work_t *req) { - async_data* data = static_cast(req->data); - start_node(); - data->result = (char *)strdup("opened"); +async_start_node_work(uv_work_t *req) { + async_node_data* node_data = static_cast(req->data); + // start_node(); + node_data->result = (char *)strdup("start_node(): bitcoind opened."); } /** - * async_after() + * async_start_node_after() * Execute our callback. */ static void -async_after(uv_work_t *req) { +async_start_node_after(uv_work_t *req) { NanScope(); - async_data* data = static_cast(req->data); + async_node_data* node_data = static_cast(req->data); - if (data->err) { - Local err = Exception::Error(String::New(data->err_msg.c_str())); + if (node_data->err) { + Local err = Exception::Error(String::New(node_data->err_msg)); + free(node_data->err_msg); const unsigned argc = 1; - Local argv[1] = { err }; + Local argv[argc] = { err }; TryCatch try_catch; - data->callback->Call(Context::GetCurrent()->Global(), argc, argv); + node_data->callback->Call(Context::GetCurrent()->Global(), argc, argv); if (try_catch.HasCaught()) { node::FatalException(try_catch); } } else { const unsigned argc = 2; - Local argv[2] = { + Local argv[argc] = { Local::New(Null()), - Local::New(String::New(data->result)) + Local::New(String::New(node_data->result)) }; TryCatch try_catch; - data->callback->Call(Context::GetCurrent()->Global(), argc, argv); + node_data->callback->Call(Context::GetCurrent()->Global(), argc, argv); if (try_catch.HasCaught()) { node::FatalException(try_catch); } } - data->callback.Dispose(); + node_data->callback.Dispose(); - if (data->result != NULL) { - free(data->result); + if (node_data->result != NULL) { + free(node_data->result); } - delete data; + delete node_data; delete req; } @@ -242,6 +271,7 @@ async_after(uv_work_t *req) { * A reimplementation of AppInit2 minus * the logging and argument parsing. */ + static int start_node(void) { boost::thread_group threadGroup; @@ -269,11 +299,9 @@ start_node(void) { } /** - * parse_logs(log_str)' - * Flow: - * - If bitcoind logs, parse, write to pfd[0]. - * - If our own logs, write to stdoutd.. - * TODO: Have this running in a separate thread. + * parse_logs(int **out_pipe, int **log_pipe) + * Differentiate our logs and bitcoind's logs. + * Send bitcoind's logs to a pipe instead. */ const char bitcoind_char[256] = { @@ -290,24 +318,16 @@ const char bitcoind_char[256] = { 0, 0, 0, 0, 0, 0, 0, }; -static unsigned int -parse_logs(char **log_str) { -#if PARSE_LOGS_ENABLED - return 0; -#endif - - int pfd[2]; - pipe(pfd); - unsigned int read_fd = pfd[0]; - unsigned int write_fd = pfd[1]; - dup2(write_fd, STDOUT_FILENO); - dup2(write_fd, STDERR_FILENO); - - int log_pipe[2]; - pipe(log_pipe); - unsigned int read_log = log_pipe[0]; - unsigned int write_log = log_pipe[1]; +static void +open_pipes(int **out_pipe, int **log_pipe) { + pipe(*out_pipe); + dup2(*out_pipe[1], STDOUT_FILENO); + dup2(*out_pipe[1], STDERR_FILENO); + pipe(*log_pipe); +} +static void +parse_logs(int **out_pipe, int **log_pipe) { unsigned int rtotal = 0; ssize_t r = 0; size_t rcount = 80 * sizeof(char); @@ -316,7 +336,7 @@ parse_logs(char **log_str) { unsigned int cp = 0; unsigned int reallocs = 0; - while ((r = read(read_fd, buf + rtotal, rcount))) { + while ((r = read(*out_pipe[0], buf + rtotal, rcount))) { unsigned int i; char *rbuf; @@ -336,8 +356,8 @@ parse_logs(char **log_str) { ssize_t w = 0; ssize_t wtotal = 0; // undo redirection - close(read_fd); - close(write_fd); + close(*out_pipe[0]); + close(*out_pipe[1]); w = write(STDOUT_FILENO, cur, cp); wtotal += w; while ((w = write(STDOUT_FILENO, rbuf + i + wtotal, wcount))) { @@ -345,14 +365,9 @@ parse_logs(char **log_str) { wtotal += w; } // reopen redirection - { - int pfd[2]; - pipe(pfd); - read_fd = pfd[0]; - write_fd = pfd[1]; - dup2(write_fd, STDOUT_FILENO); - dup2(write_fd, STDERR_FILENO); - } + pipe(*out_pipe); + dup2(*out_pipe[1], STDOUT_FILENO); + dup2(*out_pipe[1], STDERR_FILENO); break; } else if (cp == sizeof cur) { cp = 0; @@ -368,7 +383,7 @@ parse_logs(char **log_str) { size_t wcount = r; ssize_t w = 0; ssize_t wtotal = 0; - while ((w = write(write_log, rbuf + i + wtotal + 1, wcount))) { + while ((w = write(*log_pipe[1], rbuf + i + wtotal + 1, wcount))) { if (w == 0 || (size_t)wtotal == rcount) break; wtotal += w; } @@ -383,47 +398,41 @@ parse_logs(char **log_str) { } } - if (log_str) { - buf[rtotal] = '\0'; - *log_str = buf; - } else { - free(buf); - } - - return read_log; + free(buf); } static void async_parse_logs(uv_work_t *req) { - async_data* data = static_cast(req->data); - parse_logs(NULL); - data->err = true; - data->err_msg = std::string("failed"); + async_log_data* log_data = static_cast(req->data); + parse_logs(log_data->out_pipe, log_data->log_pipe); + log_data->err = true; + log_data->err_msg = (char *)strdup("parse_logs(): failed."); } static void async_parse_logs_after(uv_work_t *req) { NanScope(); - async_data* data = static_cast(req->data); + async_log_data* log_data = static_cast(req->data); - if (data->err) { - Local err = Exception::Error(String::New(data->err_msg.c_str())); + if (log_data->err) { + Local err = Exception::Error(String::New(log_data->err_msg)); + free(log_data->err_msg); const unsigned argc = 1; - Local argv[1] = { err }; + Local argv[argc] = { err }; TryCatch try_catch; - data->callback->Call(Context::GetCurrent()->Global(), argc, argv); + log_data->callback->Call(Context::GetCurrent()->Global(), argc, argv); if (try_catch.HasCaught()) { node::FatalException(try_catch); } + } else { + assert(0 && "parse_logs(): should never happen."); } - data->callback.Dispose(); - - if (data->result != NULL) { - free(data->result); + if (log_data->result != NULL) { + assert(0 && "parse_logs(): should never happen."); } - delete data; + delete log_data; delete req; }