1 /***
2 * Created on Jun 29, 2005, Copyright UC Regents
3 */
4 package org.telscenter.pas.service;
5
6 import java.io.IOException;
7 import java.util.HashMap;
8 import java.util.Map;
9
10 import javax.jms.JMSException;
11 import javax.jms.MessageListener;
12 import javax.jms.Session;
13 import javax.jms.Topic;
14 import javax.jms.TopicConnection;
15 import javax.jms.TopicConnectionFactory;
16 import javax.jms.TopicPublisher;
17 import javax.jms.TopicSession;
18 import javax.jms.TopicSubscriber;
19
20 import net.sf.sail.common.messaging.jms.mantaray.MantaSupport;
21 import net.sf.sail.core.uuid.OfferingUuid;
22
23 import org.mr.api.jms.MantaTopicConnectionFactory;
24
25 /***
26 * @author turadg
27 */
28 public class MantarayMessagingService implements IMessagingService {
29
30 private TopicConnectionFactory topicConnectionFactory;
31
32 private Map publishers = new HashMap();
33
34 private Map publisherSessions = new HashMap();
35
36 public MantarayMessagingService(OfferingUuid offeringUuid) {
37 try {
38 MantaSupport.init();
39 } catch (IOException e) {
40
41 e.printStackTrace();
42 }
43
44 topicConnectionFactory = new MantaTopicConnectionFactory();
45 }
46
47 public void subscribeTopic(String topicString, MessageListener listener)
48 throws JMSException {
49 TopicConnection connect = topicConnectionFactory
50 .createTopicConnection();
51 TopicSession subSession = connect.createTopicSession(false,
52 Session.CLIENT_ACKNOWLEDGE);
53 Topic topic = subSession.createTopic(topicString);
54 TopicSubscriber subscriber = subSession.createSubscriber(topic);
55 subscriber.setMessageListener(listener);
56
57 connect.start();
58 }
59
60 public TopicSession getPublisherSession(String topicString)
61 throws JMSException {
62 initTopic(topicString);
63 TopicSession pubSession = (TopicSession) publisherSessions
64 .get(topicString);
65 return pubSession;
66 }
67
68 public TopicPublisher getPublisher(String topicString) throws JMSException {
69 initTopic(topicString);
70 return (TopicPublisher) publishers.get(topicString);
71 }
72
73 /***
74 * @param topicString
75 * @return
76 * @throws JMSException
77 */
78 private void initTopic(String topicString) throws JMSException {
79 if (publishers.containsKey(topicString))
80 return;
81
82 TopicSession pubSession;
83 TopicConnection connect = topicConnectionFactory
84 .createTopicConnection();
85 pubSession = connect.createTopicSession(false,
86 Session.CLIENT_ACKNOWLEDGE);
87 Topic topic = pubSession.createTopic(topicString);
88 TopicPublisher publisher = pubSession.createPublisher(topic);
89 publishers.put(topicString, publisher);
90 publisherSessions.put(topicString, pubSession);
91 connect.start();
92 }
93
94 }