1 package org.yajul.jms;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5
6 import javax.jms.*;
7 import javax.naming.InitialContext;
8 import javax.naming.NamingException;
9 import java.lang.IllegalStateException;
10
11 /***
12 * Generic JMS producer or consumer.
13 * <br>User: Joshua Davis
14 * Date: Sep 18, 2007
15 * Time: 7:07:35 AM
16 */
17 public class Endpoint
18 {
19 private static Logger log = LoggerFactory.getLogger(Endpoint.class);
20
21 private InitialContext ic;
22 private ConnectionFactory connectionFactory;
23 private Session session;
24 private Destination destination;
25 private Connection connection;
26
27 private MessageConsumer consumer;
28 private MessageProducer producer;
29 private final boolean transacted = false;
30 private final int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
31 private boolean consumerStarted;
32 private boolean queueEndpoint;
33 private String messageSelector;
34
35 public Endpoint(InitialContext ic, String factoryJndiName, String destinationName, String messageSelector)
36 {
37 try
38 {
39 initialize(ic, factoryJndiName,messageSelector);
40 this.destination = (Destination) ic.lookup(destinationName);
41 }
42 catch (NamingException e)
43 {
44 log.error(e.getMessage(), e);
45 throw new RuntimeException(e);
46 }
47 }
48
49 private void initialize(InitialContext ic, String factoryJndiName, String messageSelector) throws NamingException {
50 if (ic == null)
51 this.ic = new InitialContext();
52 else
53 this.ic = ic;
54
55 this.connectionFactory = (ConnectionFactory) this.ic.lookup(factoryJndiName);
56 this.messageSelector = messageSelector;
57 }
58
59 public Endpoint(InitialContext ic,String factoryJndiName, Destination destination, String messageSelector)
60 {
61 try
62 {
63 initialize(ic, factoryJndiName,messageSelector);
64 this.destination = destination;
65 }
66 catch (NamingException e)
67 {
68 log.error(e.getMessage(), e);
69 throw new RuntimeException(e);
70 }
71 }
72
73 public boolean isQueueEndpoint()
74 {
75 return queueEndpoint;
76 }
77
78 public void setQueueEndpoint(boolean queueEndpoint)
79 {
80 this.queueEndpoint = queueEndpoint;
81 }
82
83 public void close()
84 {
85 consumerStarted = false;
86 if (consumer != null)
87 {
88 JmsHelper.close(consumer, session, connection);
89 consumer = null;
90 session = null;
91 connection = null;
92 }
93 else if (producer != null)
94 {
95 JmsHelper.close(producer, session, connection);
96 producer = null;
97 session = null;
98 connection = null;
99 }
100 }
101
102
103 public void startConsumer() throws JMSException
104 {
105 if (!consumerStarted)
106 {
107 getConsumer();
108 connection.start();
109 consumerStarted = true;
110 }
111 }
112
113 public Message receive(long timeout) throws JMSException
114 {
115 if (consumer == null)
116 throw new IllegalStateException("No consumer!");
117 return consumer.receive(timeout);
118 }
119
120 public Message receiveNowait() throws JMSException
121 {
122 if (consumer == null)
123 throw new IllegalStateException("No consumer!");
124 return consumer.receiveNoWait();
125 }
126
127 protected boolean hasConsumer()
128 {
129 return consumer != null;
130 }
131
132 protected MessageConsumer getConsumer() throws JMSException
133 {
134 if (producer != null)
135 throw new IllegalStateException("This is already a consumer!");
136 if (consumer == null)
137 {
138 consumer = getSession().createConsumer(destination, messageSelector);
139 }
140 return consumer;
141 }
142
143 public MessageProducer getProducer() throws JMSException
144 {
145 if (consumer != null)
146 throw new IllegalStateException("This is already a consumer!");
147 if (producer == null)
148 {
149 producer = getSession().createProducer(destination);
150 }
151 return producer;
152 }
153
154 public Session getSession() throws JMSException
155 {
156 if (session == null)
157 {
158 if (queueEndpoint)
159 {
160 session = ((QueueConnection)getConnection()).createQueueSession(transacted,acknowledgeMode);
161 }
162 else
163 {
164 session = getConnection().createSession(transacted, acknowledgeMode);
165 }
166 }
167 return session;
168 }
169
170 public Connection getConnection() throws JMSException
171 {
172 if (connection == null)
173 {
174 if (queueEndpoint)
175 connection = ((QueueConnectionFactory)connectionFactory).createQueueConnection();
176 else
177 connection = connectionFactory.createConnection();
178 onConnectionCreated(connection);
179 }
180 return connection;
181 }
182
183 protected void onConnectionCreated(Connection connection) throws JMSException
184 {
185 }
186
187 protected void finalize() throws Throwable
188 {
189 super.finalize();
190 close();
191 }
192
193 protected Destination getDestination()
194 {
195 return destination;
196 }
197 }