log: deal with log pipes better to differentiate node and bitcoind logs.

This commit is contained in:
Christopher Jeffrey 2014-09-02 19:00:31 -07:00
parent 9d99174c9f
commit c0c523f11a

View File

@ -103,16 +103,19 @@ using namespace v8;
NAN_METHOD(StartBitcoind);
static void
async_work(uv_work_t *req);
async_start_node_work(uv_work_t *req);
static void
async_after(uv_work_t *req);
async_start_node_after(uv_work_t *req);
static int
start_node(void);
static unsigned int
parse_logs(char **);
static void
open_pipes(int **out_pipe, int **log_pipe);
static void
parse_logs(int **out_pipe, int **log_pipe);
static void
async_parse_logs(uv_work_t *req);
@ -124,14 +127,28 @@ extern "C" void
init(Handle<Object>);
/**
* async_data
* async_node_data
* Where the uv async request data resides.
*/
struct async_data {
struct async_node_data {
Persistent<Function> callback;
bool err;
std::string err_msg;
char *err_msg;
char *result;
};
/**
* async_log_data
* Where the uv async request data resides.
*/
struct async_log_data {
Persistent<Function> callback;
int **out_pipe;
int **log_pipe;
bool err;
char *err_msg;
char *result;
};
@ -148,92 +165,104 @@ NAN_METHOD(StartBitcoind) {
"Usage: bitcoind.start(callback)");
}
// Run on a separate thead:
// char *log_str;
// int log_fd = parse_logs(&log_str);
// handle->Set(NanNew<String>("log"), NanNew<Number>(log_fd));
// TODO: Init pipe/dup2 earlier so we set the
// FD on the object, updating log string dynamically.
// uv_work_t *req = new uv_work_t();
// async_data* data = new async_data();
// data->err = false;
// data->callback = Persistent<Function>::New(callback);
// req->data = data;
// int status_ = uv_queue_work(uv_default_loop(),
// req, async_parse_logs, (uv_after_work_cb)async_parse_logs_after);
// assert(status_ == 0);
Local<Function> callback = Local<Function>::Cast(args[0]);
async_data* data = new async_data();
data->err = false;
data->callback = Persistent<Function>::New(callback);
//
// Setup pipes to differentiate our logs from bitcoind's.
// Run in a separate thread.
//
uv_work_t *req = new uv_work_t();
req->data = data;
int *out_pipe = (int *)malloc(2 * sizeof(int));
int *log_pipe = (int *)malloc(2 * sizeof(int));
int status = uv_queue_work(uv_default_loop(),
req, async_work, (uv_after_work_cb)async_after);
open_pipes(&out_pipe, &log_pipe);
assert(status == 0);
// handle->Set(NanNew<String>("log_fd"), NanNew<Number>(log_pipe[1]));
uv_work_t *req_parse_logs = new uv_work_t();
async_log_data* data_parse_logs = new async_log_data();
data_parse_logs->out_pipe = &out_pipe;
data_parse_logs->log_pipe = &log_pipe;
data_parse_logs->err = false;
data_parse_logs->callback = Persistent<Function>::New(callback);
req_parse_logs->data = data_parse_logs;
int status_parse_logs = uv_queue_work(uv_default_loop(),
req_parse_logs, async_parse_logs,
(uv_after_work_cb)async_parse_logs_after);
assert(status_parse_logs == 0);
//
// Run bitcoind's StartNode() on a separate thread.
//
async_node_data* data_start_node = new async_node_data();
data_start_node->err = false;
data_start_node->callback = Persistent<Function>::New(callback);
uv_work_t *req_start_node = new uv_work_t();
req_start_node->data = data_start_node;
int status_start_node = uv_queue_work(uv_default_loop(),
req_start_node, async_start_node_work,
(uv_after_work_cb)async_start_node_after);
assert(status_start_node == 0);
NanReturnValue(Undefined());
}
/**
* async_work()
* async_start_node_work()
* Call start_node() and start all our boost threads.
*/
static void
async_work(uv_work_t *req) {
async_data* data = static_cast<async_data*>(req->data);
start_node();
data->result = (char *)strdup("opened");
async_start_node_work(uv_work_t *req) {
async_node_data* node_data = static_cast<async_node_data*>(req->data);
// start_node();
node_data->result = (char *)strdup("start_node(): bitcoind opened.");
}
/**
* async_after()
* async_start_node_after()
* Execute our callback.
*/
static void
async_after(uv_work_t *req) {
async_start_node_after(uv_work_t *req) {
NanScope();
async_data* data = static_cast<async_data*>(req->data);
async_node_data* node_data = static_cast<async_node_data*>(req->data);
if (data->err) {
Local<Value> err = Exception::Error(String::New(data->err_msg.c_str()));
if (node_data->err) {
Local<Value> err = Exception::Error(String::New(node_data->err_msg));
free(node_data->err_msg);
const unsigned argc = 1;
Local<Value> argv[1] = { err };
Local<Value> argv[argc] = { err };
TryCatch try_catch;
data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
node_data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
} else {
const unsigned argc = 2;
Local<Value> argv[2] = {
Local<Value> argv[argc] = {
Local<Value>::New(Null()),
Local<Value>::New(String::New(data->result))
Local<Value>::New(String::New(node_data->result))
};
TryCatch try_catch;
data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
node_data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
}
data->callback.Dispose();
node_data->callback.Dispose();
if (data->result != NULL) {
free(data->result);
if (node_data->result != NULL) {
free(node_data->result);
}
delete data;
delete node_data;
delete req;
}
@ -242,6 +271,7 @@ async_after(uv_work_t *req) {
* A reimplementation of AppInit2 minus
* the logging and argument parsing.
*/
static int
start_node(void) {
boost::thread_group threadGroup;
@ -269,11 +299,9 @@ start_node(void) {
}
/**
* parse_logs(log_str)'
* Flow:
* - If bitcoind logs, parse, write to pfd[0].
* - If our own logs, write to stdoutd..
* TODO: Have this running in a separate thread.
* parse_logs(int **out_pipe, int **log_pipe)
* Differentiate our logs and bitcoind's logs.
* Send bitcoind's logs to a pipe instead.
*/
const char bitcoind_char[256] = {
@ -290,24 +318,16 @@ const char bitcoind_char[256] = {
0, 0, 0, 0, 0, 0, 0,
};
static unsigned int
parse_logs(char **log_str) {
#if PARSE_LOGS_ENABLED
return 0;
#endif
int pfd[2];
pipe(pfd);
unsigned int read_fd = pfd[0];
unsigned int write_fd = pfd[1];
dup2(write_fd, STDOUT_FILENO);
dup2(write_fd, STDERR_FILENO);
int log_pipe[2];
pipe(log_pipe);
unsigned int read_log = log_pipe[0];
unsigned int write_log = log_pipe[1];
static void
open_pipes(int **out_pipe, int **log_pipe) {
pipe(*out_pipe);
dup2(*out_pipe[1], STDOUT_FILENO);
dup2(*out_pipe[1], STDERR_FILENO);
pipe(*log_pipe);
}
static void
parse_logs(int **out_pipe, int **log_pipe) {
unsigned int rtotal = 0;
ssize_t r = 0;
size_t rcount = 80 * sizeof(char);
@ -316,7 +336,7 @@ parse_logs(char **log_str) {
unsigned int cp = 0;
unsigned int reallocs = 0;
while ((r = read(read_fd, buf + rtotal, rcount))) {
while ((r = read(*out_pipe[0], buf + rtotal, rcount))) {
unsigned int i;
char *rbuf;
@ -336,8 +356,8 @@ parse_logs(char **log_str) {
ssize_t w = 0;
ssize_t wtotal = 0;
// undo redirection
close(read_fd);
close(write_fd);
close(*out_pipe[0]);
close(*out_pipe[1]);
w = write(STDOUT_FILENO, cur, cp);
wtotal += w;
while ((w = write(STDOUT_FILENO, rbuf + i + wtotal, wcount))) {
@ -345,14 +365,9 @@ parse_logs(char **log_str) {
wtotal += w;
}
// reopen redirection
{
int pfd[2];
pipe(pfd);
read_fd = pfd[0];
write_fd = pfd[1];
dup2(write_fd, STDOUT_FILENO);
dup2(write_fd, STDERR_FILENO);
}
pipe(*out_pipe);
dup2(*out_pipe[1], STDOUT_FILENO);
dup2(*out_pipe[1], STDERR_FILENO);
break;
} else if (cp == sizeof cur) {
cp = 0;
@ -368,7 +383,7 @@ parse_logs(char **log_str) {
size_t wcount = r;
ssize_t w = 0;
ssize_t wtotal = 0;
while ((w = write(write_log, rbuf + i + wtotal + 1, wcount))) {
while ((w = write(*log_pipe[1], rbuf + i + wtotal + 1, wcount))) {
if (w == 0 || (size_t)wtotal == rcount) break;
wtotal += w;
}
@ -383,47 +398,41 @@ parse_logs(char **log_str) {
}
}
if (log_str) {
buf[rtotal] = '\0';
*log_str = buf;
} else {
free(buf);
}
return read_log;
free(buf);
}
static void
async_parse_logs(uv_work_t *req) {
async_data* data = static_cast<async_data*>(req->data);
parse_logs(NULL);
data->err = true;
data->err_msg = std::string("failed");
async_log_data* log_data = static_cast<async_log_data*>(req->data);
parse_logs(log_data->out_pipe, log_data->log_pipe);
log_data->err = true;
log_data->err_msg = (char *)strdup("parse_logs(): failed.");
}
static void
async_parse_logs_after(uv_work_t *req) {
NanScope();
async_data* data = static_cast<async_data*>(req->data);
async_log_data* log_data = static_cast<async_log_data*>(req->data);
if (data->err) {
Local<Value> err = Exception::Error(String::New(data->err_msg.c_str()));
if (log_data->err) {
Local<Value> err = Exception::Error(String::New(log_data->err_msg));
free(log_data->err_msg);
const unsigned argc = 1;
Local<Value> argv[1] = { err };
Local<Value> argv[argc] = { err };
TryCatch try_catch;
data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
log_data->callback->Call(Context::GetCurrent()->Global(), argc, argv);
if (try_catch.HasCaught()) {
node::FatalException(try_catch);
}
} else {
assert(0 && "parse_logs(): should never happen.");
}
data->callback.Dispose();
if (data->result != NULL) {
free(data->result);
if (log_data->result != NULL) {
assert(0 && "parse_logs(): should never happen.");
}
delete data;
delete log_data;
delete req;
}