rabbitmq_send.go
4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/BurntSushi/toml"
"github.com/julienschmidt/httprouter"
"github.com/streadway/amqp"
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
)
var (
msgchan chan string
config tomlConfig
)
//配置文件信息
type tomlConfig struct {
Server serverInfo `toml:"server"`
ListenPort string
}
type serverInfo struct {
Username string
Password string
IP string
Port string
}
//使用tag中自定义的名称,输出到json串
type addr struct {
IP string `json:"ip"`
Hostname string `json:"hostname"`
}
type message struct {
Group string `json:"group"`
Tags addr `json:"tags"`
Data map[string]interface{} `json:"data"`
}
//避免HTMLEscape(< -> \u003c and > -> \u003e and & -> \u0026)
func JSONMarshal(v interface{}, unescape bool) ([]byte, error) {
b, err := json.Marshal(v)
if unescape {
b = bytes.Replace(b, []byte("\\u003c"), []byte("<"), -1)
b = bytes.Replace(b, []byte("\\u003e"), []byte(">"), -1)
b = bytes.Replace(b, []byte("\\u0026"), []byte("&"), -1)
}
return b, err
}
//构造消息结构体
func newMessage() message {
return message{Group: "info", Tags: addr{Hostname: "", IP: ""}, Data: make(map[string]interface{})}
}
//发送JSON串到channel
func sendMsgToChan(data map[string]interface{}) {
m := newMessage()
if v, ok := data["ip"]; ok {
m.Tags.IP = v.(string)
delete(data, "ip")
}
if v, ok := data["hostname"]; ok {
m.Tags.Hostname = v.(string)
delete(data, "hostname")
}
if v, ok := data["type"]; ok {
m.Group = v.(string)
delete(data, "type")
}
if len(data) > 0 {
for k, v := range data {
m.Data[k] = v
}
}
//生成json
b, err := JSONMarshal(m, true)
if err != nil {
fmt.Println("json marshal error, detail info:", err.Error())
return
}
msgchan <- string(b)
}
//处理get请求
func doGet(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
data := make(map[string]interface{})
r.ParseForm()
for k, v := range r.Form {
data[k] = v[0]
}
go sendMsgToChan(data)
}
//处理post请求
func doPost(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
data := make(map[string]interface{})
dataBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Println("read post data fail, detail info:", err.Error())
return
}
queryUnescapeData, err := url.QueryUnescape(string(dataBytes))
if err != nil {
fmt.Println("query unescape error, detail info:", err.Error())
return
}
splitString := strings.Split(queryUnescapeData, "&")
for _, v := range splitString {
filed := strings.SplitN(v, "=", 2)
data[filed[0]] = filed[1]
}
go sendMsgToChan(data)
}
//读取配置文件
func readTomlConfigFile() {
_, err := toml.DecodeFile("custom_option.toml", &config)
if err != nil {
fmt.Println("decode toml file error, detail info:", err.Error())
os.Exit(-1)
}
}
//从channel读取消息,发送至rabbitmq
func publicMsgToRabbitmq() {
//address example : amqp://guest:guest@localhost:5672/
rabbitmqServerAddress := "amqp://" + config.Server.Username + ":" + config.Server.Password + "@" + config.Server.IP + ":" + config.Server.Port + "/"
conn, err := amqp.Dial(rabbitmqServerAddress)
if err != nil {
fmt.Println("connect to rabbitmq server error, detail info:", err.Error())
os.Exit(-1)
}
//退出之前关闭
defer conn.Close()
//创建channel
channel, err := conn.Channel()
if err != nil {
fmt.Println("create channel error, detail info:", err.Error())
os.Exit(-1)
}
//退出之前关闭
defer channel.Close()
//创建队列
queue, err := channel.QueueDeclare("log", false, false, false, false, nil)
if err != nil {
fmt.Println("create queue error, detail info:", err.Error())
os.Exit(-1)
}
for {
select {
case msg := <-msgchan:
//推送消息到队列
fmt.Println("get message:", msg)
err = channel.Publish("", queue.Name, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte(msg)})
if err != nil {
fmt.Println("push message to queue fail, detail info:", err.Error())
}
default:
}
}
}
func main() {
//decode toml file
readTomlConfigFile()
msgchan = make(chan string, 10)
go publicMsgToRabbitmq()
router := httprouter.New()
router.Handle("GET", "/", doGet)
router.Handle("POST", "/post", doPost)
http.ListenAndServe(":"+config.ListenPort, router)
}