Bug 43524 - Clustering with Slide does not work
Summary: Clustering with Slide does not work
Status: NEW
Alias: None
Product: Slide
Classification: Unclassified
Component: Stores (show other bugs)
Version: 2.1
Hardware: All other
: P2 normal (vote)
Target Milestone: ---
Assignee: Slide Developer List
URL:
Keywords:
Depends on:
Blocks:
 
Reported: 2007-10-01 07:50 UTC by Arne v.Irmer
Modified: 2007-10-01 07:50 UTC (History)
1 user (show)



Attachments

Note You need to log in before you can comment on or make changes to this bug.
Description Arne v.Irmer 2007-10-01 07:50:17 UTC
ClusterCacheRefreshing has to be enabled. Trying to do so
I found some bugs, that I fixed for Slide 2.1. (I attach a patch for the
webdavclientlib and slide-core)

Here are the found problems:
- The connection between slides in a cluster isn't established
Only the last started slide is able to send messages to the other,
because reading and parsing messages (UDP) was performed only one time.
I put it in an infinite loop. (Notifcationlistener)

- If connection is broken no reconnect is done.
I took the subscription out of the ClustercacheRefresh into a
SubscriberBean that will be instantiated in the ClusterCacheRefresher
and in the NotifcationListener if an IOException in the communication
with another Silde happens.

- Cache isn't refreshed (Maybe only with FileStores)
The cache is organized to store URIs. The ClustercacheRefresher tried to
remove URLs from cache that aren't stored there. So even if a message
was able to come to that point no refresh is performed. To have URIs the
BindingStore has to be used. It must implement the removeObjectFromCache
method and the cast in the ClustercacheRefresher has to be changed.

- The move-event isn't published
The Notificationtrigger is filtering ContensEvents for changing,
creating and removing events. This does not include the case that the
user makes a webdav move. The problem here is, that this isn't a content
event, because content isn't touched. But all the rest is done with
ContentEvents. The filter finds a WebDAVEvent and a MacroEvent in the
case of a file movement. Working with WebDAVEvents is very complicated
at this point, because an trap dependency between the modules will be
the consequence of using this class here and the WebDAVEvent has but
doesn't give source and destination Urls. Instead the MacroEvent is
working fine.
So the new CollectionFilter now has to do two things:
1. find the MacroEvent
2. convert one MacroEvent into two ContentEvents. One for the source and
one for the destination.(EventCollectionFilter). Here I don't know if my
solution works with revisioning...

- In the case the slide runs as root-servlet the url for the connection
isn't calculated correctly
The ClusterCacheRefresher just cuts the part form the url. Here the
method stripUrl is now looking at the repository domain. If it is not
"/" the cutting of the servlet name is performed.

After fixing these "bugs" now ClusterCacheRefreshing is working with
some slides on one file store. Other constellations weren't tested. But
maybe these fixes can help there, too.

diff -bur
jakarta-slide-server-src-2.1/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java
jakarta-slide-server-src-2.1_fixed/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java
---
jakarta-slide-server-src-2.1/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java
2004-12-23 22:08:00.000000000 +0100
+++
jakarta-slide-server-src-2.1_fixed/src/share/org/apache/slide/cluster/ClusterCacheRefresher.java
2007-09-28 10:46:55.000000000 +0200
@@ -30,7 +30,7 @@
 import org.apache.slide.common.NamespaceAccessToken;
 import org.apache.slide.common.SlideTokenImpl;
 import org.apache.slide.common.Uri;
-import org.apache.slide.store.ExtendedStore;
+import org.apache.slide.store.BindingStore;
 import org.apache.slide.store.Store;
 import org.apache.slide.util.conf.Configurable;
 import org.apache.slide.util.conf.Configuration;
@@ -38,6 +38,7 @@
 import org.apache.slide.util.logger.Logger;
 import org.apache.webdav.lib.NotificationListener;
 import org.apache.webdav.lib.Subscriber;
