Unverified Commit d10f0729 authored by y7ut's avatar y7ut Committed by GitHub

Merge pull request #1 from y7ut/AddWatchConfig

重构代码结构,确定问题
parents 5bf97d8f fc26f103
......@@ -9,3 +9,11 @@
2. 管道的形式处理数据
3. 提供数据同步到ES的插件
### 2.0.0
1. 支持动态的安装卸载插件
# 安装
```
docker build -t logtransfer:2.0 .
docker run -d --name=LTF logtransfer:2.0
```
......@@ -45,6 +45,7 @@ func initConnect() *clientv3.Client {
return cli
}
// 获取当前所有的任务 (目前在初始化时使用)
func GetAllConfFromEtcd() []EtcdValue {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
......@@ -57,7 +58,7 @@ func GetAllConfFromEtcd() []EtcdValue {
configs := make([]EtcdValue, 0)
for _, etcdResult := range resp.Kvs {
// 根据系统中当前全部的节点名称, 确定节点状态
etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:])
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
......@@ -82,7 +83,7 @@ func GetAllConfFromEtcd() []EtcdValue {
return configs
}
// 加载所有的Topic主题配置信息
func GetAllTopicFromEtcd() []EtcdValue {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
......
......@@ -24,4 +24,4 @@ type Es struct {
var (
APPConfig = new(LogTransferConf)
)
)
\ No newline at end of file
package transfer
package entity
import (
"encoding/json"
"fmt"
"regexp"
"strings"
"sync"
"time"
)
var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
MatePool = sync.Pool{New: func() interface{} { return &Matedate{data: make(map[string]interface{})} }}
)
type Formater func(string, string) (Matedate, error)
type Formater func(string, string) (Matedata, error)
// service错误日志的处理
func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) {
// vMateItem := MatePool.Get()
mateItem := MatePool.Get().(*Matedate)
mateItem := MatePool.Get().(*Matedata)
levelIndex := strings.Index(message, ":")
if levelIndex == -1 {
return *mateItem, fmt.Errorf("message format error")
}
mateItem.Topic= sourceKey
mateItem.Topic = sourceKey
mateItem.Level = message[:levelIndex]
message = message[levelIndex:]
loc, _ := time.LoadLocation("Local")
......@@ -42,15 +35,15 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
if len(curentSub) < 1 {
continue
}
if word == "errmsg"{
mateItem.data["message"] = strings.Replace(contentRegexp.FindStringSubmatch(logContent)[1],`"`,"",-1)
}else{
mateItem.data[word] = contentRegexp.FindStringSubmatch(logContent)[1]
if word == "errmsg" {
mateItem.Data["message"] = strings.Replace(contentRegexp.FindStringSubmatch(logContent)[1], `"`, "", -1)
} else {
mateItem.Data[word] = contentRegexp.FindStringSubmatch(logContent)[1]
}
}
mateItem.data["timestamp"] = mateItem.create
mateItem.Data["timestamp"] = mateItem.create
result := *mateItem
mateItem.reset()
MatePool.Put(mateItem)
......@@ -58,32 +51,32 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
}
// service错误日志的处理
func DefaultLog(sourceKey string, message string) (Matedate, error) {
func DefaultLog(sourceKey string, message string) (Matedata, error) {
vMateItem := MatePool.Get()
mateItem := vMateItem.(*Matedate)
mateItem := vMateItem.(*Matedata)
mateItem.Topic = sourceKey
mateItem.Index = sourceKey
mateItem.data = map[string]interface{}{"message":message}
mateItem.Data = map[string]interface{}{"message": message}
result := *mateItem
MatePool.Put(vMateItem)
return result, nil
}
// Json 格式的错误日志处理
func DefaultJsonLog(sourceKey string, message string) (Matedate, error) {
func DefaultJsonLog(sourceKey string, message string) (Matedata, error) {
vMateItem := MatePool.Get()
mateItem := vMateItem.(*Matedate)
mateItem := vMateItem.(*Matedata)
data := mateItem.data
data := mateItem.Data
err := json.Unmarshal([]byte(message), &data)
if err != nil {
return *mateItem, err
}
mateItem.data = data
mateItem.Data = data
result := *mateItem
MatePool.Put(vMateItem)
......
package entity
import (
"context"
"log"
"regexp"
"sync"
"time"
elastic "github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
)
var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
MatePool = sync.Pool{New: func() interface{} { return &Matedata{Data: make(map[string]interface{})} }}
messages = make(chan *Matedata, conf.APPConfig.Es.BulkSize)
)
type Matedata struct {
Topic string
Index string
Level string
create time.Time
Data map[string]interface{}
}
func (m *Matedata) reset() {
m.Topic = ""
m.Index = ""
m.Level = ""
m.Data = map[string]interface{}{}
}
func HandleMessage(m *Matedata) {
messages <- m
}
func CloseMessageChan() {
close(messages)
}
func MatedateSender(ctx context.Context, esClient *elastic.Client) {
tick := time.NewTicker(3 * time.Second)
var (
SenderMu sync.Mutex
)
bulkRequest := esClient.Bulk()
for {
select {
case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data)
bulkRequest.Add(indexRequest)
case <-tick.C:
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
count := bulkRequest.NumberOfActions()
if count > 0 {
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
}
SenderMu.Unlock()
case <-ctx.Done():
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
SenderMu.Unlock()
log.Println("Exiting...")
return
}
}
}
package plugin
import (
"fmt"
"log"
"github.com/y7ut/logtransfer/entity"
)
// 打印插件
type Dump Plugin
func (dump *Dump) HandleFunc(m *entity.Matedata) error {
log.Println("DUMP:")
for k, v := range (*m).Data {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return nil
}
func (dump *Dump) SetParams(params string) error {
return nil
}
package plugin
import (
"github.com/y7ut/logtransfer/entity"
)
// 警报与监测
type Alarm Plugin
func (alarm *Alarm) HandleFunc(m *entity.Matedata) error {
return nil
}
func (alarm *Alarm) SetParams(params string) error {
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil {
return err
}
alarm.params = &paramsValue
return err
}
package plugin
import (
"fmt"
"log"
"github.com/y7ut/logtransfer/entity"
)
// 修改插件
type Edit Plugin
func (edit *Edit) HandleFunc(m *entity.Matedata) error {
log.Println("EDIT:")
if edit.params == nil {
return fmt.Errorf("please set params first")
}
key := (*edit.params)["key"]
value := (*edit.params)["value"]
(*m).Data[key.(string)] = value
return nil
}
func (edit *Edit) SetParams(params string) error {
paramsValue, err := checkParams(params, "key", "value")
if err != nil {
return err
}
edit.params = &paramsValue
return err
}
package plugin
import (
"encoding/json"
"errors"
"log"
"strings"
"github.com/y7ut/logtransfer/entity"
)
type Handler interface {
HandleFunc(*entity.Matedata) error
SetParams(string) error
}
type Plugin struct {
params *map[string]interface{}
// error error
}
type PipeLine struct {
pipe []*Handler
}
func (p *PipeLine) AppendPlugin(plugin Handler) {
p.pipe = append(p.pipe, &plugin)
}
func (p *PipeLine) Enter(m entity.Matedata) {
for _, plugin := range p.pipe {
err := (*plugin).HandleFunc(&m)
if err != nil {
log.Println(err)
}
}
}
func checkParams(paramsJson string, key ...string) (value map[string]interface{}, err error) {
err = json.Unmarshal([]byte(paramsJson), &value)
if err != nil {
return value, err
}
var errMessage strings.Builder
var errCheck bool
errMessage.WriteString("Plugin params errors: ")
for _, checkItem := range key {
if value[checkItem] == nil {
errCheck = true
errMessage.WriteString(checkItem)
}
}
if errCheck {
return value, errors.New(errMessage.String())
}
return value, nil
}
package plugin
var RegistedPlugins = map[string]Handler{
"Dump": &Dump{},
"Edit": &Edit{},
"SaveES": &SaveES{},
"Alarm": &Alarm{},
}
package plugin
import (
"fmt"
"log"
"github.com/y7ut/logtransfer/entity"
)
// 修改插件
type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error {
log.Println("SaveES:")
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.Data["topic"] = m.Topic
m.Data["level"] = m.Level
entity.HandleMessage(m)
return nil
}
func (saveEs *SaveES) SetParams(params string) error {
paramsValue, err := checkParams(params, "index")
if err != nil {
return err
}
saveEs.params = &paramsValue
return err
}
package transfer
package source
import (
"encoding/json"
"log"
"github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
)
type Collector struct {
......@@ -16,8 +18,8 @@ type Collector struct {
type Topic struct {
Name string
Label string
PipeLine *PipeLine
Format Formater
PipeLine *plugin.PipeLine
Format entity.Formater
}
type TopicConfig struct {
......@@ -34,7 +36,7 @@ type PipeLinePluginsConfig struct {
}
// 加载所有的collector
func loadCollectors() []Collector {
func LoadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd()
collectors := make([]Collector, 0)
for _, v := range configs {
......@@ -52,6 +54,21 @@ func loadCollectors() []Collector {
return collectors
}
// 收集所有需要监听的topic
func ChooseTopic() map[*Topic]bool {
collector := LoadCollectors()
topics := loadTopics()
ableTopics := make(map[*Topic]bool)
for _, v := range collector {
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
return ableTopics
}
// 解析全部的Topic并加载内部的格式器和插件pipeline
func loadTopics() map[string]*Topic {
configs := conf.GetAllTopicFromEtcd()
......@@ -69,44 +86,30 @@ func loadTopics() map[string]*Topic {
log.Printf("get topic setting error:%s ", currentTopic.Label)
}
p := PipeLine{}
p := plugin.PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range currentTopic.PipelineConfig {
currentPlugin := pluginsBoard[v.Name]
err := currentPlugin.setParams(v.Params)
currentPlugin := plugin.RegistedPlugins[v.Name]
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.appendPlugin(currentPlugin)
p.AppendPlugin(currentPlugin)
}
var formatMethod Formater
var formatMethod entity.Formater
switch currentTopic.Format {
case 1:
formatMethod = DefaultJsonLog
formatMethod = entity.DefaultJsonLog
case 2:
formatMethod = FormatServiceWfLog
formatMethod = entity.FormatServiceWfLog
default:
formatMethod = DefaultLog
formatMethod = entity.DefaultLog
}
topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
}
return topics
}
// 收集所有需要监听的topic
func ChooseTopic() map[*Topic]bool {
collector := loadCollectors()
topics := loadTopics()
ableTopics := make(map[*Topic]bool)
for _, v := range collector {
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
return ableTopics
}
package source
import (
"context"
"log"
"strings"
"sync"
"github.com/segmentio/kafka-go"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
)
var (
CustomerManger = make(map[string]*Customer)
mu sync.Mutex
)
// 消费处理器
type Customer struct {
Reader *kafka.Reader // 特定消费者的专属Kafka Reader (我从哪里来)
HandlePipeline *plugin.PipeLine // 从Topic中构建的Piepline (要到那里去)
Format entity.Formater // 解析元数据的格式器 (变形记。。)
done chan struct{} // 结束标志
}
// 结束一个消费处理器
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
// 结束信号监听
func (c Customer) Listen() chan struct{} {
return c.done
}
// 初始化一个消费处理器
func InitCustomer(topic *Topic) *Customer{
GroupID := topic.Name + "_group"
r := InitReader(topic.Name, GroupID)
log.Printf("Check Customer group of [%s] success!", GroupID)
return &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
}
// 全局的注册当前工作的消费处理器
func RegisterManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
// 根据topic快速获取消费处理器 目前用于关闭消费处理器
func GetCustomer(topic string) (customer *Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
// 获取全部的注册过的消费处理器 所使用的的Topic名字
func GetRegisterTopics() (topics []string) {
mu.Lock()
for topic := range CustomerManger {
topics = append(topics, topic)
}
mu.Unlock()
return topics
}
// 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(ctx context.Context, c *Customer) {
defer c.Reader.Close()
log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID)
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var errMessage strings.Builder
// log.Println(c.HandlePipeline.pipe)
var matedata entity.Matedata
for {
select {
case <-c.Listen():
// 监听需要关闭的信号
c.Exit()
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
// // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// // 这里使用阻塞的上下文
m, err := c.Reader.ReadMessage(ctx)
if err != nil {
// 退出
// c.Exit()
errMessage.Reset()
errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
matedata, err = c.Format(string(c.Reader.Config().Topic), string(m.Value))
if err != nil {
errMessage.Reset()
errMessage.WriteString("Format Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
// 流入pipe
c.HandlePipeline.Enter(matedata)
}
}
}
package queue
package source
import (
"log"
......@@ -37,3 +37,4 @@ func CreateCustomerGroup(topic string, groupId string) {
log.Println("create CustomerGroup error:", err)
}
}
package transfer
import "github.com/segmentio/kafka-go"
type Customer struct {
Reader *kafka.Reader
HandlePipeline *PipeLine
Format Formater
done chan struct{}
}
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
func (c Customer) Listen() chan struct{} {
return c.done
}
func registerManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
func getCustomer(topic string) (customer *Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
// func uninstallCustomer(topic string) {
// mu.Lock()
// delete(CustomerManger, topic)
// mu.Unlock()
// }
......@@ -6,35 +6,22 @@ import (
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
elastic "github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/queue"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/source"
)
var (
Start = make(chan *Customer)
Start = make(chan *source.Customer)
Close = make(chan string)
CustomerManger = make(map[string]*Customer)
MaxRetryTime = 10
mu sync.Mutex
closeWg sync.WaitGroup
messages = make(chan *Matedate, conf.APPConfig.Es.BulkSize)
)
func getRegisterTopics() (topics []string) {
mu.Lock()
for topic := range CustomerManger {
topics = append(topics, topic)
}
mu.Unlock()
return topics
}
// 核心启动
func Run(confPath string) {
// 加载配置
......@@ -49,47 +36,47 @@ func Run(confPath string) {
// 做一个master的上下文
ctx, cancel := context.WithCancel(context.Background())
//
// 启动es消息发送器
for i := 0; i < 3; i++ {
go MatedateSender(ctx, esClient)
go entity.MatedateSender(ctx, esClient)
}
// 用于处理启动与关闭消费处理器的信号通知
go func() {
for {
select {
case customer := <-Start:
registerManger(customer)
go ReadingMessage(ctx, customer)
source.RegisterManger(customer)
go source.ReadingMessage(ctx, customer)
case closer := <-Close:
c, ok := getCustomer(closer)
c, ok := source.GetCustomer(closer)
if !ok {
log.Printf(" Customer %s unstall Failed ", closer)
}
c.Exit()
closeWg.Done()
}
}
}()
for topic := range ChooseTopic() {
GroupID := topic.Name + "_group"
r := queue.InitReader(topic.Name, GroupID)
currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
log.Printf("Check Customer group of [%s] success!", GroupID)
// TODO: 动态的注册customer,目前仅支持初始化的时候来加载
for topic := range source.ChooseTopic() {
currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer
}
// TODO: 还要监听Topic的配置变更
// 目前是通过topic的name来注册所有的消费处理器
// 所以直接给对应的topic中的customer重启就可以杀了就可以了
for sign := range sign() {
switch sign {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM:
log.Println("Safe Exit with:", sign)
currentTopics := getRegisterTopics()
currentTopics := source.GetRegisterTopics()
for _, topic := range currentTopics {
......@@ -98,7 +85,7 @@ func Run(confPath string) {
log.Printf(" Customer %s unstalling...", topic)
closeWg.Wait()
}
close(messages)
entity.CloseMessageChan()
log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0)
......@@ -114,98 +101,3 @@ func sign() <-chan os.Signal {
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
return c
}
// 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(ctx context.Context, c *Customer) {
defer c.Reader.Close()
log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID)
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var errMessage strings.Builder
// log.Println(c.HandlePipeline.pipe)
var matedata Matedate
for {
select {
case <-c.Listen():
// 监听需要关闭的信号
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
// // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// // 这里使用阻塞的上下文
m, err := c.Reader.ReadMessage(ctx)
if err != nil {
// 退出
// c.Exit()
errMessage.Reset()
errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
matedata, err = c.Format(string(c.Reader.Config().Topic), string(m.Value))
if err != nil {
errMessage.Reset()
errMessage.WriteString("Format Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
// 流入pipe
c.HandlePipeline.Enter(matedata)
}
}
}
func MatedateSender(ctx context.Context, esClient *elastic.Client) {
tick := time.NewTicker(3 * time.Second)
var (
SenderMu sync.Mutex
)
bulkRequest := esClient.Bulk()
for {
select {
case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.data)
bulkRequest.Add(indexRequest)
case <-tick.C:
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
count := bulkRequest.NumberOfActions()
if count > 0 {
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
}
SenderMu.Unlock()
case <-ctx.Done():
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
SenderMu.Unlock()
log.Println("Exiting...")
return
}
}
}
package transfer
import (
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"time"
)
var pluginsBoard = map[string]Handler{
"Dump": &Dump{},
"Edit": &Edit{},
"SaveES": &SaveES{},
}
type Matedate struct {
Topic string
Index string
Level string
create time.Time
data map[string]interface{}
}
func (m *Matedate) reset() {
m.Topic = ""
m.Index = ""
m.Level = ""
m.data = map[string]interface{}{}
}
// type Matedate map[string]interface{}
type Handler interface {
HandleFunc(*Matedate) error
setParams(string) error
}
type Plugin struct {
params *map[string]interface{}
// error error
}
type PipeLine struct {
pipe []*Handler
}
func (p *PipeLine) appendPlugin(plugin Handler) {
p.pipe = append(p.pipe, &plugin)
}
func (p *PipeLine) Enter(m Matedate) {
for _, plugin := range p.pipe {
err := (*plugin).HandleFunc(&m)
if err != nil {
log.Println(err)
}
}
}
// 打印插件
type Dump Plugin
func (dump *Dump) HandleFunc(m *Matedate) error {
log.Println("DUMP:")
for k, v := range (*m).data {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return nil
}
func (dump *Dump) setParams(params string) error {
return nil
}
// 修改插件
type Edit Plugin
func (edit *Edit) HandleFunc(m *Matedate) error {
log.Println("EDIT:")
if edit.params == nil {
return fmt.Errorf("please set params first")
}
key := (*edit.params)["key"]
value := (*edit.params)["value"]
(*m).data[key.(string)] = value
return nil
}
func (edit *Edit) setParams(params string) error {
paramsValue, err := checkParams(params, "key", "value")
if err != nil{
return err
}
edit.params = &paramsValue
return err
}
// 修改插件
type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *Matedate) error {
log.Println("SaveES:")
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.data["topic"] = m.Topic
m.data["level"] = m.Level
messages <- m
return nil
}
func (saveEs *SaveES) setParams(params string) error {
paramsValue, err := checkParams(params, "index")
if err != nil{
return err
}
saveEs.params = &paramsValue
return err
}
// 警报与监测
type Alarm Plugin
func (alarm *Alarm) HandleFunc(m *Matedate) error {
return nil
}
func (alarm *Alarm) setParams(params string) error {
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil{
return err
}
alarm.params = &paramsValue
return err
}
func checkParams(paramsJson string, key ...string) (value map[string]interface{}, err error) {
err = json.Unmarshal([]byte(paramsJson), &value)
if err != nil {
return value, err
}
var errMessage strings.Builder
var errCheck bool
errMessage.WriteString("Plugin params errors: ")
for _, checkItem := range key {
if value[checkItem] == nil {
errCheck = true
errMessage.WriteString(checkItem)
}
}
if errCheck {
return value, errors.New(errMessage.String())
}
return value, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment