Commit f83b9639 authored by 谢宇轩's avatar 谢宇轩

feat(Ding): 处理冲突合并Ding的内容

'
parents f3b5156b e25aa53c
/.idea
*.exe
/log
logtransfer
*.conf
\ No newline at end of file
logtransfer.conf
\ No newline at end of file
This diff is collapsed.
......@@ -49,7 +49,7 @@ func initConnect() *clientv3.Client {
func GetAllConfFromEtcd() ([]EtcdValue, error) {
configs := make([]EtcdValue, 0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
......@@ -170,7 +170,6 @@ func WatchLogConfigToEtcd() clientv3.WatchChan {
return wch
}
func WatchLogStatusToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), statusPath, clientv3.WithPrefix())
......
package ding
import (
"context"
"errors"
"time"
"github.com/go-resty/resty/v2"
)
type DingRobotError struct {
ErrMsg string `json:"errmsg"`
Code int `json:"errcode"`
}
func DingDingNotice(webHook string, ctx context.Context, message DDMessage) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
host := webHook
resultChannel := make(chan DingRobotError)
errorChannel := make(chan error)
go func() {
var errmsg DingRobotError
client := resty.New()
_, err := client.R().SetHeader("Content-Type", "application/json").SetBody(&message).SetResult(&errmsg).Post(host)
if err != nil {
errorChannel <- err
}
resultChannel <- errmsg
}()
for {
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
return errors.New("钉钉机器人超时")
default:
return errors.New("钉钉机器人失败")
}
case result := <-resultChannel:
if result.Code != 0 {
return errors.New(result.ErrMsg)
}
return nil
case err := <-errorChannel:
return err
}
}
}
package ding
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/y7ut/logtransfer/entity"
)
const (
MARKTYPE = "markdown"
TEXTTYPE = "text"
OPEN_TEMPLE = "# 服务通知 已开启 \r\n> 运行环境: \r\n ###### %s"
CLOSE_TEMPLE = "# 服务通知 已关闭 \r\n> 运行环境: \r\n ###### %s"
STATUS_TEMPLE = "# 服务通知 \r\n> 环境: \r\n ###### %s"
MESSAGE_TYPE_OPEN = "open"
MESSAGE_TYPE_CLOSE = "close"
MESSAGE_TYPE_PLUGIN_NOTICE = "plugin"
)
type DDAt struct {
IsAtAll bool `json:"isAtAll"`
AtMobiles []string `json:"atMobiles"`
}
type TextContent struct {
Content string `json:"content,omitempty"`
}
type MarkdownContent struct {
Title string `json:"title,omitempty"`
Text string `json:"text,omitempty"`
}
type DDMessage struct {
MsgType string `json:"msgtype"`
Markdown MarkdownContent `json:"markdown,omitempty"`
Text TextContent `json:"text,omitempty"`
At DDAt `json:"at"`
}
/**
* 初始化消息
*
* @return string
*/
func CreateDingMessage(messageType string, msg interface{}, mobiles []string) DDMessage {
var message DDMessage
message.MsgType = MARKTYPE
message.Markdown.Title = "服务通知"
switch messageType {
case MESSAGE_TYPE_OPEN:
message.Markdown.Text = fmt.Sprintf(OPEN_TEMPLE, time.Now().Format("2006/01/02 15:04:05"))
case MESSAGE_TYPE_CLOSE:
message.Markdown.Text = fmt.Sprintf(CLOSE_TEMPLE, time.Now().Format("2006/01/02 15:04:05"))
case MESSAGE_TYPE_PLUGIN_NOTICE:
message.Markdown.Text = pluginNotice(msg)
default:
message.Markdown.Text = fmt.Sprintf(STATUS_TEMPLE, time.Now().Format("2006/01/02 15:04:05"))
}
if len(mobiles) > 0 {
message.At.IsAtAll = false
message.At.AtMobiles = mobiles
} else {
message.At.IsAtAll = true
}
return message
}
func pluginNotice(msg interface{}) string {
mateData := msg.(*entity.Matedata)
templateBuff := bytes.NewBuffer([]byte{})
templateBuff.WriteString(fmt.Sprintf("# 服务通知 \r\n > 【节点】:%s", mateData.SourceAgent))
for key, value := range mateData.Data {
t := reflect.TypeOf(value).Kind().String()
if t == "map" {
s, _ := json.Marshal(value)
value = string(s)
}
if t == "float64" {
value = fmt.Sprintf("%.0f", value)
}
if t == "slice" {
value = fmt.Sprintf("%v", value)
}
templateBuff.WriteString(fmt.Sprintf(" \r\n > 【%s】:%s", key, value))
}
templateBuff.WriteString(fmt.Sprintf(" \r\n ###### %s", time.Now().Format("2006/01/02 15:04:05")))
return templateBuff.String()
}
......@@ -21,6 +21,7 @@ type Matedata struct {
Topic string
Index string
Level string
SourceAgent string
create time.Time
Data map[string]interface{}
}
......@@ -29,6 +30,7 @@ func (m *Matedata) reset() {
m.Topic = ""
m.Index = ""
m.Level = ""
m.SourceAgent = ""
m.Data = map[string]interface{}{}
}
......
......@@ -3,28 +3,41 @@ module github.com/y7ut/logtransfer
go 1.15
require (
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect
github.com/coreos/bbolt v1.3.4 // indirect
github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/envoyproxy/go-control-plane v0.9.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/go-resty/resty/v2 v2.7.0
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/olivere/elastic/v7 v7.0.24
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/segmentio/kafka-go v0.4.13
github.com/smartystreets/goconvey v1.7.2 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.uber.org/zap v1.16.0 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.3.0
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.1.0 // indirect
google.golang.org/grpc v1.33.2
gopkg.in/ini.v1 v1.62.0
sigs.k8s.io/yaml v1.3.0 // indirect
)
replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
replace (
google.golang.org/grpc => google.golang.org/grpc v1.26.0
google.golang.org/grpc v1.29.1 => google.golang.org/grpc v1.26.0
)
replace github.com/coreos/bbolt v1.3.4 => go.etcd.io/bbolt v1.3.4
This diff is collapsed.
# Kafka配置
[kafka]
address=120.25.235.220:9092
#Etcd配置
[etcd]
address=172.18.172.159:23790,172.18.172.159:23791
#Es配置
[es]
address=http://47.106.69.239:9320
bulk_size=2
\ No newline at end of file
# Kafka配置
[kafka]
address=120.25.235.220:9092
#Etcd配置
[etcd]
address=192.168.2.129:23790,192.168.2.129:23791
#Es配置
[es]
address=http://47.106.69.239:9320
bulk_size=2
\ No newline at end of file
package plugin
import (
"context"
"log"
"strings"
"github.com/y7ut/logtransfer/ding"
"github.com/y7ut/logtransfer/entity"
)
// 钉钉通知插件
type Ding Plugin
func (d *Ding) HandleFunc(m *entity.Matedata) error {
log.Println("DINGDING:")
var dingMobileData = []string{}
var send bool
searchKey := (*d.params)["searchKey"].(string)
searchValue := (*d.params)["searchValue"].(string)
dingWebHook := (*d.params)["dingWebHook"].(string)
dingMobiles := (*d.params)["dingMobiles"].(string)
if dingMobiles != "" {
dingMobileData = strings.Split(dingMobiles, ",")
}
for k, v := range (*m).Data {
if dingWebHook != "" && k == searchKey && (searchValue == "" || searchValue == v) {
send = true
break
}
}
if send {
msg := ding.CreateDingMessage(ding.MESSAGE_TYPE_PLUGIN_NOTICE, m, dingMobileData)
err := ding.DingDingNotice(dingWebHook, context.Background(), msg)
if err != nil {
log.Println(err)
}
log.Println("sendDingDingMsg:", msg.Markdown.Text)
}
return nil
}
func (d *Ding) SetParams(params string) error {
paramsValue, err := checkParams(params, "searchKey", "searchValue", "dingWebHook", "dingMobiles")
if err != nil {
return err
}
d.params = &paramsValue
return err
}
......@@ -17,6 +17,9 @@ func getRegistedPlugins() map[string]HandlerConstruct {
"Alarm": func() Handler {
return &Alarm{}
},
"DingRobot": func () Handler {
return &Ding{}
},
}
}
......
......@@ -49,7 +49,7 @@ func RegisterManger(c *Customer) {
mu.Unlock()
}
func UnstallManger(topic string){
func UnstallManger(topic string) {
mu.Lock()
delete(CustomerManger, topic)
mu.Unlock()
......@@ -99,7 +99,7 @@ func ReadingMessage(ctx context.Context, c *Customer) {
for {
select {
case <- ctx.Done():
case <-ctx.Done():
return
default:
m, err := reader.ReadMessage(ctx)
......@@ -121,6 +121,13 @@ func ReadingMessage(ctx context.Context, c *Customer) {
}
matedata, err = c.Format(string(reader.Config().Topic), string(m.Value))
matedata.SourceAgent = "默认节点"
for _, header := range m.Headers {
if header.Key == "source_agent" {
matedata.SourceAgent = string(header.Value)
}
}
if err != nil {
errMessage.Reset()
errMessage.WriteString("Format Error")
......
......@@ -37,7 +37,11 @@ func generateTopic(config TopicConfig) *Topic {
p := plugin.PipeLine{}
for _, v := range config.PipelineConfig {
currentPlugin := plugin.LoadRegistedPlugins(v.Name)
currentPlugin, ok := plugin.LoadRegistedPlugins(v.Name)
if !ok {
log.Printf("get RegistedPlugins error:%s ", v.Name)
continue
}
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Printf("plugin encode params error: %s", err)
......
......@@ -156,7 +156,8 @@ func TopicWatcherHandle() {
func sign() <-chan os.Signal {
c := make(chan os.Signal, 2)
signals := []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2}
//signals := []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2}
signals := []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT}
// 监听信号
if !signal.Ignored(syscall.SIGHUP) {
signals = append(signals, syscall.SIGHUP)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment