1 package org.yajul.jms; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 6 import javax.jms.*; 7 import javax.naming.InitialContext; 8 import javax.naming.NamingException; 9 import java.lang.IllegalStateException; 10 11 /*** 12 * Generic JMS producer or consumer. 13 * <br>User: Joshua Davis 14 * Date: Sep 18, 2007 15 * Time: 7:07:35 AM 16 */ 17 public class Endpoint 18 { 19 private static Logger log = LoggerFactory.getLogger(Endpoint.class); 20 21 private InitialContext ic; 22 private ConnectionFactory connectionFactory; 23 private Session session; 24 private Destination destination; 25 private Connection connection; 26 27 private MessageConsumer consumer; 28 private MessageProducer producer; 29 private final boolean transacted = false; 30 private final int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 31 private boolean consumerStarted; 32 private boolean queueEndpoint; 33 private String messageSelector; 34 35 public Endpoint(InitialContext ic, String factoryJndiName, String destinationName, String messageSelector) 36 { 37 try 38 { 39 initialize(ic, factoryJndiName,messageSelector); 40 this.destination = (Destination) ic.lookup(destinationName); 41 } 42 catch (NamingException e) 43 { 44 log.error(e.getMessage(), e); 45 throw new RuntimeException(e); 46 } 47 } 48 49 private void initialize(InitialContext ic, String factoryJndiName, String messageSelector) throws NamingException { 50 if (ic == null) 51 this.ic = new InitialContext(); 52 else 53 this.ic = ic; 54 55 this.connectionFactory = (ConnectionFactory) this.ic.lookup(factoryJndiName); 56 this.messageSelector = messageSelector; 57 } 58 59 public Endpoint(InitialContext ic,String factoryJndiName, Destination destination, String messageSelector) 60 { 61 try 62 { 63 initialize(ic, factoryJndiName,messageSelector); 64 this.destination = destination; 65 } 66 catch (NamingException e) 67 { 68 log.error(e.getMessage(), e); 69 throw new RuntimeException(e); 70 } 71 } 72 73 public boolean isQueueEndpoint() 74 { 75 return queueEndpoint; 76 } 77 78 public void setQueueEndpoint(boolean queueEndpoint) 79 { 80 this.queueEndpoint = queueEndpoint; 81 } 82 83 public void close() 84 { 85 consumerStarted = false; 86 if (consumer != null) 87 { 88 JmsHelper.close(consumer, session, connection); 89 consumer = null; 90 session = null; 91 connection = null; 92 } 93 else if (producer != null) 94 { 95 JmsHelper.close(producer, session, connection); 96 producer = null; 97 session = null; 98 connection = null; 99 } 100 } 101 102 103 public void startConsumer() throws JMSException 104 { 105 if (!consumerStarted) 106 { 107 getConsumer(); 108 connection.start(); 109 consumerStarted = true; 110 } 111 } 112 113 public Message receive(long timeout) throws JMSException 114 { 115 if (consumer == null) 116 throw new IllegalStateException("No consumer!"); 117 return consumer.receive(timeout); 118 } 119 120 public Message receiveNowait() throws JMSException 121 { 122 if (consumer == null) 123 throw new IllegalStateException("No consumer!"); 124 return consumer.receiveNoWait(); 125 } 126 127 protected boolean hasConsumer() 128 { 129 return consumer != null; 130 } 131 132 protected MessageConsumer getConsumer() throws JMSException 133 { 134 if (producer != null) 135 throw new IllegalStateException("This is already a consumer!"); 136 if (consumer == null) 137 { 138 consumer = getSession().createConsumer(destination, messageSelector); 139 } 140 return consumer; 141 } 142 143 public MessageProducer getProducer() throws JMSException 144 { 145 if (consumer != null) 146 throw new IllegalStateException("This is already a consumer!"); 147 if (producer == null) 148 { 149 producer = getSession().createProducer(destination); 150 } 151 return producer; 152 } 153 154 public Session getSession() throws JMSException 155 { 156 if (session == null) 157 { 158 if (queueEndpoint) 159 { 160 session = ((QueueConnection)getConnection()).createQueueSession(transacted,acknowledgeMode); 161 } 162 else 163 { 164 session = getConnection().createSession(transacted, acknowledgeMode); 165 } 166 } 167 return session; 168 } 169 170 public Connection getConnection() throws JMSException 171 { 172 if (connection == null) 173 { 174 if (queueEndpoint) 175 connection = ((QueueConnectionFactory)connectionFactory).createQueueConnection(); 176 else 177 connection = connectionFactory.createConnection(); 178 onConnectionCreated(connection); 179 } 180 return connection; 181 } 182 183 protected void onConnectionCreated(Connection connection) throws JMSException 184 { 185 } 186 187 protected void finalize() throws Throwable 188 { 189 super.finalize(); 190 close(); 191 } 192 193 protected Destination getDestination() 194 { 195 return destination; 196 } 197 }