View | Details | Raw Unified | Return to bug 50648
Collapse All | Expand All

(-)java/org/apache/catalina/tribes/group/RpcCallback2.java (+24 lines)
Line 0 Link Here
1
package org.apache.catalina.tribes.group;
2
3
import org.apache.catalina.tribes.Member;
4
5
import java.io.Serializable;
6
7
/**
8
 * @author ocostet
9
 * @version 1.0
10
 */
11
public interface RpcCallback2
12
extends RpcCallback
13
{
14
    /**
15
     * Invoked when the response message returned by
16
     * {@link #replyRequest(java.io.Serializable, org.apache.catalina.tribes.Member)}
17
     * could not be sent to the requesting member.
18
     *
19
     * @param reply The Object returned by {@link #replyRequest(java.io.Serializable, org.apache.catalina.tribes.Member)}.
20
     * @param destination the member the response was destined for.
21
     * @param exception the error that occured.
22
     */
23
    public void replyFailed( Serializable reply, Member destination, Exception exception );
24
}
(-)java/org/apache/catalina/tribes/group/RpcMessage.java (-14 / +81 lines)
Lines 24-29 Link Here
24
import java.io.Serializable;
24
import java.io.Serializable;
25
25
26
import org.apache.catalina.tribes.util.Arrays;
26
import org.apache.catalina.tribes.util.Arrays;
27
import org.apache.catalina.tribes.io.XByteBuffer;
27
28
28
/**
29
/**
29
 * <p>Title: </p>
30
 * <p>Title: </p>
Lines 37-55 Link Here
37
 */
38
 */
38
public class RpcMessage implements Externalizable {
39
public class RpcMessage implements Externalizable {
39
40
40
    protected Serializable message;
41
    byte[] messageContents = new byte[0];
41
    protected byte[] uuid;
42
    byte[] uuid;
42
    protected byte[] rpcId;
43
    byte[] rpcId;
43
    protected boolean reply = false;
44
    boolean reply = false;
45
    transient Serializable messageCache;
44
46
45
    public RpcMessage() {
47
    public RpcMessage() {
46
        //for serialization
48
        //for serialization
47
    }
49
    }
48
50
49
    public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
51
    public RpcMessage(byte[] rpcId, byte[] uuid) {
50
        this.rpcId = rpcId;
52
        this.rpcId = rpcId;
51
        this.uuid = uuid;
53
        this.uuid = uuid;
52
        this.message = message;
53
    }
54
    }
54
55
55
    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
56
    public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException {
Lines 60-66 Link Here
60
        length = in.readInt();
61
        length = in.readInt();
61
        rpcId = new byte[length];
62
        rpcId = new byte[length];
62
        in.read(rpcId, 0, length);
63
        in.read(rpcId, 0, length);
63
        message = (Serializable)in.readObject();
64
        length = in.readInt();
65
        messageContents = new byte[ length ];
66
        in.readFully( messageContents );
64
    }
67
    }
65
68
66
    public void writeExternal(ObjectOutput out) throws IOException {
69
    public void writeExternal(ObjectOutput out) throws IOException {
Lines 69-77 Link Here
69
        out.write(uuid, 0, uuid.length);
72
        out.write(uuid, 0, uuid.length);
70
        out.writeInt(rpcId.length);
73
        out.writeInt(rpcId.length);
71
        out.write(rpcId, 0, rpcId.length);
74
        out.write(rpcId, 0, rpcId.length);
72
        out.writeObject(message);
75
        out.writeInt( messageContents.length );
76
        out.write( messageContents );
73
    }
77
    }
74
    
78
79
    public void setUuid( byte[] uuid ) {
80
        this.uuid = uuid;
81
    }
82
83
    public void setRpcId( byte[] rpcId ) {
84
        this.rpcId = rpcId;
85
    }
86
87
    public void setReply( boolean reply ) {
88
        this.reply = reply;
89
    }
90
91
    public byte[] getUuid() {
92
        return uuid;
93
    }
94
95
    public byte[] getRpcId() {
96
        return rpcId;
97
    }
98
99
    public boolean isReply() {
100
        return reply;
101
    }
102
103
    public void setMessage( Serializable message ){
104
        messageCache = message;
105
106
        if( message == null ){
107
            messageContents = new byte[0];
108
        }
109
        else {
110
            try {
111
                messageContents = XByteBuffer.serialize( message );
112
            }
113
            catch ( IOException ioex ){
114
                throw new RuntimeException(ioex);
115
            }
116
        }
117
    }
118
119
    public Serializable getMessage()
120
        throws IOException, ClassCastException, ClassNotFoundException
121
    {
122
        return getMessage( null );
123
    }
124
125
    public Serializable getMessage( ClassLoader[] externalLoaders )
