--- java/org/apache/catalina/tribes/Member.java (revision 1423146)
+++ java/org/apache/catalina/tribes/Member.java (working copy)
@@ -27,6 +27,7 @@
* since a member that has crashed and the starts up again on the same port/host is
* not guaranteed to be the same member, so no state transfers will ever be confused
* @author Filip Hanik
+ * @author Greg Turnquist
* @version $Id$
*/
@@ -81,6 +82,8 @@
*/
public long getMemberAliveTime();
+ public void setMemberAliveTime(long memberAliveTime);
+
/**
* The current state of the member
* @return boolean - true if the member is functioning correctly
@@ -111,15 +114,40 @@
*/
public byte[] getPayload();
+ public void setPayload(byte[] payload);
+
/**
* returns the command associated with this member
* @return byte[]
*/
public byte[] getCommand();
+ public void setCommand(byte[] command);
+
/**
* Domain for this cluster
* @return byte[]
*/
public byte[] getDomain();
+
+ /**
+ *
+ * @param getalive boolean - calculate memberAlive time
+ * @param reset boolean - reset the cached data package, and create a new one
+ * @return byte[]
+ */
+ public byte[] getData(boolean getalive, boolean reset);
+
+ /**
+ *
+ * @param getalive boolean - calculate memberAlive time
+ * @return byte[]
+ */
+ public byte[] getData(boolean getalive);
+
+ /**
+ * Retrieve the total size of the message.
+ * @return int - number of bytes in the message
+ */
+ public int getDataLength();
}
--- java/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java (revision 1423146)
+++ java/org/apache/catalina/tribes/group/interceptors/DomainFilterInterceptor.java (working copy)
@@ -21,7 +21,6 @@
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.membership.Membership;
/**
@@ -31,6 +30,7 @@
*
*
* @author Filip Hanik
+ * @author Greg Turnquist
* @version 1.0
*/
public class DomainFilterInterceptor extends ChannelInterceptorBase {
@@ -52,7 +52,7 @@
boolean notify = false;
synchronized (membership) {
notify = Arrays.equals(domain,member.getDomain());
- if ( notify ) notify = membership.memberAlive((MemberImpl)member);
+ if ( notify ) notify = membership.memberAlive(member);
}
if ( notify ) super.memberAdded(member);
}
@@ -63,7 +63,7 @@
boolean notify = false;
synchronized (membership) {
notify = Arrays.equals(domain,member.getDomain());
- membership.removeMember((MemberImpl)member);
+ membership.removeMember(member);
}
if ( notify ) super.memberDisappeared(member);
}
@@ -94,7 +94,7 @@
protected synchronized void setupMembership() {
if ( membership == null ) {
- membership = new Membership((MemberImpl)super.getLocalMember(true));
+ membership = new Membership(super.getLocalMember(true));
}
}
--- java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (revision 1423146)
+++ java/org/apache/catalina/tribes/group/interceptors/NonBlockingCoordinator.java (working copy)
@@ -116,10 +116,8 @@
* Receive an election message
*
* @author Filip Hanik
+ * @author Greg Turnquist
* @version 1.0
- *
- *
- *
*/
public class NonBlockingCoordinator extends ChannelInterceptorBase {
@@ -187,8 +185,8 @@
public void startElection(boolean force) throws ChannelException {
synchronized (electionMutex) {
- MemberImpl local = (MemberImpl)getLocalMember(false);
- MemberImpl[] others = membership.getMembers();
+ Member local = getLocalMember(false);
+ Member[] others = membership.getMembers();
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
if ( others.length == 0 ) {
this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
@@ -212,7 +210,7 @@
return; //already have this view installed
}
int prio = AbsoluteOrder.comp.compare(local,others[0]);
- MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
+ Member leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
if ( local.equals(leader) || force ) {
CoordinationMessage msg = createElectionMsg(local, others, leader);
suggestedviewId = msg.getId();
@@ -244,21 +242,21 @@
}
}
- private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) {
+ private CoordinationMessage createElectionMsg(Member local, Member[] others, Member leader) {
Membership m = new Membership(local,AbsoluteOrder.comp,true);
Arrays.fill(m,others);
- MemberImpl[] mbrs = m.getMembers();
+ Member[] mbrs = m.getMembers();
m.reset();
CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), COORD_REQUEST);
return msg;
}
- protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException {
+ protected void sendElectionMsg(Member local, Member next, CoordinationMessage msg) throws ChannelException {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
super.sendMessage(new Member[] {next}, createData(msg, local), null);
}
- protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException {
+ protected void sendElectionMsgToNextInline(Member local, CoordinationMessage msg) throws ChannelException {
int next = Arrays.nextIndex(local,msg.getMembers());
int current = next;
msg.leader = msg.getMembers()[0];
@@ -275,7 +273,7 @@
}
}
- public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
+ public ChannelData createData(CoordinationMessage msg, Member local) {
msg.write();
ChannelData data = new ChannelData(true);
data.setAddress(local);
@@ -297,13 +295,13 @@
protected Membership mergeOnArrive(CoordinationMessage msg) {
fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
- MemberImpl local = (MemberImpl)getLocalMember(false);
+ Member local = getLocalMember(false);
Membership merged = new Membership(local,AbsoluteOrder.comp,true);
Arrays.fill(merged,msg.getMembers());
Arrays.fill(merged,getMembers());
Member[] diff = Arrays.diff(merged,membership,local);
for ( int i=0; i
*
* @author Filip Hanik
+ * @author Greg Turnquist
* @version 1.0
*/
public class TcpFailureDetector extends ChannelInterceptorBase {
@@ -128,7 +128,7 @@
//if we add it here, then add it upwards too
//check to see if it is alive
if (memberAlive(member)) {
- membership.memberAlive( (MemberImpl) member);
+ membership.memberAlive(member);
notify = true;
} else {
addSuspects.put(member, Long.valueOf(System.currentTimeMillis()));
@@ -156,7 +156,7 @@
//if the payload is not a shutdown message
if (shutdown || !memberAlive(member)) {
//not correct, we need to maintain the map
- membership.removeMember( (MemberImpl) member);
+ membership.removeMember(member);
removeSuspects.remove(member);
if (member instanceof StaticMember) {
addSuspects.put(member, Long.valueOf(System.currentTimeMillis()));
@@ -226,11 +226,11 @@
Member[] members = super.getMembers();
for (int i = 0; members != null && i < members.length; i++) {
if (memberAlive(members[i])) {
- if (membership.memberAlive((MemberImpl)members[i])) super.memberAdded(members[i]);
+ if (membership.memberAlive(members[i])) super.memberAdded(members[i]);
addSuspects.remove(members[i]);
} else {
if (membership.getMember(members[i])!=null) {
- membership.removeMember((MemberImpl)members[i]);
+ membership.removeMember(members[i]);
removeSuspects.remove(members[i]);
if (members[i] instanceof StaticMember) {
addSuspects.put(members[i], Long.valueOf(System.currentTimeMillis()));
@@ -250,22 +250,22 @@
// avoid temporary adding member.
continue;
}
- if (membership.memberAlive( (MemberImpl) members[i])) {
+ if (membership.memberAlive(members[i])) {
//we don't have this one in our membership, check to see if he/she is alive
if (memberAlive(members[i])) {
log.warn("Member added, even though we werent notified:" + members[i]);
super.memberAdded(members[i]);
} else {
- membership.removeMember( (MemberImpl) members[i]);
+ membership.removeMember(members[i]);
} //end if
} //end if
} //for
//check suspect members if they are still alive,
//if not, simply issue the memberDisappeared message
- MemberImpl[] keys = removeSuspects.keySet().toArray(new MemberImpl[removeSuspects.size()]);
+ Member[] keys = removeSuspects.keySet().toArray(new Member[removeSuspects.size()]);
for (int i = 0; i < keys.length; i++) {
- MemberImpl m = keys[i];
+ Member m = keys[i];
if (membership.getMember(m) != null && (!memberAlive(m))) {
membership.removeMember(m);
super.memberDisappeared(m);
@@ -277,9 +277,9 @@
//check add suspects members if they are alive now,
//if they are, simply issue the memberAdded message
- keys = addSuspects.keySet().toArray(new MemberImpl[addSuspects.size()]);
+ keys = addSuspects.keySet().toArray(new Member[addSuspects.size()]);
for (int i = 0; i < keys.length; i++) {
- MemberImpl m = keys[i];
+ Member m = keys[i];
if ( membership.getMember(m) == null && (memberAlive(m))) {
membership.memberAlive(m);
super.memberAdded(m);
@@ -292,7 +292,7 @@
protected synchronized void setupMembership() {
if ( membership == null ) {
- membership = new Membership((MemberImpl)super.getLocalMember(true));
+ membership = new Membership(super.getLocalMember(true));
}
}
--- java/org/apache/catalina/tribes/io/ChannelData.java (revision 1423146)
+++ java/org/apache/catalina/tribes/io/ChannelData.java (working copy)
@@ -186,7 +186,7 @@
4 + //unique id length off=12
uniqueId.length+ //id data off=12+uniqueId.length
4 + //addr length off=12+uniqueId.length+4
- ((MemberImpl)address).getDataLength()+ //member data off=12+uniqueId.length+4+add.length
+ address.getDataLength()+ //member data off=12+uniqueId.length+4+add.length
4 + //message length off=12+uniqueId.length+4+add.length+4
message.getLength();
return length;
@@ -205,7 +205,7 @@
}
public byte[] getDataPackage(byte[] data, int offset) {
- byte[] addr = ((MemberImpl)address).getData(false);
+ byte[] addr = address.getData(false);
XByteBuffer.toBytes(options,data,offset);
offset += 4; //options
XByteBuffer.toBytes(timestamp,data,offset);
--- java/org/apache/catalina/tribes/membership/McastServiceImpl.java (revision 1423146)
+++ java/org/apache/catalina/tribes/membership/McastServiceImpl.java (working copy)
@@ -46,6 +46,7 @@
* Need to fix this, could use java.nio and only need one thread to send and receive, or
* just use a timeout on the receive
* @author Filip Hanik
+ * @author Greg Turnquist
* @version $Id$
*/
public class McastServiceImpl
@@ -360,7 +361,7 @@
}
private void memberDataReceived(byte[] data) {
- final MemberImpl m = MemberImpl.getMember(data);
+ final Member m = MemberImpl.getMember(data);
if (log.isTraceEnabled()) log.trace("Mcast receive ping from member " + m);
Runnable t = null;
if (Arrays.equals(m.getCommand(), Member.SHUTDOWN_PAYLOAD)) {
@@ -444,9 +445,9 @@
protected final Object expiredMutex = new Object();
protected void checkExpired() {
synchronized (expiredMutex) {
- MemberImpl[] expired = membership.expire(timeToExpiration);
+ Member[] expired = membership.expire(timeToExpiration);
for (int i = 0; i < expired.length; i++) {
- final MemberImpl member = expired[i];
+ final Member member = expired[i];
if (log.isDebugEnabled())
log.debug("Mcast expire member " + expired[i]);
try {
--- java/org/apache/catalina/tribes/membership/MemberImpl.java (revision 1423146)
+++ java/org/apache/catalina/tribes/membership/MemberImpl.java (working copy)
@@ -32,6 +32,7 @@
* Carries the host, and port of the this or other cluster nodes.
*
* @author Filip Hanik
+ * @author Greg Turnquist
* @version $Id$
*/
public class MemberImpl implements Member, java.io.Externalizable {
@@ -170,11 +171,15 @@
* @param getalive boolean
* @return byte[]
*/
+ @Override
public byte[] getData(boolean getalive) {
return getData(getalive,false);
}
-
+ /**
+ * Retrieve the total size of the message.
+ */
+ @Override
public int getDataLength() {
return TRIBES_MBR_BEGIN.length+ //start pkg
4+ //data length
@@ -200,6 +205,7 @@
* @param reset boolean - reset the cached data package, and create a new one
* @return byte[]
*/
+ @Override
public byte[] getData(boolean getalive, boolean reset) {
if ( reset ) dataPkg = null;
//look in cache first
@@ -298,11 +304,11 @@
* @param data - the bytes received
* @return a member object.
*/
- public static MemberImpl getMember(byte[] data, MemberImpl member) {
+ public static Member getMember(byte[] data, MemberImpl member) {
return getMember(data,0,data.length,member);
}
- public static MemberImpl getMember(byte[] data, int offset, int length, MemberImpl member) {
+ public static Member getMember(byte[] data, int offset, int length, MemberImpl member) {
//package looks like
//start package TRIBES_MBR_BEGIN.length
//package length - 4 bytes
@@ -408,11 +414,11 @@
return member;
}
- public static MemberImpl getMember(byte[] data) {
+ public static Member getMember(byte[] data) {
return getMember(data,new MemberImpl());
}
- public static MemberImpl getMember(byte[] data, int offset, int length) {
+ public static Member getMember(byte[] data, int offset, int length) {
return getMember(data,offset,length,new MemberImpl());
}
--- java/org/apache/catalina/tribes/membership/Membership.java (revision 1423146)
+++ java/org/apache/catalina/tribes/membership/Membership.java (working copy)
@@ -36,11 +36,12 @@
*
* @author Filip Hanik
* @author Peter Rossbach
+ * @author Greg Turnquist
* @version $Id$
*/
public class Membership implements Cloneable {
- protected static final MemberImpl[] EMPTY_MEMBERS = new MemberImpl[0];
+ protected static final Member[] EMPTY_MEMBERS = new Member[0];
private final Object membersLock = new Object();
@@ -48,17 +49,17 @@
* The name of this membership, has to be the same as the name for the local
* member
*/
- protected final MemberImpl local;
+ protected final Member local;
/**
* A map of all the members in the cluster.
*/
- protected HashMap map = new HashMap<>();
+ protected HashMap map = new HashMap<>();
/**
* A list of all the members in the cluster.
*/
- protected MemberImpl[] members = EMPTY_MEMBERS;
+ protected Member[] members = EMPTY_MEMBERS;
/**
* sort members by alive time
@@ -69,9 +70,9 @@
public Object clone() {
synchronized (membersLock) {
Membership clone = new Membership(local, memberComparator);
- final HashMap tmpclone = (HashMap) map.clone();
+ final HashMap tmpclone = (HashMap) map.clone();
clone.map = tmpclone;
- clone.members = new MemberImpl[members.length];
+ clone.members = new Member[members.length];
System.arraycopy(members,0,clone.members,0,members.length);
return clone;
}
@@ -82,19 +83,19 @@
* @param local - has to be the name of the local member. Used to filter the local member from the cluster membership
* @param includeLocal - TBA
*/
- public Membership(MemberImpl local, boolean includeLocal) {
+ public Membership(Member local, boolean includeLocal) {
this(local, new MemberComparator(), includeLocal);
}
- public Membership(MemberImpl local) {
+ public Membership(Member local) {
this(local, false);
}
- public Membership(MemberImpl local, Comparator comp) {
+ public Membership(Member local, Comparator comp) {
this(local, comp, false);
}
- public Membership(MemberImpl local, Comparator comp, boolean includeLocal) {
+ public Membership(Member local, Comparator comp, boolean includeLocal) {
this.local = local;
if ( includeLocal ) addMember(local);
this.memberComparator = comp;
@@ -115,7 +116,7 @@
* @return - true if this member is new to the cluster, false otherwise.
* - false if this member is the local member or updated.
*/
- public synchronized boolean memberAlive(MemberImpl member) {
+ public synchronized boolean memberAlive(Member member) {
boolean result = false;
//ignore ourselves
if ( member.equals(local) ) return result;
@@ -127,7 +128,7 @@
result = true;
} else {
//update the member alive time
- MemberImpl updateMember = entry.getMember() ;
+ Member updateMember = entry.getMember() ;
if(updateMember.getMemberAliveTime() != member.getMemberAliveTime()) {
//update fields that can change
updateMember.setMemberAliveTime(member.getMemberAliveTime());
@@ -144,12 +145,12 @@
* Add a member to this component and sort array with memberComparator
* @param member The member to add
*/
- public synchronized MbrEntry addMember(MemberImpl member) {
+ public synchronized MbrEntry addMember(Member member) {
synchronized (membersLock) {
MbrEntry entry = new MbrEntry(member);
if (!map.containsKey(member) ) {
map.put(member, entry);
- MemberImpl results[] = new MemberImpl[members.length + 1];
+ Member results[] = new Member[members.length + 1];
for (int i = 0; i < members.length; i++) results[i] = members[i];
results[members.length] = member;
members = results;
@@ -164,7 +165,7 @@
*
* @param member The member to remove
*/
- public void removeMember(MemberImpl member) {
+ public void removeMember(Member member) {
map.remove(member);
synchronized (membersLock) {
int n = -1;
@@ -175,7 +176,7 @@
}
}
if (n < 0) return;
- MemberImpl results[] = new MemberImpl[members.length - 1];
+ Member results[] = new Member[members.length - 1];
int j = 0;
for (int i = 0; i < members.length; i++) {
if (i != n)
@@ -192,11 +193,11 @@
* @param maxtime - the max time a member can remain unannounced before it is considered dead.
* @return the list of expired members
*/
- public synchronized MemberImpl[] expire(long maxtime) {
+ public synchronized Member[] expire(long maxtime) {
if(!hasMembers() )
return EMPTY_MEMBERS;
- ArrayList list = null;
+ ArrayList list = null;
Iterator i = map.values().iterator();
while(i.hasNext()) {
MbrEntry entry = i.next();
@@ -208,7 +209,7 @@
}
if(list != null) {
- MemberImpl[] result = new MemberImpl[list.size()];
+ Member[] result = new Member[list.size()];
list.toArray(result);
for( int j=0; j> i = map.entrySet().iterator();
+ Iterator> i = map.entrySet().iterator();
int pos = 0;
while ( i.hasNext() )
result[pos++] = i.next().getValue();
@@ -293,10 +294,10 @@
*/
protected static class MbrEntry {
- protected final MemberImpl mbr;
+ protected final Member mbr;
protected long lastHeardFrom;
- public MbrEntry(MemberImpl mbr) {
+ public MbrEntry(Member mbr) {
this.mbr = mbr;
}
@@ -310,7 +311,7 @@
/**
* Return the actual Member object
*/
- public MemberImpl getMember() {
+ public Member getMember() {
return mbr;
}
--- java/org/apache/catalina/tribes/util/Arrays.java (revision 1423146)
+++ java/org/apache/catalina/tribes/util/Arrays.java (working copy)
@@ -30,6 +30,7 @@
/**
* @author Filip Hanik
+ * @author Greg Turnquist
* @version 1.0
*/
public class Arrays {
@@ -152,12 +153,12 @@
}
public static void fill(Membership mbrship, Member[] m) {
- for (int i=0; i result = new ArrayList<>();
- MemberImpl[] comp = complete.getMembers();
+ Member[] comp = complete.getMembers();
for ( int i=0; iCompany:
*
* @author fhanik
+ * @author Greg Turnquist
* @version 1.0
*/
public class ChannelCreator {
@@ -133,7 +134,7 @@
String d = args[++i];
String h = d.substring(0,d.indexOf(":"));
String p = d.substring(h.length()+1);
- MemberImpl m = new MemberImpl(h,Integer.parseInt(p),2000);
+ Member m = new MemberImpl(h,Integer.parseInt(p),2000);
staticMembers.add(m);
} else if ("-throughput".equals(args[i])) {
throughput = true;
--- test/org/apache/catalina/tribes/membership/TestMemberImplSerialization.java (revision 1423146)
+++ test/org/apache/catalina/tribes/membership/TestMemberImplSerialization.java (working copy)
@@ -16,11 +16,12 @@
*/
package org.apache.catalina.tribes.membership;
-import java.util.Arrays;
-
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+
+import org.apache.catalina.tribes.Member;
import org.junit.Before;
import org.junit.Test;
@@ -73,8 +74,8 @@
byte[] md1 = m1.getData();
byte[] md2 = m2.getData();
- MemberImpl a1 = MemberImpl.getMember(md1);
- MemberImpl a2 = MemberImpl.getMember(md2);
+ Member a1 = MemberImpl.getMember(md1);
+ Member a2 = MemberImpl.getMember(md2);
assertTrue(a1.getUdpPort()==a2.getUdpPort());
assertTrue(a1.getUdpPort()==udpPort);
@@ -82,13 +83,13 @@
@Test
public void testSerializationOne() throws Exception {
- MemberImpl m = m1;
+ Member m = m1;
byte[] md1 = m.getData(false,true);
byte[] mda1 = m.getData(false,false);
assertTrue(Arrays.equals(md1,mda1));
assertTrue(md1==mda1);
mda1 = m.getData(true,true);
- MemberImpl ma1 = MemberImpl.getMember(mda1);
+ Member ma1 = MemberImpl.getMember(mda1);
assertTrue(compareMembers(m,ma1));
mda1 = p1.getData(false);
assertFalse(Arrays.equals(md1,mda1));
@@ -98,15 +99,15 @@
md1 = m.getData(true,true);
Thread.sleep(50);
mda1 = m.getData(true,true);
- MemberImpl a1 = MemberImpl.getMember(md1);
- MemberImpl a2 = MemberImpl.getMember(mda1);
+ Member a1 = MemberImpl.getMember(md1);
+ Member a2 = MemberImpl.getMember(mda1);
assertTrue(a1.equals(a2));
assertFalse(Arrays.equals(md1,mda1));
}
- public boolean compareMembers(MemberImpl impl1, MemberImpl impl2) {
+ public boolean compareMembers(Member impl1, Member impl2) {
boolean result = true;
result = result && Arrays.equals(impl1.getHost(),impl2.getHost());
result = result && Arrays.equals(impl1.getPayload(),impl2.getPayload());
--- test/org/apache/catalina/tribes/test/NioSenderTest.java (revision 1423146)
+++ test/org/apache/catalina/tribes/test/NioSenderTest.java (working copy)
@@ -40,7 +40,7 @@
public class NioSenderTest {
private Selector selector = null;
private int counter = 0;
- MemberImpl mbr;
+ Member mbr;
private static int testOptions = Channel.SEND_OPTIONS_DEFAULT;
public NioSenderTest() {
// Default constructor