Lines 29-34
Link Here
|
29 |
import javax.jms.MessageConsumer; |
29 |
import javax.jms.MessageConsumer; |
30 |
import javax.jms.MessageListener; |
30 |
import javax.jms.MessageListener; |
31 |
import javax.jms.Session; |
31 |
import javax.jms.Session; |
|
|
32 |
import javax.jms.Topic; |
32 |
import javax.naming.Context; |
33 |
import javax.naming.Context; |
33 |
import javax.naming.NamingException; |
34 |
import javax.naming.NamingException; |
34 |
|
35 |
|
Lines 81-94
Link Here
|
81 |
*/ |
82 |
*/ |
82 |
public ReceiveSubscriber(boolean useProps, |
83 |
public ReceiveSubscriber(boolean useProps, |
83 |
String initialContextFactory, String providerUrl, String connfactory, String destinationName, |
84 |
String initialContextFactory, String providerUrl, String connfactory, String destinationName, |
84 |
boolean useAuth, |
85 |
String durableSubscriptionId, boolean useAuth, |
85 |
String securityPrincipal, String securityCredentials) throws NamingException, JMSException { |
86 |
String securityPrincipal, String securityCredentials) throws NamingException, JMSException { |
86 |
Context ctx = InitialContextFactory.getContext(useProps, |
87 |
Context ctx = InitialContextFactory.getContext(useProps, |
87 |
initialContextFactory, providerUrl, useAuth, securityPrincipal, securityCredentials); |
88 |
initialContextFactory, providerUrl, useAuth, securityPrincipal, securityCredentials); |
88 |
CONN = Utils.getConnection(ctx, connfactory); |
89 |
CONN = Utils.getConnection(ctx, connfactory); |
89 |
SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE); |
90 |
SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE); |
90 |
Destination dest = Utils.lookupDestination(ctx, destinationName); |
91 |
Destination dest = Utils.lookupDestination(ctx, destinationName); |
91 |
SUBSCRIBER = SESSION.createConsumer(dest); |
92 |
SUBSCRIBER = createSubscriber(SESSION, dest, durableSubscriptionId); |
92 |
queue = null; |
93 |
queue = null; |
93 |
log.debug("<init> complete"); |
94 |
log.debug("<init> complete"); |
94 |
} |
95 |
} |
Lines 114-127
Link Here
|
114 |
*/ |
115 |
*/ |
115 |
public ReceiveSubscriber(int queueSize, boolean useProps, |
116 |
public ReceiveSubscriber(int queueSize, boolean useProps, |
116 |
String initialContextFactory, String providerUrl, String connfactory, String destinationName, |
117 |
String initialContextFactory, String providerUrl, String connfactory, String destinationName, |
117 |
boolean useAuth, |
118 |
String durableSubscriptionId, boolean useAuth, |
118 |
String securityPrincipal, String securityCredentials) throws NamingException, JMSException { |
119 |
String securityPrincipal, String securityCredentials) throws NamingException, JMSException { |
119 |
Context ctx = InitialContextFactory.getContext(useProps, |
120 |
Context ctx = InitialContextFactory.getContext(useProps, |
120 |
initialContextFactory, providerUrl, useAuth, securityPrincipal, securityCredentials); |
121 |
initialContextFactory, providerUrl, useAuth, securityPrincipal, securityCredentials); |
121 |
CONN = Utils.getConnection(ctx, connfactory); |
122 |
CONN = Utils.getConnection(ctx, connfactory); |
122 |
SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE); |
123 |
SESSION = CONN.createSession(false, Session.AUTO_ACKNOWLEDGE); |
123 |
Destination dest = Utils.lookupDestination(ctx, destinationName); |
124 |
Destination dest = Utils.lookupDestination(ctx, destinationName); |
124 |
SUBSCRIBER = SESSION.createConsumer(dest); |
125 |
SUBSCRIBER = createSubscriber(SESSION, dest, durableSubscriptionId); |
125 |
if (queueSize <=0) { |
126 |
if (queueSize <=0) { |
126 |
queue = new LinkedBlockingQueue<Message>(); |
127 |
queue = new LinkedBlockingQueue<Message>(); |
127 |
} else { |
128 |
} else { |
Lines 130-135
Link Here
|
130 |
SUBSCRIBER.setMessageListener(this); |
131 |
SUBSCRIBER.setMessageListener(this); |
131 |
log.debug("<init> complete"); |
132 |
log.debug("<init> complete"); |
132 |
} |
133 |
} |
|
|
134 |
|
135 |
/** |
136 |
* Return a simple MessageConsumer or a TopicSubscriber (as a durable subscription) |
137 |
* @param session |
138 |
* JMS session |
139 |
* @param destination |
140 |
* JMS destination, can be either topic or queue |
141 |
* @param durableSubscriptionId |
142 |
* If neither empty nor null, this means that a durable |
143 |
* subscription will be used |
144 |
* @return |
145 |
* @throws JMSException |
146 |
*/ |
147 |
private MessageConsumer createSubscriber(Session session, |
148 |
Destination destination, String durableSubscriptionId) throws JMSException { |
149 |
if (isEmpty(durableSubscriptionId)) { |
150 |
return session.createConsumer(destination); |
151 |
} else { |
152 |
return session.createDurableSubscriber((Topic) destination, durableSubscriptionId); |
153 |
} |
154 |
} |
133 |
|
155 |
|
134 |
/** |
156 |
/** |
135 |
* Calls Connection.start() to begin receiving inbound messages. |
157 |
* Calls Connection.start() to begin receiving inbound messages. |
Lines 204-207
Link Here
|
204 |
log.warn("Could not add message to queue"); |
226 |
log.warn("Could not add message to queue"); |
205 |
} |
227 |
} |
206 |
} |
228 |
} |
|
|
229 |
|
230 |
|
231 |
/** |
232 |
* Checks whether string is empty |
233 |
* |
234 |
* @param s1 |
235 |
* @return True if input is null, an empty string, |
236 |
* or a white space-only string |
237 |
*/ |
238 |
private boolean isEmpty(String s1) { |
239 |
return (s1 == null || s1.trim().equals("")); |
240 |
} |
207 |
} |
241 |
} |