126
        throws IOException, ClassCastException, ClassNotFoundException
127
    {
128
        if( messageContents.length == 0 ){
129
            return null;
130
        }
131
132
        if( messageCache != null ){
133
            return messageCache;
134
        }
135
136
        return messageCache = XByteBuffer.deserialize(messageContents, 0, messageContents.length, externalLoaders);
137
    }
138
75
    @Override
139
    @Override
76
    public String toString() {
140
    public String toString() {
77
        StringBuilder buf = new StringBuilder("RpcMessage[");
141
        StringBuilder buf = new StringBuilder("RpcMessage[");
Lines 80-87 Link Here
80
        buf.append(Arrays.toString(rpcId));
144
        buf.append(Arrays.toString(rpcId));
81
        buf.append("; uuid=");
145
        buf.append("; uuid=");
82
        buf.append(Arrays.toString(uuid));
146
        buf.append(Arrays.toString(uuid));
83
        buf.append("; msg=");
147
        buf.append(";");
84
        buf.append(message);
148
        if( messageContents.length == 0 ){
149
            buf.append("[no msg]");
150
        }
151
        else {
152
            buf.append(messageContents.length).append(" bytes");
153
        }
85
        return buf.toString();
154
        return buf.toString();
86
    }
155
    }
87
    
156
    
Lines 91-97 Link Here
91
        }
160
        }
92
161
93
        public NoRpcChannelReply(byte[] rpcid, byte[] uuid) {
162
        public NoRpcChannelReply(byte[] rpcid, byte[] uuid) {
94
            super(rpcid,uuid,null);
163
            super( rpcid, uuid );
95
            reply = true;
164
            reply = true;
96
        }
165
        }
97
166
Lines 114-119 Link Here
114
            out.write(rpcId, 0, rpcId.length);
183
            out.write(rpcId, 0, rpcId.length);
115
        }
184
        }
116
    }    
185
    }    
117
118
119
}
186
}
(-)java/org/apache/catalina/tribes/group/RpcChannel.java (-18 / +73 lines)
Lines 45-50 Link Here
45
    private RpcCallback callback;
45
    private RpcCallback callback;
46
    private byte[] rpcId;
46
    private byte[] rpcId;
47
    private int replyMessageOptions = 0;
47
    private int replyMessageOptions = 0;
48
    private ClassLoader[] externalLoaders;
48
49
49
    private HashMap<RpcCollectorKey, RpcCollector> responseMap = new HashMap<RpcCollectorKey, RpcCollector>();
50
    private HashMap<RpcCollectorKey, RpcCollector> responseMap = new HashMap<RpcCollectorKey, RpcCollector>();
50
51
Lines 54-69 Link Here
54
     * @param rpcId - the unique Id for this RPC group
55
     * @param rpcId - the unique Id for this RPC group
55
     * @param channel Channel
56
     * @param channel Channel
56
     * @param callback RpcCallback
57
     * @param callback RpcCallback
58
     * @param externalLoaders ClassLoaders used to deserialize messages.
57
     */
59
     */
58
    public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
60
    public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback, ClassLoader[] externalLoaders ) {
59
        this.channel = channel;
61
        this.channel = channel;
60
        this.callback = callback;
62
        this.callback = callback;
61
        this.rpcId = rpcId;
63
        this.rpcId = rpcId;
64
        this.externalLoaders = externalLoaders;
62
        channel.addChannelListener(this);
65
        channel.addChannelListener(this);
63
    }
66
    }
64
    
67
    
65
    
66
    /**
68
    /**
69
     * Create an RPC channel. You can have several RPC channels attached to a group
70
     * all separated out by the uniqueness
71
     * @param rpcId - the unique Id for this RPC group
72
     * @param channel Channel
73
     * @param callback RpcCallback
74
     */
75
    public RpcChannel( byte[] rpcId, Channel channel, RpcCallback callback ) {
76
        this( rpcId, channel, callback, null );
77
    }
