Commit 680ac7fa authored by 谢宇轩's avatar 谢宇轩 😅

新增插件激活列表

parent 5e30a45a
package plugin
var RegistedPlugins = map[string]Handler{
"Dump": &Dump{},
"Edit": &Edit{},
"SaveES": &SaveES{},
"Alarm": &Alarm{},
}
......@@ -54,48 +54,62 @@ func LoadCollectors() []Collector {
return collectors
}
// func loadTopics() map[string]*Topic {
// configs := conf.GetAllTopicFromEtcd()
// topics := make(map[string]*Topic)
// for _, v := range configs {
// var currentTopic TopicConfig
// err := json.Unmarshal(v, &currentTopic)
// if err != nil {
// log.Printf("json decode config(%s) err : err: %s", v, err)
// }
// log.Printf("Init Topic:%s ", currentTopic.Label)
// if currentTopic.PipelineConfig == nil {
// log.Printf("get topic setting error:%s ", currentTopic.Label)
// }
// p := plugin.PipeLine{}
// // log.Println("get config", currentTopic.PipelineConfig)
// for _, v := range currentTopic.PipelineConfig {
// currentPlugin := plugins.pluginsBoard[v.Name]
// err := currentPlugin.SetParams(v.Params)
// if err != nil {
// log.Panicln("plugin encode params error:", err)
// }
// p.AppendPlugin(currentPlugin)
// }
// var formatMethod entity.Formater
// switch currentTopic.Format {
// case 1:
// formatMethod = entity.DefaultJsonLog
// case 2:
// formatMethod = entity.FormatServiceWfLog
// default:
// formatMethod = entity.DefaultLog
// }
// topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
// }
// return topics
// }
// 收集所有需要监听的topic
func ChooseTopic() map[*Topic]bool {
collector := LoadCollectors()
topics := loadTopics()
ableTopics := make(map[*Topic]bool)
for _, v := range collector {
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
return ableTopics
}
// 解析全部的Topic并加载内部的格式器和插件pipeline
func loadTopics() map[string]*Topic {
configs := conf.GetAllTopicFromEtcd()
topics := make(map[string]*Topic)
for _, v := range configs {
var currentTopic TopicConfig
err := json.Unmarshal(v, &currentTopic)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
}
log.Printf("Init Topic:%s ", currentTopic.Label)
if currentTopic.PipelineConfig == nil {
log.Printf("get topic setting error:%s ", currentTopic.Label)
}
p := plugin.PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range currentTopic.PipelineConfig {
currentPlugin := plugin.RegistedPlugins[v.Name]
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.AppendPlugin(currentPlugin)
}
var formatMethod entity.Formater
switch currentTopic.Format {
case 1:
formatMethod = entity.DefaultJsonLog
case 2:
formatMethod = entity.FormatServiceWfLog
default:
formatMethod = entity.DefaultLog
}
topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
}
return topics
}
......@@ -2,7 +2,6 @@ package transfer
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
......@@ -14,7 +13,6 @@ import (
"github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
"github.com/y7ut/logtransfer/source"
)
......@@ -76,7 +74,7 @@ func Run(confPath string) {
}()
// TODO: 动态的注册customer,目前仅支持初始化的时候来加载
for topic := range ChooseTopic() {
for topic := range source.ChooseTopic() {
currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer
}
......@@ -111,63 +109,3 @@ func sign() <-chan os.Signal {
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
return c
}
// 收集所有需要监听的topic
func ChooseTopic() map[*source.Topic]bool {
collector := source.LoadCollectors()
topics := loadTopics()
ableTopics := make(map[*source.Topic]bool)
for _, v := range collector {
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
return ableTopics
}
// 解析全部的Topic并加载内部的格式器和插件pipeline
func loadTopics() map[string]*source.Topic {
configs := conf.GetAllTopicFromEtcd()
topics := make(map[string]*source.Topic)
for _, v := range configs {
var currentTopic source.TopicConfig
err := json.Unmarshal(v, &currentTopic)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
}
log.Printf("Init Topic:%s ", currentTopic.Label)
if currentTopic.PipelineConfig == nil {
log.Printf("get topic setting error:%s ", currentTopic.Label)
}
p := plugin.PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range currentTopic.PipelineConfig {
currentPlugin := pluginsBoard[v.Name]
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.AppendPlugin(currentPlugin)
}
var formatMethod entity.Formater
switch currentTopic.Format {
case 1:
formatMethod = entity.DefaultJsonLog
case 2:
formatMethod = entity.FormatServiceWfLog
default:
formatMethod = entity.DefaultLog
}
topics[currentTopic.Name] = &source.Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
}
return topics
}
package transfer
import(
"github.com/y7ut/logtransfer/plugin"
)
var pluginsBoard = map[string]plugin.Handler{
"Dump": &plugin.Dump{},
"Edit": &plugin.Edit{},
"SaveES": &plugin.SaveES{},
"Alarm": &plugin.Alarm{},
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment