View | Details | Raw Unified | Return to bug 50670
Collapse All | Expand All

(-)test/org/apache/catalina/tribes/demos/EchoRpcTest.java (-1 lines)
Lines 23-29 Link Here
23
import org.apache.catalina.tribes.Member;
23
import org.apache.catalina.tribes.Member;
24
import org.apache.catalina.tribes.group.Response;
24
import org.apache.catalina.tribes.group.Response;
25
import org.apache.catalina.tribes.group.RpcCallback;
25
import org.apache.catalina.tribes.group.RpcCallback;
26
import org.apache.catalina.tribes.group.RpcChannel;
27
26
28
27
29
/**
28
/**
(-)java/org/apache/catalina/tribes/tipis/AbstractReplicatedMap.java (-1 lines)
Lines 41-47 Link Here
41
import org.apache.catalina.tribes.MembershipListener;
41
import org.apache.catalina.tribes.MembershipListener;
42
import org.apache.catalina.tribes.group.Response;
42
import org.apache.catalina.tribes.group.Response;
43
import org.apache.catalina.tribes.group.RpcCallback;
43
import org.apache.catalina.tribes.group.RpcCallback;
44
import org.apache.catalina.tribes.group.RpcChannel;
45
import org.apache.catalina.tribes.io.XByteBuffer;
44
import org.apache.catalina.tribes.io.XByteBuffer;
46
import org.apache.catalina.tribes.membership.MemberImpl;
45
import org.apache.catalina.tribes.membership.MemberImpl;
47
import org.apache.catalina.tribes.util.Arrays;
46
import org.apache.catalina.tribes.util.Arrays;
(-)java/org/apache/catalina/tribes/group/RpcMessage.java (-12 / +68 lines)
Lines 24-29 Link Here
24
import java.io.Serializable;
24
import java.io.Serializable;
25
25
26
import org.apache.catalina.tribes.util.Arrays;
26
import org.apache.catalina.tribes.util.Arrays;
27
import org.apache.catalina.tribes.io.XByteBuffer;
27
28
28
/**
29
/**
29
 * <p>Title: </p>
30
 * <p>Title: </p>
Lines 37-66 Link Here
37
 */
38
 */
38
public class RpcMessage implements Externalizable {
39
public class RpcMessage implements Externalizable {
39
40
40
    protected Serializable message;
41
    protected byte[] messageContents;
41
    protected byte[] uuid;
42
    protected byte[] uuid;
42
    protected byte[] rpcId;
43
    protected byte[] rpcId;
43
    protected boolean reply = false;
44
    protected boolean reply = false;
45
    private transient Serializable messageCache; //transient for heuristic purposes
44
46
45
    public RpcMessage() {
47
    public RpcMessage() {
46
        //for serialization
48
        //for serialization
47
    }
49
    }
48
50
49
    public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
51
    public RpcMessage(byte[] rpcId, byte[] uuid) {
50
        this.rpcId = rpcId;
52
        this.rpcId = rpcId;
51
        this.uuid = uuid;
53
        this.uuid = uuid;
52
        this.message = message;
53
    }
54
    }
54
55
55
    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
56
    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
56
        reply = in.readBoolean();
57
        reply = in.readBoolean();
57
        int length = in.readInt();
58
        int length = in.readInt();
58
        uuid = new byte[length];
59
        uuid = new byte[length];
59
        in.read(uuid, 0, length);
60
        in.readFully(uuid);
60
        length = in.readInt();
61
        length = in.readInt();
61
        rpcId = new byte[length];
62
        rpcId = new byte[length];
62
        in.read(rpcId, 0, length);
63
        in.readFully(rpcId);
63
        message = (Serializable)in.readObject();
64
        length = in.readInt();
65
        messageContents = new byte[length];
66
        in.readFully(messageContents);
64
    }
67
    }
65
68
66
    public void writeExternal(ObjectOutput out) throws IOException {
69
    public void writeExternal(ObjectOutput out) throws IOException {
Lines 69-77 Link Here
69
        out.write(uuid, 0, uuid.length);
72
        out.write(uuid, 0, uuid.length);
70
        out.writeInt(rpcId.length);
73
        out.writeInt(rpcId.length);
71
        out.write(rpcId, 0, rpcId.length);
74
        out.write(rpcId, 0, rpcId.length);
72
        out.writeObject(message);
75
        out.writeInt(messageContents.length);
76
        out.write(messageContents);
73
    }
77
    }
74
    
