--- java/org/apache/catalina/tribes/Member.java (revision 1423146) +++ java/org/apache/catalina/tribes/Member.java (working copy) @@ -27,6 +27,7 @@ * since a member that has crashed and the starts up again on the same port/host is * not guaranteed to be the same member, so no state transfers will ever be confused * @author Filip Hanik + * @author Greg Turnquist * @version $Id$ */ @@ -81,6 +82,8 @@ */ public long getMemberAliveTime(); + public void setMemberAliveTime(long memberAliveTime); + /** * The current state of the member * @return boolean - true if the member is functioning correctly @@ -111,15 +114,40 @@ */ public byte[] getPayload(); + public void setPayload(byte[] payload); + /** * returns the command associated with this member * @return byte[] */ public byte[] getCommand(); + public void setCommand(byte[] command); + /** * Domain for this cluster * @return byte[] */ public byte[] getDomain(); + + /** + * + * @param getalive boolean - calculate memberAlive time + * @param reset boolean - reset the cached data package, and create a new one + * @return byte[] + */ + public byte[] getData(boolean getalive, boolean reset); + + /** + * + * @param getalive boolean - calculate memberAlive time + * @return byte[] + */ + public byte[] getData(boolean getalive); + + /** + * Retrieve the total size of the message. + * @return int - number of bytes in the message + */ + public int getDataLength(); } --- java/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java (revision 1423146) +++ java/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java (working copy) @@ -21,7 +21,6 @@ import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.ChannelInterceptorBase; -import org.apache.catalina.tribes.membership.MemberImpl; import org.apache.catalina.tribes.membership.Membership; /** @@ -31,6 +30,7 @@ *

* * @author Filip Hanik + * @author Greg Turnquist * @version 1.0 */ public class DomainFilterInterceptor extends ChannelInterceptorBase { @@ -52,7 +52,7 @@ boolean notify = false; synchronized (membership) { notify = Arrays.equals(domain,member.getDomain()); - if ( notify ) notify = membership.memberAlive((MemberImpl)member); + if ( notify ) notify = membership.memberAlive(member); } if ( notify ) super.memberAdded(member); } @@ -63,7 +63,7 @@ boolean notify = false; synchronized (membership) { notify = Arrays.equals(domain,member.getDomain()); - membership.removeMember((MemberImpl)member); + membership.removeMember(member); } if ( notify ) super.memberDisappeared(member); } @@ -94,7 +94,7 @@ protected synchronized void setupMembership() { if ( membership == null ) { - membership = new Membership((MemberImpl)super.getLocalMember(true)); + membership = new Membership(super.getLocalMember(true)); } } --- java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (revision 1423146) +++ java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (working copy) @@ -116,10 +116,8 @@ * Receive an election message

