--- java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java (revision 0) +++ java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java (revision 0) @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group; + +import java.io.Serializable; + +import org.apache.catalina.tribes.ErrorHandler; +import org.apache.catalina.tribes.Member; + +public interface ExtendedRpcCallback extends RpcCallback { + + /** + * Invoked prior to a reply is sent when, and only when, the reply options contain the asynchronous sending options. + * This is invoked prior to the reply being sent. A success/failure report will be reported on the error handler. + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + * @return the ErrorHandler object to handle the callback. + */ + public ErrorHandler asyncReply(Serializable request, Serializable response, Member sender); + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + * @param reason - the reason the reply failed + * @return true if the callback would like to reattempt the reply, false otherwise + */ + public boolean replyFailed(Serializable request, Serializable response, Member sender, Exception reason); + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + */ + public void replySucceeded(Serializable request, Serializable response, Member sender); +} + native --- java/org/apache/catalina/tribes/group/RpcChannel.java (revision 1064169) +++ java/org/apache/catalina/tribes/group/RpcChannel.java (working copy) @@ -24,6 +24,7 @@ import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; import org.apache.catalina.tribes.ChannelListener; +import org.apache.catalina.tribes.ErrorHandler; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.juli.logging.Log; @@ -126,14 +127,32 @@ }//synchronized }//end if } else{ + boolean finished = false; + ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null; + boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); Serializable reply = callback.replyRequest(rmsg.message,sender); - rmsg.reply = true; - rmsg.message = reply; - try { - channel.send(new Member[] {sender}, rmsg, - replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); - }catch ( Exception x ) { - log.error("Unable to send back reply in RpcChannel.",x); + while (!finished) { + ErrorHandler handler = excallback!=null && asyncReply ? excallback.asyncReply(rmsg.message, reply, sender) : null; + rmsg.reply = true; + rmsg.message = reply; + try { + if (handler!=null) { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); + } else { + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); + } + finished = true; + if (excallback != null && !asyncReply) { + excallback.replySucceeded(rmsg.message, reply, sender); + } + }catch ( Exception x ) { + if (excallback != null && !asyncReply) { + finished = !excallback.replyFailed(rmsg.message, reply, sender, x); + } else { + finished = true; + log.error("Unable to send back reply in RpcChannel.",x); + } + } } }//end if }