AddressService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) {
  var self = this;

  var txs = block.transactions;
  var height = block.__height;

  var action = 'put';
  var reverseAction = 'del';
  if (!connectBlock) {
    action = 'del';
    reverseAction = 'put';
  }

  var operations = [];

  for(var i = 0; i < txs.length; i++) {

    var tx = txs[i];
    var txid = tx.id;
    var inputs = tx.inputs;
    var outputs = tx.outputs;

    // Subscription messages
    var txmessages = {};

    var outputLength = outputs.length;
    for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) {
      var output = outputs[outputIndex];

      var script = output.script;

      if(!script) {
        log.debug('Invalid script');
        continue;
      }

      var address = self.getAddressString(script);
      if(!address) {
        continue;
      }

      var key = self._encoding.encodeAddressIndexKey(address, height, txid);
      operations.push({
        type: action,
        key: key
      });

      // Collect data for subscribers
      if (txmessages[address]) {
        txmessages[address].outputIndexes.push(outputIndex);
      } else {
        txmessages[address] = {
          tx: tx,
          height: height,
          outputIndexes: [outputIndex],
          address: address,
          timestamp: block.header.timestamp
        };
      }
    }

    if(tx.isCoinbase()) {
      continue;
    }

    //TODO deal with P2PK
    for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) {
      var input = inputs[inputIndex];

      if(!input.script) {
        log.debug('Invalid script');
        continue;
      }

      var inputAddress = self.getAddressString(input.script);

      if(!inputAddress) {
        continue;
      }

      var inputKey = self._encoding.encodeAddressIndexKey(inputAddress, height, txid);

      operations.push({
        type: action,
        key: inputKey
      });

    }
  }
  setImmediate(function() {
    callback(null, operations);
  });
};

AddressService.prototype.blockHandler = function(block, connectBlock, callback) {
  var self = this;

  var txs = block.transactions;

  var action = 'put';
  var reverseAction = 'del';
  if (!connectBlock) {
    action = 'del';
    reverseAction = 'put';
  }

  var operations = [];

  async.eachSeries(txs, function(tx, next) {
    var txid = tx.id;
    var inputs = tx.inputs;
    var outputs = tx.outputs;

    var outputLength = outputs.length;
    for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) {
      var output = outputs[outputIndex];

      var script = output.script;

      if(!script) {
        log.debug('Invalid script');
        continue;
      }

      var address = self.getAddressString(script);

      if(!address) {
        continue;
      }

      var key = self._encoding.encodeUtxoIndexKey(address, txid, outputIndex);
      var value = self._encoding.encodeUtxoIndexValue(block.__height, output.satoshis, output._scriptBuffer);
      operations.push({
        type: action,
        key: key,
        value: value
      });

    }

    if(tx.isCoinbase()) {
      return next();
    }

    //TODO deal with P2PK
    async.each(inputs, function(input, next) {
      if(!input.script) {
        log.debug('Invalid script');
        return next();
      }

      var inputAddress = self.getAddressString(input.script);

      if(!inputAddress) {
        return next();
      }

      var inputKey = self._encoding.encodeUtxoIndexKey(inputAddress, input.prevTxId, input.outputIndex);
      //common case is connecting blocks and deleting outputs spent by these inputs
      if (connectBlock) {
        operations.push({
          type: 'del',
          key: inputKey
        });
        next();
      } else { // uncommon and slower, this happens during a reorg
        self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) {
          var utxo = tx.outputs[input.outputIndex];
          var inputValue = self._encoding.encodeUtxoIndexValue(tx.__height, utxo.satoshis, utxo._scriptBuffer);
          operations.push({
            type: 'put',
            key: inputKey,
            value: inputValue
          });
          next();
        });
      }
    }, function(err) {
      if(err) {
        return next(err);
      }
      next();
    });
  }, function(err) {
    //we are aync predicated on reorg sitch
    if(err) {
      return callback(err);
    }
    callback(null, operations);
  });
};
TransactionService.prototype.blockHandler = function(block, connectBlock, callback) {
  var self = this;
  var action = 'put';
  var reverseAction = 'del';
  if (!connectBlock) {
    action = 'del';
    reverseAction = 'put';
  }

  var operations = [];

  this.currentTransactions = {};

  async.series([
    function(next) {
      self.node.services.timestamp.getTimestamp(block.hash, function(err, timestamp) {
        if(err) {
          return next(err);
        }
        block.__timestamp = timestamp;
        next();
      });
    }, function(next) {
      async.eachSeries(block.transactions, function(tx, next) {
        tx.__timestamp = block.__timestamp;
        tx.__height = block.__height;

        self._getInputValues(tx, function(err, inputValues) {
          if(err) {
            return next(err);
          }
          tx.__inputValues = inputValues;
          self.currentTransactions[tx.id] = tx;

          operations.push({
            type: action,
            key: self.encoding.encodeTransactionKey(tx.id),
            value: self.encoding.encodeTransactionValue(tx)
          });
          next();
        });
      }, function(err) {
        if(err) {
          return next(err);
        }
        next();
      });
    }], function(err) {
        if(err) {
          return callback(err);
        }
        callback(null, operations);
    });

};
MempoolService.prototype.blockHandler = function(block, connectBlock, callback) {
  var self = this;

  if (!self._handleBlocks) {
    return setImmediate(callback);
  }

  var txs = block.transactions;

  var action = 'del';
  if (!connectBlock) {
    action = 'put';
  }

  for(var i = 0; i < txs.length; i++) {

    var tx = txs[i];
    self._updateMempool(tx, action);
  }
  setImmediate(callback);
};
WalletService.prototype.blockHandler = function(block, connectBlock, callback) {

  var opts = {
    block: block,
    connectBlock: connectBlock,
    serial: true
  };
  this._blockHandler(opts, callback);

};

