View Javadoc

1   package org.yajul.jms;
2   
3   import org.slf4j.Logger;
4   import org.slf4j.LoggerFactory;
5   
6   import javax.jms.*;
7   import javax.naming.InitialContext;
8   import javax.naming.NamingException;
9   import java.lang.IllegalStateException;
10  
11  /***
12   * Generic JMS producer or consumer.
13   * <br>User: Joshua Davis
14   * Date: Sep 18, 2007
15   * Time: 7:07:35 AM
16   */
17  public class Endpoint
18  {
19      private static Logger log = LoggerFactory.getLogger(Endpoint.class);
20  
21      private InitialContext ic;
22      private ConnectionFactory connectionFactory;
23      private Session session;
24      private Destination destination;
25      private Connection connection;
26  
27      private MessageConsumer consumer;
28      private MessageProducer producer;
29      private final boolean transacted = false;
30      private final int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
31      private boolean consumerStarted;
32      private boolean queueEndpoint;
33      private String messageSelector;
34  
35      public Endpoint(InitialContext ic, String factoryJndiName, String destinationName, String messageSelector)
36      {
37          try
38          {
39              initialize(ic, factoryJndiName,messageSelector);
40              this.destination = (Destination) ic.lookup(destinationName);
41          }
42          catch (NamingException e)
43          {
44              log.error(e.getMessage(), e);
45              throw new RuntimeException(e);
46          }
47      }
48  
49      private void initialize(InitialContext ic, String factoryJndiName, String messageSelector) throws NamingException {
50          if (ic == null)
51              this.ic = new InitialContext();
52          else
53              this.ic = ic;
54  
55          this.connectionFactory = (ConnectionFactory) this.ic.lookup(factoryJndiName);
56          this.messageSelector = messageSelector;
57      }
58  
59      public Endpoint(InitialContext ic,String factoryJndiName, Destination destination, String messageSelector)
60      {
61          try
62          {
63              initialize(ic, factoryJndiName,messageSelector);
64              this.destination = destination;
65          }
66          catch (NamingException e)
67          {
68              log.error(e.getMessage(), e);
69              throw new RuntimeException(e);
70          }
71      }
72  
73      public boolean isQueueEndpoint()
74      {
75          return queueEndpoint;
76      }
77  
78      public void setQueueEndpoint(boolean queueEndpoint)
79      {
80          this.queueEndpoint = queueEndpoint;
81      }
82  
83      public void close()
84      {
85          consumerStarted = false;
86          if (consumer != null)
87          {
88              JmsHelper.close(consumer, session, connection);
89              consumer = null;
90              session = null;
91              connection = null;
92          }
93          else if (producer != null)
94          {
95              JmsHelper.close(producer, session, connection);
96              producer = null;
97              session = null;
98              connection = null;
99          }
100     }
101 
102 
103     public void startConsumer() throws JMSException
104     {
105         if (!consumerStarted)
106         {
107             getConsumer();
108             connection.start();
109             consumerStarted = true;
110         }
111     }
112 
113     public Message receive(long timeout) throws JMSException
114     {
115         if (consumer == null)
116             throw new IllegalStateException("No consumer!");
117         return consumer.receive(timeout);
118     }
119 
120     public Message receiveNowait() throws JMSException
121     {
122         if (consumer == null)
123             throw new IllegalStateException("No consumer!");
124         return consumer.receiveNoWait();
125     }
126 
127     protected boolean hasConsumer()
128     {
129         return consumer != null;
130     }
131 
132     protected MessageConsumer getConsumer() throws JMSException
133     {
134         if (producer != null)
135             throw new IllegalStateException("This is already a consumer!");
136         if (consumer == null)
137         {
138             consumer = getSession().createConsumer(destination, messageSelector);
139         }
140         return consumer;
141     }
142 
143     public MessageProducer getProducer() throws JMSException
144     {
145         if (consumer != null)
146             throw new IllegalStateException("This is already a consumer!");
147         if (producer == null)
148         {
149             producer = getSession().createProducer(destination);
150         }
151         return producer;
152     }
153 
154     public Session getSession() throws JMSException
155     {
156         if (session == null)
157         {
158             if (queueEndpoint)
159             {
160                 session = ((QueueConnection)getConnection()).createQueueSession(transacted,acknowledgeMode);
161             }
162             else
163             {
164                 session = getConnection().createSession(transacted, acknowledgeMode);
165             }
166         }
167         return session;
168     }
169 
170     public Connection getConnection() throws JMSException
171     {
172         if (connection == null)
173         {
174             if (queueEndpoint)
175                 connection = ((QueueConnectionFactory)connectionFactory).createQueueConnection();
176             else
177                 connection = connectionFactory.createConnection();
178             onConnectionCreated(connection);
179         }
180         return connection;
181     }
182 
183     protected void onConnectionCreated(Connection connection) throws JMSException
184     {
185     }
186 
187     protected void finalize() throws Throwable
188     {
189         super.finalize();
190         close();
191     }
192 
193     protected Destination getDestination()
194     {
195         return destination;
196     }
197 }