Commit 5e30a45a authored by 谢宇轩's avatar 谢宇轩 😅

重新规划代码结构

parent 5bf97d8f
...@@ -45,6 +45,7 @@ func initConnect() *clientv3.Client { ...@@ -45,6 +45,7 @@ func initConnect() *clientv3.Client {
return cli return cli
} }
// 获取当前所有的任务 (目前在初始化时使用)
func GetAllConfFromEtcd() []EtcdValue { func GetAllConfFromEtcd() []EtcdValue {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
...@@ -57,7 +58,7 @@ func GetAllConfFromEtcd() []EtcdValue { ...@@ -57,7 +58,7 @@ func GetAllConfFromEtcd() []EtcdValue {
configs := make([]EtcdValue, 0) configs := make([]EtcdValue, 0)
for _, etcdResult := range resp.Kvs { for _, etcdResult := range resp.Kvs {
// 根据系统中当前全部的节点名称, 确定节点状态
etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:]) etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:])
ctx, cancel = context.WithTimeout(context.Background(), time.Second) ctx, cancel = context.WithTimeout(context.Background(), time.Second)
...@@ -82,7 +83,7 @@ func GetAllConfFromEtcd() []EtcdValue { ...@@ -82,7 +83,7 @@ func GetAllConfFromEtcd() []EtcdValue {
return configs return configs
} }
// 加载所有的Topic主题配置信息
func GetAllTopicFromEtcd() []EtcdValue { func GetAllTopicFromEtcd() []EtcdValue {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
......
...@@ -24,4 +24,4 @@ type Es struct { ...@@ -24,4 +24,4 @@ type Es struct {
var ( var (
APPConfig = new(LogTransferConf) APPConfig = new(LogTransferConf)
) )
\ No newline at end of file
package transfer package entity
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
) )
var ( type Formater func(string, string) (Matedata, error)
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
MatePool = sync.Pool{New: func() interface{} { return &Matedate{data: make(map[string]interface{})} }}
)
type Formater func(string, string) (Matedate, error)
// service错误日志的处理 // service错误日志的处理
func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) { func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) {
// vMateItem := MatePool.Get() // vMateItem := MatePool.Get()
mateItem := MatePool.Get().(*Matedate) mateItem := MatePool.Get().(*Matedata)
levelIndex := strings.Index(message, ":") levelIndex := strings.Index(message, ":")
if levelIndex == -1 { if levelIndex == -1 {
return *mateItem, fmt.Errorf("message format error") return *mateItem, fmt.Errorf("message format error")
} }
mateItem.Topic= sourceKey mateItem.Topic = sourceKey
mateItem.Level = message[:levelIndex] mateItem.Level = message[:levelIndex]
message = message[levelIndex:] message = message[levelIndex:]
loc, _ := time.LoadLocation("Local") loc, _ := time.LoadLocation("Local")
...@@ -42,15 +35,15 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) { ...@@ -42,15 +35,15 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
if len(curentSub) < 1 { if len(curentSub) < 1 {
continue continue
} }
if word == "errmsg"{ if word == "errmsg" {
mateItem.data["message"] = strings.Replace(contentRegexp.FindStringSubmatch(logContent)[1],`"`,"",-1) mateItem.Data["message"] = strings.Replace(contentRegexp.FindStringSubmatch(logContent)[1], `"`, "", -1)
}else{ } else {
mateItem.data[word] = contentRegexp.FindStringSubmatch(logContent)[1] mateItem.Data[word] = contentRegexp.FindStringSubmatch(logContent)[1]
} }
} }
mateItem.data["timestamp"] = mateItem.create mateItem.Data["timestamp"] = mateItem.create
result := *mateItem result := *mateItem
mateItem.reset() mateItem.reset()
MatePool.Put(mateItem) MatePool.Put(mateItem)
...@@ -58,32 +51,32 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) { ...@@ -58,32 +51,32 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
} }
// service错误日志的处理 // service错误日志的处理
func DefaultLog(sourceKey string, message string) (Matedate, error) { func DefaultLog(sourceKey string, message string) (Matedata, error) {
vMateItem := MatePool.Get() vMateItem := MatePool.Get()
mateItem := vMateItem.(*Matedate) mateItem := vMateItem.(*Matedata)
mateItem.Topic = sourceKey mateItem.Topic = sourceKey
mateItem.Index = sourceKey mateItem.Index = sourceKey
mateItem.data = map[string]interface{}{"message":message} mateItem.Data = map[string]interface{}{"message": message}
result := *mateItem result := *mateItem
MatePool.Put(vMateItem) MatePool.Put(vMateItem)
return result, nil return result, nil
} }
// Json 格式的错误日志处理 // Json 格式的错误日志处理
func DefaultJsonLog(sourceKey string, message string) (Matedate, error) { func DefaultJsonLog(sourceKey string, message string) (Matedata, error) {
vMateItem := MatePool.Get() vMateItem := MatePool.Get()
mateItem := vMateItem.(*Matedate) mateItem := vMateItem.(*Matedata)
data := mateItem.data data := mateItem.Data
err := json.Unmarshal([]byte(message), &data) err := json.Unmarshal([]byte(message), &data)
if err != nil { if err != nil {
return *mateItem, err return *mateItem, err
} }
mateItem.data = data mateItem.Data = data
result := *mateItem result := *mateItem
MatePool.Put(vMateItem) MatePool.Put(vMateItem)
......
package entity
import (
"context"
"log"
"regexp"
"sync"
"time"
elastic "github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
)
var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
MatePool = sync.Pool{New: func() interface{} { return &Matedata{Data: make(map[string]interface{})} }}
messages = make(chan *Matedata, conf.APPConfig.Es.BulkSize)
)
type Matedata struct {
Topic string
Index string
Level string
create time.Time
Data map[string]interface{}
}
func (m *Matedata) reset() {
m.Topic = ""
m.Index = ""
m.Level = ""
m.Data = map[string]interface{}{}
}
func HandleMessage(m *Matedata) {
messages <- m
}
func CloseMessageChan() {
close(messages)
}
func MatedateSender(ctx context.Context, esClient *elastic.Client) {
tick := time.NewTicker(3 * time.Second)
var (
SenderMu sync.Mutex
)
bulkRequest := esClient.Bulk()
for {
select {
case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data)
bulkRequest.Add(indexRequest)
case <-tick.C:
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
count := bulkRequest.NumberOfActions()
if count > 0 {
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
}
SenderMu.Unlock()
case <-ctx.Done():
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
SenderMu.Unlock()
log.Println("Exiting...")
return
}
}
}
package plugin
import (
"fmt"
"log"
"github.com/y7ut/logtransfer/entity"
)
// 打印插件
type Dump Plugin
func (dump *Dump) HandleFunc(m *entity.Matedata) error {
log.Println("DUMP:")
for k, v := range (*m).Data {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return nil
}
func (dump *Dump) SetParams(params string) error {
return nil
}
package plugin
import (
"github.com/y7ut/logtransfer/entity"
)
// 警报与监测
type Alarm Plugin
func (alarm *Alarm) HandleFunc(m *entity.Matedata) error {
return nil
}
func (alarm *Alarm) SetParams(params string) error {
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil {
return err
}
alarm.params = &paramsValue
return err
}
package plugin
import (
"fmt"
"log"
"github.com/y7ut/logtransfer/entity"
)
// 修改插件
type Edit Plugin
func (edit *Edit) HandleFunc(m *entity.Matedata) error {
log.Println("EDIT:")
if edit.params == nil {
return fmt.Errorf("please set params first")
}
key := (*edit.params)["key"]
value := (*edit.params)["value"]
(*m).Data[key.(string)] = value
return nil
}
func (edit *Edit) SetParams(params string) error {
paramsValue, err := checkParams(params, "key", "value")
if err != nil {
return err
}
edit.params = &paramsValue
return err
}
package plugin
import (
"encoding/json"
"errors"
"log"
"strings"
"github.com/y7ut/logtransfer/entity"
)
type Handler interface {
HandleFunc(*entity.Matedata) error
SetParams(string) error
}
type Plugin struct {
params *map[string]interface{}
// error error
}
type PipeLine struct {
pipe []*Handler
}
func (p *PipeLine) AppendPlugin(plugin Handler) {
p.pipe = append(p.pipe, &plugin)
}
func (p *PipeLine) Enter(m entity.Matedata) {
for _, plugin := range p.pipe {
err := (*plugin).HandleFunc(&m)
if err != nil {
log.Println(err)
}
}
}
func checkParams(paramsJson string, key ...string) (value map[string]interface{}, err error) {
err = json.Unmarshal([]byte(paramsJson), &value)
if err != nil {
return value, err
}
var errMessage strings.Builder
var errCheck bool
errMessage.WriteString("Plugin params errors: ")
for _, checkItem := range key {
if value[checkItem] == nil {
errCheck = true
errMessage.WriteString(checkItem)
}
}
if errCheck {
return value, errors.New(errMessage.String())
}
return value, nil
}
package plugin
import (
"fmt"
"log"
"github.com/y7ut/logtransfer/entity"
)
// 修改插件
type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error {
log.Println("SaveES:")
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.Data["topic"] = m.Topic
m.Data["level"] = m.Level
entity.HandleMessage(m)
return nil
}
func (saveEs *SaveES) SetParams(params string) error {
paramsValue, err := checkParams(params, "index")
if err != nil {
return err
}
saveEs.params = &paramsValue
return err
}
package transfer package source
import ( import (
"encoding/json" "encoding/json"
"log" "log"
"github.com/y7ut/logtransfer/conf" "github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
) )
type Collector struct { type Collector struct {
...@@ -16,8 +18,8 @@ type Collector struct { ...@@ -16,8 +18,8 @@ type Collector struct {
type Topic struct { type Topic struct {
Name string Name string
Label string Label string
PipeLine *PipeLine PipeLine *plugin.PipeLine
Format Formater Format entity.Formater
} }
type TopicConfig struct { type TopicConfig struct {
...@@ -34,7 +36,7 @@ type PipeLinePluginsConfig struct { ...@@ -34,7 +36,7 @@ type PipeLinePluginsConfig struct {
} }
// 加载所有的collector // 加载所有的collector
func loadCollectors() []Collector { func LoadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd() configs := conf.GetAllConfFromEtcd()
collectors := make([]Collector, 0) collectors := make([]Collector, 0)
for _, v := range configs { for _, v := range configs {
...@@ -52,61 +54,48 @@ func loadCollectors() []Collector { ...@@ -52,61 +54,48 @@ func loadCollectors() []Collector {
return collectors return collectors
} }
func loadTopics() map[string]*Topic { // func loadTopics() map[string]*Topic {
configs := conf.GetAllTopicFromEtcd() // configs := conf.GetAllTopicFromEtcd()
// topics := make(map[string]*Topic)
// for _, v := range configs {
// var currentTopic TopicConfig
// err := json.Unmarshal(v, &currentTopic)
// if err != nil {
// log.Printf("json decode config(%s) err : err: %s", v, err)
// }
// log.Printf("Init Topic:%s ", currentTopic.Label)
// if currentTopic.PipelineConfig == nil {
// log.Printf("get topic setting error:%s ", currentTopic.Label)
// }
// p := plugin.PipeLine{}
// // log.Println("get config", currentTopic.PipelineConfig)
// for _, v := range currentTopic.PipelineConfig {
// currentPlugin := plugins.pluginsBoard[v.Name]
// err := currentPlugin.SetParams(v.Params)
// if err != nil {
// log.Panicln("plugin encode params error:", err)
// }
// p.AppendPlugin(currentPlugin)
// }
// var formatMethod entity.Formater
// switch currentTopic.Format {
// case 1:
// formatMethod = entity.DefaultJsonLog
// case 2:
// formatMethod = entity.FormatServiceWfLog
// default:
// formatMethod = entity.DefaultLog
// }
// topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
// }
// return topics
// }
topics := make(map[string]*Topic)
for _, v := range configs {
var currentTopic TopicConfig
err := json.Unmarshal(v, &currentTopic)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
}
log.Printf("Init Topic:%s ", currentTopic.Label)
if currentTopic.PipelineConfig == nil {
log.Printf("get topic setting error:%s ", currentTopic.Label)
}
p := PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range currentTopic.PipelineConfig {
currentPlugin := pluginsBoard[v.Name]
err := currentPlugin.setParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.appendPlugin(currentPlugin)
}
var formatMethod Formater
switch currentTopic.Format {
case 1:
formatMethod = DefaultJsonLog
case 2:
formatMethod = FormatServiceWfLog
default:
formatMethod = DefaultLog
}
topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
}
return topics
}
// 收集所有需要监听的topic
func ChooseTopic() map[*Topic]bool {
collector := loadCollectors()
topics := loadTopics()
ableTopics := make(map[*Topic]bool)
for _, v := range collector {
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
return ableTopics
}
package source
import (
"context"
"log"
"strings"
"sync"
"github.com/segmentio/kafka-go"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
)
var (
CustomerManger = make(map[string]*Customer)
mu sync.Mutex
)
// 消费处理器
type Customer struct {
Reader *kafka.Reader // 特定消费者的专属Kafka Reader
HandlePipeline *plugin.PipeLine // 从Topic中构建的Piepline
Format entity.Formater // 解析元数据的格式器
done chan struct{} // 结束标志
}
// 结束一个消费处理器
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
// 结束信号监听
func (c Customer) Listen() chan struct{} {
return c.done
}
// 初始化一个消费处理器
func InitCustomer(topic *Topic) *Customer{
GroupID := topic.Name + "_group"
r := InitReader(topic.Name, GroupID)
log.Printf("Check Customer group of [%s] success!", GroupID)
return &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
}
// 全局的注册当前工作的消费处理器
func RegisterManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
// 根据topic快速获取消费处理器 目前用于关闭消费处理器
func GetCustomer(topic string) (customer *Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
// 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(ctx context.Context, c *Customer) {
defer c.Reader.Close()
log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID)
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var errMessage strings.Builder
// log.Println(c.HandlePipeline.pipe)
var matedata entity.Matedata
for {
select {
case <-c.Listen():
// 监听需要关闭的信号
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
// // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// // 这里使用阻塞的上下文
m, err := c.Reader.ReadMessage(ctx)
if err != nil {
// 退出
// c.Exit()
errMessage.Reset()
errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
matedata, err = c.Format(string(c.Reader.Config().Topic), string(m.Value))
if err != nil {
errMessage.Reset()
errMessage.WriteString("Format Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
// 流入pipe
c.HandlePipeline.Enter(matedata)
}
}
}
package queue package source
import ( import (
"log" "log"
...@@ -37,3 +37,4 @@ func CreateCustomerGroup(topic string, groupId string) { ...@@ -37,3 +37,4 @@ func CreateCustomerGroup(topic string, groupId string) {
log.Println("create CustomerGroup error:", err) log.Println("create CustomerGroup error:", err)
} }
} }
package transfer
import "github.com/segmentio/kafka-go"
type Customer struct {
Reader *kafka.Reader
HandlePipeline *PipeLine
Format Formater
done chan struct{}
}
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
func (c Customer) Listen() chan struct{} {
return c.done
}
func registerManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
func getCustomer(topic string) (customer *Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
// func uninstallCustomer(topic string) {
// mu.Lock()
// delete(CustomerManger, topic)
// mu.Unlock()
// }
...@@ -2,28 +2,29 @@ package transfer ...@@ -2,28 +2,29 @@ package transfer
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"log" "log"
"os" "os"
"os/signal" "os/signal"
"strings"
"sync" "sync"
"syscall" "syscall"
"time"
elastic "github.com/olivere/elastic/v7" elastic "github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf" "github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/queue" "github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
"github.com/y7ut/logtransfer/source"
) )
var ( var (
Start = make(chan *Customer) Start = make(chan *source.Customer)
Close = make(chan string) Close = make(chan string)
CustomerManger = make(map[string]*Customer) CustomerManger = make(map[string]*source.Customer)
MaxRetryTime = 10 MaxRetryTime = 10
mu sync.Mutex mu sync.Mutex
closeWg sync.WaitGroup closeWg sync.WaitGroup
messages = make(chan *Matedate, conf.APPConfig.Es.BulkSize)
) )
func getRegisterTopics() (topics []string) { func getRegisterTopics() (topics []string) {
...@@ -49,38 +50,34 @@ func Run(confPath string) { ...@@ -49,38 +50,34 @@ func Run(confPath string) {
// 做一个master的上下文 // 做一个master的上下文
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// // 启动es消息发送器
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
go MatedateSender(ctx, esClient) go entity.MatedateSender(ctx, esClient)
} }
// 用于处理启动与关闭消费处理器的信号通知
go func() { go func() {
for { for {
select { select {
case customer := <-Start: case customer := <-Start:
registerManger(customer) source.RegisterManger(customer)
go ReadingMessage(ctx, customer) go source.ReadingMessage(ctx, customer)
case closer := <-Close: case closer := <-Close:
c, ok := getCustomer(closer) c, ok := source.GetCustomer(closer)
if !ok { if !ok {
log.Printf(" Customer %s unstall Failed ", closer) log.Printf(" Customer %s unstall Failed ", closer)
} }
c.Exit() c.Exit()
closeWg.Done() closeWg.Done()
} }
} }
}() }()
// TODO: 动态的注册customer,目前仅支持初始化的时候来加载
for topic := range ChooseTopic() { for topic := range ChooseTopic() {
currentCustomer := source.InitCustomer(topic)
GroupID := topic.Name + "_group"
r := queue.InitReader(topic.Name, GroupID)
currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
log.Printf("Check Customer group of [%s] success!", GroupID)
Start <- currentCustomer Start <- currentCustomer
} }
...@@ -98,7 +95,7 @@ func Run(confPath string) { ...@@ -98,7 +95,7 @@ func Run(confPath string) {
log.Printf(" Customer %s unstalling...", topic) log.Printf(" Customer %s unstalling...", topic)
closeWg.Wait() closeWg.Wait()
} }
close(messages) entity.CloseMessageChan()
log.Printf(" Success unstall %d Transfer", len(currentTopics)) log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0) os.Exit(0)
...@@ -115,97 +112,62 @@ func sign() <-chan os.Signal { ...@@ -115,97 +112,62 @@ func sign() <-chan os.Signal {
return c return c
} }
// 從Kafka中消費消息,注意这里会提交commit offset // 收集所有需要监听的topic
func ReadingMessage(ctx context.Context, c *Customer) { func ChooseTopic() map[*source.Topic]bool {
defer c.Reader.Close() collector := source.LoadCollectors()
topics := loadTopics()
log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID) ableTopics := make(map[*source.Topic]bool)
for _, v := range collector {
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
// var trycount int return ableTopics
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区 }
var errMessage strings.Builder
// log.Println(c.HandlePipeline.pipe)
var matedata Matedate // 解析全部的Topic并加载内部的格式器和插件pipeline
for { func loadTopics() map[string]*source.Topic {
select { configs := conf.GetAllTopicFromEtcd()
case <-c.Listen():
// 监听需要关闭的信号
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
// // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// // 这里使用阻塞的上下文
m, err := c.Reader.ReadMessage(ctx) topics := make(map[string]*source.Topic)
if err != nil { for _, v := range configs {
// 退出
// c.Exit()
errMessage.Reset()
errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
matedata, err = c.Format(string(c.Reader.Config().Topic), string(m.Value)) var currentTopic source.TopicConfig
if err != nil { err := json.Unmarshal(v, &currentTopic)
errMessage.Reset() if err != nil {
errMessage.WriteString("Format Error") log.Printf("json decode config(%s) err : err: %s", v, err)
errMessage.WriteString(err.Error()) }
log.Println(errMessage.String()) log.Printf("Init Topic:%s ", currentTopic.Label)
continue if currentTopic.PipelineConfig == nil {
} log.Printf("get topic setting error:%s ", currentTopic.Label)
// 流入pipe
c.HandlePipeline.Enter(matedata)
} }
}
}
func MatedateSender(ctx context.Context, esClient *elastic.Client) {
tick := time.NewTicker(3 * time.Second)
var (
SenderMu sync.Mutex
)
bulkRequest := esClient.Bulk() p := plugin.PipeLine{}
for {
select {
case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.data) // log.Println("get config", currentTopic.PipelineConfig)
bulkRequest.Add(indexRequest) for _, v := range currentTopic.PipelineConfig {
currentPlugin := pluginsBoard[v.Name]
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.AppendPlugin(currentPlugin)
}
var formatMethod entity.Formater
case <-tick.C: switch currentTopic.Format {
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
count := bulkRequest.NumberOfActions()
if count > 0 { case 1:
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions()) formatMethod = entity.DefaultJsonLog
_, err := bulkRequest.Do(ctx) case 2:
if err != nil { formatMethod = entity.FormatServiceWfLog
log.Println("Save Es Error:", err) default:
} formatMethod = entity.DefaultLog
bulkRequest.Reset()
}
SenderMu.Unlock()
case <-ctx.Done():
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
SenderMu.Unlock()
log.Println("Exiting...")
return
} }
topics[currentTopic.Name] = &source.Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
} }
return topics
} }
package transfer package transfer
import ( import(
"encoding/json" "github.com/y7ut/logtransfer/plugin"
"errors"
"fmt"
"log"
"strings"
"time"
) )
var pluginsBoard = map[string]Handler{ var pluginsBoard = map[string]plugin.Handler{
"Dump": &Dump{}, "Dump": &plugin.Dump{},
"Edit": &Edit{}, "Edit": &plugin.Edit{},
"SaveES": &SaveES{}, "SaveES": &plugin.SaveES{},
} "Alarm": &plugin.Alarm{},
type Matedate struct {
Topic string
Index string
Level string
create time.Time
data map[string]interface{}
}
func (m *Matedate) reset() {
m.Topic = ""
m.Index = ""
m.Level = ""
m.data = map[string]interface{}{}
}
// type Matedate map[string]interface{}
type Handler interface {
HandleFunc(*Matedate) error
setParams(string) error
}
type Plugin struct {
params *map[string]interface{}
// error error
}
type PipeLine struct {
pipe []*Handler
}
func (p *PipeLine) appendPlugin(plugin Handler) {
p.pipe = append(p.pipe, &plugin)
}
func (p *PipeLine) Enter(m Matedate) {
for _, plugin := range p.pipe {
err := (*plugin).HandleFunc(&m)
if err != nil {
log.Println(err)
}
}
}
// 打印插件
type Dump Plugin
func (dump *Dump) HandleFunc(m *Matedate) error {
log.Println("DUMP:")
for k, v := range (*m).data {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return nil
}
func (dump *Dump) setParams(params string) error {
return nil
}
// 修改插件
type Edit Plugin
func (edit *Edit) HandleFunc(m *Matedate) error {
log.Println("EDIT:")
if edit.params == nil {
return fmt.Errorf("please set params first")
}
key := (*edit.params)["key"]
value := (*edit.params)["value"]
(*m).data[key.(string)] = value
return nil
}
func (edit *Edit) setParams(params string) error {
paramsValue, err := checkParams(params, "key", "value")
if err != nil{
return err
}
edit.params = &paramsValue
return err
}
// 修改插件
type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *Matedate) error {
log.Println("SaveES:")
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.data["topic"] = m.Topic
m.data["level"] = m.Level
messages <- m
return nil
}
func (saveEs *SaveES) setParams(params string) error {
paramsValue, err := checkParams(params, "index")
if err != nil{
return err
}
saveEs.params = &paramsValue
return err
}
// 警报与监测
type Alarm Plugin
func (alarm *Alarm) HandleFunc(m *Matedate) error {
return nil
}
func (alarm *Alarm) setParams(params string) error {
paramsValue, err := checkParams(params, "hit", "idle_time")
if err != nil{
return err
}
alarm.params = &paramsValue
return err
}
func checkParams(paramsJson string, key ...string) (value map[string]interface{}, err error) {
err = json.Unmarshal([]byte(paramsJson), &value)
if err != nil {
return value, err
}
var errMessage strings.Builder
var errCheck bool
errMessage.WriteString("Plugin params errors: ")
for _, checkItem := range key {
if value[checkItem] == nil {
errCheck = true
errMessage.WriteString(checkItem)
}
}
if errCheck {
return value, errors.New(errMessage.String())
}
return value, nil
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment