Commit 2b1e2fe0 authored by 谢宇轩's avatar 谢宇轩 😅

Merge branch 'AddWatchConfig' into main

parents d10f0729 1dd58221
......@@ -45,18 +45,17 @@ func initConnect() *clientv3.Client {
return cli
}
// 获取当前所有的任务 (目前在初始化时使用)
func GetAllConfFromEtcd() []EtcdValue {
// 获取当前开启的Agent所有的任务 (目前在初始化时使用)
func GetAllConfFromEtcd() ([]EtcdValue, error) {
configs := make([]EtcdValue, 0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
panic(fmt.Sprintf("get failed, err:%s \n", err))
return configs, err
}
configs := make([]EtcdValue, 0)
for _, etcdResult := range resp.Kvs {
// 根据系统中当前全部的节点名称, 确定节点状态
etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:])
......@@ -65,7 +64,7 @@ func GetAllConfFromEtcd() []EtcdValue {
resp, err := cli.Get(ctx, etcdKey)
cancel()
if err != nil {
panic(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
return configs, fmt.Errorf("get Etcd config failed, err:%s", err)
}
if len(resp.Kvs) != 0 {
......@@ -77,34 +76,122 @@ func GetAllConfFromEtcd() []EtcdValue {
}
}
return configs, nil
}
func GetDelRevValueFromEtcd(key string, rev int64) (EtcdValue, error) {
var value EtcdValue
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, key, clientv3.WithRev(rev))
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("config get error")
}
return configs
return resp.Kvs[0].Value, nil
}
// 加载所有的Topic主题配置信息
func GetAllTopicFromEtcd() []EtcdValue {
// 获取特定的配置
func GetConfFromEtcd(name string) (EtcdValue, error) {
var value EtcdValue
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, topicPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
resp, err := cli.Get(ctx, configPath+name)
cancel()
if err != nil {
panic(fmt.Sprintf("get failed, err:%s \n", err))
return value, err
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("config get error")
}
return resp.Kvs[0].Value, nil
}
// 加载所有的Topic主题配置信息
func GetAllTopicFromEtcd() ([]EtcdValue, error) {
configs := make([]EtcdValue, 0)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, topicPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
return configs, err
}
for _, etcdResult := range resp.Kvs {
configs = append(configs, etcdResult.Value)
}
return configs
return configs, nil
}
// 获取特定的Topic
func GetTopicFromEtcd(name string) (EtcdValue, error) {
var value EtcdValue
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, topicPath+name)
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("config get error")
}
return resp.Kvs[0].Value, nil
}
func WatchLogConfToEtcd() clientv3.WatchChan {
func WatchLogTopicToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), topicPath, clientv3.WithPrefix())
return wch
}
func WatchLogConfigToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), configPath, clientv3.WithPrefix())
return wch
}
func WatchLogStatusToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), statusPath, clientv3.WithPrefix())
return wch
}
func CheckAgentActive(name string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, statusPath+name)
log.Println("looking for", statusPath+name)
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return false
}
status := string(resp.Kvs[0].Value)
log.Println("it is"+status)
return status == "1"
}
......@@ -43,7 +43,7 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) {
}
}
mateItem.Data["timestamp"] = mateItem.create
mateItem.Data["timestamp"] = mateItem.create.Format("2006-01-02 15:04:05")
result := *mateItem
mateItem.reset()
MatePool.Put(mateItem)
......
......@@ -41,46 +41,80 @@ func CloseMessageChan() {
close(messages)
}
func MatedateSender(ctx context.Context, esClient *elastic.Client) {
tick := time.NewTicker(3 * time.Second)
var (
SenderMu sync.Mutex
)
func MatedateSender(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 初始化ES客户端
esClient, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
panic(err)
}
wp := &ESWorkPool{
WorkerFunc: func(matedatas []*Matedata) bool {
bulkRequest := esClient.Bulk()
for {
select {
case m := <-messages:
for _, m := range matedatas {
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data)
bulkRequest.Add(indexRequest)
case <-tick.C:
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
}
count := bulkRequest.NumberOfActions()
if count > 0 {
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx)
log.Printf("Send messages to Index: %d : \n", bulkRequest.NumberOfActions())
timectx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
response, err := bulkRequest.Do(timectx)
cancel()
if err != nil {
log.Println("Save Es Error:", err)
return false
}
for _, v := range response.Items {
for _, item := range v {
if item.Error != nil {
log.Printf("Find Error in ES Result in (%s): %s", item.Index, item.Error.Reason)
return false
}
}
}
bulkRequest.Reset()
}
SenderMu.Unlock()
return true
},
MaxWorkerCount: 50,
MaxIdleWorkerDuration: 5 * time.Second,
}
case <-ctx.Done():
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
wp.Start()
defer wp.Stop()
var mateDatesItems []*Matedata
var mu sync.Mutex
for {
select {
case m := <-messages:
mu.Lock()
mateDatesItems = append(mateDatesItems, m)
currentItems := mateDatesItems
mu.Unlock()
if len(currentItems) > 10 {
wp.Serve(currentItems)
mu.Lock()
mateDatesItems = mateDatesItems[:0]
mu.Unlock()
}
bulkRequest.Reset()
SenderMu.Unlock()
case <-ctx.Done():
log.Println("Exiting...")
mu.Lock()
currentItems := mateDatesItems
mu.Unlock()
wp.Serve(currentItems)
return
}
}
......
package entity
import (
"log"
"runtime"
"sync"
"time"
)
var life sync.Map
// 每个worker channel 最多可以阻塞的任务个数
var workerChanCap = func() int {
// Use blocking workerChan if GOMAXPROCS=1.
// This immediately switches Serve to WorkerFunc, which results
// in higher performance (under go1.5 at least).
//使用阻塞的workerChan if GOMAXPROCS = 1,
//这会立即将Serve切换到WorkerFunc
//在更高的性能(至少在go1.5下)。
if runtime.GOMAXPROCS(0) == 1 {
return 0
}
// Use non-blocking workerChan if GOMAXPROCS>1,
// since otherwise the Serve caller (Acceptor) may lag accepting
// new connections if WorkerFunc is CPU-bound.
//使用非阻塞的workerChan if GOMAXPROCS>1,
//否则服务调用方(接收方)可能会延迟接收
//如果WorkerFunc是cpu绑定的新连接。
return 1
}()
type ESWorkPool struct {
WorkerFunc func(matedatas []*Matedata) bool // Work的处理逻辑
MaxWorkerCount int // 最大Worker数量
MaxIdleWorkerDuration time.Duration // Worker的最大空闲时间
Lock sync.Mutex
WorkerCount int // Work的数量
mustStop bool // 停止的标志 用于通知 workerchan 的 ch!!(他们一旦进入工作,就要等干完了 才去能去获取这个标志)
readyToWork []*workerChan // 一个存储WorkerChan类似栈的FILO(现进后出)队列, 成员是指针
stopChannel chan struct{} // 结束信号的接收与发送channel
workChanPool sync.Pool // 对象池
}
type workerChan struct {
lastUseTime time.Time // 上次工作时间
ch chan []*Matedata // 接收工作内容的channel
}
var Wait sync.WaitGroup
func (wp *ESWorkPool) Start() {
// 创建一个停止信号Channel
wp.stopChannel = make(chan struct{})
stopCh := wp.stopChannel
wp.workChanPool.New = func() interface{} {
// 手动去判断 如果取出来的是个nil
// 那就一个 worker Channel 的指针 这里 vch 还是 interface{}
return &workerChan{
ch: make(chan []*Matedata, workerChanCap),
}
}
// 启动协程
go func() {
var scrath []*workerChan
for {
// 清理未使用的时间超过 最大空闲时间的WorkerChan
// 不干活的就得死!
wp.clean(&scrath)
// 每隔一段时间检查一次 去进行清理操作,直到下班
select {
case <-stopCh:
return
default:
time.Sleep(wp.MaxIdleWorkerDuration)
}
}
}()
}
func (wp *ESWorkPool) Stop() {
// 关闭 并移除stopChannel, 下班!
close(wp.stopChannel)
wp.stopChannel = nil
// 关闭全部的reday slice中的全部WorkChan,并清空Ready
wp.Lock.Lock()
ready := wp.readyToWork // 获取当前全部的WorkChannel
// 通知他们下班结束了
for i, ch := range ready {
ch.ch <- nil
ready[i] = nil
}
// 清空WorkChannel
wp.readyToWork = ready[:0]
// 设置已经停止的标志
wp.mustStop = true
wp.Lock.Unlock()
}
func (wp *ESWorkPool) Serve(matedates []*Matedata) bool {
// 获取可用的WorkerChan
ch := wp.getCh()
// 若果没有 就返回失败
if ch == nil {
return false
}
// 发送任务到workerChan
ch.ch <- matedates
return true
}
// 获取可用的workerChan
func (wp *ESWorkPool) getCh() *workerChan {
var ch *workerChan
// 默认是不需要重新创建Worker的
createWorker := false
wp.Lock.Lock()
// 获取WorkerChan的Slice 全部的工作同道
ready := wp.readyToWork
// 获取Slice中元素的数量
n := len(ready) - 1
if n < 0 {
// 若可用的数量为0, 有两种情况
if wp.WorkerCount < wp.MaxWorkerCount {
// 当前工作channerl的数量还没有达到最大worker Channerl 的数量
// 所有的已经注册的worker都去工作了,
// 这是一种ch为nil的情况 (1)
createWorker = true
wp.WorkerCount++
}
// 已经满了,不能再注册worker了, 并且已有workerChan 都分配出去了
// 第二种ch为nil的情况 (2)
} else {
// 有可用的WorkerChan name就取slice的最后一个
// 将ReadyToWork 移除出最后一个WorkChan
// 这里可以看出来,拿出去干活的人, 就不在ReadytoWork的slice中了
ch = ready[n]
ready[n] = nil
wp.readyToWork = ready[:n]
}
wp.Lock.Unlock()
// 获取不到ch的时候
if ch == nil {
// 如果是第二种情况,也就是说满了的话,那就只能空手而归了,返回一个nil
if !createWorker {
return nil
}
// 若是第一种,可以新初始化一个worker Channel
// 我们从对象池中获取一个
// 这里之所以不 var 一个新的 worker Channel 是因为
// 频繁地分配、回收内存会给 GC 带来一定的负担,而 sync.Pool 可以将暂时不用的对象缓存起来
// 待下次需要的时候直接使用,不用再次经过内存分配,复用对象的内存,减轻 GC 的压力,提升系统的性能。
vch := wp.workChanPool.Get()
// 可以的 提前给Channel POOL 一个NEW方法
// 把vch再转换为 worker Channel 的指针 这种类型
ch = vch.(*workerChan)
now, _ := life.Load("new")
change := 1
if now != nil {
change = now.(int) + 1
}
life.Store("new", change)
// 在协程中 去开启这个worker 的ch
go func() {
// 阻塞的去接收 worker Channel 的工作内容
wp.workerFunc(ch)
// 然后把这个 vch (注意不是ch 是interface类型的里面是 worker Channel 指针)放回sync.Pool中
wp.workChanPool.Put(vch)
}()
}
// 顺利返回workerchan
return ch
}
func (wp *ESWorkPool) workerFunc(ch *workerChan) {
var matedatas []*Matedata
// 阻塞等待 woker chan 中的接收 job的 ch收到 job
for matedatas = range ch.ch {
// 有人close 了他 或者发送nil了 就可以 退出了
if matedatas == nil {
now, _ := life.Load("die")
change := 1
if now != nil {
change = now.(int) + 1
}
life.Store("die", change)
break
}
// do the job!!!!
if wp.WorkerFunc(matedatas) {
// 完事了!
matedatas = matedatas[:0]
}
// 去释放这个workerChan
// 因为他在干活的时候是不知道外面发生了什么的
// 发现wp 已经都must stop了 那就直接下班了
if !wp.release(ch) {
break
}
}
// 退出的时候 顺手给工作自己的存在注销掉
wp.Lock.Lock()
count := wp.WorkerCount
wp.WorkerCount--
wp.Lock.Unlock()
log.Println("当前Worker还剩:", (count - 1))
}
func (wp *ESWorkPool) release(ch *workerChan) bool {
// 记录一下时间
ch.lastUseTime = time.Now()
wp.Lock.Lock()
// 若wp已经退出了 返回false
if wp.mustStop {
// 若worker Pool都结束了,就直接下班了
wp.Lock.Unlock()
return false
}
// 不然还是要放回去的, 放回ready to work slice中
wp.readyToWork = append(wp.readyToWork, ch)
wp.Lock.Unlock()
return true
}
// 定期的去清理长时间不用的 不干活的 因为活跃的都在后方 这个时候就需要砍掉前面的元素!
func (wp *ESWorkPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.MaxIdleWorkerDuration
current := time.Now()
wp.Lock.Lock()
ready := wp.readyToWork
n := len(ready)
i := 0
// 当前的workerChan 有空闲,并很久没用了
for i < n && current.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
// 计数+1
i++
}
*scratch = append((*scratch)[:0], ready[:i]...)
// 去掉对应数量 不经常用的 他们在slice的前方 (别忘了 FILO)
if i > 0 {
m := copy(ready, ready[i:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.readyToWork = ready[:m]
}
wp.Lock.Unlock()
// 最后别忘了 解散那个worker channel!!!
tmp := *scratch
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
}
}
......@@ -7,7 +7,7 @@ import (
"github.com/y7ut/logtransfer/transfer"
)
const version = "2.0.0"
const version = "2.1.0"
var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动")
var v = flag.Bool("v", false, "查看当前程序版本")
......
......@@ -9,7 +9,6 @@ import (
"github.com/y7ut/logtransfer/entity"
)
type Handler interface {
HandleFunc(*entity.Matedata) error
SetParams(string) error
......@@ -24,6 +23,10 @@ type PipeLine struct {
pipe []*Handler
}
func (p *PipeLine) Length() int {
return len(p.pipe)
}
func (p *PipeLine) AppendPlugin(plugin Handler) {
p.pipe = append(p.pipe, &plugin)
}
......
......@@ -2,8 +2,13 @@ package source
import (
"encoding/json"
"fmt"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
......@@ -35,10 +40,19 @@ type PipeLinePluginsConfig struct {
Params string `json:"params"`
}
// 加载所有的collector
func LoadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd()
var watchTopicChannel = make(chan *Topic)
var startTopicChannel = make(chan *Topic)
var deleteTopicChannel = make(chan string)
// 加载所有的可用的collector
func LoadCollectors() ([]Collector, error) {
collectors := make([]Collector, 0)
configs, err := conf.GetAllConfFromEtcd()
if err != nil {
return collectors, err
}
for _, v := range configs {
var currentCollector []Collector
......@@ -46,70 +60,482 @@ func LoadCollectors() []Collector {
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
}
if currentCollector != nil {
log.Printf("Init config:%s ", v)
collectors = append(collectors, currentCollector...)
}
}
return collectors
return collectors, nil
}
// 收集所有需要监听的topic
func ChooseTopic() map[*Topic]bool {
collector := LoadCollectors()
topics := loadTopics()
// 加载Agent所有的collector
func LoadCollector(name string) ([]Collector, error) {
var collectors []Collector
config, err := conf.GetConfFromEtcd(name)
if err != nil {
return collectors, fmt.Errorf("get etcd config err : err: %s", err)
}
err = json.Unmarshal(config, &collectors)
if err != nil {
return collectors, fmt.Errorf("json decode config(%s) err : err: %s", collectors, err)
}
return collectors, nil
}
// 收集所有需要监听的topic
func ChooseTopic() (map[*Topic]bool, error) {
// 收集全部的agent的collector信息
ableTopics := make(map[*Topic]bool)
for _, v := range collector {
// 所有当前
collectors, err := LoadCollectors()
if err != nil {
return ableTopics, fmt.Errorf("Load Collector error: %s", err)
}
topics, err := loadTopics()
if err != nil {
return ableTopics, fmt.Errorf("Load Topic error: %s", err)
}
for _, v := range collectors {
currentTopic := topics[v.Topic]
ableTopics[currentTopic] = true
}
return ableTopics
return ableTopics, nil
}
// 解析全部的Topic并加载内部的格式器和插件pipeline
func loadTopics() map[string]*Topic {
configs := conf.GetAllTopicFromEtcd()
func loadTopics() (map[string]*Topic, error) {
topics := make(map[string]*Topic)
configs, err := conf.GetAllTopicFromEtcd()
if err != nil {
return topics, err
}
for _, v := range configs {
var currentTopic TopicConfig
err := json.Unmarshal(v, &currentTopic)
var currentTopicConfig TopicConfig
err := json.Unmarshal(v, &currentTopicConfig)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
return topics, err
}
log.Printf("Init Topic:%s ", currentTopic.Label)
if currentTopic.PipelineConfig == nil {
log.Printf("get topic setting error:%s ", currentTopic.Label)
topics[currentTopicConfig.Name] = generateTopic(currentTopicConfig)
}
return topics, nil
}
func TopicChangeListener() <-chan *Topic {
return watchTopicChannel
}
func TopicDeleteListener() <-chan string {
return deleteTopicChannel
}
p := plugin.PipeLine{}
func TopicStartListener() <-chan *Topic {
return startTopicChannel
}
func WatchTopics() {
for confResp := range conf.WatchLogTopicToEtcd() {
for _, event := range confResp.Events {
switch event.Type {
case mvccpb.PUT:
// 有PUT操作才进行通知
var newTopicCondfig TopicConfig
if len(confResp.Events) == 0 {
continue
}
changedConf := confResp.Events[0].Kv.Value
err := json.Unmarshal(changedConf, &newTopicCondfig)
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range currentTopic.PipelineConfig {
currentPlugin := plugin.RegistedPlugins[v.Name]
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
log.Println("Unmarshal New Topic Config Error:", err)
}
p.AppendPlugin(currentPlugin)
log.Println("load New Topic success!")
watchTopicChannel <- generateTopic(newTopicCondfig)
case mvccpb.DELETE:
// 获取旧版本数据 来进行对比
// 要清空这个register 中全部这个topic的customer, 不过也应该没有了,应该都会被config watcher 给捕获到
oldTopic, err := getHistoryTopicWithEvent(confResp)
if err != nil {
log.Println("Get HIstory Collector Error:", err)
continue
}
var formatMethod entity.Formater
switch currentTopic.Format {
log.Println("some Topic remove", oldTopic.Name)
}
time.Sleep(2 * time.Second)
}
}
}
case 1:
formatMethod = entity.DefaultJsonLog
case 2:
formatMethod = entity.FormatServiceWfLog
func WatchStatus() {
for confResp := range conf.WatchLogStatusToEtcd() {
// 如果是关闭的Agent 这些可以忽略的!!!
agentKey := string(confResp.Events[0].Kv.Key)
currentChangedkey := agentKey[strings.LastIndex(agentKey, "/")+1:]
// 只有开启的才去更改配置
for _, event := range confResp.Events {
switch event.Type {
case mvccpb.PUT:
if len(confResp.Events) == 0 {
continue
}
// 如果是1 name就是从0开启的(开启)
// 如果是0有可能是从1关闭的(关闭),也有可能是突然新增的(无视)
collectors, status, err := getStatusChangeWithEvent(confResp)
if err != nil {
log.Println("Get Status Active Event Info Error:", err)
continue
}
log.Println("STATUS CHANGE", status)
switch status {
case "CREATED":
// 情况三 可以不进行操作
// 不会出现,因为初始化的时候一定是关闭的。
log.Println("Add Agent Empty Status :", currentChangedkey)
case "CLOSE":
for _, c := range collectors {
log.Println("有collector离开")
// 获取config prefix 全部collector,判断是否是最后一个用这个topic的collector
currentAbleCollector, err := LoadCollectors()
if err != nil {
log.Println("Get Current Able Collector Error:", err)
continue
}
var set = make(map[string]bool)
for _, v := range currentAbleCollector {
set[v.Topic] = true
}
if !set[c.Topic] {
// 可以删了
deleteTopicChannel <- c.Topic
log.Println(currentChangedkey, "Agent Close Delete Collector with Topic Customer left", c.Topic)
continue
}
log.Println(currentChangedkey, "Agent Close with Close Collector", c)
}
case "OPEN":
for _, c := range collectors {
_, ok := GetCustomer(c.Topic)
// 如果首次出现 那就初始化这个Topic了
// 根据topicname去获取TopicConfig
if !ok {
changedConf, err := conf.GetTopicFromEtcd(c.Topic)
if err != nil {
log.Println("Load Agent Open New Puted Topic Config Error:", err)
continue
}
// 有PUT操作才进行通知
var newPutTopicCondfig TopicConfig
err = json.Unmarshal(changedConf, &newPutTopicCondfig)
if err != nil {
log.Println("Unmarshal Agent Open New Puted Topic Config Error:", err)
continue
}
log.Println("load Agent New Puted Topic success!")
startTopicChannel <- generateTopic(newPutTopicCondfig)
log.Println(currentChangedkey, "Agent open with open Collector And Init Topic Customer", c)
}
log.Println(currentChangedkey, "Agent open with open Collector, But has opend", c)
}
default:
formatMethod = entity.DefaultLog
log.Println("Get Status Active Event Info Unkonw Error")
}
}
topics[currentTopic.Name] = &Topic{Name: currentTopic.Name, Label: currentTopic.Label, PipeLine: &p, Format: formatMethod}
}
return topics
}
}
func WatchConfigs() {
for confResp := range conf.WatchLogConfigToEtcd() {
// 如果是关闭的Agent 这些可以忽略的!!!
agentKey := string(confResp.Events[0].Kv.Key)
currentChangedkey := agentKey[strings.LastIndex(agentKey, "/")+1:]
if !conf.CheckAgentActive(currentChangedkey) {
continue
}
// 只有开启的才去更改配置
for _, event := range confResp.Events {
switch event.Type {
case mvccpb.PUT:
if len(confResp.Events) == 0 {
continue
}
// 有PUT操作才进行通知
// agent的PUT有多种情况
// 一: 通过REV查询 发现是Agent新增了一个Collector (len > 0)
// 然后再去查询内存中 的registed customer 就可以了 判断这个Collector所使用的topic是否是第一次出现(安装对应的Topic)
// 二: 通过REV查询 发现出删除了了一个Collector (len >= 0)
// 然后要再去获取config prefix 全部collector,判断是否是最后一个用这个topic的collector(需要卸载这个collector所使用的Topic)
// 三: 通过REV查询 发现之前不存在的 (len = 0)
diff, status, err := getCollectorChangeWithEvent(confResp)
if err != nil {
log.Println("Get History Collector Change Error:", err)
continue
}
switch status {
case "CREATED":
// 情况三 可以不进行操作
// 不会出现,因为初始化的时候一定是关闭的。
log.Println("Add Agent", currentChangedkey)
case "PUT":
log.Println("有collector加入")
_, ok := GetCustomer(diff.Topic)
// 如果首次出现 那就初始化这个Topic了
// 根据topicname去获取TopicConfig
if !ok {
changedConf, err := conf.GetTopicFromEtcd(diff.Topic)
if err != nil {
log.Println("Load Agent New Puted Topic Config Error:", err)
continue
}
// 有PUT操作才进行通知
var newPutTopicCondfig TopicConfig
err = json.Unmarshal(changedConf, &newPutTopicCondfig)
if err != nil {
log.Println("Unmarshal Agent New Puted Topic Config Error:", err)
continue
}
log.Println("load Agent New Puted Topic success!")
startTopicChannel <- generateTopic(newPutTopicCondfig)
log.Println(currentChangedkey, "Agent Add Collector And Init Topic Customer", diff)
}
log.Println(currentChangedkey, "Agent Add Collector", diff)
case "DEL":
log.Println("有collector离开")
// 获取config prefix 全部collector,判断是否是最后一个用这个topic的collector
currentAbleCollector, err := LoadCollectors()
if err != nil {
log.Println("Get Current Able Collector Error:", err)
continue
}
var set = make(map[string]bool)
for _, v := range currentAbleCollector {
set[v.Topic] = true
}
if !set[diff.Topic] {
// 可以删了
deleteTopicChannel <- diff.Topic
log.Println(currentChangedkey, "Agent Delete Collector with Topic Customer left", diff)
continue
}
log.Println(currentChangedkey, "Agent Delete Collector", diff)
default:
log.Println("Get History Collector Change Unkonw Error!")
}
case mvccpb.DELETE:
// 获取旧版本数据 来进行对比
// 通常这个地方应该知识空数组了
// 因为agent不允许在有collector的时候删除
// 也不允许 在开启的时候删除~
// 所以这个地方没有用
if len(confResp.Events) == 0 {
continue
}
oldCollector, err := getHistoryCollectorWithEvent(confResp)
if err != nil {
log.Println("Get History Collector Error:", err)
continue
}
if len(oldCollector) != 0 {
log.Printf("Get History Collector Error: Agent(%s) die with collector.", confResp.Events[0].Kv.Key)
continue
}
log.Printf("Agent(%s) has uninstall complete.", confResp.Events[0].Kv.Key)
}
}
}
}
func getHistoryTopicWithEvent(confResp clientv3.WatchResponse) (TopicConfig, error) {
var oldTopic TopicConfig
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
return oldTopic, err
}
err = json.Unmarshal(oldValue, &oldTopic)
if err != nil {
return oldTopic, err
}
return oldTopic, nil
}
func getHistoryCollectorWithEvent(confResp clientv3.WatchResponse) ([]Collector, error) {
var oldCollector []Collector
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
return oldCollector, err
}
err = json.Unmarshal(oldValue, &oldCollector)
if err != nil {
return oldCollector, err
}
return oldCollector, nil
}
// 获取Agent 中 Collector 的变更 有三种类型 CREATED 新增Agent PUT 新增Collector DEL 删除 Collector
func getCollectorChangeWithEvent(confResp clientv3.WatchResponse) (different Collector, changeType string, err error) {
var currentCollectors []Collector
changedConf := confResp.Events[0].Kv.Value
err = json.Unmarshal(changedConf, &currentCollectors)
if err != nil {
return different, changeType, err
}
var oldCollector []Collector
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
if len(currentCollectors) == 0 {
changeType = "CREATED"
err = nil
}
return different, changeType, err
}
err = json.Unmarshal(oldValue, &oldCollector)
if err != nil {
return different, changeType, err
}
var set = make(map[Collector]bool)
if len(oldCollector)-len(currentCollectors) < 0 {
changeType = "PUT"
for _, item := range oldCollector {
set[item] = true
}
for _, item := range currentCollectors {
if !set[item] {
different = item
}
}
} else {
changeType = "DEL"
for _, item := range currentCollectors {
set[item] = true
}
for _, item := range oldCollector {
if !set[item] {
different = item
}
}
}
return different, changeType, err
}
func getStatusChangeWithEvent(confResp clientv3.WatchResponse) (collectors []Collector, changeType string, err error) {
changeStatus := fmt.Sprintf("%s", confResp.Events[0].Kv.Value)
// 先对比一下
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
if len(oldValue) == 0 {
// 新来的可以无视
changeType = "CREATE"
err = nil
return collectors, changeType, err
}
return collectors, changeType, err
}
// 获取这个agent的内容
agentKey := string(confResp.Events[0].Kv.Key)
currentChangedkey := agentKey[strings.LastIndex(agentKey, "/")+1:]
AllCollectors, err := LoadCollector(currentChangedkey)
if err != nil {
return collectors, changeType, err
}
changeType = "CLOSE"
if changeStatus == "1" {
changeType = "OPEN"
}
var topicSet = make(map[string]bool)
for _, c := range AllCollectors {
if topicSet[c.Topic] {
continue
}
topicSet[c.Topic] = true
collectors = append(collectors, c)
}
return collectors, changeType, err
}
......@@ -18,7 +18,8 @@ var (
// 消费处理器
type Customer struct {
Reader *kafka.Reader // 特定消费者的专属Kafka Reader (我从哪里来)
Readers []*kafka.Reader // 特定消费者的专属Kafka Reader Slice (我从哪里来)
Topic *Topic
HandlePipeline *plugin.PipeLine // 从Topic中构建的Piepline (要到那里去)
Format entity.Formater // 解析元数据的格式器 (变形记。。)
done chan struct{} // 结束标志
......@@ -26,28 +27,31 @@ type Customer struct {
// 结束一个消费处理器
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
// 结束信号监听
func (c Customer) Listen() chan struct{} {
func (c Customer) Listen() <-chan struct{} {
return c.done
}
// 初始化一个消费处理器
func InitCustomer(topic *Topic) *Customer{
GroupID := topic.Name + "_group"
r := InitReader(topic.Name, GroupID)
log.Printf("Check Customer group of [%s] success!", GroupID)
return &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
func InitCustomer(topic *Topic) *Customer {
r := InitReader(topic.Name)
log.Printf("Check Customer group of [%s] success!", topic.Name)
return &Customer{Topic: topic, Readers: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
}
// 全局的注册当前工作的消费处理器
func RegisterManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
CustomerManger[c.Topic.Name] = c
mu.Unlock()
}
func UnstallManger(topic string){
mu.Lock()
delete(CustomerManger, topic)
mu.Unlock()
}
......@@ -59,6 +63,7 @@ func GetCustomer(topic string) (customer *Customer, ok bool) {
return customer, ok
}
// 获取全部的注册过的消费处理器 所使用的的Topic名字
func GetRegisterTopics() (topics []string) {
mu.Lock()
......@@ -69,37 +74,42 @@ func GetRegisterTopics() (topics []string) {
return topics
}
// 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(ctx context.Context, c *Customer) {
defer c.Reader.Close()
log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID)
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var errMessage strings.Builder
// log.Println(c.HandlePipeline.pipe)
readyToRead := make(chan *kafka.Reader)
var matedata entity.Matedata
go func(ctx context.Context, c *Customer) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case <-c.Listen():
// 监听需要关闭的信号
return
case <-ctx.Done():
c.Exit()
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
// // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// // 这里使用阻塞的上下文
case reader := <-readyToRead:
m, err := c.Reader.ReadMessage(ctx)
go func(ctx context.Context, c *Customer) {
defer reader.Close()
var errMessage strings.Builder
var matedata entity.Matedata
for {
select {
case <- ctx.Done():
return
default:
m, err := reader.ReadMessage(ctx)
if err != nil {
// 退出
// c.Exit()
switch err {
case context.Canceled:
// 监听主上下文信号
log.Println("Closing Kafka Conection!")
return
default:
errMessage.Reset()
errMessage.WriteString("Reader Error")
errMessage.WriteString(err.Error())
......@@ -108,7 +118,9 @@ func ReadingMessage(ctx context.Context, c *Customer) {
continue
}
matedata, err = c.Format(string(c.Reader.Config().Topic), string(m.Value))
}
matedata, err = c.Format(string(reader.Config().Topic), string(m.Value))
if err != nil {
errMessage.Reset()
errMessage.WriteString("Format Error")
......@@ -120,4 +132,17 @@ func ReadingMessage(ctx context.Context, c *Customer) {
c.HandlePipeline.Enter(matedata)
}
}
}(ctx, c)
}
}
}(ctx, c)
for _, p := range c.Readers {
log.Printf("Start Customer Group[%s][%d] success!", p.Config().GroupID, p.Config().Partition)
readyToRead <- p
}
}
......@@ -8,33 +8,44 @@ import (
"github.com/y7ut/logtransfer/conf"
)
func InitReader(topic string, groupId string) *kafka.Reader {
// // 先去创建一下这个分组
// CreateCustomerGroup(topic, groupId)
const GroupSuffix = "_group"
func InitReader(topic string) []*kafka.Reader {
// // 先去创建一下这个分组
// make a writer that produces to topic-A, using the least-bytes distribution
r := kafka.NewReader(kafka.ReaderConfig{
var readers []*kafka.Reader
for i := 0; i < 10; i++ {
readers = append(readers, kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topic: topic,
GroupID: groupId,
Partition: 0,
GroupID: topic+GroupSuffix,
// Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
return r
}))
}
// readers = append(readers, kafka.NewReader(kafka.ReaderConfig{
// Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
// Topic: topic,
// GroupID: groupId,
// Partition: 0,
// MinBytes: 10e3, // 10KB
// MaxBytes: 10e6, // 10MB
// }))
return readers
}
func CreateCustomerGroup(topic string, groupId string) {
func CreateCustomerGroup(topic string) error {
config := kafka.ConsumerGroupConfig{
ID: groupId,
ID: topic+GroupSuffix,
Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topics: []string{topic},
StartOffset: kafka.LastOffset,
}
_, err := kafka.NewConsumerGroup(config)
log.Printf("Customer group [%s] created success!", topic+GroupSuffix)
if err != nil {
log.Println("create CustomerGroup error:", err)
return err
}
return nil
}
package source
import (
"log"
"github.com/y7ut/logtransfer/entity"
"github.com/y7ut/logtransfer/plugin"
)
func generateTopic(config TopicConfig) *Topic {
if config.PipelineConfig == nil {
log.Printf("get topic setting error:%s ", config.Label)
}
p := plugin.PipeLine{}
// log.Println("get config", currentTopic.PipelineConfig)
for _, v := range config.PipelineConfig {
currentPlugin := plugin.RegistedPlugins[v.Name]
err := currentPlugin.SetParams(v.Params)
if err != nil {
log.Panicln("plugin encode params error:", err)
}
p.AppendPlugin(currentPlugin)
}
var formatMethod entity.Formater
switch config.Format {
case 1:
formatMethod = entity.DefaultJsonLog
case 2:
formatMethod = entity.FormatServiceWfLog
default:
formatMethod = entity.DefaultLog
}
return &Topic{Name: config.Name, Label: config.Label, PipeLine: &p, Format: formatMethod}
}
......@@ -6,10 +6,8 @@ import (
"log"
"os"
"os/signal"
"sync"
"syscall"
elastic "github.com/olivere/elastic/v7"
"time"
"github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/entity"
......@@ -19,57 +17,40 @@ import (
var (
Start = make(chan *source.Customer)
Close = make(chan string)
closeWg sync.WaitGroup
)
// 核心启动
func Run(confPath string) {
// 加载配置
conf.Init(confPath)
// 初始化ES客户端
esClient, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
panic(err)
}
// 做一个master的上下文
ctx, cancel := context.WithCancel(context.Background())
// 启动es消息发送器
for i := 0; i < 3; i++ {
go entity.MatedateSender(ctx, esClient)
}
go entity.MatedateSender(ctx)
// 用于处理启动与关闭消费处理器的信号通知
go func() {
for {
select {
case customer := <-Start:
source.RegisterManger(customer)
go source.ReadingMessage(ctx, customer)
go CollectorRegister(ctx)
case closer := <-Close:
c, ok := source.GetCustomer(closer)
if !ok {
log.Printf(" Customer %s unstall Failed ", closer)
}
c.Exit()
closeWg.Done()
}
initTopic, err := source.ChooseTopic()
if err != nil {
panic(fmt.Sprintf("init topic fail: %s", err))
}
}()
// TODO: 动态的注册customer,目前仅支持初始化的时候来加载
for topic := range source.ChooseTopic() {
for topic := range initTopic {
currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer
}
// TODO: 还要监听Topic的配置变更
// 目前是通过topic的name来注册所有的消费处理器
// 所以直接给对应的topic中的customer重启就可以杀了就可以了
go TopicWatcherHandle()
// 监听 Agent Collector 变更
go source.WatchConfigs()
// 还要监听 Topic 的配置变更
go source.WatchTopics()
// 还要监听 Status 的配置变更
go source.WatchStatus()
for sign := range sign() {
switch sign {
......@@ -80,13 +61,13 @@ func Run(confPath string) {
for _, topic := range currentTopics {
closeWg.Add(1)
Close <- topic
log.Printf(" Customer %s unstalling...", topic)
closeWg.Wait()
}
cancel()
entity.CloseMessageChan()
time.Sleep(1 * time.Second)
log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0)
}
......@@ -95,6 +76,83 @@ func Run(confPath string) {
}
func CollectorRegister(ctx context.Context) {
for {
select {
case customer := <-Start:
source.RegisterManger(customer)
go source.ReadingMessage(ctx, customer)
case closer := <-Close:
c, ok := source.GetCustomer(closer)
if !ok {
log.Printf(" Customer %s unstall Failed ", closer)
break
}
source.UnstallManger(closer)
c.Exit()
// closeWg.Done()
}
}
}
// 监控topic的变动, 只处理更新, 若删除topic的话,不会触发配置的重新载入行为
func TopicWatcherHandle() {
// restart
go func() {
for topic := range source.TopicChangeListener() {
var checkUsed bool
collectors, err := source.LoadCollectors()
if err != nil {
log.Println("Load Collector error:", err)
continue
}
// 检查是否使用
for _, item := range collectors {
if item.Topic == topic.Name {
checkUsed = true
}
}
if !checkUsed {
log.Println("Put topic but not used")
err := source.CreateCustomerGroup(topic.Name)
if err != nil {
log.Printf(" Create Topic Kafka customer group Failed : %s", err)
continue
}
continue
}
Close <- topic.Name
log.Printf(" Customer %s restart...", topic.Name)
currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer
}
}()
// close
go func() {
for deleteTopic := range source.TopicDeleteListener() {
// closeWg.Add(1)
Close <- deleteTopic
log.Printf(" Customer %s deleting...", deleteTopic)
// closeWg.Wait()
}
}()
// start
go func() {
for topic := range source.TopicStartListener() {
currentCustomer := source.InitCustomer(topic)
Start <- currentCustomer
}
}()
}
func sign() <-chan os.Signal {
c := make(chan os.Signal, 2)
// 监听信号
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment