View | Details | Raw Unified | Return to bug 47949
Collapse All | Expand All

(-)SubscriberSampler.java.orig (-52 / +65 lines)
Lines 17-22 Link Here
17
17
18
package org.apache.jmeter.protocol.jms.sampler;
18
package org.apache.jmeter.protocol.jms.sampler;
19
19
20
import java.util.Enumeration;
21
import java.util.concurrent.ConcurrentLinkedQueue;
22
20
import javax.jms.JMSException;
23
import javax.jms.JMSException;
21
import javax.jms.Message;
24
import javax.jms.Message;
22
import javax.jms.MessageListener;
25
import javax.jms.MessageListener;
Lines 48-58 Link Here
48
    // No need to synch/ - only used by sampler and ClientPool (which does its own synch)
51
    // No need to synch/ - only used by sampler and ClientPool (which does its own synch)
49
    private transient ReceiveSubscriber SUBSCRIBER = null;
52
    private transient ReceiveSubscriber SUBSCRIBER = null;
50
53
51
    //@GuardedBy("this")
54
    private final ConcurrentLinkedQueue<TextMessage> queue = new ConcurrentLinkedQueue<TextMessage>();
52
    private final StringBuffer BUFFER = new StringBuffer();
53
54
    //@GuardedBy("this")
55
    private transient int counter = 0;
56
    
55
    
57
    private transient volatile boolean interrupted = false;
56
    private transient volatile boolean interrupted = false;
58
57
Lines 102-107 Link Here
102
            sub = new OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(), this.getJNDIInitialContextFactory(),
101
            sub = new OnMessageSubscriber(this.getUseJNDIPropertiesAsBoolean(), this.getJNDIInitialContextFactory(),
103
                    this.getProviderUrl(), this.getConnectionFactory(), this.getTopic(), this.isUseAuth(), this
102
                    this.getProviderUrl(), this.getConnectionFactory(), this.getTopic(), this.isUseAuth(), this
104
                            .getUsername(), this.getPassword());
103
                            .getUsername(), this.getPassword());
104
            queue.clear();
105
            sub.setMessageListener(this);
105
            sub.setMessageListener(this);
106
            sub.resume();
106
            sub.resume();
107
            ClientPool.addClient(sub);
107
            ClientPool.addClient(sub);
Lines 147-160 Link Here
147
     */
147
     */
148
    private SampleResult sampleWithListener() {
148
    private SampleResult sampleWithListener() {
149
        SampleResult result = new SampleResult();
149
        SampleResult result = new SampleResult();
150
        StringBuffer buffer = new StringBuffer();
151
        StringBuffer propBuffer = new StringBuffer();
152
        int cnt;
153
        
150
        result.setSampleLabel(getName());
154
        result.setSampleLabel(getName());
151
        initListenerClient();
155
        initListenerClient();
152
156
153
        int loop = this.getIterationCount();
157
        int loop = this.getIterationCount();
154
158
155
        result.sampleStart();
159
        result.sampleStart();
156
        int read;
160
        
157
        while ((read=this.count(0)) < loop && interrupted == false) {
161
        while (queue.size() < loop && interrupted == false) {
158
            try {
162
            try {
159
                Thread.sleep(0, 50);
163
                Thread.sleep(0, 50);
160
            } catch (InterruptedException e) {
164
            } catch (InterruptedException e) {
Lines 162-182 Link Here
162
            }
166
            }
163
        }
167
        }
164
        result.sampleEnd();
168
        result.sampleEnd();
165
        synchronized (this) {// Need to synch because buffer is shared with onMessageHandler
169
       
166
            if (this.getReadResponseAsBoolean()) {
170
        for(cnt = 0; cnt < loop ; cnt++) {
167
                result.setResponseData(this.BUFFER.toString().getBytes());
171
        	TextMessage msg = (TextMessage) queue.poll();
168
            } else {
172
        	try {
169
                result.setBytes(this.BUFFER.toString().getBytes().length);
173
        		buffer.append(msg.getText());
170
            }
174
        		Enumeration props = msg.getPropertyNames();
171
            read=this.count(0);
175
        		while(props.hasMoreElements()) {
176
        			String name = (String) props.nextElement();
177
        			propBuffer.append("PROPERTY: ");
178
        			propBuffer.append(name);
179
        			propBuffer.append("=");
180
        			propBuffer.append(msg.getObjectProperty(name));
181
        			propBuffer.append("\n");
182
        		}
183
        	} catch (JMSException e) {
184
        		log.error(e.getMessage());
185
        	}
186
        }
187
        if (this.getReadResponseAsBoolean()) {
188
        	result.setResponseData(buffer.toString().getBytes());
189
        } else {
190
        	result.setBytes(buffer.toString().getBytes().length);
172
        }
191
        }
192
        result.setResponseHeaders(propBuffer.toString());
193
        result.setDataType(SampleResult.TEXT);
173
        result.setSuccessful(true);
194
        result.setSuccessful(true);
174
        result.setResponseCodeOK();
195
        result.setResponseCodeOK();
175
        result.setResponseMessage(read + " messages received");
196
        result.setResponseMessage(loop + " messages received");
176
        result.setSamplerData(loop + " messages expected");
197
        result.setSamplerData(loop + " messages expected");
177
        result.setSampleCount(read);
198
        result.setSampleCount(loop);
178
199
179
        this.resetCount();
180
        return result;
200
        return result;
181
    }
201
    }
