Lines 17-22
Link Here
|
17 |
|
17 |
|
18 |
package org.apache.jmeter.protocol.jms.sampler; |
18 |
package org.apache.jmeter.protocol.jms.sampler; |
19 |
|
19 |
|
|
|
20 |
import java.util.Enumeration; |
21 |
import java.util.concurrent.ConcurrentLinkedQueue; |
22 |
|
20 |
import javax.jms.JMSException; |
23 |
import javax.jms.JMSException; |
21 |
import javax.jms.Message; |
24 |
import javax.jms.Message; |
22 |
import javax.jms.MessageListener; |
25 |
import javax.jms.MessageListener; |
Lines 48-58
Link Here
|
48 |
// No need to synch/ - only used by sampler and ClientPool (which does its own synch) |
51 |
// No need to synch/ - only used by sampler and ClientPool (which does its own synch) |
49 |
private transient ReceiveSubscriber SUBSCRIBER = null; |
52 |
private transient ReceiveSubscriber SUBSCRIBER = null; |
50 |
|
53 |
|
51 |
//@GuardedBy("this") |
54 |
private final ConcurrentLinkedQueue<TextMessage> queue = new ConcurrentLinkedQueue<TextMessage>(); |
52 |
private final StringBuffer BUFFER = new StringBuffer(); |
|
|
53 |
|
54 |
//@GuardedBy("this") |
55 |
private transient int counter = 0; |
56 |
|
55 |
|
57 |
private transient volatile boolean interrupted = false; |
56 |
private transient volatile boolean interrupted = false; |
58 |
|
57 |
|
Lines 102-107
Link Here
|
102 |
sub = new OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(), this.getJNDIInitialContextFactory(), |
101 |
sub = new OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(), this.getJNDIInitialContextFactory(), |
103 |
this.getProviderUrl(), this.getConnectionFactory(), this.getTopic(), this.isUseAuth(), this |
102 |
this.getProviderUrl(), this.getConnectionFactory(), this.getTopic(), this.isUseAuth(), this |
104 |
.getUsername(), this.getPassword()); |
103 |
.getUsername(), this.getPassword()); |
|
|
104 |
queue.clear(); |
105 |
sub.setMessageListener(this); |
105 |
sub.setMessageListener(this); |
106 |
sub.resume(); |
106 |
sub.resume(); |
107 |
ClientPool.addClient(sub); |
107 |
ClientPool.addClient(sub); |
Lines 147-160
Link Here
|
147 |
*/ |
147 |
*/ |
148 |
private SampleResult sampleWithListener() { |
148 |
private SampleResult sampleWithListener() { |
149 |
SampleResult result = new SampleResult(); |
149 |
SampleResult result = new SampleResult(); |
|
|
150 |
StringBuffer buffer = new StringBuffer(); |
151 |
StringBuffer propBuffer = new StringBuffer(); |
152 |
int cnt; |
153 |
|
150 |
result.setSampleLabel(getName()); |
154 |
result.setSampleLabel(getName()); |
151 |
initListenerClient(); |
155 |
initListenerClient(); |
152 |
|
156 |
|
153 |
int loop = this.getIterationCount(); |
157 |
int loop = this.getIterationCount(); |
154 |
|
158 |
|
155 |
result.sampleStart(); |
159 |
result.sampleStart(); |
156 |
int read; |
160 |
|
157 |
while ((read=this.count(0)) < loop && interrupted == false) { |
161 |
while (queue.size() < loop && interrupted == false) { |
158 |
try { |
162 |
try { |
159 |
Thread.sleep(0, 50); |
163 |
Thread.sleep(0, 50); |
160 |
} catch (InterruptedException e) { |
164 |
} catch (InterruptedException e) { |
Lines 162-182
Link Here
|
162 |
} |
166 |
} |
163 |
} |
167 |
} |
164 |
result.sampleEnd(); |
168 |
result.sampleEnd(); |
165 |
synchronized (this) {// Need to synch because buffer is shared with onMessageHandler |
169 |
|
166 |
if (this.getReadResponseAsBoolean()) { |
170 |
for(cnt = 0; cnt < loop ; cnt++) { |
167 |
result.setResponseData(this.BUFFER.toString().getBytes()); |
171 |
TextMessage msg = (TextMessage) queue.poll(); |
168 |
} else { |
172 |
try { |
169 |
result.setBytes(this.BUFFER.toString().getBytes().length); |
173 |
buffer.append(msg.getText()); |
170 |
} |
174 |
Enumeration props = msg.getPropertyNames(); |
171 |
read=this.count(0); |
175 |
while(props.hasMoreElements()) { |
|
|
176 |
String name = (String) props.nextElement(); |
177 |
propBuffer.append("PROPERTY: "); |
178 |
propBuffer.append(name); |
179 |
propBuffer.append("="); |
180 |
propBuffer.append(msg.getObjectProperty(name)); |
181 |
propBuffer.append("\n"); |
182 |
} |
183 |
} catch (JMSException e) { |
184 |
log.error(e.getMessage()); |
185 |
} |
186 |
} |
187 |
if (this.getReadResponseAsBoolean()) { |
188 |
result.setResponseData(buffer.toString().getBytes()); |
189 |
} else { |
190 |
result.setBytes(buffer.toString().getBytes().length); |
172 |
} |
191 |
} |
|
|
192 |
result.setResponseHeaders(propBuffer.toString()); |
193 |
result.setDataType(SampleResult.TEXT); |
173 |
result.setSuccessful(true); |
194 |
result.setSuccessful(true); |
174 |
result.setResponseCodeOK(); |
195 |
result.setResponseCodeOK(); |
175 |
result.setResponseMessage(read + " messages received"); |
196 |
result.setResponseMessage(loop + " messages received"); |
176 |
result.setSamplerData(loop + " messages expected"); |
197 |
result.setSamplerData(loop + " messages expected"); |
177 |
result.setSampleCount(read); |
198 |
result.setSampleCount(loop); |
178 |
|
199 |
|
179 |
this.resetCount(); |
|
|
180 |
return result; |
200 |
return result; |
181 |
} |
201 |
} |
182 |
|
202 |
|
Lines 188-193
Link Here
|
188 |
*/ |
208 |
*/ |
189 |
private SampleResult sampleWithReceive() { |
209 |
private SampleResult sampleWithReceive() { |
190 |
SampleResult result = new SampleResult(); |
210 |
SampleResult result = new SampleResult(); |
|
|
211 |
StringBuffer buffer = new StringBuffer(); |
212 |
StringBuffer propBuffer = new StringBuffer(); |
213 |
int cnt; |
214 |
|
215 |
|
191 |
result.setSampleLabel(getName()); |
216 |
result.setSampleLabel(getName()); |
192 |
if (this.SUBSCRIBER == null) { |
217 |
if (this.SUBSCRIBER == null) { |
193 |
this.initReceiveClient(); |
218 |
this.initReceiveClient(); |
Lines 206-222
Link Here
|
206 |
} |
231 |
} |
207 |
result.sampleEnd(); |
232 |
result.sampleEnd(); |
208 |
result.setResponseMessage(loop + " samples messages received"); |
233 |
result.setResponseMessage(loop + " samples messages received"); |
|
|
234 |
for(cnt = 0; cnt < loop ; cnt++) { |
235 |
TextMessage msg = this.SUBSCRIBER.getMessage(); |
236 |
try { |
237 |
buffer.append(msg.getText()); |
238 |
Enumeration props = msg.getPropertyNames(); |
239 |
while(props.hasMoreElements()) { |
240 |
String name = (String) props.nextElement(); |
241 |
propBuffer.append("PROPERTY: "); |
242 |
propBuffer.append(name); |
243 |
propBuffer.append("="); |
244 |
propBuffer.append(msg.getObjectProperty(name)); |
245 |
propBuffer.append("\n"); |
246 |
} |
247 |
} catch (JMSException e) { |
248 |
log.error(e.getMessage()); |
249 |
} |
250 |
} |
209 |
if (this.getReadResponseAsBoolean()) { |
251 |
if (this.getReadResponseAsBoolean()) { |
210 |
result.setResponseData(this.SUBSCRIBER.getMessage().getBytes()); |
252 |
result.setResponseData(buffer.toString().getBytes()); |
211 |
} else { |
253 |
} else { |
212 |
result.setBytes(this.SUBSCRIBER.getMessage().getBytes().length); |
254 |
result.setBytes(buffer.toString().getBytes().length); |
213 |
} |
255 |
} |
|
|
256 |
result.setResponseHeaders(propBuffer.toString()); |
214 |
result.setSuccessful(true); |
257 |
result.setSuccessful(true); |
215 |
result.setResponseCode(loop + " message(s) received successfully"); |
258 |
result.setResponseCode(loop + " message(s) received successfully"); |
216 |
result.setSamplerData("Not applicable"); |
259 |
result.setSamplerData("Not applicable"); |
217 |
result.setSampleCount(loop); |
260 |
result.setSampleCount(loop); |
218 |
|
261 |
|
219 |
this.SUBSCRIBER.reset(); |
|
|
220 |
return result; |
262 |
return result; |
221 |
} |
263 |
} |
222 |
|
264 |
|
Lines 225-262
Link Here
|
225 |
* listener with the TopicSubscriber. |
267 |
* listener with the TopicSubscriber. |
226 |
*/ |
268 |
*/ |
227 |
public synchronized void onMessage(Message message) { |
269 |
public synchronized void onMessage(Message message) { |
228 |
try { |
270 |
if (message instanceof TextMessage) { |
229 |
if (message instanceof TextMessage) { |
271 |
queue.add((TextMessage)message); |
230 |
TextMessage msg = (TextMessage) message; |
272 |
} |
231 |
String content = msg.getText(); |
|
|
232 |
if (content != null) { |
233 |
this.BUFFER.append(content); |
234 |
count(1); |
235 |
} |
236 |
} |
237 |
} catch (JMSException e) { |
238 |
log.error(e.getMessage()); |
239 |
} |
240 |
} |
241 |
|
242 |
/** |
243 |
* increment the count and return the new value. |
244 |
* |
245 |
* @param increment |
246 |
* @return the new value |
247 |
*/ |
248 |
private synchronized int count(int increment) { |
249 |
this.counter += increment; |
250 |
return this.counter; |
251 |
} |
252 |
|
253 |
/** |
254 |
* resetCount will set the counter to zero and set the length of the |
255 |
* StringBuffer to zero. |
256 |
*/ |
257 |
private synchronized void resetCount() { |
258 |
this.counter = 0; |
259 |
this.BUFFER.setLength(0); |
260 |
} |
273 |
} |
261 |
|
274 |
|
262 |
// ----------- get/set methods ------------------- // |
275 |
// ----------- get/set methods ------------------- // |