ASF Bugzilla – Attachment 24485 Details for
Bug 47949
JMS Subscriber never recieves all the messages
Home
|
New
|
Browse
|
Search
|
[?]
|
Reports
|
Help
|
New Account
|
Log In
Remember
[x]
|
Forgot Password
Login:
[x]
[patch]
Patch to SubscriberSampler.java
SubscriberSampler.java.diff (text/plain), 6.75 KB, created by
Bob Yetman
on 2009-11-04 17:53:18 UTC
(
hide
)
Description:
Patch to SubscriberSampler.java
Filename:
MIME Type:
Creator:
Bob Yetman
Created:
2009-11-04 17:53:18 UTC
Size:
6.75 KB
patch
obsolete
>--- SubscriberSampler.java.orig 2009-11-04 20:17:55.000000000 -0500 >+++ SubscriberSampler.java 2009-11-04 20:45:08.000000000 -0500 >@@ -17,6 +17,9 @@ > > package org.apache.jmeter.protocol.jms.sampler; > >+import java.util.Enumeration; >+import java.util.concurrent.ConcurrentLinkedQueue; >+ > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageListener; >@@ -48,11 +51,7 @@ > // No need to synch/ - only used by sampler and ClientPool (which does its own synch) > private transient ReceiveSubscriber SUBSCRIBER = null; > >- //@GuardedBy("this") >- private final StringBuffer BUFFER = new StringBuffer(); >- >- //@GuardedBy("this") >- private transient int counter = 0; >+ private final ConcurrentLinkedQueue<TextMessage> queue = new ConcurrentLinkedQueue<TextMessage>(); > > private transient volatile boolean interrupted = false; > >@@ -102,6 +101,7 @@ > sub = new OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(), this.getJNDIInitialContextFactory(), > this.getProviderUrl(), this.getConnectionFactory(), this.getTopic(), this.isUseAuth(), this > .getUsername(), this.getPassword()); >+ queue.clear(); > sub.setMessageListener(this); > sub.resume(); > ClientPool.addClient(sub); >@@ -147,14 +147,18 @@ > */ > private SampleResult sampleWithListener() { > SampleResult result = new SampleResult(); >+ StringBuffer buffer = new StringBuffer(); >+ StringBuffer propBuffer = new StringBuffer(); >+ int cnt; >+ > result.setSampleLabel(getName()); > initListenerClient(); > > int loop = this.getIterationCount(); > > result.sampleStart(); >- int read; >- while ((read=this.count(0)) < loop && interrupted == false) { >+ >+ while (queue.size() < loop && interrupted == false) { > try { > Thread.sleep(0, 50); > } catch (InterruptedException e) { >@@ -162,21 +166,37 @@ > } > } > result.sampleEnd(); >- synchronized (this) {// Need to synch because buffer is shared with onMessageHandler >- if (this.getReadResponseAsBoolean()) { >- result.setResponseData(this.BUFFER.toString().getBytes()); >- } else { >- result.setBytes(this.BUFFER.toString().getBytes().length); >- } >- read=this.count(0); >+ >+ for(cnt = 0; cnt < loop ; cnt++) { >+ TextMessage msg = (TextMessage) queue.poll(); >+ try { >+ buffer.append(msg.getText()); >+ Enumeration props = msg.getPropertyNames(); >+ while(props.hasMoreElements()) { >+ String name = (String) props.nextElement(); >+ propBuffer.append("PROPERTY: "); >+ propBuffer.append(name); >+ propBuffer.append("="); >+ propBuffer.append(msg.getObjectProperty(name)); >+ propBuffer.append("\n"); >+ } >+ } catch (JMSException e) { >+ log.error(e.getMessage()); >+ } >+ } >+ if (this.getReadResponseAsBoolean()) { >+ result.setResponseData(buffer.toString().getBytes()); >+ } else { >+ result.setBytes(buffer.toString().getBytes().length); > } >+ result.setResponseHeaders(propBuffer.toString()); >+ result.setDataType(SampleResult.TEXT); > result.setSuccessful(true); > result.setResponseCodeOK(); >- result.setResponseMessage(read + " messages received"); >+ result.setResponseMessage(loop + " messages received"); > result.setSamplerData(loop + " messages expected"); >- result.setSampleCount(read); >+ result.setSampleCount(loop); > >- this.resetCount(); > return result; > } > >@@ -188,6 +208,11 @@ > */ > private SampleResult sampleWithReceive() { > SampleResult result = new SampleResult(); >+ StringBuffer buffer = new StringBuffer(); >+ StringBuffer propBuffer = new StringBuffer(); >+ int cnt; >+ >+ > result.setSampleLabel(getName()); > if (this.SUBSCRIBER == null) { > this.initReceiveClient(); >@@ -206,17 +231,34 @@ > } > result.sampleEnd(); > result.setResponseMessage(loop + " samples messages received"); >+ for(cnt = 0; cnt < loop ; cnt++) { >+ TextMessage msg = this.SUBSCRIBER.getMessage(); >+ try { >+ buffer.append(msg.getText()); >+ Enumeration props = msg.getPropertyNames(); >+ while(props.hasMoreElements()) { >+ String name = (String) props.nextElement(); >+ propBuffer.append("PROPERTY: "); >+ propBuffer.append(name); >+ propBuffer.append("="); >+ propBuffer.append(msg.getObjectProperty(name)); >+ propBuffer.append("\n"); >+ } >+ } catch (JMSException e) { >+ log.error(e.getMessage()); >+ } >+ } > if (this.getReadResponseAsBoolean()) { >- result.setResponseData(this.SUBSCRIBER.getMessage().getBytes()); >+ result.setResponseData(buffer.toString().getBytes()); > } else { >- result.setBytes(this.SUBSCRIBER.getMessage().getBytes().length); >+ result.setBytes(buffer.toString().getBytes().length); > } >+ result.setResponseHeaders(propBuffer.toString()); > result.setSuccessful(true); > result.setResponseCode(loop + " message(s) received successfully"); > result.setSamplerData("Not applicable"); > result.setSampleCount(loop); > >- this.SUBSCRIBER.reset(); > return result; > } > >@@ -225,38 +267,9 @@ > * listener with the TopicSubscriber. > */ > public synchronized void onMessage(Message message) { >- try { >- if (message instanceof TextMessage) { >- TextMessage msg = (TextMessage) message; >- String content = msg.getText(); >- if (content != null) { >- this.BUFFER.append(content); >- count(1); >- } >- } >- } catch (JMSException e) { >- log.error(e.getMessage()); >- } >- } >- >- /** >- * increment the count and return the new value. >- * >- * @param increment >- * @return the new value >- */ >- private synchronized int count(int increment) { >- this.counter += increment; >- return this.counter; >- } >- >- /** >- * resetCount will set the counter to zero and set the length of the >- * StringBuffer to zero. >- */ >- private synchronized void resetCount() { >- this.counter = 0; >- this.BUFFER.setLength(0); >+ if (message instanceof TextMessage) { >+ queue.add((TextMessage)message); >+ } > } > > // ----------- get/set methods ------------------- //
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 47949
:
24484
| 24485