Commit b1573403 authored by 谢宇轩's avatar 谢宇轩

fix: 修复pluginParams调用的异常

parent c0487713
FROM golang:1.16.3-alpine3.13 as build FROM golang:1.16.3 as build
RUN set -ex \ RUN set -ex \
&& go env -w GO111MODULE=on \ && go env -w GO111MODULE=on \
......
### LogTransfer Customer # LogTransfer Customer
## 原理
# 原理
从Etcd中获取所有在注册的配置,收集需要消费的Topic和对应信息。 从Etcd中获取所有在注册的配置,收集需要消费的Topic和对应信息。
# 版本 ## 版本
### 1.0.0 ### 1.0.0
1. 从kafka消费数据 1. 从kafka消费数据
2. 管道的形式处理数据 2. 管道的形式处理数据
3. 提供数据同步到ES的插件 3. 提供数据同步到ES的插件
### 2.0.0 ### 2.0.0
1. 支持动态的安装卸载插件 1. 支持动态的安装卸载插件
### 2.1.0 ### 2.1.0
1. 添加动态的配置监控 1. 添加动态的配置监控
2. 添加ES消息存储的协程池 2. 添加ES消息存储的协程池
### 2.1.1 ### 2.1.1
1. 优化退出信号的监听 1. 优化退出信号的监听
2. 修改ES Save的Deadline 2. 修改ES Save的Deadline
# 安装 ### 2.1.3
```
1. 更好的支持插件的参数
## 安装
```shell
docker build -t logtransfer:version . docker build -t logtransfer:version .
docker run -d --name=LTF logtransfer:version docker run --rm -d -v ~/logtransfer/logtransfer.conf:/app/logtransfer.conf -v ~/logtransfer/transfer.log:/app/log/runtime.log -v /etc/localtime:/etc/localtime --name=logtransfer_ijiwei docker.ijiwei.com/logagent/logtransfer:latest
``` ```
...@@ -192,6 +192,6 @@ func CheckAgentActive(name string) bool { ...@@ -192,6 +192,6 @@ func CheckAgentActive(name string) bool {
return false return false
} }
status := string(resp.Kvs[0].Value) status := string(resp.Kvs[0].Value)
log.Println("it is"+status) log.Println("it is" + status)
return status == "1" return status == "1"
} }
...@@ -4,11 +4,12 @@ type LogTransferConf struct { ...@@ -4,11 +4,12 @@ type LogTransferConf struct {
Kafka `ini:"kafka"` Kafka `ini:"kafka"`
Etcd `ini:"etcd"` Etcd `ini:"etcd"`
Es `ini:"es"` Es `ini:"es"`
Log `ini:"log"`
} }
// kafka 配置 // kafka 配置
type Kafka struct { type Kafka struct {
Address string `ini:"address"` Address string `ini:"address"`
} }
// ETCD 配置 // ETCD 配置
...@@ -18,10 +19,14 @@ type Etcd struct { ...@@ -18,10 +19,14 @@ type Etcd struct {
// Es 属性 // Es 属性
type Es struct { type Es struct {
Address string `ini:"address"` Address string `ini:"address"`
BulkSize int `ini:"bulk_size"` BulkSize int `ini:"bulk_size"`
}
type Log struct {
Keywords string `ini:"service_keyword"`
} }
var ( var (
APPConfig = new(LogTransferConf) APPConfig = new(LogTransferConf)
) )
\ No newline at end of file
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"regexp" "regexp"
"strings" "strings"
"time" "time"
"github.com/y7ut/logtransfer/conf"
) )
type Formater func(string, string) (Matedata, error) type Formater func(string, string) (Matedata, error)
...@@ -31,7 +33,7 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) { ...@@ -31,7 +33,7 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) {
// 给时间调回UTC+8 // 给时间调回UTC+8
logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], time.FixedZone("UTC", 8*3600)) logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], time.FixedZone("UTC", 8*3600))
mateItem.create = logTime mateItem.create = logTime
keyword := serviceWfLogKeyWord keyword := strings.Split(conf.APPConfig.Log.Keywords, ",")
for _, word := range keyword { for _, word := range keyword {
flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word)) flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word))
......
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
var ( var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`) contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
MatePool = sync.Pool{New: func() interface{} { return &Matedata{Data: make(map[string]interface{})} }} MatePool = sync.Pool{New: func() interface{} { return &Matedata{Data: make(map[string]interface{})} }}
messages = make(chan *Matedata, conf.APPConfig.Es.BulkSize) messages = make(chan *Matedata, conf.APPConfig.Es.BulkSize)
) )
...@@ -109,18 +108,18 @@ func MatedateSender(ctx context.Context) { ...@@ -109,18 +108,18 @@ func MatedateSender(ctx context.Context) {
mateDatesItems = mateDatesItems[:0] mateDatesItems = mateDatesItems[:0]
mu.Unlock() mu.Unlock()
wp.Serve(currentItems) wp.Serve(currentItems)
}else{ } else {
mu.Unlock() mu.Unlock()
} }
case <-autoTicker.C: case <-autoTicker.C:
mu.Lock() mu.Lock()
currentItems := mateDatesItems currentItems := mateDatesItems
if len(currentItems) > 0 { if len(currentItems) > 0 {
mateDatesItems = mateDatesItems[:0] mateDatesItems = mateDatesItems[:0]
mu.Unlock() mu.Unlock()
wp.Serve(currentItems) wp.Serve(currentItems)
}else{ } else {
mu.Unlock() mu.Unlock()
} }
......
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"github.com/y7ut/logtransfer/transfer" "github.com/y7ut/logtransfer/transfer"
) )
const version = "2.1.3" const version = "2.1.4"
var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动") var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动")
var v = flag.Bool("v", false, "查看当前程序版本") var v = flag.Bool("v", false, "查看当前程序版本")
......
package plugin package plugin
import ( import (
"fmt"
"log"
"time"
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
) )
// 打印插件 // 警报与监测
type Dump Plugin type Alarm Plugin
func (dump *Dump) HandleFunc(m *entity.Matedata) error {
log.Println("DUMP:")
for k, v := range (*m).Data {
if k == "timestamp" {
// 这里需要回显 假设现在是UTC-8
loc := time.FixedZone("UTC", -8*3600)
createdAt, err := time.ParseInLocation("2006-01-02 15:04:05 ", fmt.Sprintf("%s", v), loc)
if err != nil {
continue
}
// UTC时间就是+8小时
v = createdAt.UTC().Format("2006-01-02 15:04:05")
}
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------") func (alarm *Alarm) HandleFunc(m *entity.Matedata) error {
return nil return nil
} }
func (dump *Dump) SetParams(params string) error { func (alarm *Alarm) SetParams(params string) error {
return nil
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil {
return err
}
alarm.params = &paramsValue
return err
} }
package plugin package plugin
import ( import (
"fmt"
"log"
"time"
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
) )
// 警报与监测 // 打印插件
type Alarm Plugin type Dump Plugin
func (dump *Dump) HandleFunc(m *entity.Matedata) error {
log.Println("DUMP:")
for k, v := range (*m).Data {
if k == "timestamp" {
// 这里需要回显 假设现在是UTC-8
loc := time.FixedZone("UTC", -8*3600)
createdAt, err := time.ParseInLocation("2006-01-02 15:04:05 ", fmt.Sprintf("%s", v), loc)
if err != nil {
continue
}
// UTC时间就是+8小时
v = createdAt.UTC().Format("2006-01-02 15:04:05")
}
fmt.Printf("%s : %s\n", k, v)
}
func (alarm *Alarm) HandleFunc(m *entity.Matedata) error { fmt.Println("------------")
return nil return nil
} }
func (alarm *Alarm) SetParams(params string) error { func (dump *Dump) SetParams(params string) error {
return nil
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil {
return err
}
alarm.params = &paramsValue
return err
} }
...@@ -14,6 +14,8 @@ type Handler interface { ...@@ -14,6 +14,8 @@ type Handler interface {
SetParams(string) error SetParams(string) error
} }
type HandlerConstruct func() Handler
type Plugin struct { type Plugin struct {
params *map[string]interface{} params *map[string]interface{}
// error error // error error
......
package plugin package plugin
type HandlerConstruct func() Handler var RegistedPlugins = getRegistedPlugins()
var RegistedPlugins = map[string]HandlerConstruct{ // 获取注册过的插件插件列表
"Dump": func() Handler { func getRegistedPlugins() map[string]HandlerConstruct {
return &Dump{} return map[string]HandlerConstruct{
}, "Dump": func() Handler {
"Edit": func() Handler { return &Dump{}
return &Edit{} },
}, "Edit": func() Handler {
"SaveES": func() Handler { return &Edit{}
return &SaveES{} },
}, "SaveES": func() Handler {
"Alarm": func() Handler { return &SaveES{}
return &Alarm{} },
}, "Alarm": func() Handler {
return &Alarm{}
},
}
} }
// 加载当前注册过的插件
func LoadRegistedPlugins(plugin string) Handler {
return getRegistedPlugins()[plugin]()
}
\ No newline at end of file
...@@ -492,7 +492,7 @@ func getCollectorChangeWithEvent(confResp clientv3.WatchResponse) (different Col ...@@ -492,7 +492,7 @@ func getCollectorChangeWithEvent(confResp clientv3.WatchResponse) (different Col
func getStatusChangeWithEvent(confResp clientv3.WatchResponse) (collectors []Collector, changeType string, err error) { func getStatusChangeWithEvent(confResp clientv3.WatchResponse) (collectors []Collector, changeType string, err error) {
changeStatus := fmt.Sprintf("%s", confResp.Events[0].Kv.Value) changeStatus := string(confResp.Events[0].Kv.Value)
// 先对比一下 // 先对比一下
oldKey := confResp.Events[0].Kv.Key oldKey := confResp.Events[0].Kv.Key
......
...@@ -16,7 +16,7 @@ func generateTopic(config TopicConfig) *Topic { ...@@ -16,7 +16,7 @@ func generateTopic(config TopicConfig) *Topic {
p := plugin.PipeLine{} p := plugin.PipeLine{}
for _, v := range config.PipelineConfig { for _, v := range config.PipelineConfig {
currentPlugin := plugin.RegistedPlugins[v.Name]() currentPlugin := plugin.LoadRegistedPlugins(v.Name)
err := currentPlugin.SetParams(v.Params) err := currentPlugin.SetParams(v.Params)
if err != nil { if err != nil {
log.Panicln("plugin encode params error:", err) log.Panicln("plugin encode params error:", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment