rabbitmq_send.go 4.29 KB
package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"github.com/BurntSushi/toml"
	"github.com/julienschmidt/httprouter"
	"github.com/streadway/amqp"
	"io/ioutil"
	"net/http"
	"net/url"
	"os"
	"strings"
)

var (
	msgchan chan string
	config  tomlConfig
)

//配置文件信息
type tomlConfig struct {
	Server     serverInfo `toml:"server"`
	ListenPort string
}

type serverInfo struct {
	Username string
	Password string
	IP       string
	Port     string
}

//使用tag中自定义的名称,输出到json串
type addr struct {
	IP       string `json:"ip"`
	Hostname string `json:"hostname"`
}

type message struct {
	Group string                 `json:"group"`
	Tags  addr                   `json:"tags"`
	Data  map[string]interface{} `json:"data"`
}

//避免HTMLEscape(< -> \u003c and > -> \u003e and & -> \u0026)
func JSONMarshal(v interface{}, unescape bool) ([]byte, error) {
	b, err := json.Marshal(v)
	if unescape {
		b = bytes.Replace(b, []byte("\\u003c"), []byte("<"), -1)
		b = bytes.Replace(b, []byte("\\u003e"), []byte(">"), -1)
		b = bytes.Replace(b, []byte("\\u0026"), []byte("&"), -1)
	}
	return b, err
}

//构造消息结构体
func newMessage() message {
	return message{Group: "info", Tags: addr{Hostname: "", IP: ""}, Data: make(map[string]interface{})}
}

//发送JSON串到channel
func sendMsgToChan(data map[string]interface{}) {
	m := newMessage()
	if v, ok := data["ip"]; ok {
		m.Tags.IP = v.(string)
		delete(data, "ip")
	}

	if v, ok := data["hostname"]; ok {
		m.Tags.Hostname = v.(string)
		delete(data, "hostname")
	}

	if v, ok := data["type"]; ok {
		m.Group = v.(string)
		delete(data, "type")
	}

	if len(data) > 0 {
		for k, v := range data {
			m.Data[k] = v
		}
	}

	//生成json
	b, err := JSONMarshal(m, true)
	if err != nil {
		fmt.Println("json marshal error, detail info:", err.Error())
		return
	}
	msgchan <- string(b)
}

//处理get请求
func doGet(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
	data := make(map[string]interface{})
	r.ParseForm()
	for k, v := range r.Form {
		data[k] = v[0]
	}

	go sendMsgToChan(data)
}

//处理post请求
func doPost(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
	data := make(map[string]interface{})
	dataBytes, err := ioutil.ReadAll(r.Body)
	if err != nil {
		fmt.Println("read post data fail, detail info:", err.Error())
		return
	}

	queryUnescapeData, err := url.QueryUnescape(string(dataBytes))
	if err != nil {
		fmt.Println("query unescape error, detail info:", err.Error())
		return
	}

	splitString := strings.Split(queryUnescapeData, "&")
	for _, v := range splitString {
		filed := strings.SplitN(v, "=", 2)
		data[filed[0]] = filed[1]
	}

	go sendMsgToChan(data)
}

//读取配置文件
func readTomlConfigFile() {
	_, err := toml.DecodeFile("custom_option.toml", &config)
	if err != nil {
		fmt.Println("decode toml file error, detail info:", err.Error())
		os.Exit(-1)
	}
}

//从channel读取消息,发送至rabbitmq
func publicMsgToRabbitmq() {
	//address example : amqp://guest:guest@localhost:5672/
	rabbitmqServerAddress := "amqp://" + config.Server.Username + ":" + config.Server.Password + "@" + config.Server.IP + ":" + config.Server.Port + "/"
	conn, err := amqp.Dial(rabbitmqServerAddress)
	if err != nil {
		fmt.Println("connect to rabbitmq server error, detail info:", err.Error())
		os.Exit(-1)
	}

	//退出之前关闭
	defer conn.Close()

	//创建channel
	channel, err := conn.Channel()
	if err != nil {
		fmt.Println("create channel error, detail info:", err.Error())
		os.Exit(-1)
	}

	//退出之前关闭
	defer channel.Close()

	//创建队列
	queue, err := channel.QueueDeclare("log", false, false, false, false, nil)
	if err != nil {
		fmt.Println("create queue error, detail info:", err.Error())
		os.Exit(-1)
	}

	for {
		select {
		case msg := <-msgchan:
			//推送消息到队列
			fmt.Println("get message:", msg)
			err = channel.Publish("", queue.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(msg)})
			if err != nil {
				fmt.Println("push message to queue fail, detail info:", err.Error())
			}
		default:
		}
	}
}

func main() {
	//decode toml file
	readTomlConfigFile()
	msgchan = make(chan string, 10)
	go publicMsgToRabbitmq()
	router := httprouter.New()
	router.Handle("GET", "/", doGet)
	router.Handle("POST", "/post", doPost)
	http.ListenAndServe(":"+config.ListenPort, router)
}