Lines 26-36
Link Here
|
26 |
import java.util.ArrayList; |
26 |
import java.util.ArrayList; |
27 |
import java.util.Date; |
27 |
import java.util.Date; |
28 |
import java.util.Iterator; |
28 |
import java.util.Iterator; |
|
|
29 |
import java.util.concurrent.CountDownLatch; |
30 |
|
31 |
import javax.servlet.ServletContext; |
29 |
|
32 |
|
30 |
import org.apache.catalina.Cluster; |
33 |
import org.apache.catalina.Cluster; |
31 |
import org.apache.catalina.Container; |
34 |
import org.apache.catalina.Container; |
32 |
import org.apache.catalina.Context; |
35 |
import org.apache.catalina.Context; |
33 |
import org.apache.catalina.Engine; |
36 |
import org.apache.catalina.Engine; |
|
|
37 |
import org.apache.catalina.Globals; |
34 |
import org.apache.catalina.Host; |
38 |
import org.apache.catalina.Host; |
35 |
import org.apache.catalina.LifecycleException; |
39 |
import org.apache.catalina.LifecycleException; |
36 |
import org.apache.catalina.LifecycleListener; |
40 |
import org.apache.catalina.LifecycleListener; |
Lines 38-50
Link Here
|
38 |
import org.apache.catalina.Valve; |
42 |
import org.apache.catalina.Valve; |
39 |
import org.apache.catalina.core.StandardContext; |
43 |
import org.apache.catalina.core.StandardContext; |
40 |
import org.apache.catalina.ha.CatalinaCluster; |
44 |
import org.apache.catalina.ha.CatalinaCluster; |
|
|
45 |
import org.apache.catalina.ha.ClusterManager; |
41 |
import org.apache.catalina.ha.ClusterMessage; |
46 |
import org.apache.catalina.ha.ClusterMessage; |
42 |
import org.apache.catalina.ha.tcp.ReplicationValve; |
47 |
import org.apache.catalina.ha.tcp.ReplicationValve; |
43 |
import org.apache.catalina.tribes.Member; |
48 |
import org.apache.catalina.tribes.Member; |
44 |
import org.apache.catalina.tribes.io.ReplicationStream; |
49 |
import org.apache.catalina.tribes.io.ReplicationStream; |
45 |
import org.apache.catalina.util.LifecycleSupport; |
50 |
import org.apache.catalina.util.LifecycleSupport; |
46 |
import org.apache.catalina.util.StringManager; |
51 |
import org.apache.catalina.util.StringManager; |
47 |
import org.apache.catalina.ha.ClusterManager; |
|
|
48 |
|
52 |
|
49 |
/** |
53 |
/** |
50 |
* The DeltaManager manages replicated sessions by only replicating the deltas |
54 |
* The DeltaManager manages replicated sessions by only replicating the deltas |
Lines 62-71
Link Here
|
62 |
* @author Craig R. McClanahan |
66 |
* @author Craig R. McClanahan |
63 |
* @author Jean-Francois Arcand |
67 |
* @author Jean-Francois Arcand |
64 |
* @author Peter Rossbach |
68 |
* @author Peter Rossbach |
|
|
69 |
* @author Jason Lunn |
65 |
* @version $Revision$ $Date$ |
70 |
* @version $Revision$ $Date$ |
66 |
*/ |
71 |
*/ |
67 |
|
72 |
|
68 |
public class DeltaManager extends ClusterManagerBase{ |
73 |
public class DeltaManager extends ClusterManagerBase { |
69 |
|
74 |
|
70 |
// ---------------------------------------------------- Security Classes |
75 |
// ---------------------------------------------------- Security Classes |
71 |
public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class); |
76 |
public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class); |
Lines 106-111
Link Here
|
106 |
protected LifecycleSupport lifecycle = new LifecycleSupport(this); |
111 |
protected LifecycleSupport lifecycle = new LifecycleSupport(this); |
107 |
|
112 |
|
108 |
/** |
113 |
/** |
|
|
114 |
* Flag indicating that messageReceived(ClusterMessage) should block until |
115 |
* all local applications have completed initialization |
116 |
*/ |
117 |
protected boolean delay = false; |
118 |
|
119 |
/** |
120 |
* Barrier used to receive notification from other threads that it is okay |
121 |
* to process incoming messages from the cluster |
122 |
*/ |
123 |
private volatile CountDownLatch gate = null; |
124 |
|
125 |
/** |
109 |
* The maximum number of active Sessions allowed, or -1 for no limit. |
126 |
* The maximum number of active Sessions allowed, or -1 for no limit. |
110 |
*/ |
127 |
*/ |
111 |
private int maxActiveSessions = -1; |
128 |
private int maxActiveSessions = -1; |
Lines 121-128
Link Here
|
121 |
/** |
138 |
/** |
122 |
* wait time between send session block (default 2 sec) |
139 |
* wait time between send session block (default 2 sec) |
123 |
*/ |
140 |
*/ |
124 |
private int sendAllSessionsWaitTime = 2 * 1000 ; |
141 |
private int sendAllSessionsWaitTime = 2 * 1000; |
125 |
private ArrayList receivedMessageQueue = new ArrayList() ; |
142 |
private ArrayList<SessionMessage> receivedMessageQueue = new ArrayList<SessionMessage>() ; |
126 |
private boolean receiverQueue = false ; |
143 |
private boolean receiverQueue = false ; |
127 |
private boolean stateTimestampDrop = true ; |
144 |
private boolean stateTimestampDrop = true ; |
128 |
private long stateTransferCreateSendTime; |
145 |
private long stateTransferCreateSendTime; |
Lines 175-180
Link Here
|
175 |
public String getName() { |
192 |
public String getName() { |
176 |
return name; |
193 |
return name; |
177 |
} |
194 |
} |
|
|
195 |
|
196 |
/** |
197 |
* Set the member that indicates processing messages should wait for local |
198 |
* initialization of applications to complete |
199 |
* |
200 |
* @param delayed If true, processing messageReceived(ClusterMessage) will |
201 |
* block until local web applications have completed initialization. |
202 |
*/ |
203 |
public void setDelay ( boolean delay ) { |
204 |
this.delay = delay; |
205 |
} |
206 |
|
207 |
/** |
208 |
* Gets the member the delayed flag |
209 |
* @return delayed - boolean flag indicating that |
210 |
* messageReceived(ClusterMessage) should block until local initialization |
211 |
* of applications has completed |
212 |
*/ |
213 |
public boolean getDelay () { |
214 |
return delay; |
215 |
} |
178 |
|
216 |
|
179 |
/** |
217 |
/** |
180 |
* @return Returns the counterSend_EVT_GET_ALL_SESSIONS. |
218 |
* @return Returns the counterSend_EVT_GET_ALL_SESSIONS. |
Lines 781-786
Link Here
|
781 |
lifecycle.removeLifecycleListener(listener); |
819 |
lifecycle.removeLifecycleListener(listener); |
782 |
} |
820 |
} |
783 |
|
821 |
|
|
|
822 |
@Override |
823 |
/** |
824 |
* If this.delay is true, create a gate that will be opened when |
825 |
* local application initialization is complete |
826 |
*/ |
827 |
public void init () { |
828 |
if (delay && container != null) { |
829 |
if (container instanceof StandardContext) { |
830 |
ServletContext servletContext = ((StandardContext) container).getServletContext(); |
831 |
if (servletContext != null) { |
832 |
gate = new CountDownLatch(1); |
833 |
servletContext.setAttribute(Globals.CLUSTER_DELAY, gate); |
834 |
} |
835 |
} |
836 |
} |
837 |
super.init(); |
838 |
} |
839 |
|
784 |
/** |
840 |
/** |
785 |
* Prepare for the beginning of active use of the public methods of this |
841 |
* Prepare for the beginning of active use of the public methods of this |
786 |
* component. This method should be called after <code>configure()</code>, |
842 |
* component. This method should be called after <code>configure()</code>, |
Lines 885-892
Link Here
|
885 |
waitForSendAllSessions(beforeSendTime); |
941 |
waitForSendAllSessions(beforeSendTime); |
886 |
} finally { |
942 |
} finally { |
887 |
synchronized(receivedMessageQueue) { |
943 |
synchronized(receivedMessageQueue) { |
888 |
for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) { |
944 |
for (Iterator<SessionMessage> iter = receivedMessageQueue.iterator(); iter.hasNext();) { |
889 |
SessionMessage smsg = (SessionMessage) iter.next(); |
945 |
SessionMessage smsg = iter.next(); |
890 |
if (!stateTimestampDrop) { |
946 |
if (!stateTimestampDrop) { |
891 |
messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null); |
947 |
messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null); |
892 |
} else { |
948 |
} else { |
Lines 1068-1073
Link Here
|
1068 |
* the message received. |
1124 |
* the message received. |
1069 |
*/ |
1125 |
*/ |
1070 |
public void messageDataReceived(ClusterMessage cmsg) { |
1126 |
public void messageDataReceived(ClusterMessage cmsg) { |
|
|
1127 |
// Block processing until local application initialization has |
1128 |
// completed, if a gate has been erected |
1129 |
synchronized (this) { |
1130 |
if(gate != null) { |
1131 |
try { |
1132 |
gate.await(); |
1133 |
gate = null; |
1134 |
} |
1135 |
catch(InterruptedException e) { |
1136 |
log.error(e, e); |
1137 |
} |
1138 |
} |
1139 |
} |
1140 |
|
1071 |
if (cmsg != null && cmsg instanceof SessionMessage) { |
1141 |
if (cmsg != null && cmsg instanceof SessionMessage) { |
1072 |
SessionMessage msg = (SessionMessage) cmsg; |
1142 |
SessionMessage msg = (SessionMessage) cmsg; |
1073 |
switch (msg.getEventType()) { |
1143 |
switch (msg.getEventType()) { |
Lines 1535-1541
Link Here
|
1535 |
result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; |
1605 |
result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; |
1536 |
result.receiverQueue = receiverQueue ; |
1606 |
result.receiverQueue = receiverQueue ; |
1537 |
result.stateTimestampDrop = stateTimestampDrop ; |
1607 |
result.stateTimestampDrop = stateTimestampDrop ; |
1538 |
result.stateTransferCreateSendTime = stateTransferCreateSendTime; |
1608 |
result.stateTransferCreateSendTime = stateTransferCreateSendTime; |
|
|
1609 |
result.delay = delay; |
1539 |
return result; |
1610 |
return result; |
1540 |
} |
1611 |
} |
1541 |
} |
1612 |
} |