Commit 4decf249 authored by 谢宇轩's avatar 谢宇轩 😅

修改了一些插件的规则

parent 3468a4ee
...@@ -35,15 +35,22 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) { ...@@ -35,15 +35,22 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
mateItem.create = logTime mateItem.create = logTime
keyword := serviceWfLogKeyWord keyword := serviceWfLogKeyWord
for _, word := range keyword { for _, word := range keyword {
flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word)) flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word))
logContent := flysnowRegexp.FindString(message) logContent := flysnowRegexp.FindString(message)
curentSub := contentRegexp.FindStringSubmatch(logContent) curentSub := contentRegexp.FindStringSubmatch(logContent)
if len(curentSub) < 1 { if len(curentSub) < 1 {
continue continue
} }
mateItem.data[word] = contentRegexp.FindStringSubmatch(logContent)[1] if word == "errmsg"{
mateItem.data["message"] = strings.Replace(contentRegexp.FindStringSubmatch(logContent)[1],`"`,"",-1)
}else{
mateItem.data[word] = contentRegexp.FindStringSubmatch(logContent)[1]
}
} }
mateItem.data["timestamp"] = mateItem.create
result := *mateItem result := *mateItem
mateItem.reset() mateItem.reset()
MatePool.Put(mateItem) MatePool.Put(mateItem)
......
...@@ -176,6 +176,8 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) { ...@@ -176,6 +176,8 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) {
for { for {
select { select {
case m := <-messages: case m := <-messages:
log.Printf("created message : %s : \n", m.Index)
log.Println(m.data)
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.data) indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.data)
bulkRequest.Add(indexRequest) bulkRequest.Add(indexRequest)
...@@ -183,6 +185,7 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) { ...@@ -183,6 +185,7 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) {
// Do sends the bulk requests to Elasticsearch // Do sends the bulk requests to Elasticsearch
SenderMu.Lock() SenderMu.Lock()
count := bulkRequest.NumberOfActions() count := bulkRequest.NumberOfActions()
if count > 0 { if count > 0 {
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions()) log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx) _, err := bulkRequest.Do(ctx)
......
...@@ -2,14 +2,16 @@ package transfer ...@@ -2,14 +2,16 @@ package transfer
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"log" "log"
"strings"
"time" "time"
) )
var pluginsBoard = map[string]Handler{ var pluginsBoard = map[string]Handler{
"Dump": &Dump{}, "Dump": &Dump{},
"Edit": &Edit{}, "Edit": &Edit{},
"SaveES": &SaveES{}, "SaveES": &SaveES{},
} }
...@@ -85,30 +87,15 @@ func (edit *Edit) HandleFunc(m *Matedate) error { ...@@ -85,30 +87,15 @@ func (edit *Edit) HandleFunc(m *Matedate) error {
key := (*edit.params)["key"] key := (*edit.params)["key"]
value := (*edit.params)["value"] value := (*edit.params)["value"]
(*m).data[key.(string)] = value (*m).data[key.(string)] = value
(*m).data["eidt"] = 1
return nil return nil
} }
func (edit *Edit) setParams(params string) error { func (edit *Edit) setParams(params string) error {
var paramsValue map[string]interface{}
var checkKeyString bool
var checkValueString bool
err := json.Unmarshal([]byte(params), &paramsValue)
for key := range paramsValue {
if key == "key" {
checkKeyString = true
}
if key == "value" {
checkValueString = true
}
}
if !(checkKeyString && checkValueString) {
return fmt.Errorf("please set params true")
}
paramsValue, err := checkParams(params, "key", "value")
if err != nil{
return err
}
edit.params = &paramsValue edit.params = &paramsValue
return err return err
} }
...@@ -118,10 +105,60 @@ type SaveES Plugin ...@@ -118,10 +105,60 @@ type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *Matedate) error { func (saveEs *SaveES) HandleFunc(m *Matedate) error {
log.Println("SaveES:") log.Println("SaveES:")
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.data["topic"] = m.Topic
m.data["level"] = m.Level
messages <- m messages <- m
return nil return nil
} }
func (saveEs *SaveES) setParams(params string) error { func (saveEs *SaveES) setParams(params string) error {
paramsValue, err := checkParams(params, "index")
if err != nil{
return err
}
saveEs.params = &paramsValue
return err
}
// 警报与监测
type Alarm Plugin
func (alarm *Alarm) HandleFunc(m *Matedate) error {
return nil return nil
} }
func (alarm *Alarm) setParams(params string) error {
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil{
return err
}
alarm.params = &paramsValue
return err
}
func checkParams(paramsJson string, key ...string) (value map[string]interface{}, err error) {
err = json.Unmarshal([]byte(paramsJson), &value)
if err != nil {
return value, err
}
var errMessage strings.Builder
var errCheck bool
errMessage.WriteString("Plugin params errors: ")
for _, checkItem := range key {
if value[checkItem] == nil {
errCheck = true
errMessage.WriteString(checkItem)
}
}
if errCheck {
return value, errors.New(errMessage.String())
}
return value, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment