Index: test/org/apache/catalina/tribes/demos/EchoRpcTest.java =================================================================== --- test/org/apache/catalina/tribes/demos/EchoRpcTest.java (revision 1063205) +++ test/org/apache/catalina/tribes/demos/EchoRpcTest.java (working copy) @@ -23,7 +23,6 @@ import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.Response; import org.apache.catalina.tribes.group.RpcCallback; -import org.apache.catalina.tribes.group.RpcChannel; /** Index: java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java =================================================================== --- java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (revision 1063205) +++ java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (working copy) @@ -41,7 +41,6 @@ import org.apache.catalina.tribes.MembershipListener; import org.apache.catalina.tribes.group.Response; import org.apache.catalina.tribes.group.RpcCallback; -import org.apache.catalina.tribes.group.RpcChannel; import org.apache.catalina.tribes.io.XByteBuffer; import org.apache.catalina.tribes.membership.MemberImpl; import org.apache.catalina.tribes.util.Arrays; Index: java/org/apache/catalina/tribes/group/RpcMessage.java =================================================================== --- java/org/apache/catalina/tribes/group/RpcMessage.java (revision 1063205) +++ java/org/apache/catalina/tribes/group/RpcMessage.java (working copy) @@ -24,6 +24,7 @@ import java.io.Serializable; import org.apache.catalina.tribes.util.Arrays; +import org.apache.catalina.tribes.io.XByteBuffer; /** *

Title:

@@ -37,30 +38,32 @@ */ public class RpcMessage implements Externalizable { - protected Serializable message; + protected byte[] messageContents; protected byte[] uuid; protected byte[] rpcId; protected boolean reply = false; + private transient Serializable messageCache; //transient for heuristic purposes public RpcMessage() { //for serialization } - public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) { + public RpcMessage(byte[] rpcId, byte[] uuid) { this.rpcId = rpcId; this.uuid = uuid; - this.message = message; } public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { reply = in.readBoolean(); int length = in.readInt(); uuid = new byte[length]; - in.read(uuid, 0, length); + in.readFully(uuid); length = in.readInt(); rpcId = new byte[length]; - in.read(rpcId, 0, length); - message = (Serializable)in.readObject(); + in.readFully(rpcId); + length = in.readInt(); + messageContents = new byte[length]; + in.readFully(messageContents); } public void writeExternal(ObjectOutput out) throws IOException { @@ -69,9 +72,44 @@ out.write(uuid, 0, uuid.length); out.writeInt(rpcId.length); out.write(rpcId, 0, rpcId.length); - out.writeObject(message); + out.writeInt(messageContents.length); + out.write(messageContents); } - + + public void setMessage( Serializable message ){ + messageCache = message; + + if( message == null ){ + messageContents = new byte[0]; + } else { + try { + messageContents = XByteBuffer.serialize( message ); + } catch ( IOException ioex ){ + throw new RuntimeException(ioex); + } + } + } + + public Serializable getMessage() + throws IOException, ClassCastException, ClassNotFoundException + { + return getMessage( null ); + } + + public Serializable getMessage( ClassLoader[] externalLoaders ) + throws IOException, ClassCastException, ClassNotFoundException + { + if( messageContents.length == 0 ){ + return null; + } + + if( messageCache != null ){ + return messageCache; + } + + return messageCache = XByteBuffer.deserialize(messageContents, 0, messageContents.length, externalLoaders); + } + @Override public String toString() { StringBuilder buf = new StringBuilder("RpcMessage["); @@ -80,8 +118,14 @@ buf.append(Arrays.toString(rpcId)); buf.append("; uuid="); buf.append(Arrays.toString(uuid)); - buf.append("; msg="); - buf.append(message); + buf.append(";"); + if( messageContents.length == 0 ){ + buf.append("[no msg]"); + } else if( messageCache != null ) { + buf.append("msg=").append(messageCache); + } else { + buf.append(messageContents.length).append(" bytes"); + } return buf.toString(); } @@ -91,7 +135,7 @@ } public NoRpcChannelReply(byte[] rpcid, byte[] uuid) { - super(rpcid,uuid,null); + super(rpcid, uuid); reply = true; } @@ -113,7 +157,19 @@ out.writeInt(rpcId.length); out.write(rpcId, 0, rpcId.length); } - } + @Override + public Serializable getMessage() + throws IOException, ClassCastException, ClassNotFoundException + { + return null; + } + @Override + public Serializable getMessage( ClassLoader[] externalLoaders ) + throws IOException, ClassCastException, ClassNotFoundException + { + return null; + } + } } Index: java/org/apache/catalina/tribes/group/RpcChannel.java =================================================================== --- java/org/apache/catalina/tribes/group/RpcChannel.java (revision 1063205) +++ java/org/apache/catalina/tribes/group/RpcChannel.java (working copy) @@ -45,6 +45,7 @@ private RpcCallback callback; private byte[] rpcId; private int replyMessageOptions = 0; + private ClassLoader[] externalLoaders; private HashMap responseMap = new HashMap(); @@ -54,15 +55,28 @@ * @param rpcId - the unique Id for this RPC group * @param channel Channel * @param callback RpcCallback + * @param externalLoaders ClassLoaders used to deserialize messages. */ - public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) { + public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback, ClassLoader[] externalLoaders ) { this.channel = channel; this.callback = callback; this.rpcId = rpcId; + this.externalLoaders = externalLoaders; channel.addChannelListener(this); } + + /** + * Create an RPC channel. You can have several RPC channels attached to a group + * all separated out by the uniqueness + * @param rpcId - the unique Id for this RPC group + * @param channel Channel + * @param callback RpcCallback + */ + public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback ) { + this( rpcId, channel, callback, null ); + } + - /** * Send a message and wait for the response. * @param destination Member[] - the destination for the message, and the members you request a reply from @@ -90,7 +104,8 @@ try { synchronized (collector) { if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector); - RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); + RpcMessage rmsg = new RpcMessage(rpcId, key.id); + rmsg.setMessage(message); channel.send(destination, rmsg, sendOptions); if ( rpcOptions != NO_REPLY ) collector.wait(timeout); } @@ -106,11 +121,19 @@ @Override public void messageReceived(Serializable msg, Member sender) { RpcMessage rmsg = (RpcMessage)msg; + + Serializable encapsulatedMessage; + try { + encapsulatedMessage = rmsg.getMessage( externalLoaders ); + } catch ( Exception x ){ + throw new RuntimeException(x); + } + RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); if ( rmsg.reply ) { RpcCollector collector = responseMap.get(key); if (collector == null) { - callback.leftOver(rmsg.message, sender); + callback.leftOver(encapsulatedMessage, sender); } else { synchronized (collector) { //make sure it hasn't been removed @@ -118,18 +141,24 @@ if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) collector.destcnt--; else - collector.addResponse(rmsg.message, sender); + collector.addResponse(encapsulatedMessage, sender); if (collector.isComplete()) collector.notifyAll(); } else { if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) - callback.leftOver(rmsg.message, sender); + callback.leftOver(encapsulatedMessage, sender); } }//synchronized }//end if } else{ - Serializable reply = callback.replyRequest(rmsg.message,sender); + Serializable reply = callback.replyRequest(encapsulatedMessage,sender); + if( reply == null ){ + rmsg = new RpcMessage.NoRpcChannelReply(rmsg.rpcId, rmsg.uuid); + } else { + rmsg.setMessage(reply); + } + rmsg.reply = true; - rmsg.message = reply; + try { channel.send(new Member[] {sender}, rmsg, replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); @@ -187,7 +216,15 @@ public void setReplyMessageOptions(int replyMessageOptions) { this.replyMessageOptions = replyMessageOptions; } - + + public ClassLoader[] getExternalLoaders() { + return externalLoaders; + } + + public void setExternalLoaders( ClassLoader[] externalLoaders ) { + this.externalLoaders = externalLoaders; + } + /** * * Class that holds all response.