+import org.apache.webdav.lib.SubscriberBean;
 import org.apache.webdav.lib.methods.DepthSupport;
 
 /**
@@ -178,7 +179,6 @@
 public class ClusterCacheRefresher implements EventListener, Configurable {
     protected static final String LOG_CHANNEL =
ClusterCacheRefresher.class.getName();
 
-    protected NotificationListener listener;
 
     public ClusterCacheRefresher() {
         Domain.log("Creating ClusterCacheRefresher", LOG_CHANNEL, Logger.INFO);
@@ -222,11 +222,11 @@
                         while (keys.hasNext()) {
                             String key = keys.next().toString();
                             if ("uri".equals(key)) {
-                                Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), stripUri(information.get(key).toString()));
+                                Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), stripUri(information.get(key).toString(),repositoryDomain));
                                 Store store = theUri.getStore();
-                                if (store instanceof ExtendedStore) {
+                                if (store instanceof BindingStore) {
                                     Domain.log("Resetting cache for " + theUri,
LOG_CHANNEL, Logger.INFO);
-                                    ((ExtendedStore)
store).removeObjectFromCache(theUri);
+                                    ((BindingStore)
store).removeObjectFromCache(theUri);
                                 }
                             }
                         }
@@ -249,11 +249,11 @@
 	                    while (keys.hasNext()) {
 	                        String key = keys.next().toString();
 	                        if ("uri".equals(key)) {
-	                            Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), stripUri(information.get(key).toString()));
+	                            Uri theUri = nat.getUri(new SlideTokenImpl(new
CredentialsToken("")), stripUri(information.get(key).toString(),repositoryDomain));
 	                            Store store = theUri.getParentUri().getStore();
-	                            if (store instanceof ExtendedStore) {
+	                            if (store instanceof BindingStore) {
 	                                Domain.log("Resetting cache for " +
theUri.getParentUri(), LOG_CHANNEL, Logger.INFO);
-	                                ((ExtendedStore)
store).removeObjectFromCache(theUri.getParentUri());
+	                                ((BindingStore)
store).removeObjectFromCache(theUri.getParentUri());
 	                            }
 	                        }
 	                    }
@@ -281,58 +281,36 @@
              *    a server is down and NotificationListener.subscribe() can't
              *    reach it.
              */
-            Thread t = new Thread(new Runnable() {
             	
-            	private boolean success;
-            	
-                public void run() {
-                	success = true;
-                    listener = new NotificationListener(host, port,
repositoryHost, repositoryPort, protocol, credentials,
-                        repositoryDomain, pollInterval, udp);
-
-                    success = listener.subscribe("Update", uri, depth,
lifetime, notificationDelay, contentSubscriber, credentials);
-                    success = listener.subscribe("Update/newmember", uri,
depth, lifetime, notificationDelay, structureSubscriber, credentials);
-                    success = listener.subscribe("Delete", uri, depth,
lifetime, notificationDelay, structureSubscriber, credentials);
-                    success = listener.subscribe("Move", uri, depth, lifetime,
notificationDelay, structureSubscriber, credentials);
-                    
-                    if ( !success ) {
-						// try again quickly
-                    	try {
-							Thread.sleep(10000);
-						} catch (InterruptedException e) {
-							// ignore
-						}
-                    } else {
-						// try again before the subscriptions expire
-                    	try {
-							Thread.sleep(lifetime*1000-60);
-						} catch (InterruptedException e) {
-							// ignore
-						}
-                    }
-                }
-            });
-            t.setDaemon(true);
-            t.start();
+           SubscriberBean sb=new SubscriberBean(new NotificationListener(host,
port, repositoryHost, repositoryPort, protocol, credentials,
+    				repositoryDomain, pollInterval, udp,uri, depth, lifetime,
notificationDelay));
+            sb.add(sb.new Sub("Update",contentSubscriber));
+            sb.add(sb.new Sub("Update/newmember",structureSubscriber));
+            sb.add(sb.new Sub("Delete",structureSubscriber));
+            sb.add(sb.new Sub("Move",structureSubscriber));
+            sb.subscribe();
         }
     }
     
     /**
      * Removes the first segment of a uri. "/slide/files/foo" becomes
      * "/files/foo".
-     * 
+     * This is intended to remove the servlet path.
      * @param uri the uri to strip
+     * @param repositoryDomain The servlet path 
      * @return the stipped uri
      */
-    private String stripUri(String uri) {
-        // FIXME: if this is intended to remove the servlet path this will 
-        // NOT work if the servlet is not default-servlet or is the root servlet
+    private String stripUri(String uri,String repositoryDomain) {
+    	//Is the remote servlet the default ROOT servlet?
+    	if(!repositoryDomain.equals("/")){
     	if ( uri.indexOf("/") == 0 ) {
     		uri = uri.substring(1);
     	}
     	if ( uri.indexOf("/") > -1 ) {
     		uri = uri.substring(uri.indexOf("/"));
     	}
+    	}
     	return uri;
     }
-}
\ Kein Zeilenumbruch am Dateiende.
+    
+ }
\ Kein Zeilenumbruch am Dateiende.
diff -bur
jakarta-slide-server-src-2.1/src/share/org/apache/slide/event/EventCollectionFilter.java
jakarta-slide-server-src-2.1_fixed/src/share/org/apache/slide/event/EventCollectionFilter.java
---
jakarta-slide-server-src-2.1/src/share/org/apache/slide/event/EventCollectionFilter.java
2004-12-23 22:07:59.000000000 +0100
+++
jakarta-slide-server-src-2.1_fixed/src/share/org/apache/slide/event/EventCollectionFilter.java
2007-09-28 09:29:56.000000000 +0200
@@ -23,9 +23,11 @@
 
 package org.apache.slide.event;
 
