Commit 65e7f2e2 authored by 李世星's avatar 李世星

fix(合并): 20230213

parents 154ec724 b1573403
Pipeline #16300 passed with stage
in 0 seconds
FROM golang:1.16.3-alpine3.13 as build FROM golang:1.16.3 as build
RUN set -ex \ RUN set -ex \
&& go env -w GO111MODULE=on \ && go env -w GO111MODULE=on \
......
### LogTransfer Customer # LogTransfer Customer
## 原理
# 原理
从Etcd中获取所有在注册的配置,收集需要消费的Topic和对应信息。 从Etcd中获取所有在注册的配置,收集需要消费的Topic和对应信息。
# 版本 ## 版本
### 1.0.0 ### 1.0.0
1. 从kafka消费数据 1. 从kafka消费数据
2. 管道的形式处理数据 2. 管道的形式处理数据
3. 提供数据同步到ES的插件 3. 提供数据同步到ES的插件
### 2.0.0 ### 2.0.0
1. 支持动态的安装卸载插件 1. 支持动态的安装卸载插件
### 2.1.0 ### 2.1.0
1. 添加动态的配置监控 1. 添加动态的配置监控
2. 添加ES消息存储的协程池 2. 添加ES消息存储的协程池
### 2.1.1 ### 2.1.1
1. 优化退出信号的监听 1. 优化退出信号的监听
2. 修改ES Save的Deadline 2. 修改ES Save的Deadline
# 安装 ### 2.1.3
```
1. 更好的支持插件的参数
## 安装
```shell
docker build -t logtransfer:version . docker build -t logtransfer:version .
docker run -d --name=LTF logtransfer:version docker run --rm -d -v ~/logtransfer/logtransfer.conf:/app/logtransfer.conf -v ~/logtransfer/transfer.log:/app/log/runtime.log -v /etc/localtime:/etc/localtime --name=logtransfer_ijiwei docker.ijiwei.com/logagent/logtransfer:latest
``` ```
...@@ -4,11 +4,12 @@ type LogTransferConf struct { ...@@ -4,11 +4,12 @@ type LogTransferConf struct {
Kafka `ini:"kafka"` Kafka `ini:"kafka"`
Etcd `ini:"etcd"` Etcd `ini:"etcd"`
Es `ini:"es"` Es `ini:"es"`
Log `ini:"log"`
} }
// kafka 配置 // kafka 配置
type Kafka struct { type Kafka struct {
Address string `ini:"address"` Address string `ini:"address"`
} }
// ETCD 配置 // ETCD 配置
...@@ -18,10 +19,14 @@ type Etcd struct { ...@@ -18,10 +19,14 @@ type Etcd struct {
// Es 属性 // Es 属性
type Es struct { type Es struct {
Address string `ini:"address"` Address string `ini:"address"`
BulkSize int `ini:"bulk_size"` BulkSize int `ini:"bulk_size"`
}
type Log struct {
Keywords string `ini:"service_keyword"`
} }
var ( var (
APPConfig = new(LogTransferConf) APPConfig = new(LogTransferConf)
) )
\ No newline at end of file
...@@ -6,6 +6,8 @@ import ( ...@@ -6,6 +6,8 @@ import (
"regexp" "regexp"
"strings" "strings"
"time" "time"
"github.com/y7ut/logtransfer/conf"
) )
type Formater func(string, string) (Matedata, error) type Formater func(string, string) (Matedata, error)
...@@ -24,10 +26,14 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) { ...@@ -24,10 +26,14 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) {
mateItem.Level = message[:levelIndex] mateItem.Level = message[:levelIndex]
message = message[levelIndex:] message = message[levelIndex:]
if !strings.Contains(message, "[") {
return *mateItem, fmt.Errorf("message format error without time")
}
// 给时间调回UTC+8 // 给时间调回UTC+8
logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], time.FixedZone("UTC", 8*3600)) logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], time.FixedZone("UTC", 8*3600))
mateItem.create = logTime mateItem.create = logTime
keyword := serviceWfLogKeyWord keyword := strings.Split(conf.APPConfig.Log.Keywords, ",")
for _, word := range keyword { for _, word := range keyword {
flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word)) flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word))
...@@ -60,7 +66,7 @@ func DefaultLog(sourceKey string, message string) (Matedata, error) { ...@@ -60,7 +66,7 @@ func DefaultLog(sourceKey string, message string) (Matedata, error) {
mateItem.Topic = sourceKey mateItem.Topic = sourceKey
mateItem.Index = sourceKey mateItem.Index = sourceKey
mateItem.Data = map[string]interface{}{"message": message} mateItem.Data["message"] = message
result := *mateItem result := *mateItem
MatePool.Put(vMateItem) MatePool.Put(vMateItem)
...@@ -78,6 +84,8 @@ func DefaultJsonLog(sourceKey string, message string) (Matedata, error) { ...@@ -78,6 +84,8 @@ func DefaultJsonLog(sourceKey string, message string) (Matedata, error) {
if err != nil { if err != nil {
return *mateItem, err return *mateItem, err
} }
mateItem.Topic = sourceKey
mateItem.Data = data mateItem.Data = data
result := *mateItem result := *mateItem
......
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
var ( var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`) contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
MatePool = sync.Pool{New: func() interface{} { return &Matedata{Data: make(map[string]interface{})} }} MatePool = sync.Pool{New: func() interface{} { return &Matedata{Data: make(map[string]interface{})} }}
messages = make(chan *Matedata, conf.APPConfig.Es.BulkSize) messages = make(chan *Matedata, conf.APPConfig.Es.BulkSize)
) )
...@@ -55,6 +54,7 @@ func MatedateSender(ctx context.Context) { ...@@ -55,6 +54,7 @@ func MatedateSender(ctx context.Context) {
wp := &ESWorkPool{ wp := &ESWorkPool{
WorkerFunc: func(matedatas []*Matedata) bool { WorkerFunc: func(matedatas []*Matedata) bool {
bulkRequest := esClient.Bulk() bulkRequest := esClient.Bulk()
defer bulkRequest.Reset()
for _, m := range matedatas { for _, m := range matedatas {
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data) indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data)
bulkRequest.Add(indexRequest) bulkRequest.Add(indexRequest)
...@@ -70,6 +70,14 @@ func MatedateSender(ctx context.Context) { ...@@ -70,6 +70,14 @@ func MatedateSender(ctx context.Context) {
return false return false
} }
for _, item := range response.Failed() {
if item.Error == nil {
continue
}
log.Printf("Find Error in ES Result in (%s): %s", item.Index, item.Error.Reason)
return false
}
for _, v := range response.Items { for _, v := range response.Items {
for _, item := range v { for _, item := range v {
if item.Error != nil { if item.Error != nil {
...@@ -78,8 +86,6 @@ func MatedateSender(ctx context.Context) { ...@@ -78,8 +86,6 @@ func MatedateSender(ctx context.Context) {
} }
} }
} }
bulkRequest.Reset()
} }
return true return true
}, },
......
...@@ -63,14 +63,17 @@ func (wp *ESWorkPool) Start() { ...@@ -63,14 +63,17 @@ func (wp *ESWorkPool) Start() {
go func() { go func() {
var scrath []*workerChan var scrath []*workerChan
for { for {
// 清理未使用的时间超过 最大空闲时间的WorkerChan // 因每个worker上存在上次工作时间的标记,因此可以得出协程空闲时间
// 不干活的就得死! // 整理时按照空闲时间为协程排序, 将空闲时间大的排到前面
// 清理未使用的时间超过最大空闲时间的Worker,
// 同时记得要清理在Pool中的注册信息, 办理离职手续,不干活的就得死!
wp.clean(&scrath) wp.clean(&scrath)
// 每隔一段时间检查一次 去进行清理操作,直到下班
select { select {
case <-stopCh: case <-stopCh:
// 工作结束,下班,停止检查摸鱼协程
return return
default: default:
// 每隔一段时间检查一次 去进行清理操作,直到结束工作,下班
time.Sleep(wp.MaxIdleWorkerDuration) time.Sleep(wp.MaxIdleWorkerDuration)
} }
} }
......
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"github.com/y7ut/logtransfer/transfer" "github.com/y7ut/logtransfer/transfer"
) )
const version = "2.1.2" const version = "2.1.5"
var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动") var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动")
var v = flag.Bool("v", false, "查看当前程序版本") var v = flag.Bool("v", false, "查看当前程序版本")
......
package plugin package plugin
import ( import (
"fmt"
"log"
"time"
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
) )
// 打印插件 // 警报与监测
type Dump Plugin type Alarm Plugin
func (dump *Dump) HandleFunc(m *entity.Matedata) error {
log.Println("DUMP:")
for k, v := range (*m).Data {
if k == "timestamp" {
// 这里需要回显 假设现在是UTC-8
loc := time.FixedZone("UTC", -8*3600)
createdAt, err := time.ParseInLocation("2006-01-02 15:04:05 ", fmt.Sprintf("%s", v), loc)
if err != nil {
continue
}
// UTC时间就是+8小时
v = createdAt.UTC().Format("2006-01-02 15:04:05")
}
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------") func (alarm *Alarm) HandleFunc(m *entity.Matedata) error {
return nil return nil
} }
func (dump *Dump) SetParams(params string) error { func (alarm *Alarm) SetParams(params string) error {
return nil
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil {
return err
}
alarm.params = &paramsValue
return err
} }
package plugin package plugin
import ( import (
"fmt"
"log"
"time"
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
) )
// 警报与监测 // 打印插件
type Alarm Plugin type Dump Plugin
func (dump *Dump) HandleFunc(m *entity.Matedata) error {
log.Println("DUMP:")
for k, v := range (*m).Data {
if k == "timestamp" {
// 这里需要回显 假设现在是UTC-8
loc := time.FixedZone("UTC", -8*3600)
createdAt, err := time.ParseInLocation("2006-01-02 15:04:05 ", fmt.Sprintf("%s", v), loc)
if err != nil {
continue
}
// UTC时间就是+8小时
v = createdAt.UTC().Format("2006-01-02 15:04:05")
}
fmt.Printf("%s : %s\n", k, v)
}
func (alarm *Alarm) HandleFunc(m *entity.Matedata) error { fmt.Println("------------")
return nil return nil
} }
func (alarm *Alarm) SetParams(params string) error { func (dump *Dump) SetParams(params string) error {
return nil
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil {
return err
}
alarm.params = &paramsValue
return err
} }
...@@ -14,6 +14,8 @@ type Handler interface { ...@@ -14,6 +14,8 @@ type Handler interface {
SetParams(string) error SetParams(string) error
} }
type HandlerConstruct func() Handler
type Plugin struct { type Plugin struct {
params *map[string]interface{} params *map[string]interface{}
// error error // error error
......
package plugin package plugin
var RegistedPlugins = map[string]Handler{ import "fmt"
"Dump": &Dump{},
"Edit": &Edit{}, var RegistedPlugins = getRegistedPlugins()
"SaveES": &SaveES{},
"Alarm": &Alarm{}, // 获取注册过的插件插件列表
"DingRobot": &Ding{}, func getRegistedPlugins() map[string]HandlerConstruct {
return map[string]HandlerConstruct{
"Dump": func() Handler {
return &Dump{}
},
"Edit": func() Handler {
return &Edit{}
},
"SaveES": func() Handler {
return &SaveES{}
},
"Alarm": func() Handler {
return &Alarm{}
},
"DingRobot": func() Handler {
return &Ding{}
},
}
}
// 加载当前注册过的插件
func LoadRegistedPlugins(plugin string) (Handler, error) {
if getRegistedPlugins()[plugin] == nil {
return nil, fmt.Errorf("not support plugin 【%s】", plugin)
}
return getRegistedPlugins()[plugin](), nil
} }
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
type SaveES Plugin type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error { func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error {
log.Println("SaveES:") log.Printf("SaveES: %s", (*saveEs.params)["index"])
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"]) m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.Data["topic"] = m.Topic m.Data["topic"] = m.Topic
m.Data["level"] = m.Level m.Data["level"] = m.Level
......
...@@ -90,17 +90,17 @@ func ChooseTopic() (map[*Topic]bool, error) { ...@@ -90,17 +90,17 @@ func ChooseTopic() (map[*Topic]bool, error) {
// 收集全部的agent的collector信息 // 收集全部的agent的collector信息
ableTopics := make(map[*Topic]bool) ableTopics := make(map[*Topic]bool)
// 所有当前 // 所有当前的收集任务
collectors, err := LoadCollectors() collectors, err := LoadCollectors()
if err != nil { if err != nil {
return ableTopics, fmt.Errorf("Load Collector error: %s", err) return ableTopics, fmt.Errorf("load Collector error: %s", err)
} }
// 获取所有可用的Topic,用Topic的名字作为索引
topics, err := loadTopics() topics, err := loadTopics()
if err != nil { if err != nil {
return ableTopics, fmt.Errorf("Load Topic error: %s", err) return ableTopics, fmt.Errorf("load Topic error: %s", err)
} }
// 遍历全部的收集任务,取出全部的使用的topic放入集合中
for _, v := range collectors { for _, v := range collectors {
currentTopic := topics[v.Topic] currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true ableTopics[currentTopic] = true
...@@ -127,7 +127,6 @@ func loadTopics() (map[string]*Topic, error) { ...@@ -127,7 +127,6 @@ func loadTopics() (map[string]*Topic, error) {
if err != nil { if err != nil {
return topics, err return topics, err
} }
topics[currentTopicConfig.Name] = generateTopic(currentTopicConfig) topics[currentTopicConfig.Name] = generateTopic(currentTopicConfig)
} }
return topics, nil return topics, nil
...@@ -493,7 +492,7 @@ func getCollectorChangeWithEvent(confResp clientv3.WatchResponse) (different Col ...@@ -493,7 +492,7 @@ func getCollectorChangeWithEvent(confResp clientv3.WatchResponse) (different Col
func getStatusChangeWithEvent(confResp clientv3.WatchResponse) (collectors []Collector, changeType string, err error) { func getStatusChangeWithEvent(confResp clientv3.WatchResponse) (collectors []Collector, changeType string, err error) {
changeStatus := fmt.Sprintf("%s", confResp.Events[0].Kv.Value) changeStatus := string(confResp.Events[0].Kv.Value)
// 先对比一下 // 先对比一下
oldKey := confResp.Events[0].Kv.Key oldKey := confResp.Events[0].Kv.Key
......
...@@ -15,14 +15,12 @@ func generateTopic(config TopicConfig) *Topic { ...@@ -15,14 +15,12 @@ func generateTopic(config TopicConfig) *Topic {
p := plugin.PipeLine{} p := plugin.PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range config.PipelineConfig { for _, v := range config.PipelineConfig {
currentPlugin, ok := plugin.RegistedPlugins[v.Name] currentPlugin, err := plugin.LoadRegistedPlugins(v.Name)
if !ok { if err != nil {
log.Printf("get RegistedPlugins error:%s ", v.Name) log.Panicln("load plugin error:", err)
continue
} }
err := currentPlugin.SetParams(v.Params) err = currentPlugin.SetParams(v.Params)
if err != nil { if err != nil {
log.Panicln("plugin encode params error:", err) log.Panicln("plugin encode params error:", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment