Commit a82297ee authored by 李世星's avatar 李世星

fix(新增钉钉插件): 202301291036

parent dfceb499
...@@ -49,7 +49,7 @@ func initConnect() *clientv3.Client { ...@@ -49,7 +49,7 @@ func initConnect() *clientv3.Client {
func GetAllConfFromEtcd() ([]EtcdValue, error) { func GetAllConfFromEtcd() ([]EtcdValue, error) {
configs := make([]EtcdValue, 0) configs := make([]EtcdValue, 0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel() cancel()
if err != nil { if err != nil {
...@@ -170,7 +170,6 @@ func WatchLogConfigToEtcd() clientv3.WatchChan { ...@@ -170,7 +170,6 @@ func WatchLogConfigToEtcd() clientv3.WatchChan {
return wch return wch
} }
func WatchLogStatusToEtcd() clientv3.WatchChan { func WatchLogStatusToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), statusPath, clientv3.WithPrefix()) wch := cli.Watch(context.Background(), statusPath, clientv3.WithPrefix())
...@@ -192,6 +191,6 @@ func CheckAgentActive(name string) bool { ...@@ -192,6 +191,6 @@ func CheckAgentActive(name string) bool {
return false return false
} }
status := string(resp.Kvs[0].Value) status := string(resp.Kvs[0].Value)
log.Println("it is"+status) log.Println("it is" + status)
return status == "1" return status == "1"
} }
package ding
import (
"context"
"errors"
"time"
"github.com/go-resty/resty/v2"
)
type DingRobotError struct {
ErrMsg string `json:"errmsg"`
Code int `json:"errcode"`
}
func DingDingNotice(webHook string, ctx context.Context, message DDMessage) error {
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
host := webHook
resultChannel := make(chan DingRobotError)
errorChannel := make(chan error)
go func() {
var errmsg DingRobotError
client := resty.New()
_, err := client.R().SetHeader("Content-Type", "application/json").SetBody(&message).SetResult(&errmsg).Post(host)
if err != nil {
errorChannel <- err
}
resultChannel <- errmsg
}()
for {
select {
case <-ctx.Done():
switch ctx.Err() {
case context.DeadlineExceeded:
return errors.New("钉钉机器人超时")
default:
return errors.New("钉钉机器人失败")
}
case result := <-resultChannel:
if result.Code != 0 {
return errors.New(result.ErrMsg)
}
return nil
case err := <-errorChannel:
return err
}
}
}
package ding
import (
"bytes"
"fmt"
"time"
"github.com/y7ut/logtransfer/entity"
)
const (
MARKTYPE = "markdown"
TEXTTYPE = "text"
OPEN_TEMPLE = "# 服务通知 已开启 \r\n> 运行环境:%s \r\n ###### %s"
CLOSE_TEMPLE = "# 服务通知 已关闭 \r\n> 运行环境:%s \r\n ###### %s"
STATUS_TEMPLE = "# 服务通知 \r\n> 环境:%s \r\n ###### %s"
MESSAGE_TYPE_OPEN = "open"
MESSAGE_TYPE_CLOSE = "close"
MESSAGE_TYPE_PLUGIN_NOTICE = "plugin"
)
type DDAt struct {
IsAtAll bool `json:"isAtAll"`
AtMobiles []string `json:"atMobiles"`
}
type TextContent struct {
Content string `json:"content,omitempty"`
}
type MarkdownContent struct {
Title string `json:"title,omitempty"`
Text string `json:"text,omitempty"`
}
type DDMessage struct {
MsgType string `json:"msgtype"`
Markdown MarkdownContent `json:"markdown,omitempty"`
Text TextContent `json:"text,omitempty"`
At DDAt `json:"at"`
}
/**
* 初始化消息
*
* @return string
*/
func CreateDingMessage(dev string, messageType string, msg interface{}, mobiles []string) DDMessage {
var message DDMessage
message.MsgType = MARKTYPE
message.Markdown.Title = "服务通知"
switch messageType {
case MESSAGE_TYPE_OPEN:
message.Markdown.Text = fmt.Sprintf(OPEN_TEMPLE, dev, time.Now().Format("2006/01/02 15:04:05"))
case MESSAGE_TYPE_CLOSE:
message.Markdown.Text = fmt.Sprintf(CLOSE_TEMPLE, dev, time.Now().Format("2006/01/02 15:04:05"))
case MESSAGE_TYPE_PLUGIN_NOTICE:
message.Markdown.Text = pluginNotice(dev, msg)
default:
message.Markdown.Text = fmt.Sprintf(STATUS_TEMPLE, dev, time.Now().Format("2006/01/02 15:04:05"))
}
if len(mobiles) > 0 {
message.At.IsAtAll = false
message.At.AtMobiles = mobiles
} else {
message.At.IsAtAll = true
}
fmt.Println(message)
return message
}
func pluginNotice(dev string, msg interface{}) string {
mateData := msg.(*entity.Matedata)
templateBuff := bytes.NewBuffer([]byte{})
templateBuff.WriteString(fmt.Sprintf("# 服务通知 \r\n> 环境:%s", dev))
templateBuff.WriteString(fmt.Sprintf("\r\n> [节点]:%s", mateData.SourceAgent))
for key, value := range mateData.Data {
templateBuff.WriteString(fmt.Sprintf("\r\n> [%s]:%s", key, value))
}
templateBuff.WriteString(fmt.Sprintf("###### %s", time.Now().Format("2006/01/02 15:04:05")))
return templateBuff.String()
}
...@@ -22,6 +22,7 @@ type Matedata struct { ...@@ -22,6 +22,7 @@ type Matedata struct {
Topic string Topic string
Index string Index string
Level string Level string
SourceAgent string
create time.Time create time.Time
Data map[string]interface{} Data map[string]interface{}
} }
...@@ -30,6 +31,7 @@ func (m *Matedata) reset() { ...@@ -30,6 +31,7 @@ func (m *Matedata) reset() {
m.Topic = "" m.Topic = ""
m.Index = "" m.Index = ""
m.Level = "" m.Level = ""
m.SourceAgent = ""
m.Data = map[string]interface{}{} m.Data = map[string]interface{}{}
} }
...@@ -102,7 +104,7 @@ func MatedateSender(ctx context.Context) { ...@@ -102,7 +104,7 @@ func MatedateSender(ctx context.Context) {
mateDatesItems = mateDatesItems[:0] mateDatesItems = mateDatesItems[:0]
mu.Unlock() mu.Unlock()
wp.Serve(currentItems) wp.Serve(currentItems)
}else{ } else {
mu.Unlock() mu.Unlock()
} }
case <-autoTicker.C: case <-autoTicker.C:
...@@ -113,7 +115,7 @@ func MatedateSender(ctx context.Context) { ...@@ -113,7 +115,7 @@ func MatedateSender(ctx context.Context) {
mateDatesItems = mateDatesItems[:0] mateDatesItems = mateDatesItems[:0]
mu.Unlock() mu.Unlock()
wp.Serve(currentItems) wp.Serve(currentItems)
}else{ } else {
mu.Unlock() mu.Unlock()
} }
......
...@@ -3,28 +3,41 @@ module github.com/y7ut/logtransfer ...@@ -3,28 +3,41 @@ module github.com/y7ut/logtransfer
go 1.15 go 1.15
require ( require (
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect github.com/coreos/bbolt v1.3.4 // indirect
github.com/coreos/etcd v3.3.25+incompatible github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/envoyproxy/go-control-plane v0.9.5 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/dustin/go-humanize v1.0.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/go-resty/resty/v2 v2.7.0
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.2.0 // indirect github.com/google/uuid v1.2.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/olivere/elastic/v7 v7.0.24 github.com/olivere/elastic/v7 v7.0.24
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/segmentio/kafka-go v0.4.13 github.com/segmentio/kafka-go v0.4.13
github.com/smartystreets/goconvey v1.7.2 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.uber.org/zap v1.16.0 // indirect go.uber.org/zap v1.16.0 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.3.0 golang.org/x/time v0.3.0 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/tools v0.1.0 // indirect golang.org/x/tools v0.1.0 // indirect
google.golang.org/grpc v1.33.2
gopkg.in/ini.v1 v1.62.0 gopkg.in/ini.v1 v1.62.0
sigs.k8s.io/yaml v1.3.0 // indirect
) )
replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0 replace (
google.golang.org/grpc => google.golang.org/grpc v1.26.0
google.golang.org/grpc v1.29.1 => google.golang.org/grpc v1.26.0
)
replace github.com/coreos/bbolt v1.3.4 => go.etcd.io/bbolt v1.3.4
This diff is collapsed.
package plugin
import (
"context"
"log"
"strings"
"github.com/y7ut/logtransfer/ding"
"github.com/y7ut/logtransfer/entity"
)
// 钉钉通知插件
type Ding Plugin
func (d *Ding) HandleFunc(m *entity.Matedata) error {
log.Println("DINGDING:")
var dingMobileData = []string{}
searchKey := (*d.params)["searchKey"].(string)
searchValue := (*d.params)["searchValue"].(string)
dingWebHook := (*d.params)["dingWebHook"].(string)
dingMobiles := (*d.params)["dingMobiles"].(string)
if dingMobiles != "" {
dingMobileData = strings.Split(dingMobiles, ",")
}
for k, v := range (*m).Data {
if dingWebHook != "" && k == searchKey && (searchValue == "" || searchValue == v) {
err := ding.DingDingNotice(dingWebHook, context.Background(), ding.CreateDingMessage("dev", ding.MESSAGE_TYPE_PLUGIN_NOTICE, m, dingMobileData))
if err != nil {
log.Println(err)
}
}
}
return nil
}
func (d *Ding) SetParams(params string) error {
paramsValue, err := checkParams(params, "searchKey", "searchValue", "dingWebHook", "dingMobiles")
if err != nil {
return err
}
d.params = &paramsValue
return err
}
...@@ -5,4 +5,5 @@ var RegistedPlugins = map[string]Handler{ ...@@ -5,4 +5,5 @@ var RegistedPlugins = map[string]Handler{
"Edit": &Edit{}, "Edit": &Edit{},
"SaveES": &SaveES{}, "SaveES": &SaveES{},
"Alarm": &Alarm{}, "Alarm": &Alarm{},
"Ding": &Ding{},
} }
...@@ -49,7 +49,7 @@ func RegisterManger(c *Customer) { ...@@ -49,7 +49,7 @@ func RegisterManger(c *Customer) {
mu.Unlock() mu.Unlock()
} }
func UnstallManger(topic string){ func UnstallManger(topic string) {
mu.Lock() mu.Lock()
delete(CustomerManger, topic) delete(CustomerManger, topic)
mu.Unlock() mu.Unlock()
...@@ -99,7 +99,7 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -99,7 +99,7 @@ func ReadingMessage(ctx context.Context, c *Customer) {
for { for {
select { select {
case <- ctx.Done(): case <-ctx.Done():
return return
default: default:
m, err := reader.ReadMessage(ctx) m, err := reader.ReadMessage(ctx)
...@@ -121,6 +121,12 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -121,6 +121,12 @@ func ReadingMessage(ctx context.Context, c *Customer) {
} }
matedata, err = c.Format(string(reader.Config().Topic), string(m.Value)) matedata, err = c.Format(string(reader.Config().Topic), string(m.Value))
for _, header := range m.Headers {
if header.Key == "source_agent" {
matedata.SourceAgent = string(header.Value)
}
}
if err != nil { if err != nil {
errMessage.Reset() errMessage.Reset()
errMessage.WriteString("Format Error") errMessage.WriteString("Format Error")
......
...@@ -156,7 +156,8 @@ func TopicWatcherHandle() { ...@@ -156,7 +156,8 @@ func TopicWatcherHandle() {
func sign() <-chan os.Signal { func sign() <-chan os.Signal {
c := make(chan os.Signal, 2) c := make(chan os.Signal, 2)
signals := []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2} //signals := []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2}
signals := []os.Signal{syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT}
// 监听信号 // 监听信号
if !signal.Ignored(syscall.SIGHUP) { if !signal.Ignored(syscall.SIGHUP) {
signals = append(signals, syscall.SIGHUP) signals = append(signals, syscall.SIGHUP)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment