ASF Bugzilla – Attachment 26545 Details for
Bug 50648
RpcChannel improvements
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Help
|
New Account
|
Log In
Remember
[x]
|
Forgot Password
Login:
[x]
[patch]
svn diff
rpcchannel_et_al.diff (text/plain), 11.09 KB, created by
Olivier Costet
on 2011-01-25 09:38:16 UTC
(
hide
)
Description:
svn diff
Filename:
MIME Type:
Creator:
Olivier Costet
Created:
2011-01-25 09:38:16 UTC
Size:
11.09 KB
patch
obsolete
>Index: java/org/apache/catalina/tribes/group/RpcCallback2.java >=================================================================== >--- java/org/apache/catalina/tribes/group/RpcCallback2.java (revision 0) >+++ java/org/apache/catalina/tribes/group/RpcCallback2.java (revision 0) >@@ -0,0 +1,24 @@ >+package org.apache.catalina.tribes.group; >+ >+import org.apache.catalina.tribes.Member; >+ >+import java.io.Serializable; >+ >+/** >+ * @author ocostet >+ * @version 1.0 >+ */ >+public interface RpcCallback2 >+extends RpcCallback >+{ >+ /** >+ * Invoked when the response message returned by >+ * {@link #replyRequest(java.io.Serializable, org.apache.catalina.tribes.Member)} >+ * could not be sent to the requesting member. >+ * >+ * @param reply The Object returned by {@link #replyRequest(java.io.Serializable, org.apache.catalina.tribes.Member)}. >+ * @param destination the member the response was destined for. >+ * @param exception the error that occured. >+ */ >+ public void replyFailed( Serializable reply, Member destination, Exception exception ); >+} >Index: java/org/apache/catalina/tribes/group/RpcMessage.java >=================================================================== >--- java/org/apache/catalina/tribes/group/RpcMessage.java (revision 1063205) >+++ java/org/apache/catalina/tribes/group/RpcMessage.java (working copy) >@@ -24,6 +24,7 @@ > import java.io.Serializable; > > import org.apache.catalina.tribes.util.Arrays; >+import org.apache.catalina.tribes.io.XByteBuffer; > > /** > * <p>Title: </p> >@@ -37,19 +38,19 @@ > */ > public class RpcMessage implements Externalizable { > >- protected Serializable message; >- protected byte[] uuid; >- protected byte[] rpcId; >- protected boolean reply = false; >+ byte[] messageContents = new byte[0]; >+ byte[] uuid; >+ byte[] rpcId; >+ boolean reply = false; >+ transient Serializable messageCache; > > public RpcMessage() { > //for serialization > } > >- public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) { >+ public RpcMessage(byte[] rpcId, byte[] uuid) { > this.rpcId = rpcId; > this.uuid = uuid; >- this.message = message; > } > > public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { >@@ -60,7 +61,9 @@ > length = in.readInt(); > rpcId = new byte[length]; > in.read(rpcId, 0, length); >- message = (Serializable)in.readObject(); >+ length = in.readInt(); >+ messageContents = new byte[ length ]; >+ in.readFully( messageContents ); > } > > public void writeExternal(ObjectOutput out) throws IOException { >@@ -69,9 +72,70 @@ > out.write(uuid, 0, uuid.length); > out.writeInt(rpcId.length); > out.write(rpcId, 0, rpcId.length); >- out.writeObject(message); >+ out.writeInt( messageContents.length ); >+ out.write( messageContents ); > } >- >+ >+ public void setUuid( byte[] uuid ) { >+ this.uuid = uuid; >+ } >+ >+ public void setRpcId( byte[] rpcId ) { >+ this.rpcId = rpcId; >+ } >+ >+ public void setReply( boolean reply ) { >+ this.reply = reply; >+ } >+ >+ public byte[] getUuid() { >+ return uuid; >+ } >+ >+ public byte[] getRpcId() { >+ return rpcId; >+ } >+ >+ public boolean isReply() { >+ return reply; >+ } >+ >+ public void setMessage( Serializable message ){ >+ messageCache = message; >+ >+ if( message == null ){ >+ messageContents = new byte[0]; >+ } >+ else { >+ try { >+ messageContents = XByteBuffer.serialize( message ); >+ } >+ catch ( IOException ioex ){ >+ throw new RuntimeException(ioex); >+ } >+ } >+ } >+ >+ public Serializable getMessage() >+ throws IOException, ClassCastException, ClassNotFoundException >+ { >+ return getMessage( null ); >+ } >+ >+ public Serializable getMessage( ClassLoader[] externalLoaders ) >+ throws IOException, ClassCastException, ClassNotFoundException >+ { >+ if( messageContents.length == 0 ){ >+ return null; >+ } >+ >+ if( messageCache != null ){ >+ return messageCache; >+ } >+ >+ return messageCache = XByteBuffer.deserialize(messageContents, 0, messageContents.length, externalLoaders); >+ } >+ > @Override > public String toString() { > StringBuilder buf = new StringBuilder("RpcMessage["); >@@ -80,8 +144,13 @@ > buf.append(Arrays.toString(rpcId)); > buf.append("; uuid="); > buf.append(Arrays.toString(uuid)); >- buf.append("; msg="); >- buf.append(message); >+ buf.append(";"); >+ if( messageContents.length == 0 ){ >+ buf.append("[no msg]"); >+ } >+ else { >+ buf.append(messageContents.length).append(" bytes"); >+ } > return buf.toString(); > } > >@@ -91,7 +160,7 @@ > } > > public NoRpcChannelReply(byte[] rpcid, byte[] uuid) { >- super(rpcid,uuid,null); >+ super( rpcid, uuid ); > reply = true; > } > >@@ -114,6 +183,4 @@ > out.write(rpcId, 0, rpcId.length); > } > } >- >- > } >Index: java/org/apache/catalina/tribes/group/RpcChannel.java >=================================================================== >--- java/org/apache/catalina/tribes/group/RpcChannel.java (revision 1063205) >+++ java/org/apache/catalina/tribes/group/RpcChannel.java (working copy) >@@ -45,6 +45,7 @@ > private RpcCallback callback; > private byte[] rpcId; > private int replyMessageOptions = 0; >+ private ClassLoader[] externalLoaders; > > private HashMap<RpcCollectorKey, RpcCollector> responseMap = new HashMap<RpcCollectorKey, RpcCollector>(); > >@@ -54,16 +55,29 @@ > * @param rpcId - the unique Id for this RPC group > * @param channel Channel > * @param callback RpcCallback >+ * @param externalLoaders ClassLoaders used to deserialize messages. > */ >- public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) { >+ public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback, ClassLoader[] externalLoaders ) { > this.channel = channel; > this.callback = callback; > this.rpcId = rpcId; >+ this.externalLoaders = externalLoaders; > channel.addChannelListener(this); > } > >- > /** >+ * Create an RPC channel. You can have several RPC channels attached to a group >+ * all separated out by the uniqueness >+ * @param rpcId - the unique Id for this RPC group >+ * @param channel Channel >+ * @param callback RpcCallback >+ */ >+ public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback ) { >+ this( rpcId, channel, callback, null ); >+ } >+ >+ >+ /** > * Send a message and wait for the response. > * @param destination Member[] - the destination for the message, and the members you request a reply from > * @param message Serializable - the message you are sending out >@@ -90,12 +104,15 @@ > try { > synchronized (collector) { > if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector); >- RpcMessage rmsg = new RpcMessage(rpcId, key.id, message); >+ >+ RpcMessage rmsg = new RpcMessage(rpcId, key.id); >+ rmsg.setMessage( message ); >+ > channel.send(destination, rmsg, sendOptions); > if ( rpcOptions != NO_REPLY ) collector.wait(timeout); > } > } catch ( InterruptedException ix ) { >- Thread.interrupted(); >+ Thread.currentThread().interrupt(); > //throw new ChannelException(ix); > }finally { > responseMap.remove(key); >@@ -106,35 +123,65 @@ > @Override > public void messageReceived(Serializable msg, Member sender) { > RpcMessage rmsg = (RpcMessage)msg; >- RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); >- if ( rmsg.reply ) { >+ >+ Serializable message; >+ >+ try { >+ message = rmsg.getMessage( externalLoaders ); >+ } >+ catch ( Throwable t ){ >+ throw new RuntimeException(t); >+ } >+ >+ RpcCollectorKey key = new RpcCollectorKey( rmsg.getUuid() ); >+ >+ if ( rmsg.isReply() ) { > RpcCollector collector = responseMap.get(key); > if (collector == null) { >- callback.leftOver(rmsg.message, sender); >+ callback.leftOver( message, sender ); > } else { > synchronized (collector) { > //make sure it hasn't been removed > if ( responseMap.containsKey(key) ) { >- if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) >+ if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) { > collector.destcnt--; >- else >- collector.addResponse(rmsg.message, sender); >- if (collector.isComplete()) collector.notifyAll(); >+ } >+ else { >+ collector.addResponse(message, sender); >+ } >+ >+ if ( collector.isComplete() ) { >+ collector.notifyAll(); >+ } > } else { >- if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) >- callback.leftOver(rmsg.message, sender); >+ if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) { >+ callback.leftOver( message, sender ); >+ } > } > }//synchronized > }//end if > } else{ >- Serializable reply = callback.replyRequest(rmsg.message,sender); >- rmsg.reply = true; >- rmsg.message = reply; >+ Serializable reply = callback.replyRequest( message, sender ); >+ >+ if( reply == null ){ >+ rmsg = new RpcMessage.NoRpcChannelReply( rmsg.getRpcId(), rmsg.getUuid() ); >+ } >+ else { >+ rmsg.setMessage( reply ); >+ } >+ >+ rmsg.setReply( true ); >+ > try { > channel.send(new Member[] {sender}, rmsg, > replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK); >- }catch ( Exception x ) { >+ } >+ catch ( Exception x ) { > log.error("Unable to send back reply in RpcChannel.",x); >+ >+ if( callback instanceof RpcCallback2 ){ >+ ((RpcCallback2) callback).replyFailed( reply, sender, x ); >+ } > } > }//end if > } >@@ -187,7 +234,15 @@ > public void setReplyMessageOptions(int replyMessageOptions) { > this.replyMessageOptions = replyMessageOptions; > } >- >+ >+ public ClassLoader[] getExternalLoaders() { >+ return externalLoaders; >+ } >+ >+ public void setExternalLoaders( ClassLoader[] externalLoaders ) { >+ this.externalLoaders = externalLoaders; >+ } >+ > /** > * > * Class that holds all response.
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 50648
: 26545