-import java.util.List;
-import java.util.Iterator;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.slide.content.NodeRevisionDescriptors;
 
 /**
  * @version $Revision: 1.5.2.1 $
@@ -89,6 +91,54 @@
         return (ContentEvent [])changedContents.toArray(changedContentEvents);
     }
 
+    /**
+     * Filter all events, that indicate a movement of a file
+     * @param collection All events coming from the event dispatcher
+     * @return The events that are found
+     */
+    public static ContentEvent[] getMovedContents(EventCollection collection) {
+       /* The move event can only be found as a WebDAVEvent or MacroEvent not
as a ContentEvent.
+    	* The WebDAVEvent doesn't have getters for source-URL and destination-URL
+    	* so the MacroEvent will do the trick. What has to be done is to convert 
+    	* one MacroEvent for movement into two ContentEvents for source-URL and 
+    	* destination-URL. Both will be returned.*/
+        List changedContents = new ArrayList();
+        if (collection != null)
+        {
+            List collectedEvents = collection.getCollection();
+            for ( Iterator i = collectedEvents.iterator(); i.hasNext(); ) {
+                EventCollection.Event event = (EventCollection.Event)i.next();
+                if ( event.getMethod() == MacroEvent.MOVE) {
+                	//Get the MacroEvent
+                	MacroEvent w_event=(MacroEvent)event.getEvent();
+                	//Get both URLs
+                	String source=w_event.getSourceURI();
+                	String destination=w_event.getTargetURI();
+                    removeContentEvents(changedContents,source);
+                    removeContentEvents(changedContents,destination);
+                    
+                    /* Generate new ContentEvents Source,Token and Namespace 
+                     * can be taken from the MacroEvent. They do not have implicit 
+                     * information about the URL the ContentEvent is for. */
+                    changedContents.add(new
ContentEvent(event.getEvent().getSource(),
+                    		w_event.getToken(),
+                    		w_event.getNamespace(),
+                    		source, //source-URL
+                    		new NodeRevisionDescriptors() //TODO: Is this enough to
work with revisions?
+                    ));
+                    changedContents.add(new
ContentEvent(event.getEvent().getSource(),
+                    		w_event.getToken(),
+                    		w_event.getNamespace(),
+                    		destination, //destination-URL
+                    		new NodeRevisionDescriptors() //TODO: Is this enough to
work with revisions?
+                    ));
+                } 
+            }
+        }
+        ContentEvent[] changedContentEvents = new
ContentEvent[changedContents.size()];
+        return (ContentEvent [])changedContents.toArray(changedContentEvents);
+    }
+
     private static void removeContentEvents(List changedContents, String uri) {
         for ( Iterator i = changedContents.iterator(); i.hasNext(); ) {
             ContentEvent event = (ContentEvent)i.next();
diff -bur
jakarta-slide-server-src-2.1/src/share/org/apache/slide/store/BindingStore.java
jakarta-slide-server-src-2.1_fixed/src/share/org/apache/slide/store/BindingStore.java
---
jakarta-slide-server-src-2.1/src/share/org/apache/slide/store/BindingStore.java
2004-12-23 22:08:00.000000000 +0100
+++
jakarta-slide-server-src-2.1_fixed/src/share/org/apache/slide/store/BindingStore.java
2007-09-27 16:43:39.000000000 +0200
@@ -56,6 +56,20 @@
  * @version   $Revision: 1.3.2.1 $
  */
 public class BindingStore extends ExtendedStore {
+    // overwrites inherited
+	public void removeObjectFromCache( Object key ) {
+		if (key instanceof ResourceId) {
+			super.removeObjectFromCache(key);
+		}else if(key instanceof Uri){
+			ResourceId resourceId=null;
+			try {
+				resourceId = obtainResourceId((Uri) key);
+				super.removeObjectFromCache(resourceId);
+			} catch (ObjectNotFoundException e) {
+			} catch (ServiceAccessException e) {
+			}
+		}		
+	}   
     
     // overwrites inherited
     public ObjectNode retrieveObject(Uri uri)
diff -bur
jakarta-slide-server-src-2.1/src/webdav/server/org/apache/slide/webdav/event/NotificationTrigger.java
jakarta-slide-server-src-2.1_fixed/src/webdav/server/org/apache/slide/webdav/event/NotificationTrigger.java
---
jakarta-slide-server-src-2.1/src/webdav/server/org/apache/slide/webdav/event/NotificationTrigger.java
2004-12-23 22:07:59.000000000 +0100
+++
jakarta-slide-server-src-2.1_fixed/src/webdav/server/org/apache/slide/webdav/event/NotificationTrigger.java
2007-09-28 09:24:59.000000000 +0200
@@ -182,7 +182,11 @@
         for ( int i = 0; i < delete.length; i++ ) {
             matchingSubscribers.addAll(getSubscribers(Subscriber.DELETE,
delete[i]));
         }
-        // FIXME: Add methods for MOVE, and NEW_MAIL (??) to get full exchange
notification compliance
+        //Filter for MOVE-Events
+        ContentEvent[] move = EventCollectionFilter.getMovedContents(collection);
+        for ( int i = 0; i < move.length; i++ ) {
+            matchingSubscribers.addAll(getSubscribers(Subscriber.MOVE, move[i]));
+        }
         
         // notifiy subscribers
         for ( Iterator i = matchingSubscribers.iterator(); i.hasNext(); ) {


diff -Naur
jakarta-slide-webdavclient-src-2.1/clientlib/src/java/org/apache/webdav/lib/NotificationListener.java
jakarta-slide-webdavclient-src-2.1_fixed/clientlib/src/java/org/apache/webdav/lib/NotificationListener.java
---
jakarta-slide-webdavclient-src-2.1/clientlib/src/java/org/apache/webdav/lib/NotificationListener.java
2004-12-23 22:08:47.000000000 +0100
+++
jakarta-slide-webdavclient-src-2.1_fixed/clientlib/src/java/org/apache/webdav/lib/NotificationListener.java
2007-09-28 10:03:36.000000000 +0200
@@ -79,7 +79,10 @@
     
     private List subscribers = new ArrayList();
     private String subscribersAsString;
-
+    private String uri=null;
+    private int depth;
+    private int lifetime;
+    private int notificationDelay;
     /**
      * 
      * @param host The ip-address or hostname on which the udp or http-server
is running (e.g. "myhost.mydomain.mytld")
@@ -92,7 +95,19 @@
      * @param pollInterval The poll interval that will be used if no
notifications are revieved via UDP/TCP (in milliseconds)
      * @param udp If set to true, UDP server will be started, otherwise TCP
server (must match the repository notification mode)
      */
-    public NotificationListener(String host, int port, String repositoryHost,
int repositoryPort, Protocol protocol, Credentials credentials, String
repositoryDomain, int pollInterval, boolean udp) {
+    public NotificationListener(String host, 
+    		int port, 
+    		String repositoryHost,
+    		int repositoryPort, 
+    		Protocol protocol, 
+    		Credentials credentials, 
+    		String repositoryDomain, 
+    		int pollInterval, 
+    		boolean udp,
+    		String uri, 
+    		int depth, 
+    		int lifetime, 
+    		int notificationDelay) {
     	this.credentials = credentials;
     	this.notificationHost = host;
         this.notificationPort = port;
@@ -101,13 +116,18 @@
         this.protocol = protocol;
         this.repositoryDomain = repositoryDomain;
         this.udp = udp;
-        
+        this.uri=uri;
+        this.depth=depth;
+        this.lifetime=lifetime;
+        this.notificationDelay=notificationDelay;
+       
         if ( udp ) {
             Thread listenerThread = new Thread(new Runnable() {
                 public void run() {
                     DatagramSocket serverSocket = null;
                     try {
                         serverSocket = new DatagramSocket(notificationPort);
+                        //Read and parse messages unlimited 
                         while (true) {
                             byte[] buf = new byte[256];
                             DatagramPacket packet = new DatagramPacket(buf,
buf.length);
@@ -121,6 +141,7 @@
                 }
             });
             listenerThread.setDaemon(true);
+            listenerThread.setName("NL-ListeningThread");
             listenerThread.start();
         } else {
             Thread listenerThread = new Thread(new Runnable() {
@@ -136,6 +157,7 @@
                     }
                 }
             });
+            listenerThread.setName("listenerThread");
             listenerThread.setDaemon(true);
             listenerThread.start();
         }
@@ -169,7 +191,7 @@
      * @see WebdavResource#subscribeMethod
      * @see
http://msdn.microsoft.com/library/default.asp?url=/library/en-us/e2k3/e2k3/_webdav_subscribe.asp
      */
-    public boolean subscribe(String method, String uri, int depth, int
lifetime, int notificationDelay, Subscriber listener, Credentials credentials) {
+    public boolean subscribe(String method,Subscriber listener) {
         SubscribeMethod subscribeMethod = new
SubscribeMethod(repositoryDomain+uri);
         subscribeMethod.addRequestHeader(SubscribeMethod.H_NOTIFICATION_TYPE,
method);
         if ( udp ) {
@@ -192,7 +214,7 @@
                 logger.log(Level.INFO, "Received subscription
id="+subscriptionId+", listener: "+listener);
                 int id = Integer.valueOf(subscriptionId).intValue();
                 synchronized ( subscribers ) {
-                    subscribers.add(new Subscription(id, uri, listener));
+                    subscribers.add(new Subscription(id, uri, listener, method));
                 }
                 if ( subscribersAsString == null ) {
                     subscribersAsString = String.valueOf(id);
@@ -307,8 +329,20 @@
                     logger.log(Level.SEVERE, "Poll failed. State: "+state);
                 }
             } catch (IOException e) {
-                logger.log(Level.SEVERE, "Poll for subscribers
'"+subscribers+"' failed!");
-            }
+                logger.log(Level.SEVERE, "Poll for subscribers failed on host
"+this.notificationHost+":"+this.notificationPort);
+                logger.log(Level.SEVERE, "Try to reconnect.");
+            	//The other slide in cluster is gone. 
+            	//Get CCL-Subscriber and method
+            	SubscriberBean sb=new SubscriberBean(this);
+                for ( Iterator i = subscribers.iterator(); i.hasNext(); ) {
+                    Subscription subscriber = (Subscription)i.next();
+                    sb.add(sb.new
Sub(subscriber.method,subscriber.getSubscriber()));
+                	//Remove that subscriber
+                    unsubscribe(subscriber.uri, subscriber.getSubscriber(),
credentials);
+                }
+            	//Start a new thread to subscribe that node.
+                sb.subscribe();
+             }
         }
     }
     
@@ -372,11 +406,13 @@
         private int id;
 		private String uri;
         private Subscriber subscriber;
+        private String method;
 
-        public Subscription(int id, String uri, Subscriber subscriber) {
+        public Subscription(int id, String uri, Subscriber subscriber,String
method) {
             this.id = id;
             this.uri = uri;
             this.subscriber = subscriber;
+            this.method=method;
         }
 
         public void fireEvent(Map information) {
diff -Naur
jakarta-slide-webdavclient-src-2.1/clientlib/src/java/org/apache/webdav/lib/Subscription.java
jakarta-slide-webdavclient-src-2.1_fixed/clientlib/src/java/org/apache/webdav/lib/Subscription.java
---
jakarta-slide-webdavclient-src-2.1/clientlib/src/java/org/apache/webdav/lib/Subscription.java
1970-01-01 01:00:00.000000000 +0100
+++
jakarta-slide-webdavclient-src-2.1_fixed/clientlib/src/java/org/apache/webdav/lib/Subscription.java
2004-12-23 22:08:47.000000000 +0100
@@ -0,0 +1,58 @@
+// vi: set ts=3 sw=3:
+package org.apache.webdav.lib;
+
+/**
+ * Object that holds information about a single WebDAV subscription.
+ * 
+ * @see org.apache.webdav.lib.WebdavResource#subscribeMethod(String, String,
String, long, int, long)
+ */
+public class Subscription
+{
+   public static final String UPDATE_NOTIFICATION = "update";
+   public static final String NEW_MEMBER_NOTIFICATION = "update/newmember";
+   public static final String DELETE_NOTIFICATION = "delete";
+   public static final String MOVE_NOTIFICATION = "move";
+   
+   private int id;
+   private long lifetime;
+   private String callback;
+   private String contentLocation;
+   private String notificationType;
+   private String path;
+   
+   public Subscription(String path, int id, String callback, long lifetime, 
+         String contentLocation, String notificationType)
+   {
+      this.path = path;
+      this.id = id;
+      this.callback = callback;
+      this.lifetime = lifetime;
+      this.contentLocation = contentLocation;
+      this.notificationType = notificationType;
+   }
+   
+   public String getCallback()
+   {
+      return callback;
+   }
+   public String getContentLocation()
+   {
+      return contentLocation;
+   }
+   public int getId()
+   {
+      return id;
+   }
+   public long getLifetime()
+   {
+      return lifetime;
+   }
+   public String getNotificationType()
+   {
+      return notificationType;
+   }
+   public String getPath() 
+   {
+      return path;
+   }
+}