Commit c0487713 authored by 谢宇轩's avatar 谢宇轩

修复了插件实例的问题

parent dfceb499
...@@ -24,6 +24,10 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) { ...@@ -24,6 +24,10 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) {
mateItem.Level = message[:levelIndex] mateItem.Level = message[:levelIndex]
message = message[levelIndex:] message = message[levelIndex:]
if !strings.Contains(message, "[") {
return *mateItem, fmt.Errorf("message format error without time")
}
// 给时间调回UTC+8 // 给时间调回UTC+8
logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], time.FixedZone("UTC", 8*3600)) logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], time.FixedZone("UTC", 8*3600))
mateItem.create = logTime mateItem.create = logTime
...@@ -60,7 +64,7 @@ func DefaultLog(sourceKey string, message string) (Matedata, error) { ...@@ -60,7 +64,7 @@ func DefaultLog(sourceKey string, message string) (Matedata, error) {
mateItem.Topic = sourceKey mateItem.Topic = sourceKey
mateItem.Index = sourceKey mateItem.Index = sourceKey
mateItem.Data = map[string]interface{}{"message": message} mateItem.Data["message"] = message
result := *mateItem result := *mateItem
MatePool.Put(vMateItem) MatePool.Put(vMateItem)
...@@ -78,6 +82,8 @@ func DefaultJsonLog(sourceKey string, message string) (Matedata, error) { ...@@ -78,6 +82,8 @@ func DefaultJsonLog(sourceKey string, message string) (Matedata, error) {
if err != nil { if err != nil {
return *mateItem, err return *mateItem, err
} }
mateItem.Topic = sourceKey
mateItem.Data = data mateItem.Data = data
result := *mateItem result := *mateItem
......
...@@ -53,6 +53,7 @@ func MatedateSender(ctx context.Context) { ...@@ -53,6 +53,7 @@ func MatedateSender(ctx context.Context) {
wp := &ESWorkPool{ wp := &ESWorkPool{
WorkerFunc: func(matedatas []*Matedata) bool { WorkerFunc: func(matedatas []*Matedata) bool {
bulkRequest := esClient.Bulk() bulkRequest := esClient.Bulk()
defer bulkRequest.Reset()
for _, m := range matedatas { for _, m := range matedatas {
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data) indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data)
bulkRequest.Add(indexRequest) bulkRequest.Add(indexRequest)
...@@ -68,6 +69,14 @@ func MatedateSender(ctx context.Context) { ...@@ -68,6 +69,14 @@ func MatedateSender(ctx context.Context) {
return false return false
} }
for _, item := range response.Failed() {
if item.Error == nil {
continue
}
log.Printf("Find Error in ES Result in (%s): %s", item.Index, item.Error.Reason)
return false
}
for _, v := range response.Items { for _, v := range response.Items {
for _, item := range v { for _, item := range v {
if item.Error != nil { if item.Error != nil {
...@@ -76,8 +85,6 @@ func MatedateSender(ctx context.Context) { ...@@ -76,8 +85,6 @@ func MatedateSender(ctx context.Context) {
} }
} }
} }
bulkRequest.Reset()
} }
return true return true
}, },
......
...@@ -63,14 +63,17 @@ func (wp *ESWorkPool) Start() { ...@@ -63,14 +63,17 @@ func (wp *ESWorkPool) Start() {
go func() { go func() {
var scrath []*workerChan var scrath []*workerChan
for { for {
// 清理未使用的时间超过 最大空闲时间的WorkerChan // 因每个worker上存在上次工作时间的标记,因此可以得出协程空闲时间
// 不干活的就得死! // 整理时按照空闲时间为协程排序, 将空闲时间大的排到前面
// 清理未使用的时间超过最大空闲时间的Worker,
// 同时记得要清理在Pool中的注册信息, 办理离职手续,不干活的就得死!
wp.clean(&scrath) wp.clean(&scrath)
// 每隔一段时间检查一次 去进行清理操作,直到下班
select { select {
case <-stopCh: case <-stopCh:
// 工作结束,下班,停止检查摸鱼协程
return return
default: default:
// 每隔一段时间检查一次 去进行清理操作,直到结束工作,下班
time.Sleep(wp.MaxIdleWorkerDuration) time.Sleep(wp.MaxIdleWorkerDuration)
} }
} }
......
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
"github.com/y7ut/logtransfer/transfer" "github.com/y7ut/logtransfer/transfer"
) )
const version = "2.1.2" const version = "2.1.3"
var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动") var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动")
var v = flag.Bool("v", false, "查看当前程序版本") var v = flag.Bool("v", false, "查看当前程序版本")
......
package plugin package plugin
var RegistedPlugins = map[string]Handler{ type HandlerConstruct func() Handler
"Dump": &Dump{},
"Edit": &Edit{}, var RegistedPlugins = map[string]HandlerConstruct{
"SaveES": &SaveES{}, "Dump": func() Handler {
"Alarm": &Alarm{}, return &Dump{}
},
"Edit": func() Handler {
return &Edit{}
},
"SaveES": func() Handler {
return &SaveES{}
},
"Alarm": func() Handler {
return &Alarm{}
},
} }
...@@ -11,7 +11,7 @@ import ( ...@@ -11,7 +11,7 @@ import (
type SaveES Plugin type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error { func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error {
log.Println("SaveES:") log.Printf("SaveES: %s", (*saveEs.params)["index"])
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"]) m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.Data["topic"] = m.Topic m.Data["topic"] = m.Topic
m.Data["level"] = m.Level m.Data["level"] = m.Level
......
...@@ -90,17 +90,17 @@ func ChooseTopic() (map[*Topic]bool, error) { ...@@ -90,17 +90,17 @@ func ChooseTopic() (map[*Topic]bool, error) {
// 收集全部的agent的collector信息 // 收集全部的agent的collector信息
ableTopics := make(map[*Topic]bool) ableTopics := make(map[*Topic]bool)
// 所有当前 // 所有当前的收集任务
collectors, err := LoadCollectors() collectors, err := LoadCollectors()
if err != nil { if err != nil {
return ableTopics, fmt.Errorf("Load Collector error: %s", err) return ableTopics, fmt.Errorf("load Collector error: %s", err)
} }
// 获取所有可用的Topic,用Topic的名字作为索引
topics, err := loadTopics() topics, err := loadTopics()
if err != nil { if err != nil {
return ableTopics, fmt.Errorf("Load Topic error: %s", err) return ableTopics, fmt.Errorf("load Topic error: %s", err)
} }
// 遍历全部的收集任务,取出全部的使用的topic放入集合中
for _, v := range collectors { for _, v := range collectors {
currentTopic := topics[v.Topic] currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true ableTopics[currentTopic] = true
...@@ -127,7 +127,6 @@ func loadTopics() (map[string]*Topic, error) { ...@@ -127,7 +127,6 @@ func loadTopics() (map[string]*Topic, error) {
if err != nil { if err != nil {
return topics, err return topics, err
} }
topics[currentTopicConfig.Name] = generateTopic(currentTopicConfig) topics[currentTopicConfig.Name] = generateTopic(currentTopicConfig)
} }
return topics, nil return topics, nil
......
...@@ -15,9 +15,8 @@ func generateTopic(config TopicConfig) *Topic { ...@@ -15,9 +15,8 @@ func generateTopic(config TopicConfig) *Topic {
p := plugin.PipeLine{} p := plugin.PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range config.PipelineConfig { for _, v := range config.PipelineConfig {
currentPlugin := plugin.RegistedPlugins[v.Name] currentPlugin := plugin.RegistedPlugins[v.Name]()
err := currentPlugin.SetParams(v.Params) err := currentPlugin.SetParams(v.Params)
if err != nil { if err != nil {
log.Panicln("plugin encode params error:", err) log.Panicln("plugin encode params error:", err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment