xinyb
2024-09-19 18ffbca9acaccd5099a7a63652f52210f59a7e40
提交 | 用户 | age
a6a76f 1 /**
F 2  *
3  * Licensed to the Apache Software Foundation (ASF) under one or more
4  * contributor license agreements.  See the NOTICE file distributed with
5  * this work for additional information regarding copyright ownership.
6  * The ASF licenses this file to You under the Apache License, Version 2.0
7  * (the "License"); you may not use this file except in compliance with
8  * the License.  You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 // AMQ Ajax handler
20 // This class provides the main API for using the Ajax features of AMQ. It
21 // allows JMS messages to be sent and received from javascript when used
22 // with the org.apache.activemq.web.MessageListenerServlet.
23 //
24 // This version of the file provides an adapter interface for the jquery library
25 // and a namespace for the Javascript file, private/public variables and
26 // methods, and other scripting niceties. -- jim cook 2007/08/28
27
28 var org = org || {};
29 org.activemq = org.activemq || {};
30
31 org.activemq.Amq = function() {
32     var connectStatusHandler;
33
34     // Just a shortcut to eliminate some redundant typing.
35     var adapter = org.activemq.AmqAdapter;
36
37     if (typeof adapter == 'undefined') {
38         throw 'An org.activemq.AmqAdapter must be declared before the amq.js script file.'
39     }
40
41     // The URI of the AjaxServlet.
42     var uri;
43
44     // The number of seconds that the long-polling socket will stay connected.
45     // Best to keep this to a value less than one minute.
46     var timeout;
47
48     // Poll delay. if set to positive integer, this is the time to wait in ms
49     // before sending the next poll after the last completes.
50     var pollDelay;
51
52     // Inidicates whether logging is active or not. Not by default.
53     var logging = false;
54
55     // 5 second delay if an error occurs during poll. This could be due to
56     // server capacity problems or a timeout condition.
57     var pollErrorDelay = 5000;
58
59     // Map of handlers that will respond to message receipts. The id used during
60     // addListener(id, destination, handler) is used to key the callback
61     // handler.  
62     var messageHandlers = {};
63
64     // Indicates whether an AJAX post call is in progress.
65     var batchInProgress = false;
66
67     // A collection of pending messages that accumulate when an AJAX call is in
68     // progress. These messages will be delivered as soon as the current call
69     // completes. The array contains objects in the format { destination,
70     // message, messageType }.
71     var messageQueue = [];
72
73   // String to distinguish this client from others sharing the same session.
74   // This can occur when multiple browser windows or tabs using amq.js simultaneously.
75   // All windows share the same JESSIONID, but need to consume messages independently.
76   var clientId = null;
77   
78     /**
79      * Iterate over the returned XML and for each message in the response, 
80      * invoke the handler with the matching id.
81      */
82     var messageHandler = function(data) {
83         var response = data.getElementsByTagName("ajax-response");
84         if (response != null && response.length == 1) {
85             connectStatusHandler(true);
86             var responses = response[0].childNodes;    // <response>
87             for (var i = 0; i < responses.length; i++) {
88                 var responseElement = responses[i];
89
90                 // only process nodes of type element.....
91                 if (responseElement.nodeType != 1) continue;
92
93                 var id = responseElement.getAttribute('id');
94
95                 var handler = messageHandlers[id];
96
97                 if (logging && handler == null) {
98                     adapter.log('No handler found to match message with id = ' + id);
99                     continue;
100                 }
101
102                 // Loop thru and handle each <message>
103                 for (var j = 0; j < responseElement.childNodes.length; j++) {
104                     handler(responseElement.childNodes[j]);
105                 }
106             }
107         }
108     };
109
110     var errorHandler = function(xhr, status, ex) {
111         connectStatusHandler(false);
112         if (logging) adapter.log('Error occurred in ajax call. HTTP result: ' +
113                                  xhr.status + ', status: ' + status);
114     }
115
116     var pollErrorHandler = function(xhr, status, ex) {
117         connectStatusHandler(false);
118         if (status === 'error' && xhr.status === 0) {
119             if (logging) adapter.log('Server connection dropped.');
120             setTimeout(function() { sendPoll(); }, pollErrorDelay);
121             return;
122         }
123         if (logging) adapter.log('Error occurred in poll. HTTP result: ' +
124                                  xhr.status + ', status: ' + status);
125         setTimeout(function() { sendPoll(); }, pollErrorDelay);
126     }
127
128     var pollHandler = function(data) {
129         try {
130             messageHandler(data);
131         } catch(e) {
132             if (logging) adapter.log('Exception in the poll handler: ' + data, e);
133             throw(e);
134         } finally {
135             setTimeout(sendPoll, pollDelay);
136         }
137     };
138
139     var sendPoll = function() {
140         // Workaround IE6 bug where it caches the response
141         // Generate a unique query string with date and random
142         var now = new Date();
143         var data = 'timeout=' + timeout * 1000
144                  + '&d=' + now.getTime()
145                  + '&r=' + Math.random();
146         var options = { method: 'get',
147             data: addClientId( data ),
148             success: pollHandler,
149             error: pollErrorHandler};
150         adapter.ajax(uri, options);
151     };
152
153     var sendJmsMessage = function(destination, message, type, headers) {
154         var message = {
155             destination: destination,
156             message: message,
157             messageType: type
158         };
159         // Add message to outbound queue
160         if (batchInProgress) {
161             messageQueue[messageQueue.length] = {message:message, headers:headers};
162         } else {
163             org.activemq.Amq.startBatch();
164             adapter.ajax(uri, { method: 'post',
165                 data: addClientId( buildParams( [message] ) ),
166                 error: errorHandler,
167                 headers: headers,
168                 success: org.activemq.Amq.endBatch});
169         }
170     };
171
172     var buildParams = function(msgs) {
173         var s = [];
174         for (var i = 0, c = msgs.length; i < c; i++) {
175             if (i != 0) s[s.length] = '&';
176             s[s.length] = ((i == 0) ? 'destination' : 'd' + i);
177             s[s.length] = '=';
178             s[s.length] = msgs[i].destination;
179             s[s.length] = ((i == 0) ? '&message' : '&m' + i);
180             s[s.length] = '=';
181             s[s.length] = msgs[i].message;
182             s[s.length] = ((i == 0) ? '&type' : '&t' + i);
183             s[s.length] = '=';
184             s[s.length] = msgs[i].messageType;
185         }
186         return s.join('');
187     }
188     
189     // add clientId to data if it exists, before passing data to ajax connection adapter.
190     var addClientId = function( data ) {
191         var output = data || '';
192         if( clientId ) {
193             if( output.length > 0 ) {
194                 output += '&';
195             }
196             output += 'clientId='+clientId;
197         }
198         return output;
199     }
200
201     return {
202         // optional clientId can be supplied to allow multiple clients (browser windows) within the same session.
203         init : function(options) {
204             connectStatusHandler = options.connectStatusHandler || function(connected){};
205             uri = options.uri || '/amq';
206             pollDelay = typeof options.pollDelay == 'number' ? options.pollDelay : 0;
207             timeout = typeof options.timeout == 'number' ? options.timeout : 25;
208             logging = options.logging;
209             clientId = options.clientId;
210             adapter.init(options);
211             sendPoll();
212             
213         },
214             
215         startBatch : function() {
216             batchInProgress = true;
217         },
218
219         endBatch : function() {
220             if (messageQueue.length > 0) {
221                 var messagesToSend = [];
222                 var messagesToQueue = [];
223                 var outgoingHeaders = null;
224                 
225                 // we need to ensure that messages which set headers are sent by themselves.
226                 // if 2 'listen' messages were sent together, and a 'selector' header were added to one of them,
227                 // AMQ would add the selector to both 'listen' commands.
228                 for(i=0;i<messageQueue.length;i++) {
229                     // a message with headers should always be sent by itself.    if other messages have been added, send this one later.
230                     if ( messageQueue[ i ].headers && messagesToSend.length == 0 ) {
231                         messagesToSend[ messagesToSend.length ] = messageQueue[ i ].message;
232                         outgoingHeaders = messageQueue[ i ].headers;
233                     } else if ( ! messageQueue[ i ].headers && ! outgoingHeaders ) {
234                         messagesToSend[ messagesToSend.length ] = messageQueue[ i ].message;
235                     } else {
236                         messagesToQueue[ messagesToQueue.length ] = messageQueue[ i ];
237                     }
238                 }
239                 var body = buildParams(messagesToSend);
240                 messageQueue = messagesToQueue;
241                 org.activemq.Amq.startBatch();
242                 adapter.ajax(uri, {
243                     method: 'post',
244                     headers: outgoingHeaders,
245                     data: addClientId( body ),
246                     success: org.activemq.Amq.endBatch, 
247                     error: errorHandler});
248             } else {
249                 batchInProgress = false;
250             }
251         },
252
253         // Send a JMS message to a destination (eg topic://MY.TOPIC).  Message
254         // should be xml or encoded xml content.
255         sendMessage : function(destination, message) {
256             sendJmsMessage(destination, message, 'send');
257         },
258
259         // Listen on a channel or topic.
260         // handler must be a function taking a message argument
261         //
262         // Supported options:
263         //  selector: If supplied, it should be a SQL92 string like "property-name='value'"
264         //            http://activemq.apache.org/selectors.html
265         //
266         // Example: addListener( 'handler', 'topic://test-topic', function(msg) { return msg; }, { selector: "property-name='property-value'" } )
267         addListener : function(id, destination, handler, options) {
268             messageHandlers[id] = handler;
269             var headers = options && options.selector ? {selector:options.selector} : null;
270             sendJmsMessage(destination, id, 'listen', headers);
271         },
272
273         // remove Listener from channel or topic.
274         removeListener : function(id, destination) {
275             messageHandlers[id] = null;
276             sendJmsMessage(destination, id, 'unlisten');
277         },
278         
279         // for unit testing
280         getMessageQueue: function() {
281             return messageQueue;
282         },
283         testPollHandler: function( data ) {
284             return pollHandler( data );
285         }
286     };
287 }();