78
79
    public void setMessage( Serializable message ){
80
        messageCache = message;
81
82
        if( message == null ){
83
            messageContents = new byte[0];
84
        } else {
85
            try {
86
                messageContents = XByteBuffer.serialize( message );
87
            } catch ( IOException ioex ){
88
                throw new RuntimeException(ioex);
89
            }
90
        }
91
    }
92
93
    public Serializable getMessage()
94
        throws IOException, ClassCastException, ClassNotFoundException
95
    {
96
        return getMessage( null );
97
    }
98
99
    public Serializable getMessage( ClassLoader[] externalLoaders )
100
        throws IOException, ClassCastException, ClassNotFoundException
101
    {
102
        if( messageContents.length == 0 ){
103
            return null;
104
        }
105
106
        if( messageCache != null ){
107
            return messageCache;
108
        }
109
110
        return messageCache = XByteBuffer.deserialize(messageContents, 0, messageContents.length, externalLoaders);
111
    }
112
75
    @Override
113
    @Override
76
    public String toString() {
114
    public String toString() {
77
        StringBuilder buf = new StringBuilder("RpcMessage[");
115
        StringBuilder buf = new StringBuilder("RpcMessage[");
Lines 80-87 Link Here
80
        buf.append(Arrays.toString(rpcId));
118
        buf.append(Arrays.toString(rpcId));
81
        buf.append("; uuid=");
119
        buf.append("; uuid=");
82
        buf.append(Arrays.toString(uuid));
120
        buf.append(Arrays.toString(uuid));
83
        buf.append("; msg=");
121
        buf.append(";");
84
        buf.append(message);
122
        if( messageContents.length == 0 ){
123
            buf.append("[no msg]");
124
        } else if( messageCache != null ) {
125
            buf.append("msg=").append(messageCache);
126
        } else {
127
            buf.append(messageContents.length).append(" bytes");
128
        }
85
        return buf.toString();
129
        return buf.toString();
86
    }
130
    }
87
    
131
    
Lines 91-97 Link Here
91
        }
135
        }
92
136
93
        public NoRpcChannelReply(byte[] rpcid, byte[] uuid) {
137
        public NoRpcChannelReply(byte[] rpcid, byte[] uuid) {
94
            super(rpcid,uuid,null);
138
            super(rpcid, uuid);
95
            reply = true;
139
            reply = true;
96
        }
140
        }
97
141
Lines 113-119 Link Here
113
            out.writeInt(rpcId.length);
157
            out.writeInt(rpcId.length);
114
            out.write(rpcId, 0, rpcId.length);
158
            out.write(rpcId, 0, rpcId.length);
115
        }
159
        }
116
    }    
117
160
161
        @Override
162
        public Serializable getMessage() 
163
            throws IOException, ClassCastException, ClassNotFoundException
164
        {
165
            return null;
166
        }
118
167
168
        @Override
169
        public Serializable getMessage( ClassLoader[] externalLoaders )
170
            throws IOException, ClassCastException, ClassNotFoundException
171
        {
172
            return null;
173
        }
174
    }
119
}
175
}
(-)java/org/apache/catalina/tribes/group/RpcChannel.java (-9 / +46 lines)
Lines 45-50 Link Here
45
    private RpcCallback callback;
45
    private RpcCallback callback;
46
    private byte[] rpcId;
46
    private byte[] rpcId;
47
    private int replyMessageOptions = 0;
47
    private int replyMessageOptions = 0;
48
    private ClassLoader[] externalLoaders;
48
49
49
    private HashMap<RpcCollectorKey, RpcCollector> responseMap = new HashMap<RpcCollectorKey, RpcCollector>();
50
    private HashMap<RpcCollectorKey, RpcCollector> responseMap = new HashMap<RpcCollectorKey, RpcCollector>();
50
51
Lines 54-68 Link Here
54
     * @param rpcId - the unique Id for this RPC group
55
     * @param rpcId - the unique Id for this RPC group
55
     * @param channel Channel
56
     * @param channel Channel
56
     * @param callback RpcCallback
57
     * @param callback RpcCallback
58
     * @param externalLoaders ClassLoaders used to deserialize messages.
57
     */
59
     */
58
    public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
60
    public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback, ClassLoader[] externalLoaders ) {
59
        this.channel = channel;
61
        this.channel = channel;
60
        this.callback = callback;
62
        this.callback = callback;
61
        this.rpcId = rpcId;
63
        this.rpcId = rpcId;
64
        this.externalLoaders = externalLoaders;
62
        channel.addChannelListener(this);
65
        channel.addChannelListener(this);
63
    }