* * @author Filip Hanik + * @author Greg Turnquist * @version 1.0 - * - * - * */ public class NonBlockingCoordinator extends ChannelInterceptorBase { @@ -187,8 +185,8 @@ public void startElection(boolean force) throws ChannelException { synchronized (electionMutex) { - MemberImpl local = (MemberImpl)getLocalMember(false); - MemberImpl[] others = membership.getMembers(); + Member local = getLocalMember(false); + Member[] others = membership.getMembers(); fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated")); if ( others.length == 0 ) { this.viewId = new UniqueId(UUIDGenerator.randomUUID(false)); @@ -212,7 +210,7 @@ return; //already have this view installed } int prio = AbsoluteOrder.comp.compare(local,others[0]); - MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view? + Member leader = ( prio < 0 )?local:others[0];//am I the leader in my view? if ( local.equals(leader) || force ) { CoordinationMessage msg = createElectionMsg(local, others, leader); suggestedviewId = msg.getId(); @@ -244,21 +242,21 @@ } } - private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) { + private CoordinationMessage createElectionMsg(Member local, Member[] others, Member leader) { Membership m = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(m,others); - MemberImpl[] mbrs = m.getMembers(); + Member[] mbrs = m.getMembers(); m.reset(); CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), COORD_REQUEST); return msg; } - protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException { + protected void sendElectionMsg(Member local, Member next, CoordinationMessage msg) throws ChannelException { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")")); super.sendMessage(new Member[] {next}, createData(msg, local), null); } - protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { + protected void sendElectionMsgToNextInline(Member local, CoordinationMessage msg) throws ChannelException { int next = Arrays.nextIndex(local,msg.getMembers()); int current = next; msg.leader = msg.getMembers()[0]; @@ -275,7 +273,7 @@ } } - public ChannelData createData(CoordinationMessage msg, MemberImpl local) { + public ChannelData createData(CoordinationMessage msg, Member local) { msg.write(); ChannelData data = new ChannelData(true); data.setAddress(local); @@ -297,13 +295,13 @@ protected Membership mergeOnArrive(CoordinationMessage msg) { fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge")); - MemberImpl local = (MemberImpl)getLocalMember(false); + Member local = getLocalMember(false); Membership merged = new Membership(local,AbsoluteOrder.comp,true); Arrays.fill(merged,msg.getMembers()); Arrays.fill(merged,getMembers()); Member[] diff = Arrays.diff(merged,membership,local); for ( int i=0; i * * @author Filip Hanik + * @author Greg Turnquist * @version 1.0 */ public class TcpFailureDetector extends ChannelInterceptorBase { @@ -128,7 +128,7 @@ //if we add it here, then add it upwards too //check to see if it is alive if (memberAlive(member)) { - membership.memberAlive( (MemberImpl) member); + membership.memberAlive(member); notify = true; } else { addSuspects.put(member, Long.valueOf(System.currentTimeMillis())); @@ -156,7 +156,7 @@ //if the payload is not a shutdown message if (shutdown || !memberAlive(member)) { //not correct, we need to maintain the map - membership.removeMember( (MemberImpl) member); + membership.removeMember(member); removeSuspects.remove(member); if (member instanceof StaticMember) { addSuspects.put(member, Long.valueOf(System.currentTimeMillis())); @@ -226,11 +226,11 @@ Member[] members = super.getMembers(); for (int i = 0; members != null && i < members.length; i++) { if (memberAlive(members[i])) { - if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]); + if (membership.memberAlive(members[i])) super.memberAdded(members[i]); addSuspects.remove(members[i]); } else { if (membership.getMember(members[i])!=null) { - membership.removeMember((MemberImpl)members[i]); + membership.removeMember(members[i]); removeSuspects.remove(members[i]); if (members[i] instanceof StaticMember) { addSuspects.put(members[i], Long.valueOf(System.currentTimeMillis())); @@ -250,22 +250,22 @@ // avoid temporary adding member. continue; } - if (membership.memberAlive( (MemberImpl) members[i])) { + if (membership.memberAlive(members[i])) { //we don't have this one in our membership, check to see if he/she is alive if (memberAlive(members[i])) { log.warn("Member added, even though we werent notified:" + members[i]); super.memberAdded(members[i]); } else { - membership.removeMember( (MemberImpl) members[i]); + membership.removeMember(members[i]); } //end if } //end if } //for //check suspect members if they are still alive, //if not, simply issue the memberDisappeared message - MemberImpl[] keys = removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]); + Member[] keys = removeSuspects.keySet().toArray(new Member[removeSuspects.size()]); for (int i = 0; i < keys.length; i++) { - MemberImpl m = keys[i]; + Member m = keys[i]; if (membership.getMember(m) != null && (!memberAlive(m))) { membership.removeMember(m); super.memberDisappeared(m); @@ -277,9 +277,9 @@ //check add suspects members if they are alive now, //if they are, simply issue the memberAdded message - keys = addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]); + keys = addSuspects.keySet().toArray(new Member[addSuspects.size()]); for (int i = 0; i < keys.length; i++) { - MemberImpl m = keys[i]; + Member m = keys[i]; if ( membership.getMember(m) == null && (memberAlive(m))) { membership.memberAlive(m); super.memberAdded(m); @@ -292,7 +292,7 @@ protected synchronized void setupMembership() { if ( membership == null ) { - membership = new Membership((MemberImpl)super.getLocalMember(true)); + membership = new Membership(super.getLocalMember(true)); } } --- java/org/apache/catalina/tribes/io/ChannelData.java (revision 1423146) +++ java/org/apache/catalina/tribes/io/ChannelData.java (working copy) @@ -186,7 +186,7 @@ 4 + //unique id length off=12 uniqueId.length+ //id data off=12+uniqueId.length 4 + //addr length off=12+uniqueId.length+4 - ((MemberImpl)address).getDataLength()+ //member data off=12+uniqueId.length+4+add.length + address.getDataLength()+ //member data off=12+uniqueId.length+4+add.length 4 + //message length off=12+uniqueId.length+4+add.length+4 message.getLength(); return length; @@ -205,7 +205,7 @@ } public byte[] getDataPackage(byte[] data, int offset) { - byte[] addr = ((MemberImpl)address).getData(false); + byte[] addr = address.getData(false); XByteBuffer.toBytes(options,data,offset); offset += 4; //options XByteBuffer.toBytes(timestamp,data,offset); --- java/org/apache/catalina/tribes/membership/McastServiceImpl.java (revision 1423146) +++ java/org/apache/catalina/tribes/membership/McastServiceImpl.java (working copy) @@ -46,6 +46,7 @@ * Need to fix this, could use java.nio and only need one thread to send and receive, or * just use a timeout on the receive * @author Filip Hanik + * @author Greg Turnquist * @version $Id$ */ public class McastServiceImpl @@ -360,7 +361,7 @@ } private void memberDataReceived(byte[] data) { - final MemberImpl m = MemberImpl.getMember(data); + final Member m = MemberImpl.getMember(data); if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m); Runnable t = null; if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) { @@ -444,9 +445,9 @@ protected final Object expiredMutex = new Object(); protected void checkExpired() { synchronized (expiredMutex) { - MemberImpl[] expired = membership.expire(timeToExpiration); + Member[] expired = membership.expire(timeToExpiration); for (int i = 0; i < expired.length; i++) { - final MemberImpl member = expired[i]; + final Member member = expired[i]; if (log.isDebugEnabled()) log.debug("Mcast expire member " + expired[i]); try { --- java/org/apache/catalina/tribes/membership/MemberImpl.java (revision 1423146) +++ java/org/apache/catalina/tribes/membership/MemberImpl.java (working copy) @@ -32,6 +32,7 @@ * Carries the host, and port of the this or other cluster nodes. * * @author Filip Hanik + * @author Greg Turnquist * @version $Id$ */ public class MemberImpl implements Member, java.io.Externalizable { @@ -170,11 +171,15 @@ * @param getalive boolean * @return byte[] */ + @Override public byte[] getData(boolean getalive) { return getData(getalive,false); } - + /** + * Retrieve the total size of the message. + */ + @Override public int getDataLength() { return TRIBES_MBR_BEGIN.length+ //start pkg 4+ //data length @@ -200,6 +205,7 @@ * @param reset boolean - reset the cached data package, and create a new one * @return byte[] */ + @Override public byte[] getData(boolean getalive, boolean reset) { if ( reset ) dataPkg = null; //look in cache first @@ -298,11 +304,11 @@ * @param data - the bytes received * @return a member object. */ - public static MemberImpl getMember(byte[] data, MemberImpl member) { + public static Member getMember(byte[] data, MemberImpl member) { return getMember(data,0,data.length,member); } - public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) { + public static Member getMember(byte[] data, int offset, int length, MemberImpl member) { //package looks like //start package TRIBES_MBR_BEGIN.length //package length - 4 bytes @@ -408,11 +414,11 @@ return member; } - public static MemberImpl getMember(byte[] data) { + public static Member getMember(byte[] data) { return getMember(data,new MemberImpl()); } - public static MemberImpl getMember(byte[] data, int offset, int length) { + public static Member getMember(byte[] data, int offset, int length) { return getMember(data,offset,length,new MemberImpl()); } --- java/org/apache/catalina/tribes/membership/Membership.java (revision 1423146) +++ java/org/apache/catalina/tribes/membership/Membership.java (working copy) @@ -36,11 +36,12 @@ * * @author Filip Hanik * @author Peter Rossbach + * @author Greg Turnquist * @version $Id$ */ public class Membership implements Cloneable { - protected static final MemberImpl[] EMPTY_MEMBERS = new MemberImpl[0]; + protected static final Member[] EMPTY_MEMBERS = new Member[0]; private final Object membersLock = new Object(); @@ -48,17 +49,17 @@ * The name of this membership, has to be the same as the name for the local * member */ - protected final MemberImpl local; + protected final Member local; /** * A map of all the members in the cluster. */ - protected HashMap map = new HashMap<>(); + protected HashMap map = new HashMap<>(); /** * A list of all the members in the cluster. */ - protected MemberImpl[] members = EMPTY_MEMBERS; + protected Member[] members = EMPTY_MEMBERS; /** * sort members by alive time @@ -69,9 +70,9 @@ public Object clone() { synchronized (membersLock) { Membership clone = new Membership(local, memberComparator); - final HashMap tmpclone = (HashMap) map.clone(); + final HashMap tmpclone = (HashMap) map.clone(); clone.map = tmpclone; - clone.members = new MemberImpl[members.length]; + clone.members = new Member[members.length]; System.arraycopy(members,0,clone.members,0,members.length); return clone; } @@ -82,19 +83,19 @@ * @param local - has to be the name of the local member. Used to filter the local member from the cluster membership * @param includeLocal - TBA */ - public Membership(MemberImpl local, boolean includeLocal) { + public Membership(Member local, boolean includeLocal) { this(local, new MemberComparator(), includeLocal); } - public Membership(MemberImpl local) { + public Membership(Member local) { this(local, false); } - public Membership(MemberImpl local, Comparator comp) { + public Membership(Member local, Comparator comp) { this(local, comp, false); } - public Membership(MemberImpl local, Comparator comp, boolean includeLocal) { + public Membership(Member local, Comparator comp, boolean includeLocal) { this.local = local; if ( includeLocal ) addMember(local); this.memberComparator = comp; @@ -115,7 +116,7 @@ * @return - true if this member is new to the cluster, false otherwise.
* - false if this member is the local member or updated. */ - public synchronized boolean memberAlive(MemberImpl member) { + public synchronized boolean memberAlive(Member member) { boolean result = false; //ignore ourselves if ( member.equals(local) ) return result; @@ -127,7 +128,7 @@ result = true; } else { //update the member alive time - MemberImpl updateMember = entry.getMember() ; + Member updateMember = entry.getMember() ; if(updateMember.getMemberAliveTime() != member.getMemberAliveTime()) { //update fields that can change updateMember.setMemberAliveTime(member.getMemberAliveTime()); @@ -144,12 +145,12 @@ * Add a member to this component and sort array with memberComparator * @param member The member to add */ - public synchronized MbrEntry addMember(MemberImpl member) { + public synchronized MbrEntry addMember(Member member) { synchronized (membersLock) { MbrEntry entry = new MbrEntry(member); if (!map.containsKey(member) ) { map.put(member, entry); - MemberImpl results[] = new MemberImpl[members.length + 1]; + Member results[] = new Member[members.length + 1]; for (int i = 0; i < members.length; i++) results[i] = members[i]; results[members.length] = member; members = results; @@ -164,7 +165,7 @@ * * @param member The member to remove */ - public void removeMember(MemberImpl member) { + public void removeMember(Member member) { map.remove(member); synchronized (membersLock) { int n = -1; @@ -175,7 +176,7 @@ } } if (n < 0) return; - MemberImpl results[] = new MemberImpl[members.length - 1]; + Member results[] = new Member[members.length - 1]; int j = 0; for (int i = 0; i < members.length; i++) { if (i != n) @@ -192,11 +193,11 @@ * @param maxtime - the max time a member can remain unannounced before it is considered dead. * @return the list of expired members */ - public synchronized MemberImpl[] expire(long maxtime) { + public synchronized Member[] expire(long maxtime) { if(!hasMembers() ) return EMPTY_MEMBERS; - ArrayList list = null; + ArrayList list = null; Iterator i = map.values().iterator(); while(i.hasNext()) { MbrEntry entry = i.next(); @@ -208,7 +209,7 @@ } if(list != null) { - MemberImpl[] result = new MemberImpl[list.size()]; + Member[] result = new Member[list.size()]; list.toArray(result); for( int j=0; j> i = map.entrySet().iterator(); + Iterator> i = map.entrySet().iterator(); int pos = 0; while ( i.hasNext() ) result[pos++] = i.next().getValue(); @@ -293,10 +294,10 @@ */ protected static class MbrEntry { - protected final MemberImpl mbr; + protected final Member mbr; protected long lastHeardFrom; - public MbrEntry(MemberImpl mbr) { + public MbrEntry(Member mbr) { this.mbr = mbr; } @@ -310,7 +311,7 @@ /** * Return the actual Member object */ - public MemberImpl getMember() { + public Member getMember() { return mbr; } --- java/org/apache/catalina/tribes/util/Arrays.java (revision 1423146) +++ java/org/apache/catalina/tribes/util/Arrays.java (working copy) @@ -30,6 +30,7 @@ /** * @author Filip Hanik + * @author Greg Turnquist * @version 1.0 */ public class Arrays { @@ -152,12 +153,12 @@ } public static void fill(Membership mbrship, Member[] m) { - for (int i=0; i result = new ArrayList<>(); - MemberImpl[] comp = complete.getMembers(); + Member[] comp = complete.getMembers(); for ( int i=0; iCompany:

* * @author fhanik + * @author Greg Turnquist * @version 1.0 */ public class ChannelCreator { @@ -133,7 +134,7 @@ String d = args[++i]; String h = d.substring(0,d.indexOf(":")); String p = d.substring(h.length()+1); - MemberImpl m = new MemberImpl(h,Integer.parseInt(p),2000); + Member m = new MemberImpl(h,Integer.parseInt(p),2000); staticMembers.add(m); } else if ("-throughput".equals(args[i])) { throughput = true; --- test/org/apache/catalina/tribes/membership/TestMemberImplSerialization.java (revision 1423146) +++ test/org/apache/catalina/tribes/membership/TestMemberImplSerialization.java (working copy) @@ -16,11 +16,12 @@ */ package org.apache.catalina.tribes.membership; -import java.util.Arrays; - import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.Arrays; + +import org.apache.catalina.tribes.Member; import org.junit.Before; import org.junit.Test; @@ -73,8 +74,8 @@ byte[] md1 = m1.getData(); byte[] md2 = m2.getData(); - MemberImpl a1 = MemberImpl.getMember(md1); - MemberImpl a2 = MemberImpl.getMember(md2); + Member a1 = MemberImpl.getMember(md1); + Member a2 = MemberImpl.getMember(md2); assertTrue(a1.getUdpPort()==a2.getUdpPort()); assertTrue(a1.getUdpPort()==udpPort); @@ -82,13 +83,13 @@ @Test public void testSerializationOne() throws Exception { - MemberImpl m = m1; + Member m = m1; byte[] md1 = m.getData(false,true); byte[] mda1 = m.getData(false,false); assertTrue(Arrays.equals(md1,mda1)); assertTrue(md1==mda1); mda1 = m.getData(true,true); - MemberImpl ma1 = MemberImpl.getMember(mda1); + Member ma1 = MemberImpl.getMember(mda1); assertTrue(compareMembers(m,ma1)); mda1 = p1.getData(false); assertFalse(Arrays.equals(md1,mda1)); @@ -98,15 +99,15 @@ md1 = m.getData(true,true); Thread.sleep(50); mda1 = m.getData(true,true); - MemberImpl a1 = MemberImpl.getMember(md1); - MemberImpl a2 = MemberImpl.getMember(mda1); + Member a1 = MemberImpl.getMember(md1); + Member a2 = MemberImpl.getMember(mda1); assertTrue(a1.equals(a2)); assertFalse(Arrays.equals(md1,mda1)); } - public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) { + public boolean compareMembers(Member impl1, Member impl2) { boolean result = true; result = result && Arrays.equals(impl1.getHost(),impl2.getHost()); result = result && Arrays.equals(impl1.getPayload(),impl2.getPayload()); --- test/org/apache/catalina/tribes/test/NioSenderTest.java (revision 1423146) +++ test/org/apache/catalina/tribes/test/NioSenderTest.java (working copy) @@ -40,7 +40,7 @@ public class NioSenderTest { private Selector selector = null; private int counter = 0; - MemberImpl mbr; + Member mbr; private static int testOptions = Channel.SEND_OPTIONS_DEFAULT; public NioSenderTest() { // Default constructor