78
79
80
    /**
67
     * Send a message and wait for the response.
81
     * Send a message and wait for the response.
68
     * @param destination Member[] - the destination for the message, and the members you request a reply from
82
     * @param destination Member[] - the destination for the message, and the members you request a reply from
69
     * @param message Serializable - the message you are sending out
83
     * @param message Serializable - the message you are sending out
Lines 90-101 Link Here
90
        try {
104
        try {
91
            synchronized (collector) {
105
            synchronized (collector) {
92
                if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
106
                if ( rpcOptions != NO_REPLY ) responseMap.put(key, collector);
93
                RpcMessage rmsg = new RpcMessage(rpcId, key.id, message);
107
108
                RpcMessage rmsg = new RpcMessage(rpcId, key.id);
109
                rmsg.setMessage( message );
110
94
                channel.send(destination, rmsg, sendOptions);
111
                channel.send(destination, rmsg, sendOptions);
95
                if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
112
                if ( rpcOptions != NO_REPLY ) collector.wait(timeout);
96
            }
113
            }
97
        } catch ( InterruptedException ix ) {
114
        } catch ( InterruptedException ix ) {
98
            Thread.interrupted();
115
            Thread.currentThread().interrupt();
99
            //throw new ChannelException(ix);
116
            //throw new ChannelException(ix);
100
        }finally {
117
        }finally {
101
            responseMap.remove(key);
118
            responseMap.remove(key);
Lines 106-140 Link Here
106
    @Override
123
    @Override
107
    public void messageReceived(Serializable msg, Member sender) {
124
    public void messageReceived(Serializable msg, Member sender) {
108
        RpcMessage rmsg = (RpcMessage)msg;
125
        RpcMessage rmsg = (RpcMessage)msg;
109
        RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
126
110
        if ( rmsg.reply ) {
127
        Serializable message;
128
129
        try {
130
            message = rmsg.getMessage( externalLoaders );
131
        }
132
        catch ( Throwable t ){
133
            throw new RuntimeException(t);
134
        }
135
136
        RpcCollectorKey key = new RpcCollectorKey( rmsg.getUuid() );
137
138
        if ( rmsg.isReply() ) {
111
            RpcCollector collector = responseMap.get(key);
139
            RpcCollector collector = responseMap.get(key);
112
            if (collector == null) {
140
            if (collector == null) {
113
                callback.leftOver(rmsg.message, sender);
141
                callback.leftOver( message, sender );
114
            } else {
142
            } else {
115
                synchronized (collector) {
143
                synchronized (collector) {
116
                    //make sure it hasn't been removed
144
                    //make sure it hasn't been removed
117
                    if ( responseMap.containsKey(key) ) {
145
                    if ( responseMap.containsKey(key) ) {
118
                        if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) 
146
                        if ( (rmsg instanceof RpcMessage.NoRpcChannelReply) ) {
119
                            collector.destcnt--;
147
                            collector.destcnt--;
120
                        else 
148
                        }
121
                            collector.addResponse(rmsg.message, sender);
149
                        else {
122
                        if (collector.isComplete()) collector.notifyAll();
150
                            collector.addResponse(message, sender);
151
                        }
152
153
                        if ( collector.isComplete() ) {
154
                            collector.notifyAll();
155
                        }
123
                    } else {
156
                    } else {
124
                        if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) 
157
                        if (! (rmsg instanceof RpcMessage.NoRpcChannelReply) ) {
125
                            callback.leftOver(rmsg.message, sender);
158
                            callback.leftOver( message, sender );
159
                        }
126
                    }
160
                    }
127
                }//synchronized
161
                }//synchronized
128
            }//end if
162
            }//end if
129
        } else{
163
        } else{
130
            Serializable reply = callback.replyRequest(rmsg.message,sender);
164
            Serializable reply = callback.replyRequest( message, sender );
131
            rmsg.reply = true;
165
132
            rmsg.message = reply;
166
            if( reply == null ){
167
                rmsg = new RpcMessage.NoRpcChannelReply( rmsg.getRpcId(), rmsg.getUuid() );
168
            }
169
            else {
170
                rmsg.setMessage( reply );
171
            }
172
173
            rmsg.setReply( true );
174
133
            try {
175
            try {
134
                channel.send(new Member[] {sender}, rmsg,
176
                channel.send(new Member[] {sender}, rmsg,
135
                        replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
177
                        replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
136
            }catch ( Exception x )  {
178
            }
179
            catch ( Exception x )  {
137
                log.error("Unable to send back reply in RpcChannel.",x);
180
                log.error("Unable to send back reply in RpcChannel.",x);
181
182
                if( callback instanceof RpcCallback2 ){
183
                    ((RpcCallback2) callback).replyFailed( reply, sender, x );
184
                }
138
            }
185
            }
139
        }//end if
186
        }//end if
140
    }
187
    }
Lines 187-193 Link Here
187
    public void setReplyMessageOptions(int replyMessageOptions) {
234
    public void setReplyMessageOptions(int replyMessageOptions) {
188
        this.replyMessageOptions = replyMessageOptions;
235
        this.replyMessageOptions = replyMessageOptions;
189
    }
236
    }
190
        
237
238
    public ClassLoader[] getExternalLoaders() {
239
        return externalLoaders;
240
    }
241
242
    public void setExternalLoaders( ClassLoader[] externalLoaders ) {
243
        this.externalLoaders = externalLoaders;
244
    }
245
191
    /**
246
    /**
192
     * 
247
     * 
193
     * Class that holds all response.
248
     * Class that holds all response.

Return to bug 50648