Commit 3468a4ee authored by 谢宇轩's avatar 谢宇轩 😅

重构plugin部分

parent e68f9d41
...@@ -9,8 +9,8 @@ import ( ...@@ -9,8 +9,8 @@ import (
) )
func InitReader(topic string, groupId string) *kafka.Reader { func InitReader(topic string, groupId string) *kafka.Reader {
// 先去创建一下这个分组 // // 先去创建一下这个分组
CreateCustomerGroup(topic, groupId) // CreateCustomerGroup(topic, groupId)
// make a writer that produces to topic-A, using the least-bytes distribution // make a writer that produces to topic-A, using the least-bytes distribution
r := kafka.NewReader(kafka.ReaderConfig{ r := kafka.NewReader(kafka.ReaderConfig{
......
package storage
import (
"fmt"
"github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
)
func GetClient() *elastic.Client {
var err error
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
return nil
}
return client
}
...@@ -71,11 +71,14 @@ func loadTopics() map[string]*Topic { ...@@ -71,11 +71,14 @@ func loadTopics() map[string]*Topic {
p := PipeLine{} p := PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range currentTopic.PipelineConfig { for _, v := range currentTopic.PipelineConfig {
currentPlugin := pluginsBoard[v.Name]
makePiepline(&p,v) err := currentPlugin.setParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.appendPlugin(currentPlugin)
} }
var formatMethod Formater var formatMethod Formater
...@@ -107,19 +110,3 @@ func ChooseTopic() map[*Topic]bool { ...@@ -107,19 +110,3 @@ func ChooseTopic() map[*Topic]bool {
return ableTopics return ableTopics
} }
func makePiepline(p *PipeLine ,config PipeLinePluginsConfig){
if p.Current == nil {
current := pluginsBoard[config.Name]
if config.Params != "" {
current.setParams(config.Params)
}
p.Current = &current
p.Next = &PipeLine{}
log.Printf("install plugin :%s", config.Label,)
return
}
makePiepline(p.Next, config)
}
\ No newline at end of file
...@@ -33,8 +33,8 @@ func getCustomer(topic string) (customer *Customer, ok bool) { ...@@ -33,8 +33,8 @@ func getCustomer(topic string) (customer *Customer, ok bool) {
return customer, ok return customer, ok
} }
func uninstallCustomer(topic string) { // func uninstallCustomer(topic string) {
mu.Lock() // mu.Lock()
delete(CustomerManger, topic) // delete(CustomerManger, topic)
mu.Unlock() // mu.Unlock()
} // }
...@@ -5,36 +5,35 @@ import ( ...@@ -5,36 +5,35 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
) )
var ( var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`) contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"} serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
MatePool = sync.Pool{New: func() interface{} { return &Matedate{data: make(map[string]interface{})} }}
) )
type Formater func(string, string) (Matedate, error) type Formater func(string, string) (Matedate, error)
// service错误日志的处理 // service错误日志的处理
func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) { func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate) // vMateItem := MatePool.Get()
mateItem := MatePool.Get().(*Matedate)
levelIndex := strings.Index(message, ":") levelIndex := strings.Index(message, ":")
if levelIndex == -1 { if levelIndex == -1 {
return result, fmt.Errorf("message format error") return *mateItem, fmt.Errorf("message format error")
} }
// log.Println(message)
result["topic"] = sourceKey
result["level"] = message[:levelIndex]
result["project"] = "jiwei-service"
message = message[levelIndex:]
mateItem.Topic= sourceKey
mateItem.Level = message[:levelIndex]
message = message[levelIndex:]
loc, _ := time.LoadLocation("Local") loc, _ := time.LoadLocation("Local")
logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], loc) logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], loc)
mateItem.create = logTime
result["created_at"] = logTime
keyword := serviceWfLogKeyWord keyword := serviceWfLogKeyWord
for _, word := range keyword { for _, word := range keyword {
flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word)) flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word))
logContent := flysnowRegexp.FindString(message) logContent := flysnowRegexp.FindString(message)
...@@ -42,27 +41,44 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) { ...@@ -42,27 +41,44 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
if len(curentSub) < 1 { if len(curentSub) < 1 {
continue continue
} }
result[word] = contentRegexp.FindStringSubmatch(logContent)[1] mateItem.data[word] = contentRegexp.FindStringSubmatch(logContent)[1]
} }
result := *mateItem
mateItem.reset()
MatePool.Put(mateItem)
return result, nil return result, nil
} }
// service错误日志的处理 // service错误日志的处理
func DefaultLog(sourceKey string, message string) (Matedate, error) { func DefaultLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate)
result["topic"] = sourceKey
result["message"] = message
vMateItem := MatePool.Get()
mateItem := vMateItem.(*Matedate)
mateItem.Topic = sourceKey
mateItem.Index = sourceKey
mateItem.data = map[string]interface{}{"message":message}
result := *mateItem
MatePool.Put(vMateItem)
return result, nil return result, nil
} }
// Json 格式的错误日志处理 // Json 格式的错误日志处理
func DefaultJsonLog(sourceKey string, message string) (Matedate, error) { func DefaultJsonLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate)
err := json.Unmarshal([]byte(message), &result) vMateItem := MatePool.Get()
mateItem := vMateItem.(*Matedate)
data := mateItem.data
err := json.Unmarshal([]byte(message), &data)
if err != nil { if err != nil {
return nil, err return *mateItem, err
} }
mateItem.data = data
result := *mateItem
MatePool.Put(vMateItem)
return result, nil return result, nil
} }
package transfer
import (
"encoding/json"
"fmt"
"log"
)
var pluginsBoard = map[string]Plugin{
"Dump":&Dump{},
"Edit":&Edit{},
"SaveES":&SaveES{},
}
type Matedate map[string]interface{}
type Plugin interface {
Handle(Matedate) Matedate
setParams(string)
}
type PipeLine struct {
Next *PipeLine
Current *Plugin
}
func (p PipeLine) Enter(m Matedate) Matedate {
if p.Current == nil {
return m
}
if p.Next == nil {
return (*p.Current).Handle(m)
}
return p.Next.Enter((*p.Current).Handle(m))
}
// 修改插件
type Edit struct {
Params []map[string]string
}
func (p Edit) Handle(m Matedate) Matedate {
for _, item := range p.Params {
currentKey := item["key"]
currentParams := item["value"]
m[currentKey] = currentParams
}
return m
}
func (p *Edit) setParams(params string) {
var paramsValue []map[string]string
err := json.Unmarshal([]byte(params), &paramsValue)
if err != nil {
log.Printf("Edit Plugin json decode params(%s) err : err: %s", params, err)
}
p.Params = paramsValue
}
// 持久化到ES插件
type SaveES struct {
Params map[string]interface{}
}
func (p SaveES) Handle(m Matedate) Matedate {
messages <- m
return m
}
func (p *SaveES) setParams(params string) {}
// 打印
type Dump struct {
Params map[string]interface{}
}
func (p *Dump) Handle(m Matedate) Matedate {
for k, v := range m {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return m
}
func (p Dump) setParams(params string) {
var paramsValue map[string]interface{}
err := json.Unmarshal([]byte(params), &paramsValue)
if err != nil {
log.Printf("Dump Plugin json decode params(%s) err : err: %s", params, err)
}
p.Params = paramsValue
}
...@@ -23,7 +23,7 @@ var ( ...@@ -23,7 +23,7 @@ var (
MaxRetryTime = 10 MaxRetryTime = 10
mu sync.Mutex mu sync.Mutex
closeWg sync.WaitGroup closeWg sync.WaitGroup
messages = make(chan Matedate, conf.APPConfig.Es.BulkSize) messages = make(chan *Matedate, conf.APPConfig.Es.BulkSize)
) )
func getRegisterTopics() (topics []string) { func getRegisterTopics() (topics []string) {
...@@ -45,10 +45,11 @@ func Run(confPath string) { ...@@ -45,10 +45,11 @@ func Run(confPath string) {
fmt.Println("connect es error", err) fmt.Println("connect es error", err)
panic(err) panic(err)
} }
// 做一个master的上下文
// 做一个master的上下文
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// //
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
go MatedateSender(ctx, esClient) go MatedateSender(ctx, esClient)
} }
...@@ -75,7 +76,7 @@ func Run(confPath string) { ...@@ -75,7 +76,7 @@ func Run(confPath string) {
}() }()
for topic := range ChooseTopic() { for topic := range ChooseTopic() {
GroupID := topic.Name + "_group" GroupID := topic.Name + "_group"
r := queue.InitReader(topic.Name, GroupID) r := queue.InitReader(topic.Name, GroupID)
currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format} currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
...@@ -108,7 +109,7 @@ func Run(confPath string) { ...@@ -108,7 +109,7 @@ func Run(confPath string) {
} }
func sign() <-chan os.Signal { func sign() <-chan os.Signal {
c := make(chan os.Signal,2) c := make(chan os.Signal, 2)
// 监听信号 // 监听信号
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2) signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
return c return c
...@@ -123,6 +124,9 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -123,6 +124,9 @@ func ReadingMessage(ctx context.Context, c *Customer) {
// var trycount int // var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区 // var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var errMessage strings.Builder var errMessage strings.Builder
// log.Println(c.HandlePipeline.pipe)
var matedata Matedate
for { for {
select { select {
case <-c.Listen(): case <-c.Listen():
...@@ -136,7 +140,6 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -136,7 +140,6 @@ func ReadingMessage(ctx context.Context, c *Customer) {
m, err := c.Reader.ReadMessage(ctx) m, err := c.Reader.ReadMessage(ctx)
if err != nil { if err != nil {
// 退出 // 退出
// c.Exit() // c.Exit()
...@@ -144,17 +147,16 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -144,17 +147,16 @@ func ReadingMessage(ctx context.Context, c *Customer) {
errMessage.WriteString("Reader Error") errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error()) errMessage.WriteString(err.Error())
log.Println(errMessage.String()) log.Println(errMessage.String())
continue continue
} }
matedata, err := c.Format(string(c.Reader.Config().Topic), string(m.Value)) matedata, err = c.Format(string(c.Reader.Config().Topic), string(m.Value))
if err != nil { if err != nil {
errMessage.Reset() errMessage.Reset()
errMessage.WriteString("Format Error") errMessage.WriteString("Format Error")
errMessage.WriteString(err.Error()) errMessage.WriteString(err.Error())
log.Println(errMessage.String()) log.Println(errMessage.String())
continue continue
} }
// 流入pipe // 流入pipe
...@@ -174,7 +176,7 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) { ...@@ -174,7 +176,7 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) {
for { for {
select { select {
case m := <-messages: case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index("logs").Doc(m) indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.data)
bulkRequest.Add(indexRequest) bulkRequest.Add(indexRequest)
case <-tick.C: case <-tick.C:
......
package transfer
import (
"encoding/json"
"fmt"
"log"
"time"
)
var pluginsBoard = map[string]Handler{
"Dump": &Dump{},
"Edit": &Edit{},
"SaveES": &SaveES{},
}
type Matedate struct {
Topic string
Index string
Level string
create time.Time
data map[string]interface{}
}
func (m *Matedate) reset() {
m.Topic = ""
m.Index = ""
m.Level = ""
m.data = map[string]interface{}{}
}
// type Matedate map[string]interface{}
type Handler interface {
HandleFunc(*Matedate) error
setParams(string) error
}
type Plugin struct {
params *map[string]interface{}
// error error
}
type PipeLine struct {
pipe []*Handler
}
func (p *PipeLine) appendPlugin(plugin Handler) {
p.pipe = append(p.pipe, &plugin)
}
func (p *PipeLine) Enter(m Matedate) {
for _, plugin := range p.pipe {
err := (*plugin).HandleFunc(&m)
if err != nil {
log.Println(err)
}
}
}
// 打印插件
type Dump Plugin
func (dump *Dump) HandleFunc(m *Matedate) error {
log.Println("DUMP:")
for k, v := range (*m).data {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return nil
}
func (dump *Dump) setParams(params string) error {
return nil
}
// 修改插件
type Edit Plugin
func (edit *Edit) HandleFunc(m *Matedate) error {
log.Println("EDIT:")
if edit.params == nil {
return fmt.Errorf("please set params first")
}
key := (*edit.params)["key"]
value := (*edit.params)["value"]
(*m).data[key.(string)] = value
(*m).data["eidt"] = 1
return nil
}
func (edit *Edit) setParams(params string) error {
var paramsValue map[string]interface{}
var checkKeyString bool
var checkValueString bool
err := json.Unmarshal([]byte(params), &paramsValue)
for key := range paramsValue {
if key == "key" {
checkKeyString = true
}
if key == "value" {
checkValueString = true
}
}
if !(checkKeyString && checkValueString) {
return fmt.Errorf("please set params true")
}
edit.params = &paramsValue
return err
}
// 修改插件
type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *Matedate) error {
log.Println("SaveES:")
messages <- m
return nil
}
func (saveEs *SaveES) setParams(params string) error {
return nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment