ASF Bugzilla – Attachment 23005 Details for
Bug 46284
Add flag to DeltaManager that blocks processing cluster messages until local applicaiton initialization is completed
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Help
|
New Account
|
Log In
Remember
[x]
|
Forgot Password
Login:
[x]
[patch]
A multi-file patch that includes changes that allow DeltaManager to block processing of cluster messages until local application initialization is complete
delay_cluster.patch (text/plain), 12.47 KB, created by
Jason A. Lunn
on 2008-12-09 06:49:48 UTC
(
hide
)
Description:
A multi-file patch that includes changes that allow DeltaManager to block processing of cluster messages until local application initialization is complete
Filename:
MIME Type:
Creator:
Jason A. Lunn
Created:
2008-12-09 06:49:48 UTC
Size:
12.47 KB
patch
obsolete
>Index: java/org/apache/catalina/Globals.java >=================================================================== >--- java/org/apache/catalina/Globals.java (revision 719433) >+++ java/org/apache/catalina/Globals.java (working copy) >@@ -36,6 +36,14 @@ > "org.apache.catalina.deploy.alt_dd"; > > /** >+ * The servlet context attribute under which we store the concurrent latch >+ * used to block processing cluster messages until after local application >+ * initialization >+ */ >+ public static final String CLUSTER_DELAY = >+ "org.apache.catalina.ha.delay"; >+ >+ /** > * The request attribute under which we store the array of X509Certificate > * objects representing the certificate chain presented by our client, > * if any. >Index: java/org/apache/catalina/ha/session/DeltaManager.java >=================================================================== >--- java/org/apache/catalina/ha/session/DeltaManager.java (revision 719433) >+++ java/org/apache/catalina/ha/session/DeltaManager.java (working copy) >@@ -26,11 +26,15 @@ > import java.util.ArrayList; > import java.util.Date; > import java.util.Iterator; >+import java.util.concurrent.CountDownLatch; >+ >+import javax.servlet.ServletContext; > > import org.apache.catalina.Cluster; > import org.apache.catalina.Container; > import org.apache.catalina.Context; > import org.apache.catalina.Engine; >+import org.apache.catalina.Globals; > import org.apache.catalina.Host; > import org.apache.catalina.LifecycleException; > import org.apache.catalina.LifecycleListener; >@@ -38,13 +42,13 @@ > import org.apache.catalina.Valve; > import org.apache.catalina.core.StandardContext; > import org.apache.catalina.ha.CatalinaCluster; >+import org.apache.catalina.ha.ClusterManager; > import org.apache.catalina.ha.ClusterMessage; > import org.apache.catalina.ha.tcp.ReplicationValve; > import org.apache.catalina.tribes.Member; > import org.apache.catalina.tribes.io.ReplicationStream; > import org.apache.catalina.util.LifecycleSupport; > import org.apache.catalina.util.StringManager; >-import org.apache.catalina.ha.ClusterManager; > > /** > * The DeltaManager manages replicated sessions by only replicating the deltas >@@ -62,10 +66,11 @@ > * @author Craig R. McClanahan > * @author Jean-Francois Arcand > * @author Peter Rossbach >+ * @author Jason Lunn > * @version $Revision$ $Date$ > */ > >-public class DeltaManager extends ClusterManagerBase{ >+public class DeltaManager extends ClusterManagerBase { > > // ---------------------------------------------------- Security Classes > public static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory.getLog(DeltaManager.class); >@@ -106,6 +111,18 @@ > protected LifecycleSupport lifecycle = new LifecycleSupport(this); > > /** >+ * Flag indicating that messageReceived(ClusterMessage) should block until >+ * all local applications have completed initialization >+ */ >+ protected boolean delay = false; >+ >+ /** >+ * Barrier used to receive notification from other threads that it is okay >+ * to process incoming messages from the cluster >+ */ >+ private volatile CountDownLatch gate = null; >+ >+ /** > * The maximum number of active Sessions allowed, or -1 for no limit. > */ > private int maxActiveSessions = -1; >@@ -121,8 +138,8 @@ > /** > * wait time between send session block (default 2 sec) > */ >- private int sendAllSessionsWaitTime = 2 * 1000 ; >- private ArrayList receivedMessageQueue = new ArrayList() ; >+ private int sendAllSessionsWaitTime = 2 * 1000; >+ private ArrayList<SessionMessage> receivedMessageQueue = new ArrayList<SessionMessage>() ; > private boolean receiverQueue = false ; > private boolean stateTimestampDrop = true ; > private long stateTransferCreateSendTime; >@@ -175,6 +192,27 @@ > public String getName() { > return name; > } >+ >+ /** >+ * Set the member that indicates processing messages should wait for local >+ * initialization of applications to complete >+ * >+ * @param delayed If true, processing messageReceived(ClusterMessage) will >+ * block until local web applications have completed initialization. >+ */ >+ public void setDelay ( boolean delay ) { >+ this.delay = delay; >+ } >+ >+ /** >+ * Gets the member the delayed flag >+ * @return delayed - boolean flag indicating that >+ * messageReceived(ClusterMessage) should block until local initialization >+ * of applications has completed >+ */ >+ public boolean getDelay () { >+ return delay; >+ } > > /** > * @return Returns the counterSend_EVT_GET_ALL_SESSIONS. >@@ -781,6 +819,24 @@ > lifecycle.removeLifecycleListener(listener); > } > >+ @Override >+ /** >+ * If this.delay is true, create a gate that will be opened when >+ * local application initialization is complete >+ */ >+ public void init () { >+ if (delay && container != null) { >+ if (container instanceof StandardContext) { >+ ServletContext servletContext = ((StandardContext) container).getServletContext(); >+ if (servletContext != null) { >+ gate = new CountDownLatch(1); >+ servletContext.setAttribute(Globals.CLUSTER_DELAY, gate); >+ } >+ } >+ } >+ super.init(); >+ } >+ > /** > * Prepare for the beginning of active use of the public methods of this > * component. This method should be called after <code>configure()</code>, >@@ -885,8 +941,8 @@ > waitForSendAllSessions(beforeSendTime); > } finally { > synchronized(receivedMessageQueue) { >- for (Iterator iter = receivedMessageQueue.iterator(); iter.hasNext();) { >- SessionMessage smsg = (SessionMessage) iter.next(); >+ for (Iterator<SessionMessage> iter = receivedMessageQueue.iterator(); iter.hasNext();) { >+ SessionMessage smsg = iter.next(); > if (!stateTimestampDrop) { > messageReceived(smsg, smsg.getAddress() != null ? (Member) smsg.getAddress() : null); > } else { >@@ -1068,6 +1124,20 @@ > * the message received. > */ > public void messageDataReceived(ClusterMessage cmsg) { >+ // Block processing until local application initialization has >+ // completed, if a gate has been erected >+ synchronized (this) { >+ if(gate != null) { >+ try { >+ gate.await(); >+ gate = null; >+ } >+ catch(InterruptedException e) { >+ log.error(e, e); >+ } >+ } >+ } >+ > if (cmsg != null && cmsg instanceof SessionMessage) { > SessionMessage msg = (SessionMessage) cmsg; > switch (msg.getEventType()) { >@@ -1535,7 +1605,8 @@ > result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; > result.receiverQueue = receiverQueue ; > result.stateTimestampDrop = stateTimestampDrop ; >- result.stateTransferCreateSendTime = stateTransferCreateSendTime; >+ result.stateTransferCreateSendTime = stateTransferCreateSendTime; >+ result.delay = delay; > return result; > } > } >Index: java/org/apache/catalina/ha/session/mbeans-descriptors.xml >=================================================================== >--- java/org/apache/catalina/ha/session/mbeans-descriptors.xml (revision 719433) >+++ java/org/apache/catalina/ha/session/mbeans-descriptors.xml (working copy) >@@ -268,7 +268,7 @@ > <attribute > name="expireSessionsOnShutdown" > is="true" >- description="exipre all sessions cluster wide as one node goes down" >+ description="expire all sessions cluster wide as one node goes down" > type="boolean"/> > <attribute > name="notifyListenersOnReplication" >@@ -293,6 +293,10 @@ > name="sendAllSessionsWaitTime" > description="wait time between send session block (default 2 sec)" > type="int"/> >+ <attribute >+ name="delay" >+ description="wait until local applications have initialized before processing cluster messages" >+ type="boolean"/> > <operation > name="listSessionIds" > description="Return the list of active session ids" >Index: java/org/apache/catalina/startup/Catalina.java >=================================================================== >--- java/org/apache/catalina/startup/Catalina.java (revision 719433) >+++ java/org/apache/catalina/startup/Catalina.java (working copy) >@@ -28,11 +28,18 @@ > import java.util.ArrayList; > import java.util.HashMap; > import java.util.List; >+import java.util.concurrent.CountDownLatch; >+ >+import javax.servlet.ServletContext; > > import org.apache.catalina.Container; >+import org.apache.catalina.Globals; > import org.apache.catalina.Lifecycle; > import org.apache.catalina.LifecycleException; > import org.apache.catalina.Server; >+import org.apache.catalina.Service; >+import org.apache.catalina.core.ContainerBase; >+import org.apache.catalina.core.StandardContext; > import org.apache.catalina.core.StandardServer; > import org.apache.tomcat.util.digester.Digester; > import org.apache.tomcat.util.digester.Rule; >@@ -56,6 +63,7 @@ > * > * @author Craig R. McClanahan > * @author Remy Maucherat >+ * @author Jason Lunn > * @version $Revision$ $Date$ > */ > >@@ -579,7 +587,18 @@ > log.error("Catalina.start: ", e); > } > } >- >+ >+ // Open any gates stored in ServletContexts to allow the processing of >+ // cluster messages once local applications have been initialized >+ Service [] services = server.findServices(); >+ if ( services != null ) { >+ for ( Service service : services ) { >+ if ( service != null ) { >+ openContainerGates( service.getContainer() ); >+ } >+ } >+ } >+ > long t2 = System.nanoTime(); > if(log.isInfoEnabled()) > log.info("Server startup in " + ((t2 - t1) / 1000000) + " ms"); >@@ -601,7 +620,6 @@ > await(); > stop(); > } >- > } > > >@@ -609,7 +627,6 @@ > * Stop an existing server instance. > */ > public void stop() { >- > try { > // Remove the ShutdownHook first so that server.stop() > // doesn't get invoked twice >@@ -629,7 +646,6 @@ > log.error("Catalina.stop", e); > } > } >- > } > > >@@ -637,9 +653,7 @@ > * Await and shutdown. > */ > public void await() { >- > server.await(); >- > } > > >@@ -647,15 +661,50 @@ > * Print usage information for this application. > */ > protected void usage() { >- > System.out.println > ("usage: java org.apache.catalina.startup.Catalina" > + " [ -config {pathname} ]" > + " [ -nonaming ] { start | stop }"); >- > } > > >+ /** >+ * Traverses the argument container and decrements the CountDownLatch >+ * found in the servlet context with attribute key Globals.CLUSTER_DELAY >+ * if found >+ * @param container Possibly null Container instance >+ */ >+ protected void openContainerGates ( Container container ) { >+ if (container == null) { >+ return; >+ } >+ >+ if ( container instanceof StandardContext ) { >+ StandardContext context = >+ (StandardContext)container; >+ ServletContext servletContext = >+ context.getServletContext(); >+ if (servletContext != null) { >+ Object contextAttribute = servletContext >+ .getAttribute(Globals.CLUSTER_DELAY); >+ if (contextAttribute != null && >+ contextAttribute instanceof CountDownLatch) { >+ CountDownLatch gate = >+ (CountDownLatch) contextAttribute; >+ gate.countDown(); >+ } >+ } >+ } else if ( container instanceof ContainerBase ) { >+ ContainerBase base = (ContainerBase)container; >+ Container [] containers = base.findChildren(); >+ if ( containers != null ) { >+ for ( Container childContainer : containers ) { >+ openContainerGates( childContainer ); >+ } >+ } >+ } >+ } >+ > // --------------------------------------- CatalinaShutdownHook Inner Class > > // XXX Should be moved to embedded ! >@@ -709,6 +758,4 @@ > top.setParentClassLoader(parentClassLoader); > > } >- >- > }
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 46284
:
22928
| 23005