generalize store

This commit is contained in:
Maximilian Hils
2014-12-09 18:55:16 +01:00
parent 14a8d2f5b8
commit 05bc7e8cd8
5 changed files with 259 additions and 241 deletions

View File

@@ -15,19 +15,34 @@ class WebFlowView(flow.FlowView):
def _add(self, f):
super(WebFlowView, self)._add(f)
app.ClientConnection.broadcast("add_flow", f.get_state(short=True))
app.ClientConnection.broadcast(
type="flows",
cmd="add",
data=f.get_state(short=True)
)
def _update(self, f):
super(WebFlowView, self)._update(f)
app.ClientConnection.broadcast("update_flow", f.get_state(short=True))
app.ClientConnection.broadcast(
type="flows",
cmd="update",
data=f.get_state(short=True)
)
def _remove(self, f):
super(WebFlowView, self)._remove(f)
app.ClientConnection.broadcast("remove_flow", f.get_state(short=True))
app.ClientConnection.broadcast(
type="flows",
cmd="remove",
data=f.get_state(short=True)
)
def _recalculate(self, flows):
super(WebFlowView, self)._recalculate(flows)
app.ClientConnection.broadcast("reset_flows", None)
app.ClientConnection.broadcast(
type="flows",
cmd="reset"
)
class WebState(flow.State):
@@ -120,7 +135,8 @@ class WebMaster(flow.FlowMaster):
def handle_log(self, l):
self.last_log_id += 1
app.ClientConnection.broadcast(
"add_event", {
type="add_event",
data={
"id": self.last_log_id,
"message": l.msg,
"level": l.level

View File

@@ -23,13 +23,8 @@ class WebSocketEventBroadcaster(tornado.websocket.WebSocketHandler):
self.connections.remove(self)
@classmethod
def broadcast(cls, type, data):
message = json.dumps(
{
"type": type,
"data": data
}
)
def broadcast(cls, **kwargs):
message = json.dumps(kwargs)
for conn in cls.connections:
try:
conn.write_message(message)

View File

@@ -102,10 +102,10 @@ AppDispatcher.dispatchServerAction = function (action) {
};
var ActionTypes = {
// Channel
CHANNEL_OPEN: "channel_open",
CHANNEL_CLOSE: "channel_close",
CHANNEL_ERROR: "channel_error",
// Connection
CONNECTION_OPEN: "connection_open",
CONNECTION_CLOSE: "connection_close",
CONNECTION_ERROR: "connection_error",
// Settings
UPDATE_SETTINGS: "update_settings",
@@ -123,17 +123,17 @@ var ActionTypes = {
var ConnectionActions = {
open: function () {
AppDispatcher.dispatchViewAction({
type: ActionTypes.CHANNEL_OPEN
type: ActionTypes.CONNECTION_OPEN
});
},
close: function () {
AppDispatcher.dispatchViewAction({
type: ActionTypes.CHANNEL_CLOSE
type: ActionTypes.CONNECTION_CLOSE
});
},
error: function () {
AppDispatcher.dispatchViewAction({
type: ActionTypes.CHANNEL_ERROR
type: ActionTypes.CONNECTION_ERROR
});
}
};
@@ -241,6 +241,117 @@ EventEmitter.prototype.removeListener = function (events, f) {
}.bind(this));
};
function Store() {
this._views = [];
this.reset();
}
_.extend(Store.prototype, {
add: function (elem) {
if (elem.id in this._pos_map) {
return;
}
this._pos_map[elem.id] = this._list.length;
this._list.push(elem);
for (var i = 0; i < this._views.length; i++) {
this._views[i].add(elem);
}
},
update: function (elem) {
if (!(elem.id in this._pos_map)) {
return;
}
this._list[this._pos_map[elem.id]] = elem;
for (var i = 0; i < this._views.length; i++) {
this._views[i].update(elem);
}
},
remove: function (elem_id) {
if (!(elem.id in this._pos_map)) {
return;
}
this._list.splice(this._pos_map[elem_id], 1);
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].remove(elem_id);
}
},
reset: function (elems) {
this._list = elems || [];
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].recalculate(this._list);
}
},
_build_map: function () {
this._pos_map = {};
for (var i = 0; i < this._list.length; i++) {
var elem = this._list[i];
this._pos_map[elem.id] = i;
}
},
get: function (elem_id) {
return this._list[this._pos_map[elem_id]];
}
});
function LiveStore(type) {
Store.call(this);
this.type = type;
this._updates_before_fetch = undefined;
this._fetchxhr = false;
this.handle = this.handle.bind(this);
AppDispatcher.register(this.handle);
// Avoid double-fetch on startup.
if (!(window.ws && window.ws.readyState === WebSocket.CONNECTING)) {
this.fetch();
}
}
_.extend(LiveStore.prototype, Store.prototype, {
handle: function (event) {
if (event.type === ActionTypes.CONNECTION_OPEN) {
return this.fetch();
}
if (event.type === this.type) {
if (event.cmd === "reset") {
this.fetch();
} else if (this._updates_before_fetch) {
console.log("defer update", event);
this._updates_before_fetch.push(event);
} else {
this[event.cmd](event.data);
}
}
},
close: function () {
AppDispatcher.unregister(this.handle);
},
fetch: function () {
console.log("fetch " + this.type);
if (this._fetchxhr) {
this._fetchxhr.abort();
}
this._fetchxhr = $.getJSON("/" + this.type, this.handle_fetch.bind(this));
this._updates_before_fetch = []; // (JS: empty array is true)
},
handle_fetch: function (data) {
this._fetchxhr = false;
console.log(this.type + " fetched.", this._updates_before_fetch);
this.reset(data.flows);
var updates = this._updates_before_fetch;
this._updates_before_fetch = false;
for (var i = 0; i < updates.length; i++) {
this.handle(updates[i]);
}
},
});
function _SettingsStore() {
EventEmitter.call(this);
@@ -369,117 +480,9 @@ _.extend(_EventLogStore.prototype, EventEmitter.prototype, {
var EventLogStore = new _EventLogStore();
AppDispatcher.register(EventLogStore.handle.bind(EventLogStore));
function FlowStore() {
this._views = [];
this.reset();
}
_.extend(FlowStore.prototype, {
add: function (flow) {
this._pos_map[flow.id] = this._flow_list.length;
this._flow_list.push(flow);
for (var i = 0; i < this._views.length; i++) {
this._views[i].add(flow);
}
},
update: function (flow) {
this._flow_list[this._pos_map[flow.id]] = flow;
for (var i = 0; i < this._views.length; i++) {
this._views[i].update(flow);
}
},
remove: function (flow_id) {
this._flow_list.splice(this._pos_map[flow_id], 1);
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].remove(flow_id);
}
},
reset: function (flows) {
this._flow_list = flows || [];
this._build_map();
for (var i = 0; i < this._views.length; i++) {
this._views[i].recalculate(this._flow_list);
}
},
_build_map: function () {
this._pos_map = {};
for (var i = 0; i < this._flow_list.length; i++) {
var flow = this._flow_list[i];
this._pos_map[flow.id] = i;
}
},
get: function (flow_id) {
return this._flow_list[this._pos_map[flow_id]];
}
});
function LiveFlowStore() {
FlowStore.call(this);
this.updates_before_fetch = undefined;
this._fetchxhr = false;
this.handle = this.handle.bind(this);
AppDispatcher.register(this.handle);
// Avoid double-fetch on startup.
if(!(window.ws && window.ws.readyState === WebSocket.CONNECTING)) {
this.fetch();
}
return new LiveStore("flows");
}
_.extend(LiveFlowStore.prototype, FlowStore.prototype, {
handle: function (event) {
switch (event.type) {
case ActionTypes.CHANNEL_OPEN:
case ActionTypes.RESET_FLOWS:
this.fetch();
break;
case ActionTypes.ADD_FLOW:
case ActionTypes.UPDATE_FLOW:
case ActionTypes.REMOVE_FLOW:
if (this.updates_before_fetch) {
console.log("defer update", type, data);
this.updates_before_fetch.push(event);
} else {
if(event.type === ActionTypes.ADD_FLOW){
this.add(event.data);
} else if (event.type === ActionTypes.UPDATE_FLOW){
this.update(event.data);
} else {
this.remove(event.data);
}
}
break;
}
},
close: function () {
AppDispatcher.unregister(this.handle);
},
add: function (flow) {
// Make sure that deferred adds don't add an element twice.
if (!(flow.id in this._pos_map)) {
FlowStore.prototype.add.call(this, flow);
}
},
fetch: function () {
console.log("fetch");
if (this._fetchxhr) {
this._fetchxhr.abort();
}
this._fetchxhr = $.getJSON("/flows", this.handle_fetch.bind(this));
this.updates_before_fetch = []; // (JS: empty array is true)
},
handle_fetch: function (data) {
this._fetchxhr = false;
console.log("Flows fetched.", this.updates_before_fetch);
this.reset(data.flows);
var updates = this.updates_before_fetch;
this.updates_before_fetch = false;
for (var i = 0; i < updates.length; i++) {
this.handle(updates[i]);
}
},
});
function SortByInsertionOrder() {
this.i = 0;
@@ -505,7 +508,7 @@ function FlowView(store, filt, sortfun) {
this.store = store;
this.store._views.push(this);
this.recalculate(this.store._flow_list, filt, sortfun);
this.recalculate(this.store._list, filt, sortfun);
}
_.extend(FlowView.prototype, EventEmitter.prototype, {
@@ -582,7 +585,7 @@ _.extend(FlowView.prototype, EventEmitter.prototype, {
}
}
});
function Channel(url) {
function Connection(url) {
if (url[0] === "/") {
url = location.origin.replace("http", "ws") + url;
@@ -1859,7 +1862,7 @@ var routes = (
)
);
$(function () {
window.ws = new Channel("/updates");
window.ws = new Connection("/updates");
ReactRouter.run(routes, function (Handler) {
React.render(React.createElement(Handler, null), document.body);