static LocalTransaction makeLocalTransaction(HStoreSite hstore_site) {
long txnId = hstore_site.getTransactionIdManager(0).getNextUniqueTransactionId();
long clientHandle = -1;
CatalogContext catalogContext = hstore_site.getCatalogContext();
int base_partition = CollectionUtil.random(hstore_site.getLocalPartitionIds());
PartitionSet predict_touchedPartitions = catalogContext.getAllPartitionIds();
boolean predict_readOnly = false;
boolean predict_canAbort = true;
Procedure catalog_proc = catalogContext.procedures.getIgnoreCase("@NoOp");
ParameterSet params = new ParameterSet();
RpcCallback<ClientResponseImpl> client_callback = null;
LocalTransaction ts = new LocalTransaction(hstore_site);
long batchId = -1;
ts.init(batchId, txnId, EstTime.currentTimeMillis(), clientHandle, base_partition,
predict_touchedPartitions, predict_readOnly, predict_canAbort,
catalog_proc, params, client_callback);
EstTimeUpdater.update(System.currentTimeMillis());
return (ts);
}
java类com.google.protobuf.RpcCallback的实例源码
MockHStoreSite.java 文件源码
项目:sstore-soft
阅读 31
收藏 0
点赞 0
评论 0
HStoreCoordinator.java 文件源码
项目:sstore-soft
阅读 33
收藏 0
点赞 0
评论 0
/**
* Tell all remote partitions to start the map phase for this txn
* @param ts
*/
public void transactionMap(LocalTransaction ts, RpcCallback<TransactionMapResponse> callback) {
ByteString paramBytes = null;
try {
ByteBuffer b = ByteBuffer.wrap(FastSerializer.serialize(ts.getProcedureParameters()));
paramBytes = ByteString.copyFrom(b.array());
} catch (Exception ex) {
throw new RuntimeException("Unexpected error when serializing StoredProcedureInvocation", ex);
}
TransactionMapRequest request = TransactionMapRequest.newBuilder()
.setTransactionId(ts.getTransactionId())
.setClientHandle(ts.getClientHandle())
.setBasePartition(ts.getBasePartition())
.setProcedureId(ts.getProcedure().getId())
.setParams(paramBytes)
.build();
PartitionSet partitions = ts.getPredictTouchedPartitions();
if (debug.val){
LOG.debug(String.format("Notifying partitions %s that %s is in Map Phase", partitions, ts));
if (trace.val) LOG.trace("<HStoreCoordinator.TransactionMap> is executing to sendMessages to all partitions");
}
this.transactionMap_handler.sendMessages(ts, request, callback, partitions);
}
SecureBulkLoadEndpoint.java 文件源码
项目:ditb
阅读 32
收藏 0
点赞 0
评论 0
@Override
public void prepareBulkLoad(RpcController controller,
PrepareBulkLoadRequest request,
RpcCallback<PrepareBulkLoadResponse> done){
try {
List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers();
if(bulkLoadObservers != null) {
ObserverContext<RegionCoprocessorEnvironment> ctx =
new ObserverContext<RegionCoprocessorEnvironment>();
ctx.prepare(env);
for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) {
bulkLoadObserver.prePrepareBulkLoad(ctx, request);
}
}
String bulkToken = createStagingDir(baseStagingDir,
getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString();
done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build());
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
}
done.run(null);
}
HStoreSite.java 文件源码
项目:sstore-soft
阅读 30
收藏 0
点赞 0
评论 0
/**
* Convenience method for sending an error ClientResponse back to the client
* @param client_handle
* @param status
* @param message
* @param clientCallback
* @param initiateTime
*/
public void responseError(long client_handle,
Status status,
String message,
RpcCallback<ClientResponseImpl> clientCallback,
long batchId,
long initiateTime) {
ClientResponseImpl cresponse = new ClientResponseImpl(
-1,
client_handle,
-1,
status,
HStoreConstants.EMPTY_RESULT,
message);
this.responseSend(cresponse, clientCallback, batchId, initiateTime, 0);
}
MockHStoreSite.java 文件源码
项目:s-store
阅读 31
收藏 0
点赞 0
评论 0
static LocalTransaction makeLocalTransaction(HStoreSite hstore_site) {
long txnId = hstore_site.getTransactionIdManager(0).getNextUniqueTransactionId();
long clientHandle = -1;
CatalogContext catalogContext = hstore_site.getCatalogContext();
int base_partition = CollectionUtil.random(hstore_site.getLocalPartitionIds());
PartitionSet predict_touchedPartitions = catalogContext.getAllPartitionIds();
boolean predict_readOnly = false;
boolean predict_canAbort = true;
Procedure catalog_proc = catalogContext.procedures.getIgnoreCase("@NoOp");
ParameterSet params = new ParameterSet();
RpcCallback<ClientResponseImpl> client_callback = null;
LocalTransaction ts = new LocalTransaction(hstore_site);
long batchId = -1;
ts.init(batchId, txnId, EstTime.currentTimeMillis(), clientHandle, base_partition,
predict_touchedPartitions, predict_readOnly, predict_canAbort,
catalog_proc, params, client_callback);
EstTimeUpdater.update(System.currentTimeMillis());
return (ts);
}
TransactionFinishHandler.java 文件源码
项目:sstore-soft
阅读 30
收藏 0
点赞 0
评论 0
@Override
public void remoteQueue(RpcController controller, TransactionFinishRequest request,
RpcCallback<TransactionFinishResponse> callback) {
if (this.finishDispatcher != null && request.getStatus() == Status.ABORT_RESTART) {
if (debug.val)
LOG.debug(String.format("Queuing %s for txn #%d [status=%s]",
request.getClass().getSimpleName(), request.getTransactionId(), request.getStatus()));
Object o[] = { controller, request, callback };
this.finishDispatcher.queue(o);
} else {
if (debug.val)
LOG.debug(String.format("Sending %s to remote handler for txn #%d [status=%s]",
request.getClass().getSimpleName(), request.getTransactionId(), request.getStatus()));
this.remoteHandler(controller, request, callback);
}
}
HStoreCoordinator.java 文件源码
项目:s-store
阅读 36
收藏 0
点赞 0
评论 0
/**
* Send the TransactionWorkRequest to the target remote site
* @param builders
* @param callback
*/
public void transactionWork(LocalTransaction ts, int site_id, TransactionWorkRequest request, RpcCallback<TransactionWorkResponse> callback) {
if (debug.val)
LOG.debug(String.format("%s - Sending TransactionWorkRequest to remote site %d " +
"[numFragments=%d, txnId=%d]",
ts, site_id, request.getFragmentsCount(), request.getTransactionId()));
assert(request.getFragmentsCount() > 0) :
String.format("No WorkFragments for Site %d in %s", site_id, ts);
assert(site_id != this.local_site_id) :
String.format("Trying to send %s for %s to local site %d",
request.getClass().getSimpleName(), ts, site_id);
assert(ts.getTransactionId().longValue() == request.getTransactionId()) :
String.format("%s is for txn #%d but the %s has txn #%d",
ts.getClass().getSimpleName(), ts.getTransactionId(),
request.getClass().getSimpleName(), request.getTransactionId());
this.channels[site_id].transactionWork(ts.getTransactionWorkController(site_id), request, callback);
}
CoprocessorRpcChannel.java 文件源码
项目:ditb
阅读 43
收藏 0
点赞 0
评论 0
@Override
@InterfaceAudience.Private
public void callMethod(Descriptors.MethodDescriptor method,
RpcController controller,
Message request, Message responsePrototype,
RpcCallback<Message> callback) {
Message response = null;
try {
response = callExecService(controller, method, request, responsePrototype);
} catch (IOException ioe) {
LOG.warn("Call failed on IOException", ioe);
ResponseConverter.setControllerException(controller, ioe);
}
if (callback != null) {
callback.run(response);
}
}
HStoreCoordinator.java 文件源码
项目:s-store
阅读 27
收藏 0
点赞 0
评论 0
/**
* Tell all remote partitions to start the map phase for this txn
* @param ts
*/
public void transactionMap(LocalTransaction ts, RpcCallback<TransactionMapResponse> callback) {
ByteString paramBytes = null;
try {
ByteBuffer b = ByteBuffer.wrap(FastSerializer.serialize(ts.getProcedureParameters()));
paramBytes = ByteString.copyFrom(b.array());
} catch (Exception ex) {
throw new RuntimeException("Unexpected error when serializing StoredProcedureInvocation", ex);
}
TransactionMapRequest request = TransactionMapRequest.newBuilder()
.setTransactionId(ts.getTransactionId())
.setClientHandle(ts.getClientHandle())
.setBasePartition(ts.getBasePartition())
.setProcedureId(ts.getProcedure().getId())
.setParams(paramBytes)
.build();
PartitionSet partitions = ts.getPredictTouchedPartitions();
if (debug.val){
LOG.debug(String.format("Notifying partitions %s that %s is in Map Phase", partitions, ts));
if (trace.val) LOG.trace("<HStoreCoordinator.TransactionMap> is executing to sendMessages to all partitions");
}
this.transactionMap_handler.sendMessages(ts, request, callback, partitions);
}
TransactionFinishHandler.java 文件源码
项目:s-store
阅读 34
收藏 0
点赞 0
评论 0
@Override
public void remoteQueue(RpcController controller, TransactionFinishRequest request,
RpcCallback<TransactionFinishResponse> callback) {
if (this.finishDispatcher != null && request.getStatus() == Status.ABORT_RESTART) {
if (debug.val)
LOG.debug(String.format("Queuing %s for txn #%d [status=%s]",
request.getClass().getSimpleName(), request.getTransactionId(), request.getStatus()));
Object o[] = { controller, request, callback };
this.finishDispatcher.queue(o);
} else {
if (debug.val)
LOG.debug(String.format("Sending %s to remote handler for txn #%d [status=%s]",
request.getClass().getSimpleName(), request.getTransactionId(), request.getStatus()));
this.remoteHandler(controller, request, callback);
}
}
ProtoRpcController.java 文件源码
项目:sstore-soft
阅读 33
收藏 0
点赞 0
评论 0
public void startRpc(EventLoop eventLoop, Message.Builder builder, RpcCallback<Message> callback) {
if (this.callback != null) {
throw new IllegalStateException(
"ProtoRpcController already in use by another RPC call; " +
"wait for callback before reusing.");
}
if (callback == null) {
throw new NullPointerException("callback cannot be null");
}
assert this.eventLoop == null;
assert eventLoop != null;
assert this.builder == null;
assert builder != null;
this.eventLoop = eventLoop;
this.builder = builder;
this.callback = callback;
status = Protocol.Status.INVALID;
}
BaseRowProcessorEndpoint.java 文件源码
项目:ditb
阅读 26
收藏 0
点赞 0
评论 0
/**
* Pass a processor to region to process multiple rows atomically.
*
* The RowProcessor implementations should be the inner classes of your
* RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
* the Coprocessor endpoint together.
*
* See {@code TestRowProcessorEndpoint} for example.
*
* The request contains information for constructing processor
* (see {@link #constructRowProcessorFromRequest}. The processor object defines
* the read-modify-write procedure.
*/
@Override
public void process(RpcController controller, ProcessRequest request,
RpcCallback<ProcessResponse> done) {
ProcessResponse resultProto = null;
try {
RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
Region region = env.getRegion();
long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
region.processRowsWithLocks(processor, nonceGroup, nonce);
T result = processor.getResult();
ProcessResponse.Builder b = ProcessResponse.newBuilder();
b.setRowProcessorResult(result.toByteString());
resultProto = b.build();
} catch (Exception e) {
ResponseConverter.setControllerException(controller, new IOException(e));
}
done.run(resultProto);
}
HStoreCoordinator.java 文件源码
项目:sstore-soft
阅读 35
收藏 0
点赞 0
评论 0
@Override
public void transactionInit(RpcController controller, TransactionInitRequest request, RpcCallback<TransactionInitResponse> callback) {
try {
transactionInit_handler.remoteQueue(controller, request, callback);
} catch (Throwable ex) {
shutdownCluster(ex);
}
}
HStoreSite.java 文件源码
项目:s-store
阅读 30
收藏 0
点赞 0
评论 0
/**
* This is legacy method needed for using Evan's VoltProcedureListener.
*/
@Override
@Deprecated
public void invocationQueue(ByteBuffer buffer, final RpcCallback<byte[]> clientCallback) {
// XXX: This is a big hack. We should just deal with the ClientResponseImpl directly
RpcCallback<ClientResponseImpl> wrapperCallback = new RpcCallback<ClientResponseImpl>() {
@Override
public void run(ClientResponseImpl parameter) {
if (trace.val) LOG.trace("Serializing ClientResponse to byte array:\n" + parameter);
FastSerializer fs = new FastSerializer();
try {
parameter.writeExternal(fs);
clientCallback.run(fs.getBBContainer().b.array());
} catch (IOException ex) {
throw new RuntimeException(ex);
} finally {
fs.clear();
}
}
};
if (this.preProcessorQueue != null) {
this.preProcessorQueue.add(Pair.of(buffer, wrapperCallback));
} else {
this.invocationProcess(buffer, wrapperCallback);
}
}
TransactionPrepareHandler.java 文件源码
项目:s-store
阅读 26
收藏 0
点赞 0
评论 0
@Override
public void sendLocal(Long txn_id, TransactionPrepareRequest request, PartitionSet partitions, RpcCallback<TransactionPrepareResponse> callback) {
// We don't care whether we actually updated anybody locally, so we don't need to
// pass in a set to get the partitions that were updated here.
LocalTransaction ts = this.hstore_site.getTransaction(txn_id);
assert(ts != null) : "Unexpected null transaction handle for txn #" + txn_id;
this.hstore_site.transactionPrepare(ts, partitions, ts.getPrepareCallback());
}
HStoreCoordinator.java 文件源码
项目:sstore-soft
阅读 27
收藏 0
点赞 0
评论 0
@Override
public void transactionPrepare(RpcController controller, TransactionPrepareRequest request, RpcCallback<TransactionPrepareResponse> callback) {
try {
transactionPrepare_handler.remoteQueue(controller, request, callback);
} catch (Throwable ex) {
shutdownCluster(ex);
}
}
HStoreCoordinator.java 文件源码
项目:s-store
阅读 32
收藏 0
点赞 0
评论 0
@Override
public void timeSync(RpcController controller, TimeSyncRequest request, RpcCallback<TimeSyncResponse> done) {
if (debug.val)
LOG.debug(String.format("Received %s from HStoreSite %s",
request.getClass().getSimpleName(),
HStoreThreadManager.formatSiteName(request.getSenderSite())));
TimeSyncResponse.Builder builder = TimeSyncResponse.newBuilder()
.setT0R(System.currentTimeMillis())
.setT0S(request.getT0S())
.setSenderSite(local_site_id);
ThreadUtil.sleep(10);
done.run(builder.setT1S(System.currentTimeMillis()).build());
}
TimeLimitedRpcController.java 文件源码
项目:ditb
阅读 27
收藏 0
点赞 0
评论 0
@Override
public void notifyOnCancel(RpcCallback<Object> cancellationCb) {
this.cancellationCb.set(cancellationCb);
if (this.cancelled) {
cancellationCb.run(null);
}
}
HStoreCoordinator.java 文件源码
项目:sstore-soft
阅读 27
收藏 0
点赞 0
评论 0
@Override
public void heartbeat(RpcController controller, HeartbeatRequest request, RpcCallback<HeartbeatResponse> done) {
if (debug.val)
LOG.debug(String.format("Received %s from HStoreSite %s",
request.getClass().getSimpleName(),
HStoreThreadManager.formatSiteName(request.getSenderSite())));
HeartbeatResponse.Builder builder = HeartbeatResponse.newBuilder()
.setSenderSite(local_site_id)
.setStatus(Status.OK);
done.run(builder.build());
}
TransactionWorkHandler.java 文件源码
项目:s-store
阅读 27
收藏 0
点赞 0
评论 0
@Override
public void remoteQueue(RpcController controller, TransactionWorkRequest request,
RpcCallback<TransactionWorkResponse> callback) {
if (debug.val)
LOG.debug(String.format("Executing %s using remote handler for txn #%d",
request.getClass().getSimpleName(), request.getTransactionId()));
this.remoteHandler(controller, request, callback);
}
TransactionInitDispatcher.java 文件源码
项目:sstore-soft
阅读 25
收藏 0
点赞 0
评论 0
@SuppressWarnings("unchecked")
@Override
public void runImpl(Object o[]) {
RpcController controller = (RpcController)o[0];
TransactionInitRequest request = (TransactionInitRequest)o[1];
RpcCallback<TransactionInitResponse> callback = (RpcCallback<TransactionInitResponse>)o[2];
hstore_coordinator.getTransactionInitHandler().remoteHandler(controller, request, callback);
}
TransactionPreProcessor.java 文件源码
项目:sstore-soft
阅读 24
收藏 0
点赞 0
评论 0
public TransactionPreProcessor(HStoreSite hstore_site,
BlockingQueue<Pair<ByteBuffer, RpcCallback<ClientResponseImpl>>> queue) {
super(hstore_site,
HStoreConstants.THREAD_NAME_PREPROCESSOR,
queue,
hstore_site.getHStoreConf().site.status_exec_info);
}
HStoreCoordinator.java 文件源码
项目:s-store
阅读 30
收藏 0
点赞 0
评论 0
@Override
public void shutdownPrepare(RpcController controller, ShutdownPrepareRequest request, RpcCallback<ShutdownPrepareResponse> done) {
String originName = HStoreThreadManager.formatSiteName(request.getSenderSite());
// See if they gave us the original error. If they did, then we'll
// try to be helpful and print it out here
SerializableException error = null;
if (request.hasError() && request.getError().isEmpty() == false) {
error = SerializableException.deserializeFromBuffer(request.getError().asReadOnlyByteBuffer());
}
LOG.warn(String.format("Got %s from %s [hasError=%s]%s",
request.getClass().getSimpleName(), originName, (error != null),
(error != null ? "\n" + error : "")));
// Tell the HStoreSite to prepare to shutdown
HStoreCoordinator.this.hstore_site.prepareShutdown(request.hasError());
ThreadUtil.sleep(5000);
// Then send back the acknowledgment that we're good to go
ShutdownPrepareResponse response = ShutdownPrepareResponse.newBuilder()
.setSenderSite(HStoreCoordinator.this.local_site_id)
.build();
done.run(response);
LOG.warn(String.format("Sent %s back to %s",
response.getClass().getSimpleName(), originName));
}
LocalTransaction.java 文件源码
项目:sstore-soft
阅读 41
收藏 0
点赞 0
评论 0
/**
* Testing Constructor with Parameters and Callback
* @param txn_id
* @param base_partition
* @param predict_touchedPartitions
* @param catalog_proc
* @param proc_params
* @return
*/
public LocalTransaction testInit(Long txn_id,
int base_partition,
PartitionSet predict_touchedPartitions,
Procedure catalog_proc,
Object...proc_params) {
this.client_callback = new RpcCallback<ClientResponseImpl>() {
public void run(ClientResponseImpl parameter) {}
};
return this.testInit(txn_id,
base_partition,
new ParameterSet(proc_params),
predict_touchedPartitions, catalog_proc);
}
HStoreSite.java 文件源码
项目:sstore-soft
阅读 40
收藏 0
点赞 0
评论 0
protected void invocationQueue(ByteBuffer buffer, ClientInputHandler handler, Connection c) {
int messageSize = buffer.capacity();
RpcCallback<ClientResponseImpl> callback = new ClientResponseCallback(this.clientInterface, c, messageSize);
this.clientInterface.increaseBackpressure(messageSize);
if (this.preProcessorQueue != null) {
this.preProcessorQueue.add(Pair.of(buffer, callback));
} else {
this.invocationProcess(buffer, callback);
}
}
HStoreSite.java 文件源码
项目:sstore-soft
阅读 31
收藏 0
点赞 0
评论 0
/**
* This is legacy method needed for using Evan's VoltProcedureListener.
*/
@Override
@Deprecated
public void invocationQueue(ByteBuffer buffer, final RpcCallback<byte[]> clientCallback) {
// XXX: This is a big hack. We should just deal with the ClientResponseImpl directly
RpcCallback<ClientResponseImpl> wrapperCallback = new RpcCallback<ClientResponseImpl>() {
@Override
public void run(ClientResponseImpl parameter) {
if (trace.val) LOG.trace("Serializing ClientResponse to byte array:\n" + parameter);
FastSerializer fs = new FastSerializer();
try {
parameter.writeExternal(fs);
clientCallback.run(fs.getBBContainer().b.array());
} catch (IOException ex) {
throw new RuntimeException(ex);
} finally {
fs.clear();
}
}
};
if (this.preProcessorQueue != null) {
this.preProcessorQueue.add(Pair.of(buffer, wrapperCallback));
} else {
this.invocationProcess(buffer, wrapperCallback);
}
}
HStoreSite.java 文件源码
项目:sstore-soft
阅读 31
收藏 0
点赞 0
评论 0
/**
* Send the transaction request to another node for execution. We will create
* a TransactionRedirectCallback that will automatically send the ClientResponse
* generated from the remote node for this txn back to the client
* @param catalog_proc
* @param serializedRequest
* @param base_partition
* @param clientCallback
*/
public void transactionRedirect(Procedure catalog_proc,
ByteBuffer serializedRequest,
int base_partition,
RpcCallback<ClientResponseImpl> clientCallback) {
if (debug.val)
LOG.debug(String.format("Forwarding %s request to partition %d [clientHandle=%d]",
catalog_proc.getName(), base_partition,
StoredProcedureInvocation.getClientHandle(serializedRequest)));
// Make a wrapper for the original callback so that when the result comes back frm the remote partition
// we will just forward it back to the client. How sweet is that??
RedirectCallback callback = null;
try {
callback = new RedirectCallback(this);
// callback = (RedirectCallback)objectPools.CALLBACKS_TXN_REDIRECT_REQUEST.borrowObject();
callback.init(clientCallback);
} catch (Exception ex) {
throw new RuntimeException("Failed to get TransactionRedirectCallback", ex);
}
// Mark this request as having been redirected
// XXX: This sucks because we have to copy the bytes, which will then
// get copied again when we have to serialize it out to a ByteString
serializedRequest.rewind();
ByteBuffer copy = ByteBuffer.allocate(serializedRequest.capacity());
copy.put(serializedRequest);
StoredProcedureInvocation.setBasePartition(base_partition, copy);
this.hstore_coordinator.transactionRedirect(copy.array(),
callback,
base_partition);
if (hstore_conf.site.txn_counters) TransactionCounter.REDIRECTED.inc(catalog_proc);
}
TransactionMapHandler.java 文件源码
项目:sstore-soft
阅读 29
收藏 0
点赞 0
评论 0
@Override
public void remoteHandler(RpcController controller,
TransactionMapRequest request,
RpcCallback<TransactionMapResponse> callback) {
assert(request.hasTransactionId()) :
"Got " + request.getClass().getSimpleName() + " without a txn id!";
Long txn_id = Long.valueOf(request.getTransactionId());
if (debug.val)
LOG.debug(String.format("Got %s for txn #%d",
request.getClass().getSimpleName(), txn_id));
// The mr_ts handle will be null if this HStoreSite is not where the
// base partition for the original MRTransaction
MapReduceTransaction mr_ts = hstore_site.getTransaction(txn_id);
if (mr_ts == null) {
mr_ts = hstore_site.getTransactionInitializer()
.createMapReduceTransaction(txn_id,
EstTime.currentTimeMillis(),
request.getClientHandle(),
request.getBasePartition(),
request.getProcedureId(),
request.getParams().asReadOnlyByteBuffer());
}
assert(mr_ts.isMapPhase());
mr_ts.initTransactionMapWrapperCallback(callback);
/*
* Here we would like to start MapReduce Transaction on the remote partition except the base partition of it.
* This is to avoid the double invoke for remote task.
* */
for (int partition : hstore_site.getLocalPartitionIds()) {
if (partition != mr_ts.getBasePartition()) {
LocalTransaction ts = mr_ts.getLocalTransaction(partition);
hstore_site.transactionStart(ts);
}
} // FOR
}
HStoreCoordinator.java 文件源码
项目:s-store
阅读 34
收藏 0
点赞 0
评论 0
/**
* Forward a StoredProcedureInvocation request to a remote site for execution
* @param serializedRequest
* @param callback
* @param partition
*/
public void transactionRedirect(byte[] serializedRequest, RpcCallback<TransactionRedirectResponse> callback, int partition) {
int dest_site_id = catalogContext.getSiteIdForPartitionId(partition);
if (debug.val)
LOG.debug(String.format("Redirecting transaction request to partition #%d on %s",
partition, HStoreThreadManager.formatSiteName(dest_site_id)));
ByteString bs = ByteString.copyFrom(serializedRequest);
TransactionRedirectRequest mr = TransactionRedirectRequest.newBuilder()
.setSenderSite(this.local_site_id)
.setWork(bs)
.build();
this.channels[dest_site_id].transactionRedirect(new ProtoRpcController(), mr, callback);
}
TransactionWorkHandler.java 文件源码
项目:sstore-soft
阅读 37
收藏 0
点赞 0
评论 0
@Override
public void remoteQueue(RpcController controller, TransactionWorkRequest request,
RpcCallback<TransactionWorkResponse> callback) {
if (debug.val)
LOG.debug(String.format("Executing %s using remote handler for txn #%d",
request.getClass().getSimpleName(), request.getTransactionId()));
this.remoteHandler(controller, request, callback);
}