66
    }
67
68
    /**
69
     * Create an RPC channel. You can have several RPC channels attached to a group
70
     * all separated out by the uniqueness
71
     * @param rpcId - the unique Id for this RPC group
72
     * @param channel Channel
73
     * @param callback RpcCallback
74
     */
75
    public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback ) {
76
        this( rpcId, channel, callback, null );
77
    }
78
64
    
79
    
65
    
66
    /**
80
    /**
67
     * Send a message and wait for the response.
81
     * Send a message and wait for the response.
68
     * @param destination Member[] - the destination for the message, and the members you request a reply from
82
     * @param destination Member[] - the destination for the message, and the members you request a reply from
Lines 90-96 Link Here
90
        try {
104
        try {
91
            synchronized (collector) {
105
            synchronized (collector) {
92
                if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
106
                if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
93
                RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
107
                RpcMessage rmsg = new RpcMessage(rpcId, key.id);
108
                rmsg.setMessage(message);
94
                channel.send(destination, rmsg, sendOptions);
109
                channel.send(destination, rmsg, sendOptions);
95
                if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
110
                if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
96
            }
111
            }
Lines 106-116 Link Here
106
    @Override
121
    @Override
107
    public void messageReceived(Serializable msg, Member sender) {
122
    public void messageReceived(Serializable msg, Member sender) {
108
        RpcMessage rmsg = (RpcMessage)msg;
123
        RpcMessage rmsg = (RpcMessage)msg;
124
125
        Serializable encapsulatedMessage;
126
        try {
127
            encapsulatedMessage = rmsg.getMessage( externalLoaders );
128
        } catch ( Exception x ){
129
            throw new RuntimeException(x);
130
        }
131
109
        RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
132
        RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
110
        if ( rmsg.reply ) {
133
        if ( rmsg.reply ) {
111
            RpcCollector collector = responseMap.get(key);
134
            RpcCollector collector = responseMap.get(key);
112
            if (collector == null) {
135
            if (collector == null) {
113
                callback.leftOver(rmsg.message, sender);
136
                callback.leftOver(encapsulatedMessage, sender);
114
            } else {
137
            } else {
115
                synchronized (collector) {
138
                synchronized (collector) {
116
                    //make sure it hasn't been removed
139
                    //make sure it hasn't been removed
Lines 118-135 Link Here
118
                        if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) 
141
                        if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) 
119
                            collector.destcnt--;
142
                            collector.destcnt--;
120
                        else 
143
                        else 
121
                            collector.addResponse(rmsg.message, sender);
144
                            collector.addResponse(encapsulatedMessage, sender);
122
                        if (collector.isComplete()) collector.notifyAll();
145
                        if (collector.isComplete()) collector.notifyAll();
123
                    } else {
146
                    } else {
124
                        if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) 
147
                        if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) 
125
                            callback.leftOver(rmsg.message, sender);
148
                            callback.leftOver(encapsulatedMessage, sender);
126
                    }
149
                    }
127
                }//synchronized
150
                }//synchronized
128
            }//end if
151
            }//end if
129
        } else{
152
        } else{
130
            Serializable reply = callback.replyRequest(rmsg.message,sender);
153
            Serializable reply = callback.replyRequest(encapsulatedMessage,sender);
154
            if( reply == null ){
155
                rmsg = new RpcMessage.NoRpcChannelReply(rmsg.rpcId, rmsg.uuid);
156
            } else {
157
                rmsg.setMessage(reply);
158
            }
159
131
            rmsg.reply = true;
160
            rmsg.reply = true;
132
            rmsg.message = reply;
161
133
            try {
162
            try {
134
                channel.send(new Member[] {sender}, rmsg,
163
                channel.send(new Member[] {sender}, rmsg,
135
                        replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
164
                        replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
Lines 187-193 Link Here
187
    public void setReplyMessageOptions(int replyMessageOptions) {
216
    public void setReplyMessageOptions(int replyMessageOptions) {
188
        this.replyMessageOptions = replyMessageOptions;
217
        this.replyMessageOptions = replyMessageOptions;
189
    }
218
    }
190
        
219
220
    public ClassLoader[] getExternalLoaders() {
221
        return externalLoaders;
222
    }
223
224
    public void setExternalLoaders( ClassLoader[] externalLoaders ) {
225
        this.externalLoaders = externalLoaders;
226
    }
227
191
    /**
228
    /**
192
     * 
229
     * 
193
     * Class that holds all response.
230
     * Class that holds all response.

Return to bug 50670