socket.js 10.3 KB
var net = require('net'),
    redis = require('redis'),
    _ = require('lodash'),
    async = require('async'),
    index = require('../index.js'),
    readerConfig = require('./readerConfig.js');

var r = redis.createClient();
// 清除redis内存数据库数据
r.keys('*', function(err, keys){
    keys.forEach(function (key, pos)
    {
        r.del(key, function(err, o)
        {
            if (err)
            {
                console.error('没有' + key);
            }
            if (pos === (keys.length - 1))
            {
                console.log('redis内存数据清理成功');
            }
        });
    });
});
r.on('error', function(err)
{
    console.log('Redis连接错误:'+err);
    process.exit(1);
});

/**
 * socket服务创建模块
 */
function main()
{
    // 读取配置文件
    var host = '172.16.13.95',
        port = 12345;


    var xml = '<?xml version="1.0"?>\n<!DOCTYPE cross-domain-policy SYSTEM \n"http://www.adobe.com/xml/dtds/cross-domain-policy.dtd">\n<cross-domain-policy>\n';
    xml += '<site-control permitted-cross-domain-policies="master-only"/>\n';
    xml += '<allow-access-from domain="*" to-ports="*"/>\n';
    xml += '</cross-domain-policy>\n';

    connectAllReaders();

    // 建立socket服务端
    net.createServer(function(conn)
    {
        conn.setEncoding("utf8");
        var remoteIp = conn.remoteAddress;
        console.log('远程主机%s连接成功', remoteIp);

        // 连接web客户端需要发送的数据
        conn.write(xml+'\0');

        var isStart = false;

        conn.on('data', function(data)
        {
            if(data == '<policy-file-request/>\0')
                conn.write(xml+'\0');
            else
            {

                data = data.replace('\0', '');

                var tags = [];
                // 处理传过来的数据
                var params = getCommand(data);
                r.hget(params.groupId, params.state, function(err, epcs)
                {
                    if (err) {
                        console.error(err);
                    } else {
                        try
                        {
                            epcs = JSON.parse(epcs);
                            _.forEach(epcs, function(tagData){
                                if(tagData && getMillTimeDiffNow(tagData.updateTime) < 30000)
                                {
                                    console.log('groupId:%s ------- %s', tagData.GroupID, tagData.EPC);
                                    tags.push(tagData);
                                }
                            });
                        }
                        catch(er)
                        {
                            console.error('标签数据JSON格式错误:'+er);
                        }
                    }

                    // 有离架标签则记录日志
                    if(tags.length)
                    {
                        console.log(tags);
                    }

                    conn.write(output(params.commandType, 100, tags)+"\0");
                });
            }
        });

        conn.on('end', function()
        {
            console.log('远程主机断开');
            conn.end();
        });

    }).listen(port, host, function()
    {
        console.log('Socket创建成功在%s:%d', host, port);
    });
}


/**
 * 连接所有读写器
 */
function connectAllReaders()
{
    var readers = readerConfig.getReaderConfig();
    for(var ip in readers)
    {
        var reader = readers[ip];
        var filter = {
            'on': '1',
            rssi: reader.rssi
        };
        var frequency = 10;

        // 初次连接最多连接5次
        async.retry(5, function(cb, results){
            connectReader(ip, 9761, filter, frequency, reader.inventory_time, reader.stay_time, reader.ants, reader.group, cb);

        }, function(err, results){
            if(err){
                console.error(err);
            }
        });
    }
}

/**
 * 连接并获取读写器的数据,然后传递到客户端
 *
 * @param host 读写器的的IP
 * @param port 读写器的的端口
 * @param filter 读写器的RSSI过滤有关数据
 * @param frequency 读写器的定频有关数据
 * @param inventoryTime 盘点时间
 * @param antStayTime 天线驻留时间
 * @param power 各天线的功率
 * @param group 读写器所在组
 * @param callback 回调函数
 */
function connectReader(host, port, filter, frequency, inventoryTime, antStayTime, power, group, callback)
{
    index.start({
        host: host,
        port: port,
        filter: filter,
        frequency: frequency,
        inventoryTime: inventoryTime,
        antStayTime: antStayTime,
        power: power,
        group: group
    }, function (err, connected, onTags, goneTags) {

        var onEpcs = [],
            offEpcs = [];

        // 获取redis中指定组在架标签数据
        r.hget(group, 'on', function(err, epcs)
        {
            if (err)
            {
                console.error(err);
            }
            else
            {
                try
                {
                    var redisEpcs = JSON.parse(epcs);
                    // 处理在架标签
                    _.forEach(redisEpcs, function(item){
                        if(item.host != host && _.findWhere(onTags, {epc: item.epc}) === undefined){
                            onEpcs.push(item);
                        }
                    });
                    _.forEach(onTags, function(item){
                        onEpcs.push({
                            GroupID: item.group,
                            EPC: item.epc,
                            host: item.host,
                            speed: item.speed,
                            updateTime: item.updateTime
                        });
                    });

                    r.hset(group, 'on', JSON.stringify(onEpcs), function(){});

                    // 处理离架标签
                    _.forEach(goneTags, function(goneEpc){
                        if(!goneEpc || (_.findWhere(redisEpcs, {EPC: goneEpc.epc}) === undefined && _.findWhere(onTags, {epc: goneEpc.epc}) === undefined)){
                            offEpcs.push({
                                GroupID: goneEpc.group,
                                EPC: goneEpc.epc,
                                host: goneEpc.host,
                                speed: goneEpc.speed,
                                rssi: goneEpc.rssi,
                                updateTime: goneEpc.updateTime
                            });
                        }
                    });

                    r.hset(group, 'off', JSON.stringify(offEpcs), function(){});

                    if(connected){
                        callback(null, connected);
                    }else{
                        callback(new Error('连接读写器'+host+'错误'), connected);
                    }
                }
                catch(er)
                {
                    console.error('标签数据JSON格式错误:'+er);
                }
            }
        });

    });
}

function processOnTags()
{
    // 获取redis中指定组在架标签数据
    r.hget(group, 'on', function(err, epcs)
    {
        if (err)
        {
            console.error(err);
        }
        else
        {
            try
            {
                var redisEpcs = JSON.parse(epcs);
                // 处理在架标签
                _.forEach(redisEpcs, function(item){
                    if(item.host != host && _.findWhere(onTags, {epc: item.epc}) === undefined){
                        onEpcs.push(item);
                    }
                });
                _.forEach(onTags, function(item){
                    onEpcs.push({
                        GroupID: item.group,
                        EPC: item.epc,
                        host: item.host,
                        speed: item.speed,
                        updateTime: item.updateTime
                    });
                });
                r.hset(group, 'on', JSON.stringify(onEpcs), function(){});

                // 处理离架标签
                _.forEach(goneTags, function(goneEpc){
                    if(!goneEpc || _.findWhere(redisEpcs, {EPC: goneEpc.epc}) === undefined){
                        offEpcs.push({
                            GroupID: goneEpc.group,
                            EPC: goneEpc.epc,
                            host: goneEpc.host,
                            speed: goneEpc.speed,
                            rssi: goneEpc.rssi,
                            updateTime: goneEpc.updateTime
                        });
                    }
                });

                r.hset(group, 'off', JSON.stringify(offEpcs), function(){});

                if(connected){
                    callback(null, connected);
                }else{
                    callback(new Error('连接读写器'+host+'错误'), connected);
                }
            }
            catch(er)
            {
                console.error('标签数据JSON格式错误:'+er);
            }
        }
    });
}

/**
 * 获取指定时间和当前时间的差异
 * @param val 指定的时间
 * @returns int 时间差(单位:毫秒)
 */
function getMillTimeDiffNow(val)
{
    var end = new Date();
    return end.getTime() - val;
}

/**
 * 读取命令结果
 * @param data 传入的数据
 * @returns 解析出来的数据对象
 */
function getCommand(data)
{
    try
    {
        data = JSON.parse(data);
    }
    catch(er)
    {
        console.log('JSON格式错误:'+er);
    }
    var params = {};

    if(data.Key != '123456')
    {
        params.commandType = 'error';
        return params;
    }

    params.commandType = data.CommandType;
    if('GroupID' in data)
    {
        params.groupId = data.GroupID;
    }

    if('State' in data)
    {
        var state = data.State;
        if(state == 'ShelfOn')
        {
            state = 'on'
        }
        else if(state == 'ShelfOff')
        {
            state = 'off'
        }
        else
        {
            state = 'off'
        }
        params.state = state
    }

    return params;
}

/**
 * 格式化输出内容
 * @param commandType 命令类型
 * @param code 状态码
 * @param data 数据
 * @returns string
 */
function output (commandType, code, data)
{
    var res = {
        CommandType: commandType,
        Code: code,
        Data: data
    };

    return JSON.stringify(res);
}

main();