Commit 37a9bead authored by 谢宇轩's avatar 谢宇轩 😅

添加基础的插件功能

parent 77ec5f8f
......@@ -14,9 +14,10 @@ import (
var (
configPath = "/logagent/config/"
statusPath = "/logagent/active/"
topicPath = "/logagent/topic/"
)
type LogagentConfig []byte
type EtcdValue []byte
var cli *clientv3.Client
......@@ -31,8 +32,9 @@ func Init(confPath string) {
func initConnect() *clientv3.Client {
addressList := strings.Split(APPConfig.Etcd.Address, ",")
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{APPConfig.Etcd.Address},
Endpoints: addressList,
DialTimeout: 5 * time.Second,
})
if err != nil {
......@@ -43,7 +45,7 @@ func initConnect() *clientv3.Client {
return cli
}
func GetAllConfFromEtcd() []LogagentConfig {
func GetAllConfFromEtcd() []EtcdValue {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
......@@ -52,12 +54,10 @@ func GetAllConfFromEtcd() []LogagentConfig {
panic(fmt.Sprintf("get failed, err:%s \n", err))
}
configs := make([]LogagentConfig, 0)
configs := make([]EtcdValue, 0)
for _, etcdResult := range resp.Kvs {
etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:])
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
......@@ -82,6 +82,25 @@ func GetAllConfFromEtcd() []LogagentConfig {
return configs
}
func GetAllTopicFromEtcd() []EtcdValue {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, topicPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
panic(fmt.Sprintf("get failed, err:%s \n", err))
}
configs := make([]EtcdValue, 0)
for _, etcdResult := range resp.Kvs {
configs = append(configs, etcdResult.Value)
}
return configs
}
func WatchLogConfToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), configPath, clientv3.WithPrefix())
......
#### LogTransfer
# Kafka配置
[kafka]
address=
# Kafka配置
[etcd]
address=
# ES 配置
[es]
address=
bulk_size=50
......@@ -13,6 +13,26 @@ type Collector struct {
Topic string `json:"topic"`
}
type Topic struct {
Name string
Label string
PipeLine *PipeLine
Format Formater
}
type TopicConfig struct {
Format int `json:"format"`
Label string `json:"label"`
Name string `json:"name"`
PipelineConfig []PipeLinePluginsConfig `json:"piepline"`
}
type PipeLinePluginsConfig struct {
Label string `json:"label"`
Name string `json:"name"`
Params string `json:"params"`
}
// 加载所有的collector
func loadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd()
......@@ -24,7 +44,7 @@ func loadCollectors() []Collector {
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
}
if currentCollector !=nil {
if currentCollector != nil {
log.Printf("Init config:%s ", v)
collectors = append(collectors, currentCollector...)
}
......@@ -32,12 +52,74 @@ func loadCollectors() []Collector {
return collectors
}
func loadTopics() map[string]*Topic {
configs := conf.GetAllTopicFromEtcd()
topics := make(map[string]*Topic)
for _, v := range configs {
var currentTopic TopicConfig
err := json.Unmarshal(v, &currentTopic)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
}
log.Printf("Init Topic:%s ", currentTopic.Label)
if currentTopic.PipelineConfig == nil {
log.Printf("get topic setting error:%s ", currentTopic.Label)
}
p := PipeLine{}
for _, v := range currentTopic.PipelineConfig {
makePiepline(&p,v)
}
var formatMethod Formater
switch currentTopic.Format {
case 1:
formatMethod = DefaultJsonLog
case 2:
formatMethod = FormatServiceWfLog
default:
formatMethod = DefaultLog
}
topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
}
return topics
}
// 收集所有需要监听的topic
func ChooseTopic() map[string]bool {
func ChooseTopic() map[*Topic]bool {
collector := loadCollectors()
topics := make(map[string]bool, 0)
topics := loadTopics()
ableTopics := make(map[*Topic]bool)
for _, v := range collector {
topics[v.Topic] = true
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
return topics
return ableTopics
}
func makePiepline(p *PipeLine ,config PipeLinePluginsConfig){
if p.Current == nil {
current := pluginsBoard[config.Name]
if config.Params != "" {
current.setParams(config.Params)
}
p.Current = &current
p.Next = &PipeLine{}
log.Printf("install plugin :%s", config.Label,)
return
}
makePiepline(p.Next, config)
}
\ No newline at end of file
package transfer
import (
"encoding/json"
"fmt"
"regexp"
"strings"
......@@ -16,10 +17,10 @@ type Formater func(string, string) (Matedate, error)
// service错误日志的处理
func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate, 0)
result := make(Matedate)
levelIndex := strings.Index(message, ":")
if levelIndex == -1 {
return result, fmt.Errorf("message format error.")
return result, fmt.Errorf("message format error")
}
// log.Println(message)
result["topic"] = sourceKey
......@@ -49,9 +50,19 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
// service错误日志的处理
func DefaultLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate, 0)
result := make(Matedate)
result["topic"] = sourceKey
result["message"] = message
return result, nil
}
// Json 格式的错误日志处理
func DefaultJsonLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate)
err := json.Unmarshal([]byte(message), &result)
if err != nil {
return nil, err
}
return result, nil
}
package transfer
import (
"encoding/json"
"fmt"
"log"
)
var pluginsBoard = map[string]Plugin{
"Dump":&Dump{},
"Edit":&Edit{},
"SaveES":&SaveES{},
}
type Matedate map[string]interface{}
type Handler func(Matedate) Matedate
type Plugin interface {
Handle(Matedate) Matedate
setParams(string)
}
type PipeLine struct {
Next *PipeLine
Current Handler
Current *Plugin
}
func (p PipeLine) Enter(m Matedate) Matedate {
......@@ -18,30 +29,74 @@ func (p PipeLine) Enter(m Matedate) Matedate {
return m
}
if p.Next == nil {
return p.Current(m)
return (*p.Current).Handle(m)
}
return p.Next.Enter(p.Current(m))
return p.Next.Enter((*p.Current).Handle(m))
}
// 修改插件
type Edit struct {
Params map[string]interface{}
}
// 持久化到ES
func Edit(m Matedate) Matedate {
m["source"] = "logtranfers"
func (p Edit) Handle(m Matedate) Matedate {
key := fmt.Sprintf("%s", p.Params["key"])
log.Println(key)
log.Println(p.Params["value"])
m[key] = p.Params["value"]
m["EDIT"] = "ok"
return m
}
// 持久化到ES
func SaveES(m Matedate) Matedate {
func (p *Edit) setParams(params string) {
var paramsValue map[string]interface{}
err := json.Unmarshal([]byte(params), &paramsValue)
if err != nil {
log.Printf("Edit Plugin json decode params(%s) err : err: %s", params, err)
}
p.Params = paramsValue
}
// 持久化到ES插件
type SaveES struct {
Params map[string]interface{}
}
func (p SaveES) Handle(m Matedate) Matedate {
messages <- m
return m
}
func (p *SaveES) setParams(params string) {
var paramsValue map[string]interface{}
err := json.Unmarshal([]byte(params), &paramsValue)
if err != nil {
log.Printf("SaveES Plugin json decode params(%s) err : err: %s", params, err)
}
p.Params = paramsValue
}
// 打印
func Dump(m Matedate) Matedate {
type Dump struct {
Params map[string]interface{}
}
func (p *Dump) Handle(m Matedate) Matedate {
for k, v := range m {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return m
}
func (p Dump) setParams(params string) {
var paramsValue map[string]interface{}
err := json.Unmarshal([]byte(params), &paramsValue)
if err != nil {
log.Printf("Dump Plugin json decode params(%s) err : err: %s", params, err)
}
p.Params = paramsValue
}
......@@ -19,7 +19,7 @@ import (
var (
Start = make(chan *Customer)
Close = make(chan string)
CustomerManger = make(map[string]*Customer, 0)
CustomerManger = make(map[string]*Customer)
MaxRetryTime = 10
mu sync.Mutex
closeWg sync.WaitGroup
......@@ -37,15 +37,18 @@ func getRegisterTopics() (topics []string) {
// 核心启动
func Run(confPath string) {
// 加载配置
conf.Init(confPath)
// 初始化ES客户端
esClient, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
panic(err)
}
// 做一个master的上下文
ctx, cancel := context.WithCancel(context.Background())
//
for i := 0; i < 3; i++ {
go MatedateSender(ctx, esClient)
}
......@@ -72,20 +75,10 @@ func Run(confPath string) {
}()
for topic := range ChooseTopic() {
GroupID := topic + "_group"
r := queue.InitReader(topic, GroupID)
currentFormater := DefaultLog
// TODO:使用ETCD下发需要的pipe handler
if strings.Contains(topic, "service") {
currentFormater = FormatServiceWfLog
}
// Edit->Save->Dump 这个顺序
// pipe := &PipeLine{Current: Edit, Next: &PipeLine{Current: Dump, Next: &PipeLine{Current: SaveES, Next: nil}}}
pipe := &PipeLine{Current: Edit, Next: &PipeLine{Current: SaveES, Next: nil}}
currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: pipe, Format: currentFormater}
GroupID := topic.Name + "_group"
r := queue.InitReader(topic.Name, GroupID)
currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
log.Printf("Check Customer group of [%s] success!", GroupID)
Start <- currentCustomer
}
......@@ -129,7 +122,7 @@ func ReadingMessage(ctx context.Context, c *Customer) {
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var errMessage strings.Builder
for {
select {
case <-c.Listen():
......@@ -143,16 +136,25 @@ func ReadingMessage(ctx context.Context, c *Customer) {
m, err := c.Reader.ReadMessage(ctx)
if err != nil {
// 退出
// c.Exit()
log.Println(err)
errMessage.Reset()
errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
matedata, err := c.Format(string(c.Reader.Config().Topic), string(m.Value))
if err != nil {
log.Println(err)
errMessage.Reset()
errMessage.WriteString("Format Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
// 流入pipe
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment