001package var.mom.jms.log; 002 003import javax.jms.Connection; 004import javax.jms.ConnectionFactory; 005import javax.jms.Destination; 006import javax.jms.JMSException; 007import javax.jms.Message; 008import javax.jms.MessageConsumer; 009import javax.jms.MessageListener; 010import javax.jms.Session; 011import javax.jms.TextMessage; 012import javax.naming.Context; 013import javax.naming.InitialContext; 014import javax.naming.NamingException; 015 016/** 017 * asynchronous provider for the log service, flitering only messages with 018 * String property "Priority"=="high" 019 * 020 * @author Sandro Leuchter 021 * 022 */ 023 024public class ConsumerFilteredNode implements MessageListener { 025 private Connection connection; 026 private Session session; 027 private MessageConsumer consumer; 028 029 /** 030 * constructor, establishes and starts connection to JMS provider specified in 031 * JNDI (via jndi.properties), afterwards consumer is ready filtering only 032 * messages with String property "Priority" set to "high" 033 * 034 * @throws NamingException JNDI exceptions 035 * @throws JMSException JMS exceptions 036 */ 037 public ConsumerFilteredNode() throws NamingException, JMSException { 038 Context ctx = new InitialContext(); 039 ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory"); 040 Destination queue = (Destination) ctx.lookup(Conf.QUEUE); 041 this.connection = factory.createConnection(); 042 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 043 this.consumer = this.session.createConsumer(queue, "Priority='high'"); 044 this.consumer.setMessageListener(this); 045 this.connection.start(); 046 } 047 048 /** 049 * asynchronous message consumption 050 * 051 * @see javax.jms.MessageListener 052 */ 053 @Override 054 public void onMessage(Message message) { 055 try { 056 if (message instanceof TextMessage) { 057 TextMessage textMessage = (TextMessage) message; 058 String messageText = textMessage.getText(); 059 String priority = textMessage.getStringProperty("Priority"); 060 System.out.println(messageText + " [Priority=" + priority + "]"); 061 } 062 } catch (JMSException e) { 063 System.err.println(e); 064 } 065 } 066 067 /** 068 * main routine and starting point of program 069 * 070 * @param args[0] time to wait in ms 071 */ 072 public static void main(String[] args) { 073 long wait = Long.parseLong(args[0]); 074 ConsumerFilteredNode node = null; 075 try { 076 node = new ConsumerFilteredNode(); 077 Thread.sleep(wait); 078 } catch (InterruptedException | NamingException | JMSException e) { 079 System.err.println(e); 080 } finally { 081 try { 082 if ((node != null) && (node.consumer != null)) { 083 node.consumer.close(); 084 } 085 if ((node != null) && (node.session != null)) { 086 node.session.close(); 087 } 088 if ((node != null) && (node.connection != null)) { 089 node.connection.close(); 090 } 091 } catch (JMSException e) { 092 System.err.println(e); 093 } 094 } 095 } 096 097}