source: Dev/branches/rest-dojo-ui/client/dojox/cometd/RestChannels.js @ 256

Last change on this file since 256 was 256, checked in by hendrikvanantwerpen, 13 years ago

Reworked project structure based on REST interaction and Dojo library. As
soon as this is stable, the old jQueryUI branch can be removed (it's
kept for reference).

File size: 16.7 KB
Line 
1dojo.provide("dojox.cometd.RestChannels");
2
3dojo.require("dojox.rpc.Client");
4dojo.require("dojo._base.url");
5dojo.requireIf(dojox.data && !!dojox.data.JsonRestStore,"dojox.data.restListener");
6
7// Note that cometd _base is _not_ required, this can run standalone, but ifyou want
8// cometd functionality, you must explicitly load/require it elsewhere, and cometd._base
9// MUST be loaded prior to RestChannels ifyou use it.
10
11// summary:
12//              REST Channels - An HTTP/REST Based approach to Comet transport with full REST messaging
13//              semantics
14//              REST Channels is a efficient, reliable duplex transport for Comet
15
16// description:
17//              This can be used:
18//              1. As a cometd transport
19//              2. As an enhancement for the REST RPC service, to enable "live" data (real-time updates directly alter the data in indexes)
20//              2a. With the JsonRestStore (which is driven by the REST RPC service), so this dojo.data has real-time data. Updates can be heard through the dojo.data notification API.
21//              3. As a standalone transport. To use it as a standalone transport looks like this:
22//      |               dojox.cometd.RestChannels.open();
23//      |               dojox.cometd.RestChannels.get("/myResource",{callback:function(){
24//      |                       // this is called when the resource is first retrieved and any time the
25//      |                       // resource is changed in the future. This provides a means for retrieving a
26//      |                       // resource and subscribing to it in a single request
27//      |               });
28//      |       dojox.cometd.RestChannels.subscribe("/anotherResource",{callback:function(){
29//      |               // this is called when the resource is changed in the future
30//      |       });
31//              Channels HTTP can be configured to a different delays:
32//      |       dojox.cometd.RestChannels.defaultInstance.autoReconnectTime = 60000; // reconnect after one minute
33//
34
35(function(){
36        dojo.declare("dojox.cometd.RestChannels", null, {
37                constructor: function(options){
38                        // summary:
39                        //              Initiates the REST Channels protocol
40                        //      options:
41                        //              Keyword arguments:
42                        //      The *autoSubscribeRoot* parameter:
43                        //              When this is set, all REST service requests that have this
44                        //              prefix will be auto-subscribed. The default is '/' (all REST requests).
45                        //  The *url* parameter:
46                        //              This is the url to connect to for server-sent messages. The default
47                        //              is "/channels".
48                        //      The *autoReconnectTime* parameter:
49                        //              This is amount time to wait to reconnect with a connection is broken
50                        // The *reloadDataOnReconnect* parameter:
51                        //              This indicates whether RestChannels should re-download data when a connection
52                        //              is restored (value of true), or if it should re-subscribe with retroactive subscriptions
53                        //              (Subscribe-Since header) using HEAD requests (value of false). The
54                        //              default is true.
55                        dojo.mixin(this,options);
56                        // If we have a Rest service available and we are auto subscribing, we will augment the Rest service
57                        if(dojox.rpc.Rest && this.autoSubscribeRoot){
58                                // override the default Rest handler so we can add subscription requests
59                                var defaultGet = dojox.rpc.Rest._get;
60                                var self = this;
61                                dojox.rpc.Rest._get = function(service, id){
62                                        // when there is a REST get, we will intercept and add our own xhr handler
63                                        var defaultXhrGet = dojo.xhrGet;
64                                        dojo.xhrGet = function(r){
65                                                var autoSubscribeRoot = self.autoSubscribeRoot;
66                                                return (autoSubscribeRoot && r.url.substring(0, autoSubscribeRoot.length) == autoSubscribeRoot) ?
67                                                        self.get(r.url,r) : // auto-subscribe
68                                                        defaultXhrGet(r); // plain XHR request
69                                        };
70
71                                        var result = defaultGet.apply(this,arguments);
72                                        dojo.xhrGet = defaultXhrGet;
73                                        return result;
74                                };
75                        }
76                },
77                absoluteUrl: function(baseUrl,relativeUrl){
78                        return new dojo._Url(baseUrl,relativeUrl)+'';
79                },
80                acceptType: "application/rest+json,application/http;q=0.9,*/*;q=0.7",
81                subscriptions: {},
82                subCallbacks: {},
83                autoReconnectTime: 3000,
84                reloadDataOnReconnect: true,
85                sendAsJson: false,
86                url: '/channels',
87                autoSubscribeRoot: '/',
88                open: function(){
89                        // summary:
90                        //              Startup the transport (connect to the "channels" resource to receive updates from the server).
91                        //
92                        // description:
93                        //              Note that if there is no connection open, this is automatically called when you do a subscription,
94                        //              it is often not necessary to call this
95                        //
96                        this.started = true;
97                        if(!this.connected){
98                                this.connectionId = dojox.rpc.Client.clientId;
99                                var clientIdHeader = this.createdClientId ? 'Client-Id' : 'Create-Client-Id';
100                                this.createdClientId = true;
101                                var headers = {Accept:this.acceptType};
102                                headers[clientIdHeader] = this.connectionId;
103                                var dfd = dojo.xhrPost({headers:headers, url: this.url, noStatus: true});
104                                var self = this;
105                                this.lastIndex = 0;
106                                var onerror, onprogress = function(data){ // get all the possible event handlers
107                                        if(typeof dojo == 'undefined'){
108                                                return null;// this can be called after dojo is unloaded, just do nothing in that case
109                                        }
110                                        if(xhr && xhr.status > 400){
111                                                return onerror(true);
112                                        }
113                                        if(typeof data == 'string'){
114                                                data = data.substring(self.lastIndex);
115                                        }
116                                        var contentType = xhr && (xhr.contentType || xhr.getResponseHeader("Content-Type")) || (typeof data != 'string' && "already json");
117                                        var error = self.onprogress(xhr,data,contentType);
118                                        if(error){
119                                                if(onerror()){
120                                                        return new Error(error);
121                                                }
122                                        }
123                                        if(!xhr || xhr.readyState==4){
124                                                xhr = null;
125                                                if(self.connected){
126                                                        self.connected = false;
127                                                        self.open();
128                                                }
129                                        }
130                                        return data;
131                                };
132                                onerror = function(error){
133                                        if(xhr && xhr.status == 409){
134                                                // a 409 indicates that there is a multiple connections, and we need to poll
135                                                console.log("multiple tabs/windows open, polling");
136                                                self.disconnected();
137                                                return null;
138                                        }
139                                        self.createdClientId = false;
140                                        self.disconnected();
141                                        return error;
142                                };
143                                dfd.addCallbacks(onprogress,onerror);
144                                var xhr = dfd.ioArgs.xhr; // this may not exist if we are not using XHR, but an alternate XHR plugin
145                                if(xhr){
146                                        // if we are doing a monitorable XHR, we want to listen to streaming events
147                                        xhr.onreadystatechange = function(){
148                                                var responseText;
149                                                try{
150                                                        if(xhr.readyState == 3){// only for progress, the deferred object will handle the finished responses
151                                                                self.readyState = 3;
152                                                                responseText = xhr.responseText;
153                                                        }
154                                                } catch(e){
155                                                }
156                                                if(typeof responseText=='string'){
157                                                        onprogress(responseText);
158                                                }
159                                        }
160                                }
161
162
163                                if(window.attachEvent){// IE needs a little help with cleanup
164                                        window.attachEvent("onunload",function(){
165                                                self.connected= false;
166                                                if(xhr){
167                                                        xhr.abort();
168                                                }
169                                        });
170                                }
171
172                                this.connected = true;
173                        }
174                },
175                _send: function(method,args,data){
176                        // fire an XHR with appropriate modification for JSON handling
177                        if(this.sendAsJson){
178                                // send use JSON Messaging
179                                args.postData = dojo.toJson({
180                                        target:args.url,
181                                        method:method,
182                                        content: data,
183                                        params:args.content,
184                                        subscribe:args.headers["Subscribe"]
185                                });
186                                args.url = this.url;
187                                method = "POST";
188                        }else{
189                                args.postData = dojo.toJson(data);
190                        }
191                        return dojo.xhr(method,args,args.postData);
192                },
193                subscribe: function(/*String*/channel, /*dojo.__XhrArgs?*/args){
194                        // summary:
195                        //              Subscribes to a channel/uri, and returns a dojo.Deferred object for the response from
196                        //              the subscription request
197                        //
198                        // channel:
199                        //              the uri for the resource you want to monitor
200                        //
201                        // args:
202                        //              See dojo.xhr
203                        //
204                        // headers:
205                        //              These are the headers to be applied to the channel subscription request
206                        //
207                        // callback:
208                        //              This will be called when a event occurs for the channel
209                        //              The callback will be called with a single argument:
210                        //      |       callback(message)
211                        //              where message is an object that follows the XHR API:
212                        //              status : Http status
213                        //              statusText : Http status text
214                        //              getAllResponseHeaders() : The response headers
215                        //              getResponseHeaders(headerName) : Retrieve a header by name
216                        //              responseText : The response body as text
217                        //                      with the following additional Bayeux properties
218                        //              data : The response body as JSON
219                        //              channel : The channel/url of the response
220                        args = args || {};
221                        args.url = this.absoluteUrl(this.url, channel);
222                        if(args.headers){
223                                // FIXME: combining Ranges with notifications is very complicated, we will save that for a future version
224                                delete args.headers.Range;
225                        }
226                        var oldSince = this.subscriptions[channel];
227                        var method = args.method || "HEAD"; // HEAD is the default for a subscription
228                        var since = args.since;
229                        var callback = args.callback;
230                        var headers = args.headers || (args.headers = {});
231                        this.subscriptions[channel] = since || oldSince || 0;
232                        var oldCallback = this.subCallbacks[channel];
233                        if(callback){
234                                this.subCallbacks[channel] = oldCallback ? function(m){
235                                        oldCallback(m);
236                                        callback(m);
237                                } : callback;
238                        }
239                        if(!this.connected){
240                                this.open();
241                        }
242                        if(oldSince === undefined || oldSince != since){
243                                headers["Cache-Control"] = "max-age=0";
244                                since = typeof since == 'number' ? new Date(since).toUTCString() : since;
245                                if(since){
246                                        headers["Subscribe-Since"] = since;
247                                }
248                                headers["Subscribe"] = args.unsubscribe ? 'none' : '*';
249                                var dfd = this._send(method,args);
250
251                                var self = this;
252                                dfd.addBoth(function(result){
253                                        var xhr = dfd.ioArgs.xhr;
254                                        if(!(result instanceof Error)){
255                                                if(args.confirmation){
256                                                        args.confirmation();
257                                                }
258                                        }
259                                        if(xhr && xhr.getResponseHeader("Subscribed")  == "OK"){
260                                                var lastMod = xhr.getResponseHeader('Last-Modified');
261
262                                                if(xhr.responseText){
263                                                        self.subscriptions[channel] = lastMod || new Date().toUTCString();
264                                                }else{
265                                                        return null; // don't process the response, the response will be received in the main channels response
266                                                }
267                                        }else if(xhr && !(result instanceof Error)){ // if the server response was successful and we have access to headers but it does indicate a subcription was successful, that means it is did not accept the subscription
268                                                delete self.subscriptions[channel];
269                                        }
270                                        if(!(result instanceof Error)){
271                                                var message = {
272                                                        responseText:xhr && xhr.responseText,
273                                                        channel:channel,
274                                                        getResponseHeader:function(name){
275                                                                return xhr.getResponseHeader(name);
276                                                        },
277                                                        getAllResponseHeaders:function(){
278                                                                return xhr.getAllResponseHeaders();
279                                                        },
280                                                        result: result
281                                                };
282                                                if(self.subCallbacks[channel]){
283                                                        self.subCallbacks[channel](message); // call with the fake xhr object
284                                                }
285                                        }else{
286                                                if(self.subCallbacks[channel]){
287                                                        self.subCallbacks[channel](xhr); // call with the actual xhr object
288                                                }
289                                        }
290                                        return result;
291                                });
292                                return dfd;
293                        }
294                        return null;
295                },
296                publish: function(channel,data){
297                        // summary:
298                        //              Publish an event.
299                        // description:
300                        //              This does a simple POST operation to the provided URL,
301                        //              POST is the semantic equivalent of publishing a message within REST/Channels
302                        // channel:
303                        //              Channel/resource path to publish to
304                        // data:
305                        //              data to publish
306                        return this._send("POST",{url:channel,contentType : 'application/json'},data);
307                },
308                _processMessage: function(message){
309                        message.event = message.event || message.getResponseHeader('Event');
310                        if(message.event=="connection-conflict"){
311                                return "conflict"; // indicate an error
312                        }
313                        try{
314                                message.result = message.result || dojo.fromJson(message.responseText);
315                        }
316                        catch(e){}
317                        var self = this;
318                        var loc = message.channel = new dojo._Url(this.url, message.source || message.getResponseHeader('Content-Location'))+'';//for cometd
319                        if(loc in this.subscriptions && message.getResponseHeader){
320                                this.subscriptions[loc] = message.getResponseHeader('Last-Modified');
321                        }
322                        if(this.subCallbacks[loc]){
323                                setTimeout(function(){ //give it it's own stack
324                                        self.subCallbacks[loc](message);
325                                },0);
326                        }
327                        this.receive(message);
328                        return null;
329                },
330                onprogress: function(xhr,data,contentType){
331                        // internal XHR progress handler
332                        if(!contentType || contentType.match(/application\/rest\+json/)){
333                                var size = data.length;
334                                data = data.replace(/^\s*[,\[]?/,'['). // must start with a opening bracket
335                                        replace(/[,\]]?\s*$/,']'); // and end with a closing bracket
336                                try{
337                                        // if this fails, it probably means we have an incomplete JSON object
338                                        var xhrs = dojo.fromJson(data);
339                                        this.lastIndex += size;
340                                }
341                                catch(e){
342                                }
343                        }else if(dojox.io && dojox.io.httpParse && contentType.match(/application\/http/)){
344                                // do HTTP tunnel parsing
345                                var topHeaders = '';
346                                if(xhr && xhr.getAllResponseHeaders){
347                                        // mixin/inherit headers from the container response
348                                        topHeaders = xhr.getAllResponseHeaders();
349                                }
350                                xhrs = dojox.io.httpParse(data,topHeaders,xhr.readyState != 4);
351                        }else if(typeof data == "object"){
352                                xhrs = data;
353                        }
354                        if(xhrs){
355                                for(var i = 0;i < xhrs.length;i++){
356                                        if(this._processMessage(xhrs[i])){
357                                                return "conflict";
358                                        }
359                                }
360                                return null;
361                        }
362                        if(!xhr){
363                                //no streaming and we didn't get any message, must be an error
364                                return "error";
365                        }
366                        if(xhr.readyState != 4){ // we only want finished responses here if we are not streaming
367                                return null;
368                        }
369                        if(xhr.__proto__){// firefox uses this property, so we create an instance to shadow this property
370                                xhr = {channel:"channel",__proto__:xhr};
371                        }
372                        return this._processMessage(xhr);
373
374                },
375
376                get: function(/*String*/channel, /*dojo.__XhrArgs?*/args){
377                        // summary:
378                        //              GET the initial value of the resource and subscribe to it
379                        //              See subscribe for parameter values
380                        (args = args || {}).method = "GET";
381                        return this.subscribe(channel,args);
382                },
383                receive: function(message){
384                        // summary:
385                        //              Called when a message is received from the server
386                        //      message:
387                        //              A cometd/XHR message
388                        if(dojox.data && dojox.data.restListener){
389                                dojox.data.restListener(message);
390                        }
391                },
392                disconnected: function(){
393                        // summary:
394                        //              called when our channel gets disconnected
395                        var self = this;
396                        if(this.connected){
397                                this.connected = false;
398                                if(this.started){ // if we are started, we shall try to reconnect
399                                        setTimeout(function(){ // auto reconnect
400                                                // resubscribe to our current subscriptions
401                                                var subscriptions = self.subscriptions;
402                                                self.subscriptions = {};
403                                                for(var i in subscriptions){
404                                                        if(self.reloadDataOnReconnect && dojox.rpc.JsonRest){
405                                                                // do a reload of the resource
406                                                                delete dojox.rpc.Rest._index[i];
407                                                                dojox.rpc.JsonRest.fetch(i);
408                                                        }else{
409                                                                self.subscribe(i,{since:subscriptions[i]});
410                                                        }
411                                                }
412                                                self.open();
413                                        }, this.autoReconnectTime);
414                                }
415                        }
416                },
417                unsubscribe: function(/*String*/channel, /*dojo.__XhrArgs?*/args){
418                        // summary:
419                        //              unsubscribes from the resource
420                        //              See subscribe for parameter values
421
422                        args = args || {};
423                        args.unsubscribe = true;
424                        this.subscribe(channel,args); // change the time frame to after 5000AD
425                },
426                disconnect: function(){
427                        // summary:
428                        //              disconnect from the server
429                        this.started = false;
430                        this.xhr.abort();
431                }
432        });
433        var Channels = dojox.cometd.RestChannels.defaultInstance = new dojox.cometd.RestChannels();
434        if(dojox.cometd.connectionTypes){
435                // register as a dojox.cometd transport and wire everything for cometd handling
436                // below are the necessary adaptions for cometd
437                Channels.startup = function(data){ // must be able to handle objects or strings
438                        Channels.open();
439                        this._cometd._deliver({channel:"/meta/connect",successful:true}); // tell cometd we are connected so it can proceed to send subscriptions, even though we aren't yet
440
441                };
442                Channels.check = function(types, version, xdomain){
443                        for(var i = 0; i< types.length; i++){
444                                if(types[i] == "rest-channels"){
445                                        return !xdomain;
446                                }
447                        }
448                        return false;
449                };
450                Channels.deliver = function(message){
451                        // nothing to do
452                };
453                dojo.connect(this,"receive",null,function(message){
454                        message.data = message.result;
455                        this._cometd._deliver(message);
456                });
457                Channels.sendMessages = function(messages){
458                        for(var i = 0; i < messages.length; i++){
459                                var message = messages[i];
460                                var channel = message.channel;
461                                var cometd = this._cometd;
462                                var args = {
463                                        confirmation: function(){ // send a confirmation back to cometd
464                                                cometd._deliver({channel:channel,successful:true});
465                                        }
466                                };
467                                if(channel == '/meta/subscribe'){
468                                        this.subscribe(message.subscription,args);
469                                }else if(channel == '/meta/unsubscribe'){
470                                        this.unsubscribe(message.subscription,args);
471                                }else if(channel == '/meta/connect'){
472                                        args.confirmation();
473                                }else if(channel == '/meta/disconnect'){
474                                        Channels.disconnect();
475                                        args.confirmation();
476                                }else if(channel.substring(0,6) != '/meta/'){
477                                        this.publish(channel,message.data);
478                                }
479                        }
480                };
481                dojox.cometd.connectionTypes.register("rest-channels", Channels.check, Channels,false,true);
482        }
483})();
Note: See TracBrowser for help on using the repository browser.