1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/vector-Maggie-instant-push

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
connector.js 12 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
V神 Отправлено 17.11.2014 10:35 0658ad3
/**
* 会话服务器,提供设备保持长连接服务
* ps:以事件的形式来保持长连接,同一个连接如不关闭,除了第一次会发一个connect,其他只要客户端同一个socket发送心跳 */
var msgpack = require('./helper/framedmsgpack.js');
var dateHelper = require('./helper/dateHelper.js');
var redisHelper = require('./helper/redisHelper.js');
redisHelper = new redisHelper();
// var request = require('request');
var qqwry = null;
// qqwry = require('./helper/qqwry/index.js').info(); // 纯真ip库
// socket缓存队列:先进先出,不会内存泄露
var connections = require('./helper/limitablemap.js');
connections = new connections(28888); // 最多支持28888个设备连接
var THRESHOLD = 86400000; // TOKEN过期阀值
/*function sayHello(connection) {
console.log('sayHello, ' + new Date());
var o = {"type" : "message", "from": "justin", "to": "what", "content": "haha", "timestamp": new Date()}; // new Object()
var framedBuffer = msgpack.packWithType(o, 31);
connection.write(framedBuffer);
// connection.pipe(connection);
}
function sayImage(connection) {
console.log('sayHello, ' + new Date());
var o = {"type" : "message", "from": "justin", "to": "what", "content": "haha", "timestamp": new Date()};
var framedBuffer = msgpack.packWithType(o, 32);
connection.write(framedBuffer);
}*/
/**
* 根据固定格式分析完整包,如果是就直接返回,不是就缓存(多了就返回信息再缓存,少了直接缓存)
*/
function analyseWrapper(trunk, buffers) {
// 假如有上次遗留信息
if (buffers.length > 0) {
var chunk = buffers.pop(); // 弹出队尾
var trunks = new Buffer(chunk.length + trunk.length); // 重构buffer
chunk.copy(trunks, 0);
trunk.copy(trunks, chunk.length);
buffers.splice(0, buffers.length); // 清空Buffer缓存数组
trunk = trunks;
}
// 第一次连接,没有鉴权,但只发小于5的流,过滤
if (trunk.length < 5) {
return {
isWrap: true,
message: null
}
}
/** 解析完整消息*/
var message = msgpack.getWholeWrap(trunk);
if (message.isWrap && message.buffer === null) { // 完整的信息
console.log('it is Wrap.');
// console.log(JSON.stringify(message.message) + '|' + message.type);
// console.log(message.message + '|' + message.type);
} else if (message.isWrap && message.buffer !== null) { // 一条完整信息包含下一条信息的部分内容
console.log('it is Wrap but include next.');
// console.log(message.message + '|' + message.type);
// console.log(JSON.stringify(message.message) + '|' + message.type);
buffers.push(message.buffer);
} else if (!message.isWrap && message.buffer !== null) { // 不够一条完整信息
console.log('it is not Wrap.');
buffers.push(message.buffer);
}
return message;
}
/** socket 回调
* [session description]
* @param {[type]} connection [description]
* @return {[type]} [description]
*/
function session(connection) {
console.log('----------------------------------------------');
console.log('\nsocket.----------------------------------------------> ' + dateHelper.getNowFormatTime());
var ip = connection.remoteAddress;
if (qqwry) { // 开启
ip = qqwry.searchIP(ip); // 解析ip地址 eg:{ ip: '115.28.1.119', Country: '北京市', Area: '万网IDC机房' }
}
console.log('Connect:socket connected:%s %s %s', ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country, ip.Area === undefined ? '' : ip.Area);
var auth = false;
var userInfo = null; // 一个用户一条连接:客户端传消息Object原型对象userInfo {'userName':'...', 'pushToken':'...',deviceId:'',....}
// 缓存机制
var buffers = [];
var loginTime = 0; // 登录时间
var pastDue = false; // token过期
//----心跳机制:10分钟未收到心跳促发关闭事件----
connection.setTimeout(600000, function() {
console.log('10分钟没心跳:踢掉僵尸socket...');
connection.destroy(); // 关闭此次socket连接,会出发close事件
});
//----传输的字节流,在这里是字节流一节节的不能保证一次就是一组完整数据,先校验数据完整性----
connection.on('data', function(trunk) { // trunk是一个默认8k长度的SlowBuffer(共享slab策略)
// step1:authenticate
if (!auth) {
var msg = analyseWrapper(trunk, buffers);
if (!msg.isWrap) // 不够一次完整信息,已缓存到buffer
return;
userInfo = msg.message; // json即Object对象原型
if (userInfo === null) {
connection.write(msgpack.packWithType({
"result": false,
"code": 403,
"message": "please send {userName:'.', pushToken:'.', deviceId:'.'} by this way.",
"timestamp": dateHelper.getNowFormatTime1()
}, 1));
connection.destroy();
return;
}
// userInfo = JSON.stringify(userInfo); 序列化json对象 --> JSON.parse(str) 反序列化
if (!(userInfo instanceof Object)) {
userInfo = eval("(" + userInfo + ")"); // 因为是一个字符串最外层有个"",先转换成一个标准的JSON对象
}
console.log('pushToken\'s key:userName=%s, devceId=%s, _pushToken', userInfo.userName, userInfo.deviceId);
console.log('pushToken\'s value: %s', userInfo.pushToken);
// 验证设备是否有token签到?
redisHelper.get(userInfo.userName + userInfo.deviceId + '_pushToken', function(error, reply) {
if (error) { // 如果错了reply为undefined,没错没有数据即error=null、reply=null
connection.write(msgpack.packWithType({
"result": false,
"code": 500,
"message": "redis server exception.",
"timestamp": dateHelper.getNowFormatTime1()
}, 1));
connection.destroy();
return;
}
if (reply === null || reply.split('@')[0] !== userInfo.pushToken) { // 验证token是否通过鉴权
connection.write(msgpack.packWithType({
"result": false,
"code": 403,
"message": "please do sth before authenticate.",
"timestamp": dateHelper.getNowFormatTime1()
}, 1));
connection.destroy();
return;
}
// 保存Token时间
loginTime = Date.now();
if (loginTime - reply.split('@')[1].valueOf() > THRESHOLD) { // 验证token是否过期
pastDue = true;
connection.write(msgpack.packWithType({
"result": false,
"code": 403,
"message": "pushToken was pasted due, please authenticate again.",
"timestamp": dateHelper.getNowFormatTime1()
}, 1));
connection.destroy();
return;
}
auth = true; // 鉴权通过
connections.set(userInfo.deviceId.toString(), connection); // Hash表维护socket队列
console.log("入队 --> 标识:【%s】, 队列长度:【%s】", userInfo.deviceId, connections.keys.length);
connection.write(msgpack.packWithType({
"result": true,
"code": 200,
"message": "device check in yet.",
"timestamp": dateHelper.getNowFormatTime1()
}, 1));
/** 离线消息 zilla不需要,因为有Push-Server负责发送离线 MDM不存在离线消息...*/
});
} else {
// step2: Token过期机制
if (Date.now() - loginTime > THRESHOLD) {
pastDue = true;
connection.write(msgpack.packWithType({
"result": false,
"code": 403,
"message": 'pushToken was pasted due, please authenticate again.',
'timestamp': dateHelper.getNowFormatTime1()
}, 1));
connection.destroy();
return;
}
// step3: 接收信息,心跳
var message = analyseWrapper(trunk, buffers);
if (message.isWrap && message.message == '') { // js里面的''表示一个字节,可能其他语言的" "表示一个字节
connection.write(msgpack.packWithType('', 1)); // <Buffer 00 00 00 01 01 a0>
console.log('heart router:4 min ...');
return;
}
}
});
//----当一个错误发生时触发。 'close' 事件将直接被下列时间调用----
connection.on('error', function(error) {
connections.remove(userInfo.deviceId);
console.log('%s %s %s \'s delete socket: ', ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country, ip.Area === undefined ? '' : ip.Area);
console.log("出队 --> 标识:【%s】, 队列长度:【%s】", userInfo.deviceId, connections.keys.length);
console.log('check deleted socket isExsit? ---> ' + connections.get(userInfo.deviceId));
});
//----server的socket端给客户端发送Fin数据包请求关闭----
connection.on('end', function() {
// 1.同一台设备上更换用户 2.Token过期 3.应用程序退出
console.log('%s %s %s --> Fin:socket has disconnected that beceuse of client send FIN dataPackage',
ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country,
ip.Area === undefined ? '' : ip.Area);
});
//----当套接字完全关闭时该事件被分发。参数had_error是一个布尔值,表示了套接字是否因为一个传输错误而被关闭----
connection.on('close', function(had_error) { // 客户端socket端向server发送Fin请求关闭,或者服务器手工请求关闭
console.log('%s %s %s --> Close:connection has closed that caused by error? --> %s',
ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country,
ip.Area === undefined ? '' : ip.Area, had_error);
// 删除此用户Token信息(即token过期)
if (pastDue) {
redisHelper.delete(userInfo.userName + userInfo.deviceId + '_pushToken', function(error, reply) {
console.log('delete pushToken.' + reply);
});
}
if (auth && !had_error && !connections.get(userInfo.deviceId).localAddress) {
// 两种情况:
// 1,僵尸socket被踢超时内没socket连接,要移除队列key,此时队列key对应的为僵尸socket,随便调用它方法就会触发其close事件
// 2,僵尸socket被踢超时内有socket连接,不移除队列key,此时队列key对应的value被新的socket覆盖
connections.remove(userInfo.deviceId);
console.log('%s %s %s \'s delete socket', ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country, ip.Area === undefined ? '' : ip.Area);
console.log("出队 --> 标识:【%s】, 队列长度:【%s】", userInfo.deviceId, connections.keys.length);
console.log('check deleted socket isExsit? ---> ' + connections.get(userInfo.deviceId));
// ##### mdm 子系统,标识设备签到:checkout ####
/*var checkoutUrl = 'http://115.28.0.60/mdm/api/v1/devices/' + userInfo.deviceId;
var options = {
url: checkoutUrl,
method: 'DELETE',
headers: {
'User-Agent': 'request',
'Content-type': 'application/json; charset=utf-8'
}
};
// 开始发送请求
request(options, function(error, response, body) {
if (!error && response.statusCode == 200) {
// { result: 0, desc: 'success', detail: '{"code" : 2000,"desc" : "device checkout success"}' }
var info = JSON.parse(body);
if (info.result === 0)
console.log('mdm平台设备:%s下线成功', userInfo.deviceId);
}
});*/
}
});
}
//######## commonjs interface #################
module.exports = {
connections: connections,
session: session
}
// #### 每个客户端socket连上服务器,服务器都会触发一个属于每个客户端自己的connect事件回调闭包 ####
/* var server = net.createServer(function(connection) { // connection是一个socket实例,实现了readable和writeable接口
});
当一个错误发生时触发。 'close' 事件将直接被下列时间调用。
server.on('error', function(error) {
console.log('server error: ' + error);
if (error.code == 'EADDRINUSE') {
console.log('Address in use, retrying...');
/*server.listen(6878, function(){
console.log('socket server bound again, port:6878.');
});
}
});
// 当服务被关闭时触发. 注意:如果当前仍有活动连接,他个事件将等到所有连接都结束后才触发。
server.on('close', function() {
console.log('server closed....');
// 自动重启,删除所有redis缓存Socket队列
// server.listen(6868, function() {
// console.log('socket server bound again, port:8868.');
// });
});
server.listen(process.env.PORT || 8868, function() {
console.log('socket server bound, port:8868.');
}); */

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/vector-Maggie-instant-push.git
git@api.gitlife.ru:oschina-mirror/vector-Maggie-instant-push.git
oschina-mirror
vector-Maggie-instant-push
vector-Maggie-instant-push
master