Слияние кода завершено, страница обновится автоматически
/**
* 会话服务器,保持长连接机制
* ps:以事件的形式来保持长连接,同一个连接如不关闭,除了第一次会发一个connect,其他只要客户端同一个socket发送心跳
* 服务器只触发data事件。
*/
var net = require('net');
var msgpack = require('./framedmsgpack.js');
var dateHelper = require('./helper/dateHelper.js');
var messageDAO = require('./models/message.js');
var deviceInfoDAO = require('./models/deviceInfo.js');
var qqwry = null;
// qqwry = require('./helper/qqwry/index.js').info(); // 纯真ip库,是否有必要开启,自行斟酌
var redisHelper = require('./helper/redisHelper.js');
redisHelper = new redisHelper();
// socket缓存队列:先进先出,不会内存泄露
var connections = require('./helper/limitablemap.js');
connections = new connections(888); // 最多支持1000个设备连接
var THRESHOLD = 86400000; // TOKEN过期阀值
// 捕捉全局异常
process.on('uncaughtException', function(err) {
console.error(err);
});
//----每个客户端socket连上服务器,服务器都会触发一个属于每个客户端自己的connect事件回调闭包-----
var server = net.createServer(function(connection) { // connection是一个socket实例,实现了readable和writeable接口
console.log('----------------------------------------------');
console.log('\nsocket.----------------------------------------------> ' + dateHelper.getNowFormatTime());
var ip = connection.remoteAddress;
if(qqwry) { // 开启
ip = qqwry.searchIP(ip); // 解析ip地址 eg:{ ip: '115.28.1.119', Country: '北京市', Area: '万网IDC机房' }
}
console.log('Connect:socket connected:%s %s %s', ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country,
ip.Area === undefined ? '' : ip.Area);
var auth = false;
var corpse = false; // 同一台设备连接丢失,重新连接后队列覆盖问题
var userInfo = null; // 一个用户一条连接:客户端传消息Object原型对象userInfo {'userName':'...', 'pushToken':'...',deviceId:'',....}
// 缓存机制
var buffers = [];
var loginTime = 0; // 登录时间
var pastDue = false; // token过期
//----心跳机制:7分钟未收到心跳促发关闭事件----
connection.setTimeout(420000, function() {
console.log('7分钟没心跳:踢掉僵尸socket...');
corpse = true;
connection.destroy(); // 关闭此次socket连接,会出发close事件
});
//----传输的字节流,在这里是字节流一节节的不能保证一次就是一组完整数据,先校验数据完整性----
connection.on('data', function(trunk) { // trunk是一个默认8k长度的Buffer
// step1:authenticate
if(!auth) {
var msg = analyseWrapper(trunk, buffers);
if(!msg.isWrap) // 不够一次完整信息,已缓存到buffer
return;
userInfo = msg.message; // json即Object对象原型
if(userInfo === null) {
connection.write(msgpack.packWithType({"result": false, "code": 403, "message": "please send {userName:'.', pushToken:'.', deviceId:'.'} by this way.", "timestamp": dateHelper.getNowFormatTime1()}, 1));
connection.destroy();
return;
}
console.log(userInfo);
// userInfo = JSON.stringify(userInfo); 转成unicode编码 --> JSON.parse(str) 反序列化
userInfo = eval("(" + userInfo + ")"); // 因为是一个字符串最外层有个"",先转换成一个标准的JSON对象
console.log('pushToken\'s key:userName=%s, devceId=%s, _pushToken', userInfo.userName, userInfo.deviceId);
console.log('pushToken\'s value: %s', userInfo.pushToken);
// 验证设备是否有token签到?
redisHelper.get(userInfo.userName + userInfo.deviceId + '_pushToken', function(error, reply) {
if(error) { // 如果错了reply为undefined,没错没有数据即error=null、reply=null
connection.write(msgpack.packWithType({"result": false, "code": 500, "message": "redis server exception.",
"timestamp": dateHelper.getNowFormatTime1()}, 1));
connection.destroy();
return;
}
if(reply === null || reply.split('@')[0] !== userInfo.pushToken) { // 验证是否通过鉴权
connection.write(msgpack.packWithType({"result": false, "code": 403, "message": "please do sth before authenticate.",
"timestamp": dateHelper.getNowFormatTime1()}, 1));
connection.destroy();
return;
}
// 保存Token时间
loginTime = Date.now();
// 验证token是否过期
if(loginTime - reply.split('@')[1].valueOf() > THRESHOLD) {
pastDue = true;
connection.write(msgpack.packWithType({"result": false, "code": 403, "message": "pushToken was pasted due, please authenticate again.", "timestamp": dateHelper.getNowFormatTime1()}, 1));
connection.destroy();
return;
}
auth = true; // 鉴权通过
connections.set(userInfo.deviceId.toString(), connection); // Hash表维护socket队列
console.log("入队 --> 标识:【%s】, 队列长度:【%s】", userInfo.deviceId, connections.keys.length);
/**
* 缓存socket会话队列;是否需要用户?同一台设备上切换用户后pushToken丢失!!每次都要鉴权
*/
redisHelper.set('socket_size', connections.keys.length, function(error, reply) { // 覆盖之前
if(error) {
console.log('redis set error [socket]... %s', error);
connection.write(msgpack.packWithType({"result": false, "code": 500, "message": " redis on server exception.",
"timestamp": dateHelper.getNowFormatTime1()}, 1));
// 释放无效socket
connection.destroy();
} else {
console.log('redis save [socket]...' + reply);
connection.write(msgpack.packWithType({result: true, code: 200, message: "device check in yet.",
host: "121.199.39.92", port: 8869, timestamp: dateHelper.getNowFormatTime1()}, 1));
/**
* step2 更新长连接设备[签到]信息
*/
var conditions = {deviceId: userInfo.deviceId},
update = {
online: true,
connectTime: new Date()
};
deviceInfoDAO.findOneAndUpdate(conditions, update, function(err, deviceInfo) {
console.log('开始更新设备[签到]信息...' + userInfo.deviceId);
if(err) {
console.error('更新签到状态失败,mongodb operate exception...\n' + err);
} else {
if (deviceInfo) {
console.log('更新签到状态成功,' + deviceInfo.deviceId + ', deviceInfo.online=' + deviceInfo.online);
} else {
deviceInfoDAO.save(userInfo, function(err) {
if(!err) {
console.log('设备签到成功,deviceInfo.deviceId=' + deviceInfo.deviceId);
}
});
}
}
});
}
/**
* step3 离线消息功能,zilla不需要,因为有Push-Server负责发送离线
*/
messageDAO.findByName({target: userInfo.deviceId, receive: false}, function(err, message) {
console.log(userInfo.deviceId + '设备' + (message ? "有" : '没有') + '离线消息');
if(!err && message) {
console.log('开始发送离线信息...');
connection.write(msgpack.packWithType({offline: true}, 1)); // 发送整个消息,等待设备回执
}
});
});
});
} else {
// step4: Token过期机制
if(Date.now() - loginTime > THRESHOLD) {
pastDue = true;
connection.write(msgpack.packWithType({"result": false, "code": 403, "message": "pushToken was pasted due,
please authenticate again.", "timestamp": dateHelper.getNowFormatTime1()}, 1));
connection.destroy();
return;
}
// step5: 接收信息,心跳
var message = analyseWrapper(trunk, buffers);
if (message.isWrap && message.message == '') { // js里面的''表示一个字节,可能其他语言的" "表示一个字节
connection.write(msgpack.packWithType('', 1)); // <Buffer 00 00 00 01 01 a0>
console.log('heart router:4 min ...');
return;
}
}
});
//----当一个错误发生时触发。 'close' 事件将直接被下列时间调用----
connection.on('error', function(error) {
connections.remove(userInfo.deviceId);
console.log('%s %s %s \'s delete socket: ', ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country, ip.Area === undefined ? '' : ip.Area);
console.log("出队 --> 标识:【%s】, 队列长度:【%s】", userInfo.deviceId, connections.keys.length);
console.log('check deleted socket isExsit? ---> ' + connections.get(userInfo.deviceId));
redisHelper.set('socket_size', connections.keys.length , function(error, reply) {
offline(userInfo.deviceId); // 更新设备[离线]状态
});
});
//----server的socket端给客户端发送Fin数据包请求关闭----
connection.on('end', function() {
// 1.同一台设备上更换用户
// 2.Token过期
// 3.应用程序退出
console.log('%s %s %s --> Fin:socket has disconnected that beceuse of client send FIN dataPackage',
ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country,
ip.Area === undefined ? '' : ip.Area);
});
//----当套接字完全关闭时该事件被分发。参数had_error是一个布尔值,表示了套接字是否因为一个传输错误而被关闭----
connection.on('close', function(had_error) { // 客户端socket端向server发送Fin请求关闭,或者手工请求关闭
console.log('%s %s %s --> Close:connection has closed is cause by error? -> %s',
ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country,
ip.Area === undefined ? '' : ip.Area, had_error);
// 鉴权服务器 删除此用户Token信息(token其实有个有效期)
if(pastDue) {
redisHelper.delete(userInfo.userName + userInfo.deviceId + '_pushToken', function(error, reply) {
console.log('delete pushToken.' + reply);
});
}
// 会话服务器 删除此次会话
if(auth && !had_error && !corpse) {
connections.remove(userInfo.deviceId);
console.log('%s %s %s \'s delete socket', ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country, ip.Area === undefined ? '' : ip.Area);
console.log("出队 --> 标识:【%s】, 队列长度:【%s】", userInfo.deviceId, connections.keys.length);
console.log('check deleted socket isExsit? ---> ' + connections.get(userInfo.deviceId));
redisHelper.set('socket_size', connections.keys.length , function(error, reply) {
offline(userInfo.deviceId); // 更新设备[离线]状态
});
} else {
console.log("标识:【%s】, 长度:【%s】", userInfo.deviceId, connections.keys.length);
}
});
// setInterval(sayHello, 10000, connection); 不要这样写!
/**
* 把耗时方法拆分,重新分配到事件队列的队尾,等待最后促发
*
process.nextTick(function() {
msgpack.getWholeWrap(trunk)
sayHello(connection);
sayImage(connection);
}); */
});
/**
* 根据固定格式分析完整包,如果是就直接返回,不是就缓存(多了就返回信息再缓存,少了直接缓存)
*/
function analyseWrapper(trunk, buffers) {
// 假如有上次遗留信息
if(buffers.length > 0) {
var chunk = buffers.pop(); // 弹出队尾
var trunks = new Buffer(chunk.length + trunk.length); // 重构buffer
chunk.copy(trunks, 0);
trunk.copy(trunks, chunk.length);
buffers.splice(0, buffers.length);// 清空Buffer缓存数组
trunk = trunks;
}
// 第一次连接,没有鉴权,但只发小于5的流,过滤
if(trunk.length < 5) {
return {isWrap: true, message: null};
}
/**
* 解析完整消息*/
var message = msgpack.getWholeWrap(trunk);
if (message.isWrap && message.buffer === null) { // 完整的信息
console.log('it is Wrap.');
// console.log(JSON.stringify(message.message) + '|' + message.type);
// console.log(message.message + '|' + message.type);
} else if(message.isWrap && message.buffer !== null) { // 一条完整信息包含下一条信息的部分内容
console.log('it is Wrap but include next.');
// console.log(message.message + '|' + message.type);
// console.log(JSON.stringify(message.message) + '|' + message.type);
buffers.push(message.buffer);
} else if(!message.isWrap && message.buffer !== null) { // 不够一条完整信息
console.log('it is not Wrap.');
buffers.push(message.buffer);
}
return message;
}
/*function sayHello(connection) {
console.log('sayHello, ' + new Date());
var o = {"type" : "message", "from": "justin", "to": "what", "content": "haha", "timestamp": new Date()}; // new Object()
var framedBuffer = msgpack.packWithType(o, 31);
connection.write(framedBuffer);
// connection.pipe(connection);
}
function sayImage(connection) {
console.log('sayHello, ' + new Date());
var o = {"type" : "message", "from": "justin", "to": "what", "content": "haha", "timestamp": new Date()};
var framedBuffer = msgpack.packWithType(o, 32);
connection.write(framedBuffer);
}
*/
//-----当一个错误发生时触发。 'close' 事件将直接被下列时间调用-----
server.on('error', function(error) {
console.log('server error: %s', error);
if (error.code == 'EADDRINUSE') {
console.error('Address in use, retrying...');
/*server.listen(6878, function(){
console.log('socket server bound again, port:6878.');
});*/
}
});
//-----当服务被关闭时触发. 注意:如果当前仍有活动连接,他个事件将等到所有连接都结束后才触发-----
server.on('close', function() {
console.log('server closed....');
// 自动重启,删除所有redis缓存Socket队列
// server.listen(6868, function() {
// console.log('socket server bound again, port:8868.');
// });
});
//######## startup #################
require('./auth.js')();
// require('./dispacher.js')();
server.listen(/*process.env.PORT ||*/ 8868, function() {
console.log('socket server bound, port:8868.');
});
/**
* [writeMsg description] 推送消息
* @param {[type]} deviceId [description]
* @param {[type]} msg [description]
* @return {[type]} [description]
*/
function writeMsg(deviceId, msg) {
var flag = true;
try {
// console.log(connections.length + ", isArray=" + (connections instanceof Array));
console.log('session服务器开始向:' + deviceId + ', 推送消息:' + msg + ', ' + (typeof deviceId));
var socket = connections.get(deviceId);
if(socket) {
socket.write(msgpack.packWithType(msg, 1));
} else {
flag = false;
}
} catch(e) {
console.error(e);
flag = false;
}
return flag;
}
/**
* [offline description] 更新设备[离线]状态
* @param {[type]} devId [description]
* @param {[type]} callbcak [description]
* @return {[type]} [description]
*/
function offline(devId, callbcak) {
console.log('开始更新设备[离线]信息...');
var conditions = {deviceId: devId},
update = {
online: false,
disconnetTime: new Date()
};
deviceInfoDAO.findOneAndUpdate(conditions, update, function(err, deviceInfo) {
if(callbcak) { // 回调给调用者处理
callbcak(err, deviceInfo);
return;
}
if(err) { // 自己处理
console.error('mongodb operate exception...\n' + err);
} else {
if (deviceInfo) {
console.log('更新设备[离线]状态成功,%s, deviceInfo.online=%s, %s',
deviceInfo.deviceId, deviceInfo.online, deviceInfo.disconnetTime);
} else {
console.log('设备离线状态更新失败: ' + deviceInfo.deviceId);
}
}
});
}
/* ############ 开启API服务器 ####################*/
var APP_ID = require('./config/config.js').APP_ID;
var APP_SECRET = require('./config/config.js').APP_SECRET;
var express = require('express');
var app = express();
// 使用中间件
app.use(express.bodyParser());
/**
* [description] 全局过滤器
* @param {[type]} req [description]
* @param {[type]} res [description]
* @param {Function} next [description]
* @return {[type]} [description]
*/
app.use(function(req, res, next) {
console.log('--------------------------------------------------------');
console.log('\n--------------------------------------------------------');
console.log('all methods captured');
var ip = req.ip;
if(qqwry)
ip = qqwry.searchIP(req.ip); // 解析ip地址 eg:{ ip: '115.28.1.119', Country: '北京市', Area: '万网IDC机房' }
console.log('Request:request coming:%s, %s, %s', ip.ip === undefined ? ip : ip.ip,
ip.Country === undefined ? '' : ip.Country,
ip.Area === undefined ? '' : ip.Area);
console.log('Request:%s %s', req.method, req.url);
console.log('Request body:');
console.log(req.body);
next();
});
/**
* [description] 推送消息
* @param {[type]} req [description]
* @param {[type]} res [description]
* @return {[type]} [description]
*/
app.post('/pushMsg', function(req, res) {
var appId = req.body.appId;
var secret = req.body.secret;
// step1 获取msg
var msg = req.body.msg;
var deviceId = req.body.deviceId;
if (APP_ID !== appId || APP_SECRET !== secret || deviceId === undefined || deviceId === '') {
console.log('401:request validation fail. please input params.');
res.status(401).json({result: false, 'message': 'request validation fail. maybe no deviceId...'});
return;
}
msg = eval("(" + msg + ")"); // 转换msg对象
// msg 做个适配功能 适配成本低消息体格式,如下
// msg: '{"id":"111","title":"公告","messageType":"MODULE","extras":{"moduleIdentifer":"com.foss.announcement"}}',
// deviceId: '00000000-4df8-d963-3b64-93f651a1e20a'
var message = {
msgId: msg.id,
msgType: msg.messageType,
title: msg.title,
target: deviceId,
content: msg.content
}
if(msg.extras) { // 存在扩展信息情况,例如推送公告
message.extras = msg.extras;
}
if(msg.isWrap) { // 完整信息标识,完整就不需要拉取
message.isWrap = msg.isWrap;
}
console.log('step1 开始保存日志, id:' + message.msgId);
// mongo更新成功状态,变色龙特有 --> message.receive = true;
messageDAO.save(message, function(err) {
if (err) {
console.error('插入message对象异常:%s', err);
res.status(500).json("插入message对象异常,请联系开发人员...");
return;
}
console.log('step2 开始发送消息 --> %s: %s, 队列长度:%s', typeof deviceId, deviceId, connections.keys.length);
var flag = false;
if (msg.isWrap) {
flag = writeMsg(deviceId, msg);
} else {
flag = writeMsg(deviceId, {id: msg.id, isWrap: msg.isWrap}); // 需要拉取完整信息
}
res.status(200).json(flag ? 200 : 304);
console.log(flag ? "发送成功..." : "发送失败,此设备未保持长连接,登录session服务器后它会去平台拉取离线消息!");
});
});
/**
* [description] 拉取完整消息,不需要回执(1推送长消息没有content内容,2短消息直接推送过去;包含content字段的即长消息!)
* @param {[type]} req [description]
* @param {[type]} res [description]
* @return {[type]} [description]
*/
app.get('/v1/message/:username/:deviceId/:pushToken', function(req, res) {
// step1 接收参数
var pushToken = req.params.pushToken; // 验证token的好处,token存在有效期
var username = req.params.username;
var deviceId = req.params.deviceId;
// 获取query
var msgId = req.query.msgId;
// var timestamp = req.query.timestamp;
if (pushToken === undefined || pushToken === '' || msgId === undefined || msgId === '') {
console.log('401:request validation fail. please input params.');
res.status(401).json({result: false, message: 'request validation fail. please input params.'});
return;
}
// step2 验证授权
redisHelper.get(username + deviceId + '_pushToken', function(error, reply) { // 已最后一次鉴权用户为主,覆盖之前
reply = reply.split('@')[0]; // 切换
console.log("pushToken=" + reply);
if(!error && reply == pushToken) {
console.log('有效pushToken:' + reply);
// step3 根据msgId持久化message回执
var conditions = {msgId: msgId},
update = {
receive: true, // 回执
receiveTime: new Date()
};
messageDAO.findOneAndUpdate(conditions, update, function(err, Message) {
if(err) {
console.error('mongodb operate exception...\n' + err);
res.status(500).json({result: false, message: 'server exception. '});
} else {
console.log('更新成功,Message.receive=' + Message.receive);
res.status(200).json({result: true, message: Message});
}
});
} else if(!error && reply != pushToken) {
console.error('pushToken validation fail...' + reply);
res.status(401).json({result: false, message: 'pushToken validation fail.'});
} else {
console.error('redis set error...' + reply);
res.status(500).json({result: false, message: ' redis net exc that maybe connection disable.try it again.'});
}
});
});
/**
* [description] 拉取所有离线消息,并且当作消息回执
* @param {[type]} req [description]
* @param {[type]} res [description]
* @return {[type]} [description]
*/
app.get('/v1/messages/:username/:deviceId/:pushToken', function(req, res) {
// step1 接收参数
var pushToken = req.params.pushToken;
var username = req.params.username;
var deviceId = req.params.deviceId;
// 获取query
// var timestamp = req.query.timestamp;
if (pushToken === undefined || pushToken === '') {
console.log('401:request validation fail. please input params.');
res.status(401).json({result: false, message: 'request validation fail. please input params.'});
return;
}
// step2 验证授权
redisHelper.get(username + deviceId + '_pushToken', function(error, reply) { // 已最后一次鉴权用户为主,覆盖之前
reply = reply.split('@')[0]; // 切换
console.log("pushToken=" + reply);
if(!error && reply == pushToken) {
console.log('有效pushToken:' + reply);
// step3 查询所有离线消息,更新回执状态
messageDAO.query({receive: false}, function(err, messages) {
console.log('开始拉取离线信息...');
if(err) {
console.error('mongodb operate exception...\n' + err);
res.status(500).json({result: false, message: 'server exception. do it again...'}); // 重新拉取
return;
} else {
console.log('拉取成功,messages.length=' + messages.length);
res.status(200).json({result: true, message: messages});
// messages.forEach(function(item, index, array) {}); {multi: true}表示批量更新
// step4 更新接收状态:这样不需要回执了,可以加强安全考虑,每个需要回执
messageDAO.update({receive: false}, {receive: true, receiveTime: Date.now()}, {multi: true}, function(err, numberAffected, message) {
// if (err) return handleError(err); // 抛出异常
console.log('开始更新离线消息状态...');
if(!err) {
console.log(' %d条离线消息被更新!', numberAffected);
// console.log('The raw response from Mongo was ', message);
}
});
}
});
} else if(!error && reply != pushToken) {
console.error('pushToken validation fail...' + reply);
res.status(401).json({result: false, message: 'pushToken validation fail.'});
} else {
console.error('redis set error...' + reply);
res.status(500).json({result: false, message: ' redis net exc that maybe connection disable.try it again.'});
}
});
});
/**
* [description] 消息回执 http://ip:port/v1/message/:appId/:secret/:msgId/:timestamp
* @param {[type]} req [description]
* @param {[type]} res [description]
* @return {[type]} [description]
*/
app.put('/v1/message/:username/:deviceId/:pushToken/:msgId/:timestamp', function(req, res) {
// step1 接收参数
var pushToken = req.params.pushToken;
var username = req.params.username;
var deviceId = req.params.deviceId;
// 获取query
var msgId = req.params.msgId;
// var timestamp = req.query.timestamp;
if (pushToken === undefined || pushToken === '' || msgId === undefined || msgId === '') {
console.log('401:request validation fail. please input params.');
res.status(401).json({result: false, message: 'request validation fail. please input params.'});
return;
}
// step2 验证授权
redisHelper.get(username + deviceId + '_pushToken', function(error, reply) { // 已最后一次鉴权用户为主,覆盖之前
reply = reply.split('@')[0]; // 切换
console.log("pushToken=" + reply);
if(!error && reply == pushToken) {
console.log('有效pushToken:' + reply);
// step3 根据msgId持久化message回执
var conditions = {msgId: msgId},
update = {
receive: true,
receiveTime: new Date()
};
messageDAO.findOneAndUpdate(conditions, update, function(err, Message) {
if(err) {
console.error('mongodb operate exception...\n' + err);
res.status(500).json({result: false, message: 'server exception. '});
} else {
console.log('更新成功,Message.receive=' + Message.receive);
res.status(200).json({result: true, message: 'has recived.'});
}
});
} else if(!error && reply != pushToken) {
console.error('pushToken validation fail...' + reply);
res.status(401).json({result: false, message: 'pushToken validation fail.'});
} else {
console.error('redis set error...' + reply);
res.status(500).json({result: false, message: ' redis net exc that maybe connection disable.try it again.'});
}
});
});
/**
* 查询接口未完成:
* 第三方查询已推送消息的详情,是否收到、回执反馈等
* 消息适配等
*/
app.listen(/*process.env.PORT ||*/8869);
console.log('Push API server bound, port:8869.');
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )