socket.js 9.63 KB
var net = require('net'),
    redis = require('redis'),
    _ = require('lodash'),
    async = require('async'),
    index = require('../index.js'),
    readerConfig = require('./readerConfig.js'),
    WebSocketServer = require('ws').Server;

var r = redis.createClient();
// 清除redis内存数据库数据
r.keys('*', function(err, keys){
    keys.forEach(function (key, pos)
    {
        r.del(key, function(err, o)
        {
            if (err)
            {
                console.error('没有' + key);
            }
            if (pos === (keys.length - 1))
            {
                console.log('redis内存数据清理成功');
            }
        });
    });
});
r.on('error', function(err)
{
    console.log('Redis连接错误:'+err);
    process.exit(1);
});

/**
 * socket服务创建模块
 */
function main()
{
    // 读取配置文件
    var host = '172.16.13.95',
        port = 12345;

    connectAllReaders();

    var wss = new WebSocketServer({ host: host, port: port }, function(){
        console.log('Socket创建成功在%s:%d', host, port);
    });
    wss.on('connection', function(ws) {
        var remoteIp = ws._socket.remoteAddress;
        console.log('远程主机%s连接成功', remoteIp);
        ws.on('message', function(data) {
            var tags = [];
            // 处理传过来的数据
            var params = getCommand(data);
            r.hget(params.groupId, params.state, function(err, epcs)
            {
                if (err) {
                    console.error(err);
                } else {
                    try
                    {
                        epcs = JSON.parse(epcs);
                        _.forEach(epcs, function(tagData){
                            if(tagData && getMillTimeDiffNow(tagData.updateTime) < 30000)
                            {
                                console.log('groupId:%s ------- %s', tagData.GroupID, tagData.EPC);
                                tags.push(tagData);
                            }
                        });
                    }
                    catch(er)
                    {
                        console.error('标签数据JSON格式错误:'+er);
                    }
                }

                // 有离架标签则记录日志
                if(tags.length)
                {
                    console.log(tags);
                }

                ws.send(output(params.commandType, 100, tags));
            });
        });

        //连接关闭
        ws.onclose = function() {
            console.log('远程主机%s断开连接', remoteIp);
        }
    });
    wss.on('error', function(err){
        console.log(err);
        wss.close();
    });
}


/**
 * 连接所有读写器
 */
function connectAllReaders()
{
    var readers = readerConfig.getReaderConfig();
    for(var ip in readers)
    {
        var reader = readers[ip];
        var filter = {
            'on': '1',
            rssi: reader.rssi
        };

        // 初次连接最多连接5次
        async.retry(5, function(cb, results){
            connectReader(ip, 9761, filter, reader.frequency, reader.inventory_time, reader.stay_time, reader.ants, reader.group, cb);

        }, function(err, results){
            if(err){
                console.error(err);
            }
        });
    }
}

/**
 * 连接并获取读写器的数据,然后传递到客户端
 *
 * @param host 读写器的的IP
 * @param port 读写器的的端口
 * @param filter 读写器的RSSI过滤有关数据
 * @param frequency 读写器的定频有关数据
 * @param inventoryTime 盘点时间
 * @param antStayTime 天线驻留时间
 * @param power 各天线的功率
 * @param group 读写器所在组
 * @param callback 回调函数
 */
function connectReader(host, port, filter, frequency, inventoryTime, antStayTime, power, group, callback)
{
    index.start({
        host: host,
        port: port,
        filter: filter,
        frequency: frequency,
        inventoryTime: inventoryTime,
        antStayTime: antStayTime,
        power: power,
        group: group
    }, function (err, connected, onTags, goneTags) {

        var onEpcs = [],
            offEpcs = [];

        // 获取redis中指定组在架标签数据
        r.hget(group, 'on', function(err, epcs)
        {
            if (err)
            {
                console.error(err);
            }
            else
            {
                try
                {
                    var redisEpcs = JSON.parse(epcs);
                    // 处理在架标签
                    _.forEach(redisEpcs, function(item){
                        if(item.host != host && _.findWhere(onTags, {epc: item.epc}) === undefined){
                            onEpcs.push(item);
                        }
                    });
                    _.forEach(onTags, function(item){
                        onEpcs.push({
                            GroupID: item.group,
                            EPC: item.epc,
                            host: item.host,
                            speed: item.speed,
                            updateTime: item.updateTime
                        });
                    });

                    r.hset(group, 'on', JSON.stringify(onEpcs), function(){});

                    // 处理离架标签
                    _.forEach(goneTags, function(goneEpc){
                        if(!goneEpc || (_.findWhere(redisEpcs, {EPC: goneEpc.epc}) === undefined && _.findWhere(onTags, {epc: goneEpc.epc}) === undefined)){
                            offEpcs.push({
                                GroupID: goneEpc.group,
                                EPC: goneEpc.epc,
                                host: goneEpc.host,
                                speed: goneEpc.speed,
                                rssi: goneEpc.rssi,
                                updateTime: goneEpc.updateTime
                            });
                        }
                    });

                    r.hset(group, 'off', JSON.stringify(offEpcs), function(){});

                    if(connected){
                        callback(null, connected);
                    }else{
                        callback(new Error('连接读写器'+host+'错误'), connected);
                    }
                }
                catch(er)
                {
                    console.error('标签数据JSON格式错误:'+er);
                }
            }
        });

    });
}

function processOnTags()
{
    // 获取redis中指定组在架标签数据
    r.hget(group, 'on', function(err, epcs)
    {
        if (err)
        {
            console.error(err);
        }
        else
        {
            try
            {
                var redisEpcs = JSON.parse(epcs);
                // 处理在架标签
                _.forEach(redisEpcs, function(item){
                    if(item.host != host && _.findWhere(onTags, {epc: item.epc}) === undefined){
                        onEpcs.push(item);
                    }
                });
                _.forEach(onTags, function(item){
                    onEpcs.push({
                        GroupID: item.group,
                        EPC: item.epc,
                        host: item.host,
                        speed: item.speed,
                        updateTime: item.updateTime
                    });
                });
                r.hset(group, 'on', JSON.stringify(onEpcs), function(){});

                // 处理离架标签
                _.forEach(goneTags, function(goneEpc){
                    if(!goneEpc || _.findWhere(redisEpcs, {EPC: goneEpc.epc}) === undefined){
                        offEpcs.push({
                            GroupID: goneEpc.group,
                            EPC: goneEpc.epc,
                            host: goneEpc.host,
                            speed: goneEpc.speed,
                            rssi: goneEpc.rssi,
                            updateTime: goneEpc.updateTime
                        });
                    }
                });

                r.hset(group, 'off', JSON.stringify(offEpcs), function(){});

                if(connected){
                    callback(null, connected);
                }else{
                    callback(new Error('连接读写器'+host+'错误'), connected);
                }
            }
            catch(er)
            {
                console.error('标签数据JSON格式错误:'+er);
            }
        }
    });
}

/**
 * 获取指定时间和当前时间的差异
 * @param val 指定的时间
 * @returns int 时间差(单位:毫秒)
 */
function getMillTimeDiffNow(val)
{
    var end = new Date();
    return end.getTime() - val;
}

/**
 * 读取命令结果
 * @param data 传入的数据
 * @returns 解析出来的数据对象
 */
function getCommand(data)
{
    try
    {
        data = JSON.parse(data);
    }
    catch(er)
    {
        console.log('JSON格式错误:'+er);
    }
    var params = {};

    if(data.Key != '123456')
    {
        params.commandType = 'error';
        return params;
    }

    params.commandType = data.CommandType;
    if('GroupID' in data)
    {
        params.groupId = data.GroupID;
    }

    if('State' in data)
    {
        var state = data.State;
        if(state == 'ShelfOn' || state == 'ShelfOnAndNear')
        {
            state = 'on'
        }
        else
        {
            state = 'off'
        }
        params.state = state
    }

    return params;
}

/**
 * 格式化输出内容
 * @param commandType 命令类型
 * @param code 状态码
 * @param data 数据
 * @returns string
 */
function output (commandType, code, data)
{
    var res = {
        CommandType: commandType,
        Code: code,
        Data: data
    };

    return JSON.stringify(res);
}

main();