182
202
Lines 188-193 Link Here
188
     */
208
     */
189
    private SampleResult sampleWithReceive() {
209
    private SampleResult sampleWithReceive() {
190
        SampleResult result = new SampleResult();
210
        SampleResult result = new SampleResult();
211
        StringBuffer buffer = new StringBuffer();
212
        StringBuffer propBuffer = new StringBuffer();
213
        int cnt;
214
        
215
        
191
        result.setSampleLabel(getName());
216
        result.setSampleLabel(getName());
192
        if (this.SUBSCRIBER == null) {
217
        if (this.SUBSCRIBER == null) {
193
            this.initReceiveClient();
218
            this.initReceiveClient();
Lines 206-222 Link Here
206
        }
231
        }
207
        result.sampleEnd();
232
        result.sampleEnd();
208
        result.setResponseMessage(loop + " samples messages received");
233
        result.setResponseMessage(loop + " samples messages received");
234
        for(cnt = 0; cnt < loop ; cnt++) {
235
        	TextMessage msg = this.SUBSCRIBER.getMessage();
236
        	try {
237
        		buffer.append(msg.getText());
238
        		Enumeration props = msg.getPropertyNames();
239
        		while(props.hasMoreElements()) {
240
        			String name = (String) props.nextElement();
241
        			propBuffer.append("PROPERTY: ");
242
        			propBuffer.append(name);
243
        			propBuffer.append("=");
244
        			propBuffer.append(msg.getObjectProperty(name));
245
        			propBuffer.append("\n");
246
        		}
247
        	} catch (JMSException e) {
248
        		log.error(e.getMessage());
249
        	}
250
        }
209
        if (this.getReadResponseAsBoolean()) {
251
        if (this.getReadResponseAsBoolean()) {
210
            result.setResponseData(this.SUBSCRIBER.getMessage().getBytes());
252
            result.setResponseData(buffer.toString().getBytes());
211
        } else {
253
        } else {
212
            result.setBytes(this.SUBSCRIBER.getMessage().getBytes().length);
254
            result.setBytes(buffer.toString().getBytes().length);
213
        }
255
        }
256
        result.setResponseHeaders(propBuffer.toString());
214
        result.setSuccessful(true);
257
        result.setSuccessful(true);
215
        result.setResponseCode(loop + " message(s) received successfully");
258
        result.setResponseCode(loop + " message(s) received successfully");
216
        result.setSamplerData("Not applicable");
259
        result.setSamplerData("Not applicable");
217
        result.setSampleCount(loop);
260
        result.setSampleCount(loop);
218
261
219
        this.SUBSCRIBER.reset();
220
        return result;
262
        return result;
221
    }
263
    }
222
264
Lines 225-262 Link Here
225
     * listener with the TopicSubscriber.
267
     * listener with the TopicSubscriber.
226
     */
268
     */
227
    public synchronized void onMessage(Message message) {
269
    public synchronized void onMessage(Message message) {
228
        try {
270
    	if (message instanceof TextMessage) {
229
            if (message instanceof TextMessage) {
271
    		queue.add((TextMessage)message);
230
                TextMessage msg = (TextMessage) message;
272
    	}
231
                String content = msg.getText();
232
                if (content != null) {
233
                    this.BUFFER.append(content);
234
                    count(1);
235
                }
236
            }
237
        } catch (JMSException e) {
238
            log.error(e.getMessage());
239
        }
240
    }
241
242
    /**
243
     * increment the count and return the new value.
244
     *
245
     * @param increment
246
     * @return the new value
247
     */
248
    private synchronized int count(int increment) {
249
        this.counter += increment;
250
        return this.counter;
251
    }
252
253
    /**
254
     * resetCount will set the counter to zero and set the length of the
255
     * StringBuffer to zero.
256
     */
257
    private synchronized void resetCount() {
258
        this.counter = 0;
259
        this.BUFFER.setLength(0);
260
    }
273
    }
261
274
262
    // ----------- get/set methods ------------------- //
275
    // ----------- get/set methods ------------------- //

Return to bug 47949