--- SubscriberSampler.java.orig 2009-11-04 20:17:55.000000000 -0500 +++ SubscriberSampler.java 2009-11-04 20:45:08.000000000 -0500 @@ -17,6 +17,9 @@ package org.apache.jmeter.protocol.jms.sampler; +import java.util.Enumeration; +import java.util.concurrent.ConcurrentLinkedQueue; + import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @@ -48,11 +51,7 @@ // No need to synch/ - only used by sampler and ClientPool (which does its own synch) private transient ReceiveSubscriber SUBSCRIBER = null; - //@GuardedBy("this") - private final StringBuffer BUFFER = new StringBuffer(); - - //@GuardedBy("this") - private transient int counter = 0; + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); private transient volatile boolean interrupted = false; @@ -102,6 +101,7 @@ sub = new OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(), this.getJNDIInitialContextFactory(), this.getProviderUrl(), this.getConnectionFactory(), this.getTopic(), this.isUseAuth(), this .getUsername(), this.getPassword()); + queue.clear(); sub.setMessageListener(this); sub.resume(); ClientPool.addClient(sub); @@ -147,14 +147,18 @@ */ private SampleResult sampleWithListener() { SampleResult result = new SampleResult(); + StringBuffer buffer = new StringBuffer(); + StringBuffer propBuffer = new StringBuffer(); + int cnt; + result.setSampleLabel(getName()); initListenerClient(); int loop = this.getIterationCount(); result.sampleStart(); - int read; - while ((read=this.count(0)) < loop && interrupted == false) { + + while (queue.size() < loop && interrupted == false) { try { Thread.sleep(0, 50); } catch (InterruptedException e) { @@ -162,21 +166,37 @@ } } result.sampleEnd(); - synchronized (this) {// Need to synch because buffer is shared with onMessageHandler - if (this.getReadResponseAsBoolean()) { - result.setResponseData(this.BUFFER.toString().getBytes()); - } else { - result.setBytes(this.BUFFER.toString().getBytes().length); - } - read=this.count(0); + + for(cnt = 0; cnt < loop ; cnt++) { + TextMessage msg = (TextMessage) queue.poll(); + try { + buffer.append(msg.getText()); + Enumeration props = msg.getPropertyNames(); + while(props.hasMoreElements()) { + String name = (String) props.nextElement(); + propBuffer.append("PROPERTY: "); + propBuffer.append(name); + propBuffer.append("="); + propBuffer.append(msg.getObjectProperty(name)); + propBuffer.append("\n"); + } + } catch (JMSException e) { + log.error(e.getMessage()); + } + } + if (this.getReadResponseAsBoolean()) { + result.setResponseData(buffer.toString().getBytes()); + } else { + result.setBytes(buffer.toString().getBytes().length); } + result.setResponseHeaders(propBuffer.toString()); + result.setDataType(SampleResult.TEXT); result.setSuccessful(true); result.setResponseCodeOK(); - result.setResponseMessage(read + " messages received"); + result.setResponseMessage(loop + " messages received"); result.setSamplerData(loop + " messages expected"); - result.setSampleCount(read); + result.setSampleCount(loop); - this.resetCount(); return result; } @@ -188,6 +208,11 @@ */ private SampleResult sampleWithReceive() { SampleResult result = new SampleResult(); + StringBuffer buffer = new StringBuffer(); + StringBuffer propBuffer = new StringBuffer(); + int cnt; + + result.setSampleLabel(getName()); if (this.SUBSCRIBER == null) { this.initReceiveClient(); @@ -206,17 +231,34 @@ } result.sampleEnd(); result.setResponseMessage(loop + " samples messages received"); + for(cnt = 0; cnt < loop ; cnt++) { + TextMessage msg = this.SUBSCRIBER.getMessage(); + try { + buffer.append(msg.getText()); + Enumeration props = msg.getPropertyNames(); + while(props.hasMoreElements()) { + String name = (String) props.nextElement(); + propBuffer.append("PROPERTY: "); + propBuffer.append(name); + propBuffer.append("="); + propBuffer.append(msg.getObjectProperty(name)); + propBuffer.append("\n"); + } + } catch (JMSException e) { + log.error(e.getMessage()); + } + } if (this.getReadResponseAsBoolean()) { - result.setResponseData(this.SUBSCRIBER.getMessage().getBytes()); + result.setResponseData(buffer.toString().getBytes()); } else { - result.setBytes(this.SUBSCRIBER.getMessage().getBytes().length); + result.setBytes(buffer.toString().getBytes().length); } + result.setResponseHeaders(propBuffer.toString()); result.setSuccessful(true); result.setResponseCode(loop + " message(s) received successfully"); result.setSamplerData("Not applicable"); result.setSampleCount(loop); - this.SUBSCRIBER.reset(); return result; } @@ -225,38 +267,9 @@ * listener with the TopicSubscriber. */ public synchronized void onMessage(Message message) { - try { - if (message instanceof TextMessage) { - TextMessage msg = (TextMessage) message; - String content = msg.getText(); - if (content != null) { - this.BUFFER.append(content); - count(1); - } - } - } catch (JMSException e) { - log.error(e.getMessage()); - } - } - - /** - * increment the count and return the new value. - * - * @param increment - * @return the new value - */ - private synchronized int count(int increment) { - this.counter += increment; - return this.counter; - } - - /** - * resetCount will set the counter to zero and set the length of the - * StringBuffer to zero. - */ - private synchronized void resetCount() { - this.counter = 0; - this.BUFFER.setLength(0); + if (message instanceof TextMessage) { + queue.add((TextMessage)message); + } } // ----------- get/set methods ------------------- //