Commit a9e90bb1 authored by 谢宇轩's avatar 谢宇轩 😅

添加topic 和 agent config的监控

parent 42d45662
...@@ -14,7 +14,7 @@ import ( ...@@ -14,7 +14,7 @@ import (
var ( var (
configPath = "/logagent/config/" configPath = "/logagent/config/"
statusPath = "/logagent/active/" statusPath = "/logagent/active/"
topicPath = "/logagent/topic/" topicPath = "/logagent/topic/"
) )
type EtcdValue []byte type EtcdValue []byte
...@@ -34,7 +34,7 @@ func initConnect() *clientv3.Client { ...@@ -34,7 +34,7 @@ func initConnect() *clientv3.Client {
addressList := strings.Split(APPConfig.Etcd.Address, ",") addressList := strings.Split(APPConfig.Etcd.Address, ",")
cli, err := clientv3.New(clientv3.Config{ cli, err := clientv3.New(clientv3.Config{
Endpoints: addressList, Endpoints: addressList,
DialTimeout: 5 * time.Second, DialTimeout: 5 * time.Second,
}) })
if err != nil { if err != nil {
...@@ -45,66 +45,160 @@ func initConnect() *clientv3.Client { ...@@ -45,66 +45,160 @@ func initConnect() *clientv3.Client {
return cli return cli
} }
// 获取当前所有的任务 (目前在初始化时使用) // 获取当前开启的Agent所有的任务 (目前在初始化时使用)
func GetAllConfFromEtcd() []EtcdValue { func GetAllConfFromEtcd() ([]EtcdValue, error) {
configs := make([]EtcdValue, 0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel() cancel()
if err != nil { if err != nil {
panic(fmt.Sprintf("get failed, err:%s \n", err)) return configs, err
} }
configs := make([]EtcdValue, 0)
for _, etcdResult := range resp.Kvs { for _, etcdResult := range resp.Kvs {
// 根据系统中当前全部的节点名称, 确定节点状态 // 根据系统中当前全部的节点名称, 确定节点状态
etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:]) etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:])
ctx, cancel = context.WithTimeout(context.Background(), time.Second) ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, etcdKey) resp, err := cli.Get(ctx, etcdKey)
cancel() cancel()
if err != nil { if err != nil {
panic(fmt.Sprintf("Get Etcd config failed, err:%s \n", err)) return configs, fmt.Errorf("get Etcd config failed, err:%s", err)
} }
if len(resp.Kvs) != 0 { if len(resp.Kvs) != 0 {
status := string(resp.Kvs[0].Value) status := string(resp.Kvs[0].Value)
if status == "1" { if status == "1" {
log.Printf("load config from:%s ", etcdResult.Key) log.Printf("load config from:%s ", etcdResult.Key)
configs = append(configs, etcdResult.Value) configs = append(configs, etcdResult.Value)
} }
} }
} }
return configs return configs, nil
} }
// 加载所有的Topic主题配置信息 func GetDelRevValueFromEtcd(key string, rev int64) (EtcdValue, error) {
func GetAllTopicFromEtcd() []EtcdValue { var value EtcdValue
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, topicPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend)) resp, err := cli.Get(ctx, key, clientv3.WithRev(rev))
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("config get error")
}
return resp.Kvs[0].Value, nil
}
// 获取特定的配置
func GetConfFromEtcd(name string) (EtcdValue, error) {
var value EtcdValue
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, statusPath+name)
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("status error")
}
status := string(resp.Kvs[0].Value)
if status != "1" {
return value, fmt.Errorf("status error")
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err = cli.Get(ctx, configPath+name)
cancel() cancel()
if err != nil { if err != nil {
panic(fmt.Sprintf("get failed, err:%s \n", err)) return value, err
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("config get error")
} }
return resp.Kvs[0].Value, nil
}
// 加载所有的Topic主题配置信息
func GetAllTopicFromEtcd() ([]EtcdValue, error) {
configs := make([]EtcdValue, 0) configs := make([]EtcdValue, 0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, topicPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
return configs, err
}
for _, etcdResult := range resp.Kvs { for _, etcdResult := range resp.Kvs {
configs = append(configs, etcdResult.Value) configs = append(configs, etcdResult.Value)
} }
return configs return configs, nil
}
// 获取特定的Topic
func GetTopicFromEtcd(name string) (EtcdValue, error) {
var value EtcdValue
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, topicPath+name)
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("config get error")
}
return resp.Kvs[0].Value, nil
}
func WatchLogTopicToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), topicPath, clientv3.WithPrefix())
return wch
} }
func WatchLogConfToEtcd() clientv3.WatchChan { func WatchLogConfigToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), configPath, clientv3.WithPrefix()) wch := cli.Watch(context.Background(), configPath, clientv3.WithPrefix())
return wch return wch
} }
func CheckAgentActive(name string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, statusPath+name)
log.Println("looking for", statusPath+name)
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return false
}
status := string(resp.Kvs[0].Value)
log.Println("it is"+status)
return status == "1"
}
...@@ -36,12 +36,19 @@ func (m *Matedata) reset() { ...@@ -36,12 +36,19 @@ func (m *Matedata) reset() {
func HandleMessage(m *Matedata) { func HandleMessage(m *Matedata) {
messages <- m messages <- m
} }
func CloseMessageChan() { func CloseMessageChan() {
close(messages) close(messages)
} }
func MatedateSender(ctx context.Context, esClient *elastic.Client) { func MatedateSender(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 初始化ES客户端
esClient, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
panic(err)
}
wp := &ESWorkPool{ wp := &ESWorkPool{
WorkerFunc: func(matedatas []*Matedata) bool { WorkerFunc: func(matedatas []*Matedata) bool {
...@@ -53,8 +60,9 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) { ...@@ -53,8 +60,9 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) {
count := bulkRequest.NumberOfActions() count := bulkRequest.NumberOfActions()
if count > 0 { if count > 0 {
log.Printf("Send messages to Index: %d : \n", bulkRequest.NumberOfActions()) log.Printf("Send messages to Index: %d : \n", bulkRequest.NumberOfActions())
response, err := bulkRequest.Do(ctx) timectx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
response, err := bulkRequest.Do(timectx)
cancel()
if err != nil { if err != nil {
log.Println("Save Es Error:", err) log.Println("Save Es Error:", err)
return false return false
...@@ -73,11 +81,12 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) { ...@@ -73,11 +81,12 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) {
} }
return true return true
}, },
MaxWorkerCount: 51, MaxWorkerCount: 50,
MaxIdleWorkerDuration: 5 * time.Second, MaxIdleWorkerDuration: 5 * time.Second,
} }
wp.Start()
wp.Start()
defer wp.Stop()
var mateDatesItems []*Matedata var mateDatesItems []*Matedata
var mu sync.Mutex var mu sync.Mutex
...@@ -106,8 +115,6 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) { ...@@ -106,8 +115,6 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) {
mu.Unlock() mu.Unlock()
wp.Serve(currentItems) wp.Serve(currentItems)
wp.Stop()
return return
} }
} }
......
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
) )
type Handler interface { type Handler interface {
HandleFunc(*entity.Matedata) error HandleFunc(*entity.Matedata) error
SetParams(string) error SetParams(string) error
...@@ -24,6 +23,10 @@ type PipeLine struct { ...@@ -24,6 +23,10 @@ type PipeLine struct {
pipe []*Handler pipe []*Handler
} }
func (p *PipeLine) Length() int {
return len(p.pipe)
}
func (p *PipeLine) AppendPlugin(plugin Handler) { func (p *PipeLine) AppendPlugin(plugin Handler) {
p.pipe = append(p.pipe, &plugin) p.pipe = append(p.pipe, &plugin)
} }
......
...@@ -2,6 +2,7 @@ package plugin ...@@ -2,6 +2,7 @@ package plugin
import ( import (
"fmt" "fmt"
"log"
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
) )
...@@ -10,7 +11,7 @@ import ( ...@@ -10,7 +11,7 @@ import (
type SaveES Plugin type SaveES Plugin
func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error { func (saveEs *SaveES) HandleFunc(m *entity.Matedata) error {
// log.Println("SaveES:") log.Println("SaveES:")
m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"]) m.Index = fmt.Sprintf("%s", (*saveEs.params)["index"])
m.Data["topic"] = m.Topic m.Data["topic"] = m.Topic
m.Data["level"] = m.Level m.Data["level"] = m.Level
......
...@@ -2,8 +2,13 @@ package source ...@@ -2,8 +2,13 @@ package source
import ( import (
"encoding/json" "encoding/json"
"fmt"
"log" "log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/y7ut/logtransfer/conf" "github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin" "github.com/y7ut/logtransfer/plugin"
...@@ -35,10 +40,19 @@ type PipeLinePluginsConfig struct { ...@@ -35,10 +40,19 @@ type PipeLinePluginsConfig struct {
Params string `json:"params"` Params string `json:"params"`
} }
// 加载所有的collector var watchTopicChannel = make(chan *Topic)
func LoadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd() var startTopicChannel = make(chan *Topic)
var deleteTopicChannel = make(chan string)
// 加载所有的可用的collector
func LoadCollectors() ([]Collector, error) {
collectors := make([]Collector, 0) collectors := make([]Collector, 0)
configs, err := conf.GetAllConfFromEtcd()
if err != nil {
return collectors, err
}
for _, v := range configs { for _, v := range configs {
var currentCollector []Collector var currentCollector []Collector
...@@ -46,70 +60,345 @@ func LoadCollectors() []Collector { ...@@ -46,70 +60,345 @@ func LoadCollectors() []Collector {
if err != nil { if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err) log.Printf("json decode config(%s) err : err: %s", v, err)
} }
if currentCollector != nil { if currentCollector != nil {
log.Printf("Init config:%s ", v) log.Printf("Init config:%s ", v)
collectors = append(collectors, currentCollector...) collectors = append(collectors, currentCollector...)
} }
} }
return collectors return collectors, nil
} }
// 收集所有需要监听的topic // 加载所有的collector
func ChooseTopic() map[*Topic]bool { func LoadCollector(name string) Collector {
collector := LoadCollectors() config, err := conf.GetConfFromEtcd(name)
topics := loadTopics() if err != nil {
log.Printf("get etcd config err : err: %s", err)
}
var collector Collector
err = json.Unmarshal(config, &collector)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", collector, err)
}
return collector
}
// 收集所有需要监听的topic
func ChooseTopic() (map[*Topic]bool, error) {
// 收集全部的agent的collector信息
ableTopics := make(map[*Topic]bool) ableTopics := make(map[*Topic]bool)
for _, v := range collector {
// 所有当前
collectors, err := LoadCollectors()
if err != nil {
return ableTopics, fmt.Errorf("Load Collector error: %s", err)
}
topics, err := loadTopics()
if err != nil {
return ableTopics, fmt.Errorf("Load Topic error: %s", err)
}
for _, v := range collectors {
currentTopic := topics[v.Topic] currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true ableTopics[currentTopic] = true
} }
return ableTopics return ableTopics, nil
} }
// 解析全部的Topic并加载内部的格式器和插件pipeline // 解析全部的Topic并加载内部的格式器和插件pipeline
func loadTopics() map[string]*Topic { func loadTopics() (map[string]*Topic, error) {
configs := conf.GetAllTopicFromEtcd()
topics := make(map[string]*Topic) topics := make(map[string]*Topic)
configs, err := conf.GetAllTopicFromEtcd()
if err != nil {
return topics, err
}
for _, v := range configs { for _, v := range configs {
var currentTopic TopicConfig var currentTopicConfig TopicConfig
err := json.Unmarshal(v, &currentTopic) err := json.Unmarshal(v, &currentTopicConfig)
if err != nil { if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err) return topics, err
} }
log.Printf("Init Topic:%s ", currentTopic.Label)
if currentTopic.PipelineConfig == nil { topics[currentTopicConfig.Name] = generateTopic(currentTopicConfig)
log.Printf("get topic setting error:%s ", currentTopic.Label) }
return topics, nil
}
func TopicChangeListener() <-chan *Topic {
return watchTopicChannel
}
func TopicDeleteListener() <-chan string{
return deleteTopicChannel
}
func TopicStartListener() <-chan *Topic {
return startTopicChannel
}
func WatchTopics() {
for confResp := range conf.WatchLogTopicToEtcd() {
for _, event := range confResp.Events {
switch event.Type {
case mvccpb.PUT:
// 有PUT操作才进行通知
var newTopicCondfig TopicConfig
if len(confResp.Events) == 0 {
continue
}
changedConf := confResp.Events[0].Kv.Value
err := json.Unmarshal(changedConf, &newTopicCondfig)
if err != nil {
log.Println("Unmarshal New Topic Config Error:", err)
}
log.Println("load New Topic success!")
watchTopicChannel <- generateTopic(newTopicCondfig)
case mvccpb.DELETE:
// 获取旧版本数据 来进行对比
// 要清空这个register 中全部这个topic的customer, 不过也应该没有了,应该都会被config watcher 给捕获到
oldTopic, err := getHistoryTopicWithEvent(confResp)
if err != nil {
log.Println("Get HIstory Collector Error:", err)
continue
}
log.Println("some Topic remove", oldTopic.Name)
// TODO: 以防万一查一下
}
time.Sleep(2*time.Second)
}
}
}
func WatchConfigs() {
for confResp := range conf.WatchLogConfigToEtcd() {
// 如果是关闭的Agent 这些可以忽略的!!!
agentKey := string(confResp.Events[0].Kv.Key)
currentChangedkey := agentKey[strings.LastIndex(agentKey, "/")+1:]
if !conf.CheckAgentActive(currentChangedkey){
continue
} }
// 只有开启的才去更改配置
for _, event := range confResp.Events {
switch event.Type {
case mvccpb.PUT:
if len(confResp.Events) == 0 {
continue
}
// 有PUT操作才进行通知
// agent的PUT有多种情况
// 一: 通过REV查询 发现是Agent新增了一个Collector (len > 0)
// 然后再去查询内存中 的registed customer 就可以了 判断这个Collector所使用的topic是否是第一次出现(安装对应的Topic)
// 二: 通过REV查询 发现出删除了了一个Collector (len >= 0)
// 然后要再去获取config prefix 全部collector,判断是否是最后一个用这个topic的collector(需要卸载这个collector所使用的Topic)
// 三: 通过REV查询 发现之前不存在的 (len = 0)
diff, status, err := getCollectorChangeWithEvent(confResp)
if err != nil {
log.Println("Get History Collector Change Error:", err)
continue
}
switch status {
case "CREATED":
// 情况三 可以不进行操作
// 不会出现,因为初始化的时候一定是关闭的。
log.Println("Add Agent", currentChangedkey)
case "PUT":
log.Println("有collector加入")
_, ok := GetCustomer(diff.Topic)
// 如果首次出现 那就初始化这个Topic了
// 根据topicname去获取TopicConfig
if !ok {
changedConf, err := conf.GetTopicFromEtcd(diff.Topic)
if err != nil {
log.Println("Load Agent New Puted Topic Config Error:", err)
continue
}
// 有PUT操作才进行通知
var newPutTopicCondfig TopicConfig
err = json.Unmarshal(changedConf, &newPutTopicCondfig)
if err != nil {
log.Println("Unmarshal Agent New Puted Topic Config Error:", err)
continue
}
log.Println("load Agent New Puted Topic success!")
startTopicChannel <- generateTopic(newPutTopicCondfig)
log.Println(currentChangedkey, "Agent Add Collector And Init Topic", diff)
}
log.Println(currentChangedkey, "Agent Add Collector", diff)
case "DEL":
log.Println("有collector离开")
// 获取config prefix 全部collector,判断是否是最后一个用这个topic的collector
currentAbleCollector, err := LoadCollectors()
if err != nil {
log.Println("Get Current Able Collector Error:", err)
continue
}
var set = make(map[string]bool)
for _, v := range currentAbleCollector {
set[v.Topic] = true
}
p := plugin.PipeLine{} if !set[diff.Topic] {
// 可以删了
deleteTopicChannel <-diff.Topic
}
// log.Println("get config", currentTopic.PipelineConfig) log.Println(currentChangedkey, "Agent Delete Collector", diff)
for _, v := range currentTopic.PipelineConfig { default:
currentPlugin := plugin.RegistedPlugins[v.Name] log.Println("Get History Collector Change Unkonw Error!")
err := currentPlugin.SetParams(v.Params) }
if err != nil {
log.Panicln("plugin encode params error:", err) case mvccpb.DELETE:
// 获取旧版本数据 来进行对比
// 通常这个地方应该知识空数组了
// 因为agent不允许在有collector的时候删除
// 也不允许 在开启的时候删除~
// 所以这个地方没有用
if len(confResp.Events) == 0 {
continue
}
oldCollector, err := getHistoryCollectorWithEvent(confResp)
if err != nil {
log.Println("Get History Collector Error:", err)
continue
}
if len(oldCollector) != 0 {
log.Printf("Get History Collector Error: Agent(%s) die with collector.", confResp.Events[0].Kv.Key)
continue
}
log.Printf("Agent(%s) has uninstall complete.", confResp.Events[0].Kv.Key)
} }
p.AppendPlugin(currentPlugin)
} }
var formatMethod entity.Formater
switch currentTopic.Format { }
}
func getHistoryTopicWithEvent(confResp clientv3.WatchResponse) (TopicConfig, error) {
var oldTopic TopicConfig
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
return oldTopic, err
}
err = json.Unmarshal(oldValue, &oldTopic)
if err != nil {
return oldTopic, err
}
return oldTopic, nil
}
func getHistoryCollectorWithEvent(confResp clientv3.WatchResponse) ([]Collector, error) {
var oldCollector []Collector
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
return oldCollector, err
}
err = json.Unmarshal(oldValue, &oldCollector)
if err != nil {
return oldCollector, err
}
return oldCollector, nil
}
// 获取Agent 中 Collector 的变更 有三种类型 CREATED 新增Agent PUT 新增Collector DEL 删除 Collector
func getCollectorChangeWithEvent(confResp clientv3.WatchResponse) (different Collector, changeType string, err error) {
var currentCollectors []Collector
changedConf := confResp.Events[0].Kv.Value
err = json.Unmarshal(changedConf, &currentCollectors)
if err != nil {
return different, changeType, err
}
var oldCollector []Collector
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
if len(currentCollectors) == 0 {
changeType = "CREATED"
err = nil
}
return different, changeType, err
}
err = json.Unmarshal(oldValue, &oldCollector)
if err != nil {
return different, changeType, err
}
var set = make(map[Collector]bool)
if len(oldCollector)-len(currentCollectors) < 0 {
changeType = "PUT"
for _, item := range oldCollector {
set[item] = true
}
case 1: for _, item := range currentCollectors {
formatMethod = entity.DefaultJsonLog if !set[item] {
case 2: different = item
formatMethod = entity.FormatServiceWfLog }
default: }
formatMethod = entity.DefaultLog } else {
changeType = "DEL"
for _, item := range currentCollectors {
set[item] = true
}
for _, item := range oldCollector {
if !set[item] {
different = item
}
} }
topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
} }
return topics
return different, changeType, err
} }
...@@ -31,15 +31,14 @@ func (c Customer) Exit() { ...@@ -31,15 +31,14 @@ func (c Customer) Exit() {
} }
// 结束信号监听 // 结束信号监听
func (c Customer) Listen() chan struct{} { func (c Customer) Listen() <-chan struct{} {
return c.done return c.done
} }
// 初始化一个消费处理器 // 初始化一个消费处理器
func InitCustomer(topic *Topic) *Customer { func InitCustomer(topic *Topic) *Customer {
GroupID := topic.Name + "_group" r := InitReader(topic.Name)
r := InitReader(topic.Name, GroupID) log.Printf("Check Customer group of [%s] success!", topic.Name)
log.Printf("Check Customer group of [%s] success!", GroupID)
return &Customer{Topic: topic, Readers: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format} return &Customer{Topic: topic, Readers: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
} }
...@@ -50,6 +49,12 @@ func RegisterManger(c *Customer) { ...@@ -50,6 +49,12 @@ func RegisterManger(c *Customer) {
mu.Unlock() mu.Unlock()
} }
func UnstallManger(topic string){
mu.Lock()
delete(CustomerManger, topic)
mu.Unlock()
}
// 根据topic快速获取消费处理器 目前用于关闭消费处理器 // 根据topic快速获取消费处理器 目前用于关闭消费处理器
func GetCustomer(topic string) (customer *Customer, ok bool) { func GetCustomer(topic string) (customer *Customer, ok bool) {
mu.Lock() mu.Lock()
...@@ -75,34 +80,44 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -75,34 +80,44 @@ func ReadingMessage(ctx context.Context, c *Customer) {
readyToRead := make(chan *kafka.Reader) readyToRead := make(chan *kafka.Reader)
go func(ctx context.Context, c *Customer) { go func(ctx context.Context, c *Customer) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for { for {
select { select {
case <-c.Listen(): case <-c.Listen():
return return
case <-ctx.Done(): case <-ctx.Done():
c.Exit() c.Exit()
return
case reader := <-readyToRead: case reader := <-readyToRead:
defer reader.Close()
go func(ctx context.Context, c *Customer) { go func(ctx context.Context, c *Customer) {
var errMessage strings.Builder defer reader.Close()
var errMessage strings.Builder
var matedata entity.Matedata var matedata entity.Matedata
for { for {
select { select {
case <-ctx.Done(): case <- ctx.Done():
c.Exit() return
default: default:
m, err := reader.ReadMessage(ctx) m, err := reader.ReadMessage(ctx)
if err != nil { if err != nil {
// 退出 switch err {
// c.Exit() case context.Canceled:
errMessage.Reset() // 监听主上下文信号
errMessage.WriteString("Reader Error") log.Println("Closing Kafka Conection!")
errMessage.WriteString(err.Error()) return
log.Println(errMessage.String()) default:
errMessage.Reset()
continue errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error())
log.Println(errMessage.String())
continue
}
} }
matedata, err = c.Format(string(reader.Config().Topic), string(m.Value)) matedata, err = c.Format(string(reader.Config().Topic), string(m.Value))
...@@ -129,6 +144,5 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -129,6 +144,5 @@ func ReadingMessage(ctx context.Context, c *Customer) {
log.Printf("Start Customer Group[%s][%d] success!", p.Config().GroupID, p.Config().Partition) log.Printf("Start Customer Group[%s][%d] success!", p.Config().GroupID, p.Config().Partition)
readyToRead <- p readyToRead <- p
} }
} }
...@@ -8,7 +8,9 @@ import ( ...@@ -8,7 +8,9 @@ import (
"github.com/y7ut/logtransfer/conf" "github.com/y7ut/logtransfer/conf"
) )
func InitReader(topic string, groupId string) []*kafka.Reader { const GroupSuffix = "_group"
func InitReader(topic string) []*kafka.Reader {
// // 先去创建一下这个分组 // // 先去创建一下这个分组
// make a writer that produces to topic-A, using the least-bytes distribution // make a writer that produces to topic-A, using the least-bytes distribution
var readers []*kafka.Reader var readers []*kafka.Reader
...@@ -16,7 +18,7 @@ func InitReader(topic string, groupId string) []*kafka.Reader { ...@@ -16,7 +18,7 @@ func InitReader(topic string, groupId string) []*kafka.Reader {
readers = append(readers, kafka.NewReader(kafka.ReaderConfig{ readers = append(readers, kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","), Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topic: topic, Topic: topic,
GroupID: groupId, GroupID: topic+GroupSuffix,
// Partition: 0, // Partition: 0,
MinBytes: 10e3, // 10KB MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB MaxBytes: 10e6, // 10MB
...@@ -33,15 +35,17 @@ func InitReader(topic string, groupId string) []*kafka.Reader { ...@@ -33,15 +35,17 @@ func InitReader(topic string, groupId string) []*kafka.Reader {
return readers return readers
} }
func CreateCustomerGroup(topic string, groupId string) { func CreateCustomerGroup(topic string) error {
config := kafka.ConsumerGroupConfig{ config := kafka.ConsumerGroupConfig{
ID: groupId, ID: topic+GroupSuffix,
Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","), Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topics: []string{topic}, Topics: []string{topic},
StartOffset: kafka.LastOffset, StartOffset: kafka.LastOffset,
} }
_, err := kafka.NewConsumerGroup(config) _, err := kafka.NewConsumerGroup(config)
log.Printf("Customer group [%s] created success!", topic+GroupSuffix)
if err != nil { if err != nil {
log.Println("create CustomerGroup error:", err) return err
} }
return nil
} }
package source
import (
"log"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
)
func generateTopic(config TopicConfig) *Topic {
if config.PipelineConfig == nil {
log.Printf("get topic setting error:%s ", config.Label)
}
p := plugin.PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range config.PipelineConfig {
currentPlugin := plugin.RegistedPlugins[v.Name]
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.AppendPlugin(currentPlugin)
}
var formatMethod entity.Formater
switch config.Format {
case 1:
formatMethod = entity.DefaultJsonLog
case 2:
formatMethod = entity.FormatServiceWfLog
default:
formatMethod = entity.DefaultLog
}
return &Topic{Name: config.Name, Label: config.Label, PipeLine: &p, Format: formatMethod}
}
...@@ -6,10 +6,8 @@ import ( ...@@ -6,10 +6,8 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time"
elastic "github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf" "github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/entity" "github.com/y7ut/logtransfer/entity"
...@@ -17,61 +15,42 @@ import ( ...@@ -17,61 +15,42 @@ import (
) )
var ( var (
Start = make(chan *source.Customer) Start = make(chan *source.Customer)
Close = make(chan string) Close = make(chan string)
closeWg sync.WaitGroup
) )
// 核心启动 // 核心启动
func Run(confPath string) { func Run(confPath string) {
// 加载配置 // 加载配置
conf.Init(confPath) conf.Init(confPath)
// 初始化ES客户端
esClient, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
panic(err)
}
// 做一个master的上下文 // 做一个master的上下文
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// 启动es消息发送器 go entity.MatedateSender(ctx)
// for i := 0; i < 20; i++ {
// go entity.MatedateSender(ctx, esClient)
// }
go entity.MatedateSender(ctx, esClient)
// 用于处理启动与关闭消费处理器的信号通知 // 用于处理启动与关闭消费处理器的信号通知
go func() { go CollectorRegister(ctx)
for {
select {
case customer := <-Start:
source.RegisterManger(customer)
go source.ReadingMessage(ctx, customer)
case closer := <-Close:
c, ok := source.GetCustomer(closer)
if !ok {
log.Printf(" Customer %s unstall Failed ", closer)
} initTopic, err := source.ChooseTopic()
c.Exit() if err != nil {
closeWg.Done() panic(fmt.Sprintf("init topic fail: %s", err))
} }
}
}()
// TODO: 动态的注册customer,目前仅支持初始化的时候来加载 for topic := range initTopic {
for topic := range source.ChooseTopic() {
currentCustomer := source.InitCustomer(topic) currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer Start <- currentCustomer
} }
// TODO: 还要监听Topic的配置变更 go TopicWatcherHandle()
// 目前是通过topic的name来注册所有的消费处理器
// 所以直接给对应的topic中的customer重启就可以杀了就可以了 // 监听Agent Collector变更
go source.WatchConfigs()
// 还要监听Topic的配置变更
go source.WatchTopics()
// TODO: 监听Agent 启动状态变更
for sign := range sign() { for sign := range sign() {
switch sign { switch sign {
...@@ -82,13 +61,13 @@ func Run(confPath string) { ...@@ -82,13 +61,13 @@ func Run(confPath string) {
for _, topic := range currentTopics { for _, topic := range currentTopics {
closeWg.Add(1)
Close <- topic Close <- topic
log.Printf(" Customer %s unstalling...", topic) log.Printf(" Customer %s unstalling...", topic)
closeWg.Wait()
} }
entity.CloseMessageChan()
cancel() cancel()
entity.CloseMessageChan()
time.Sleep(1 * time.Second)
log.Printf(" Success unstall %d Transfer", len(currentTopics)) log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0) os.Exit(0)
} }
...@@ -97,6 +76,83 @@ func Run(confPath string) { ...@@ -97,6 +76,83 @@ func Run(confPath string) {
} }
func CollectorRegister(ctx context.Context) {
for {
select {
case customer := <-Start:
source.RegisterManger(customer)
go source.ReadingMessage(ctx, customer)
case closer := <-Close:
c, ok := source.GetCustomer(closer)
if !ok {
log.Printf(" Customer %s unstall Failed ", closer)
break
}
source.UnstallManger(closer)
c.Exit()
// closeWg.Done()
}
}
}
// 监控topic的变动, 只处理更新, 若删除topic的话,不会触发配置的重新载入行为
func TopicWatcherHandle() {
// restart
go func() {
for topic := range source.TopicChangeListener() {
var checkUsed bool
collectors, err := source.LoadCollectors()
if err != nil {
log.Println("Load Collector error:", err)
continue
}
// 检查是否使用
for _, item := range collectors {
if item.Topic == topic.Name {
checkUsed = true
}
}
if !checkUsed {
log.Println("Put topic but not used")
err := source.CreateCustomerGroup(topic.Name)
if err != nil {
log.Printf(" Create Topic Kafka customer group Failed : %s", err)
continue
}
continue
}
Close <- topic.Name
log.Printf(" Customer %s restart...", topic.Name)
currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer
}
}()
// close
go func() {
for deleteTopic := range source.TopicDeleteListener() {
// closeWg.Add(1)
Close <- deleteTopic
log.Printf(" Customer %s deleting...", deleteTopic)
// closeWg.Wait()
}
}()
// start
go func() {
for topic := range source.TopicStartListener() {
currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer
}
}()
}
func sign() <-chan os.Signal { func sign() <-chan os.Signal {
c := make(chan os.Signal, 2) c := make(chan os.Signal, 2)
// 监听信号 // 监听信号
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment