Index: java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java =================================================================== --- java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java (revision 0) +++ java/org/apache/catalina/tribes/group/ExtendedRpcCallback.java (revision 0) @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.catalina.tribes.group; + +import org.apache.catalina.tribes.Member; + +import java.io.Serializable; + +public interface ExtendedRpcCallback extends RpcCallback { + +// /** +// * Invoked prior to a reply is sent when, and only when, the reply options contain the asynchronous sending options. +// * This is invoked prior to the reply being sent. A success/failure report will be reported on the error handler. +// * @param request - the original message that requested the reply +// * @param response - the reply message to the original message +// * @param sender - the sender requested that reply +// * @return the ErrorHandler object to handle the callback. +// */ +// public ErrorHandler asyncReply(Serializable request, Serializable response, Member sender); +// +// /** +// * +// * @param request - the original message that requested the reply +// * @param response - the reply message to the original message +// * @param sender - the sender requested that reply +// * @param reason - the reason the reply failed +// * @return true if the callback would like to reattempt the reply, false otherwise +// */ +// public boolean replyFailed(Serializable request, Serializable response, Member sender, Exception reason); +// +// /** +// * +// * @param request - the original message that requested the reply +// * @param response - the reply message to the original message +// * @param sender - the sender requested that reply +// */ +// public void replySucceeded(Serializable request, Serializable response, Member sender); + + RpcReplyHandler getReplyHandler( Serializable request, Member sender ); + +} Index: java/org/apache/catalina/tribes/group/RpcChannel.java =================================================================== --- java/org/apache/catalina/tribes/group/RpcChannel.java (revision 1063205) +++ java/org/apache/catalina/tribes/group/RpcChannel.java (working copy) @@ -21,10 +21,7 @@ import java.util.Arrays; import java.util.HashMap; -import org.apache.catalina.tribes.Channel; -import org.apache.catalina.tribes.ChannelException; -import org.apache.catalina.tribes.ChannelListener; -import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.*; import org.apache.catalina.tribes.util.UUIDGenerator; import org.apache.juli.logging.Log; import org.apache.juli.logging.LogFactory; @@ -127,15 +124,74 @@ }//synchronized }//end if } else{ +// boolean finished = false; +// ExtendedRpcCallback excallback = (callback instanceof ExtendedRpcCallback)?((ExtendedRpcCallback)callback) : null; +// boolean asyncReply = ((replyMessageOptions & Channel.SEND_OPTIONS_ASYNCHRONOUS) == Channel.SEND_OPTIONS_ASYNCHRONOUS); +// Serializable reply = callback.replyRequest(rmsg.message,sender); +// while (!finished) { +// ErrorHandler handler = excallback!=null && asyncReply ? excallback.asyncReply(rmsg.message, reply, sender) : null; +// rmsg.reply = true; +// rmsg.message = reply; +// try { +// if (handler!=null) { +// channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, handler); +// } else { +// channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); +// } +// finished = true; +// if (excallback != null && !asyncReply) { +// excallback.replySucceeded(rmsg.message, reply, sender); +// } +// }catch ( Exception x ) { +// if (excallback != null && !asyncReply) { +// finished = !excallback.replyFailed(rmsg.message, reply, sender, x); +// } else { +// finished = true; +// log.error("Unable to send back reply in RpcChannel.",x); +// } +// } +// } + Serializable reply = callback.replyRequest(rmsg.message,sender); rmsg.reply = true; rmsg.message = reply; + + final RpcReplyHandler + replyHandler = (callback instanceof ExtendedRpcCallback) + ? ((ExtendedRpcCallback) callback).getReplyHandler( rmsg.message, sender ) + : null + ; + + ErrorHandler errorHandler = null; + + if( replyHandler != null ){ + final Serializable + req = rmsg.message, + resp = reply + ; + final Member m = sender; + + errorHandler = new ErrorHandler(){ + @Override + public void handleError( ChannelException x, UniqueId id ) { + replyHandler.replyFailed( req, resp, m, x ); + } + + @Override + public void handleCompletion( UniqueId id ) { + replyHandler.replySucceeded( req, resp, m ); + } + }; + } + try { - channel.send(new Member[] {sender}, rmsg, - replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); - }catch ( Exception x ) { - log.error("Unable to send back reply in RpcChannel.",x); + channel.send(new Member[] {sender}, rmsg,replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK, errorHandler); } + catch ( ChannelException cex ){ + //errorHandler has been notified at + //this point, hasn't it? + log.error("Unable to send back reply in RpcChannel.", cex); + } }//end if } Index: java/org/apache/catalina/tribes/group/RpcReplyHandler.java =================================================================== --- java/org/apache/catalina/tribes/group/RpcReplyHandler.java (revision 0) +++ java/org/apache/catalina/tribes/group/RpcReplyHandler.java (revision 0) @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.group; + +import org.apache.catalina.tribes.Member; + +import java.io.Serializable; + +/** + * FIXME: Badly named. + * Reports success or failure of RPC reply sending. + */ +public interface RpcReplyHandler { + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + * @param reason - the reason the reply failed + */ + public void replyFailed( Serializable request, Serializable response, Member sender, Exception reason); + + /** + * + * @param request - the original message that requested the reply + * @param response - the reply message to the original message + * @param sender - the sender requested that reply + */ + public void replySucceeded(Serializable request, Serializable response, Member sender); + +}