fixed bug that removed session index information unintentionally

This commit is contained in:
Ritchie Martori
2012-10-23 12:39:35 -07:00
parent cbffa5955e
commit 768165c09f
2 changed files with 76 additions and 58 deletions

View File

@@ -52,6 +52,10 @@ Cluster.prototype.emitToAll = function (ev, data) {
});
}
Cluster.prototype.emitToUsers = function (users, ev, data) {
}
Cluster.prototype.emitToServer = function(host, ev, data) {
var body = {
event: ev,
@@ -67,8 +71,15 @@ Cluster.prototype.emitToServer = function(host, ev, data) {
}
Cluster.prototype.handleProxy = function (req, res) {
if(req.body && req.body.event) {
process.server.sockets.emit(req.body.event, req.body.data);
var cmd = req.body
, sessions = process.server.sessions;
if(cmd.sessions) {
// only emit to sockets that match this query
sessions.emitToSessions(cmd.sessions, cmd.event, cmd.data);
} else {
// emit to all
sessions.emitToAll(req.body.event, req.body.data);
}
res.end();

View File

@@ -65,6 +65,8 @@ SessionStore.prototype.createSession = function(sid, fn) {
sid = undefined;
}
if(sid) {
debug('building existing %s', sid);
this.find({id: sid}, function(err, s) {
if(err) return fn(err);
if(!s) {
@@ -77,11 +79,14 @@ SessionStore.prototype.createSession = function(sid, fn) {
// index sessions by user
if(s && s.uid) {
userSessionIndex[s.uid] = sess;
} else {
debug('not indexing user session', s);
}
fn(err, sess);
});
} else {
sid = this.createUniqueIdentifier();
debug('creating new %s', sid);
var sess = sessionIndex[sid] = new Session({id: sid}, this, socketIndex, store.sockets);
fn(null, sess);
this.insert({id: sid}, function(err, s) {
@@ -90,6 +95,36 @@ SessionStore.prototype.createSession = function(sid, fn) {
}
};
/**
* Emit data to all the sockets listening to the given event.
*
* @param {String} ev
* @param {Object} data
*/
SessionStore.prototype.emitToAll = function (ev, data) {
this.sockets.emit(ev, data);
};
/**
* Emit data to all the sockets bound to the given sessions.
*
* @param {Array} sessions
* @param {String} event
* @param {Object} data
*/
SessionStore.prototype.emitToSessions = function (sessions, ev, data) {
sessions.forEach(function (sid) {
var session = sessionIndex[sid]
, socket = session && session.socket;
if(socket) {
socket.emit(ev, data);
}
});
};
/**
* An in memory representation of a client or user connection that can be saved to disk.
* Data will be passed around via a `Context` to resources.
@@ -106,91 +141,61 @@ SessionStore.prototype.createSession = function(sid, fn) {
*/
function Session(data, store, sockets, rawSockets) {
<<<<<<< HEAD
var sess = this;
=======
var sid;
>>>>>>> 929285c8b89118aa505a218b939cec34d431e38e
this.data = data;
if(data && data.id) this.sid = sid = data.id;
this.store = store;
// create faux socket, to queue any events until
// a real socket is available
var socketWrapper = this.socket = {
on: function () {
var s = sockets[sid];
// if we have a real socket, use it
if(s) {
s.on.apply(s, arguments);
} else {
// otherwise add to bind queue
var queue = this._bindQueue = this._bindQueue || [];
queue.push(arguments);
}
},
emit: function (ev) {
var s = sockets[sid];
// if we have a real socket, use it
if(s) {
s.emit.apply(s, arguments);
} else {
// otherwise add to bind queue
var queue = this._emitQueue = this._bindQueue || [];
queue.push(arguments);
}
}
};
this.emitToUsers = function(collection, query, event, data) {
collection.get(query, function(users) {
var userSession;
var notConnectedUsers = [];
if(users && users.id) {
<<<<<<< HEAD
userSession = userSessionIndex[err.id];
=======
userSession = userSessionIndex[users.id];
>>>>>>> 929285c8b89118aa505a218b939cec34d431e38e
if(userSession && userSession.socket) {
userSession.socket.emit(event, data);
}
return;
var u = users;
users = [u];
}
debug('emit to users:');
debug(' query: %j', query);
debug(' event: %s', event);
debug(' data: %j', data);
// process.server.cluster.emitToUsers(users, event, data);
users.forEach(function(u) {
userSession = userSessionIndex[u.id];
// emit to sessions online
if(userSession && userSession.socket) {
debug('using a connected socket');
userSession.socket.emit(event, data);
} else {
debug('user is offline:', u, userSession);
debug('all user sessions:', userSessionIndex);
notConnectedUsers.push(u.id);
}
});
debug('not connected users: %j', notConnectedUsers);
});
};
this.emitToAll = function(ev, data) {
// emit to other servers in the cluster
process.server.cluster.emitToAll(ev, data);
rawSockets.emit.apply(rawSockets, arguments);
// emit to the current server
store.emitToAll(ev, data);
};
// resolve queue once a socket is ready
store.socketQueue.once(this.sid, function (socket) {
address(function (err, add) {
debug('setting host %s for %s', add, sess.id);
sess.set({host: add + ':' + process.server.options.port}).save();
});
// drain bind queue
if(socketWrapper._bindQueue && socketWrapper._bindQueue.length) {
socketWrapper._bindQueue.forEach(function (args) {
socket.on.apply(socket, args);
});
}
// drain emit queue
if(socketWrapper._emitQueue && socketWrapper._emitQueue.length) {
socketWrapper._emitQueue.forEach(function (args) {
socket.emit.apply(socket, args);
});
}
debug('bound socket to', sess);
sess.socket = socket;
});
}
@@ -223,7 +228,9 @@ Session.prototype.save = function(fn) {
, data = this.data
, query = {id: data.id};
session.remove(function (err) {
debug('saving %s', data.id);
session.store.remove({id: data.id}, function (err) {
if(err) return fn(err);
session.store.insert(data, function (err, res) {
fn && fn(err, res);
@@ -257,7 +264,7 @@ Session.prototype.fetch = function(fn) {
Session.prototype.remove = function(fn) {
var session = this;
debug('Removing %s', this.data.id);
debug('removing %s', this.data.id);
delete sessionIndex[this.data.id];
delete userSessionIndex[this.data.uid]; // TODO: Don't delete all of a user's sessions