WalletService.prototype.concurrentBlockHandler = function(block, connectBlock, callback) {

  var opts = {
    block: block,
    connectBlock: connectBlock
  };
  this._blockHandler(opts, callback);

};

WalletService.prototype._blockHandler = function(opts, callback) {

  var self = this;

  if (!self._checkAddresses()) {
    return setImmediate(function() {
      callback(null, []);
    });
  }

  async.mapSeries(opts.block.transactions, function(tx, next) {

    self._processTransaction(opts, tx, next);

  }, function(err, operations) {

    if(err) {
      return callback(err);
    }

    var ret = _.compact(_.flattenDeep(operations));
    callback(null, ret);

  });

};

WalletService.prototype._processTransaction = function(opts, tx, callback) {
  var self = this;

  tx.outputs.forEach(function(output, index) {
    output.index = index;
  });

  var ioData = tx.inputs.concat(tx.outputs);

  async.mapSeries(ioData, function(io, next) {
    if (opts.serial) {
      self._processSerialIO(opts, tx, io, next);
    } else {
      self._processConcurrentIO(opts, tx, io, next);
    }
  }, function(err, operations) {
    if(err) {
      return callback(err);
    }
    callback(null, operations);
  });

};

WalletService.prototype._processConcurrentIO = function(opts, tx, io, callback) {

  var self = this;
  var walletIds = self._getWalletIdsFromScript(io);

  if (!walletIds) {
    return callback();
  }
  var actions = self._getActions(opts.connectBlock);

  var operations = walletIds.map(function(walletId) {
    return {
      type: actions[0],
      key: self._encoding.encodeWalletTransactionKey(walletId, opts.block.__height, tx.id)
    };
  });

  setImmediate(function() {
    callback(null, operations);
  });

};

WalletService.prototype._processSerialIO = function(opts, tx, io, callback) {
  var fn = this._processSerialOutput;
  if (io instanceof Input) {
    fn = this._processSerialInput;
  }
  fn.call(this, opts, tx, io, callback);
};

WalletService.prototype._getWalletIdsFromScript = function(io) {

  if(!io.script) {
    log.debug('Invalid script');
    return;
  }

  return this._addressMap[this.getAddressString(io)];

};

WalletService.prototype._getActions = function(connect) {
  var action = 'put';
  var reverseAction = 'del';
  if (!connect) {
    action = 'del';
    reverseAction = 'put';
  }
  return [action, reverseAction];
};

WalletService.prototype._processSerialOutput = function(opts, tx, output, callback) {

  var self = this;
  var walletIds = self._getWalletIdsFromScript(output);

  if (!walletIds) {
    return callback();
  }

  var actions = self._getActions(opts.connectBlock);

  async.mapSeries(walletIds, function(walletId, next) {

    self.balances[walletId] = self.balances[walletId] || 0;
    self.balances[walletId] += opts.connectBlock ?  output.satoshis : (-1 * output.satoshis);

    var operations = [
      {
        type: actions[0],
        key: self._encoding.encodeWalletUtxoKey(walletId, tx.id, output.index),
        value: self._encoding.encodeWalletUtxoValue(opts.block.__height, output.satoshis, output._scriptBuffer)
      },
      {
        type: actions[0],
        key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, output.satoshis, tx.id, output.index),
        value: self._encoding.encodeWalletUtxoSatoshisValue(opts.block.__height, output._scriptBuffer)
      },
      {
        type: 'put',
        key: self._encoding.encodeWalletBalanceKey(walletId),
        value: self._encoding.encodeWalletBalanceValue(self.balances[walletId])
      }
    ];

    next(null, operations);

  }, function(err, operations) {

    if(err) {
      return callback(err);
    }

    callback(null, operations);

  });

};

WalletService.prototype._processSerialInput = function(opts, tx, input, callback) {

  var self = this;

  var walletIds = input.script && input.script.isPublicKeyIn() ?
    ['p2pk'] :
    self._getWalletIdsFromScript(input);

  if (!walletIds) {
    return callback();
  }

  var actions = self._getActions(opts.connectBlock);

  async.mapSeries(walletIds, function(walletId, next) {

    self.node.services.transaction.getTransaction(input.prevTxId.toString('hex'), {}, function(err, tx) {

      if(err) {
        return next(err);
      }

      var utxo = tx.outputs[input.outputIndex];

      if (walletId === 'p2pk') {

        var pubKey = utxo.script.getPublicKey().toString('hex');
        walletId = self._addressMap[pubKey];

        if (!walletId) {
          return next(null, []);
        }

      }

      self.balances[walletId] = self.balances[walletId] || 0;
      self.balances[walletId] += opts.connectBlock ? (-1 * utxo.satoshis) : utxo.satoshis;

      var operations = [
        {
          type: actions[1],
          key: self._encoding.encodeWalletUtxoKey(walletId, input.prevTxId, input.outputIndex),
          value: self._encoding.encodeWalletUtxoValue(tx.__height, utxo.satoshis, utxo._scriptBuffer)
        },
        {
          type: actions[1],
          key: self._encoding.encodeWalletUtxoSatoshisKey(walletId, utxo.satoshis, tx.id, input.outputIndex),
          value: self._encoding.encodeWalletUtxoSatoshisValue(tx.__height, utxo._scriptBuffer)
        },
        {
          type: 'put',
          key: self._encoding.encodeWalletBalanceKey(walletId),
          value: self._encoding.encodeWalletBalanceValue(self.balances[walletId])
        }
      ];

      next(null, operations);

    });
  }, function(err, operations) {

    if(err) {
      return callback(err);
    }

    callback(null, operations);

  });

};

WalletService.prototype._loadAllAddresses = function(callback) {

