View Javadoc

1   /***
2    * Created on Jun 29, 2005, Copyright UC Regents
3    */
4   package org.telscenter.pas.service;
5   
6   import java.io.IOException;
7   import java.util.HashMap;
8   import java.util.Map;
9   
10  import javax.jms.JMSException;
11  import javax.jms.MessageListener;
12  import javax.jms.Session;
13  import javax.jms.Topic;
14  import javax.jms.TopicConnection;
15  import javax.jms.TopicConnectionFactory;
16  import javax.jms.TopicPublisher;
17  import javax.jms.TopicSession;
18  import javax.jms.TopicSubscriber;
19  
20  import net.sf.sail.common.messaging.jms.mantaray.MantaSupport;
21  import net.sf.sail.core.uuid.OfferingUuid;
22  
23  import org.mr.api.jms.MantaTopicConnectionFactory;
24  
25  /***
26   * @author turadg
27   */
28  public class MantarayMessagingService implements IMessagingService {
29  
30  	private TopicConnectionFactory topicConnectionFactory;
31  
32  	private Map publishers = new HashMap();
33  
34  	private Map publisherSessions = new HashMap();
35  
36  	public MantarayMessagingService(OfferingUuid offeringUuid) {
37  		try {
38  			MantaSupport.init();
39  		} catch (IOException e) {
40  			// TODO Auto-generated catch block
41  			e.printStackTrace();
42  		}
43  
44  		topicConnectionFactory = new MantaTopicConnectionFactory();
45  	}
46  
47  	public void subscribeTopic(String topicString, MessageListener listener)
48  			throws JMSException {
49  		TopicConnection connect = topicConnectionFactory
50  				.createTopicConnection();
51  		TopicSession subSession = connect.createTopicSession(false,
52  				Session.CLIENT_ACKNOWLEDGE);
53  		Topic topic = subSession.createTopic(topicString);
54  		TopicSubscriber subscriber = subSession.createSubscriber(topic);
55  		subscriber.setMessageListener(listener);
56  		// Now that setup is complete, start the Connection
57  		connect.start();
58  	}
59  
60  	public TopicSession getPublisherSession(String topicString)
61  			throws JMSException {
62  		initTopic(topicString);
63  		TopicSession pubSession = (TopicSession) publisherSessions
64  				.get(topicString);
65  		return pubSession;
66  	}
67  
68  	public TopicPublisher getPublisher(String topicString) throws JMSException {
69  		initTopic(topicString);
70  		return (TopicPublisher) publishers.get(topicString);
71  	}
72  
73  	/***
74  	 * @param topicString
75  	 * @return
76  	 * @throws JMSException
77  	 */
78  	private void initTopic(String topicString) throws JMSException {
79  		if (publishers.containsKey(topicString))
80  			return;
81  
82  		TopicSession pubSession;
83  		TopicConnection connect = topicConnectionFactory
84  				.createTopicConnection();
85  		pubSession = connect.createTopicSession(false,
86  				Session.CLIENT_ACKNOWLEDGE);
87  		Topic topic = pubSession.createTopic(topicString);
88  		TopicPublisher publisher = pubSession.createPublisher(topic);
89  		publishers.put(topicString, publisher);
90  		publisherSessions.put(topicString, pubSession);
91  		connect.start();
92  	}
93  
94  }