Commit 2703e667 authored by 谢宇轩's avatar 谢宇轩 😅

添加状态监控!

parent a9e90bb1
......@@ -104,22 +104,7 @@ func GetConfFromEtcd(name string) (EtcdValue, error) {
var value EtcdValue
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, statusPath+name)
cancel()
if err != nil {
log.Println(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) == 0 {
return value, fmt.Errorf("status error")
}
status := string(resp.Kvs[0].Value)
if status != "1" {
return value, fmt.Errorf("status error")
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err = cli.Get(ctx, configPath+name)
resp, err := cli.Get(ctx, configPath+name)
cancel()
if err != nil {
......@@ -185,6 +170,14 @@ func WatchLogConfigToEtcd() clientv3.WatchChan {
return wch
}
func WatchLogStatusToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), statusPath, clientv3.WithPrefix())
return wch
}
func CheckAgentActive(name string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
......
......@@ -69,19 +69,20 @@ func LoadCollectors() ([]Collector, error) {
return collectors, nil
}
// 加载所有的collector
func LoadCollector(name string) Collector {
// 加载Agent所有的collector
func LoadCollector(name string) ([]Collector, error) {
var collectors []Collector
config, err := conf.GetConfFromEtcd(name)
if err != nil {
log.Printf("get etcd config err : err: %s", err)
return collectors, fmt.Errorf("get etcd config err : err: %s", err)
}
var collector Collector
err = json.Unmarshal(config, &collector)
err = json.Unmarshal(config, &collectors)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", collector, err)
return collectors, fmt.Errorf("json decode config(%s) err : err: %s", collectors, err)
}
return collector
return collectors, nil
}
// 收集所有需要监听的topic
......@@ -136,7 +137,7 @@ func TopicChangeListener() <-chan *Topic {
return watchTopicChannel
}
func TopicDeleteListener() <-chan string{
func TopicDeleteListener() <-chan string {
return deleteTopicChannel
}
......@@ -144,7 +145,6 @@ func TopicStartListener() <-chan *Topic {
return startTopicChannel
}
func WatchTopics() {
for confResp := range conf.WatchLogTopicToEtcd() {
......@@ -177,21 +177,107 @@ func WatchTopics() {
}
log.Println("some Topic remove", oldTopic.Name)
// TODO: 以防万一查一下
}
time.Sleep(2*time.Second)
time.Sleep(2 * time.Second)
}
}
}
func WatchStatus() {
for confResp := range conf.WatchLogStatusToEtcd() {
// 如果是关闭的Agent 这些可以忽略的!!!
agentKey := string(confResp.Events[0].Kv.Key)
currentChangedkey := agentKey[strings.LastIndex(agentKey, "/")+1:]
// 只有开启的才去更改配置
for _, event := range confResp.Events {
switch event.Type {
case mvccpb.PUT:
if len(confResp.Events) == 0 {
continue
}
// 如果是1 name就是从0开启的(开启)
// 如果是0有可能是从1关闭的(关闭),也有可能是突然新增的(无视)
collectors, status, err := getStatusChangeWithEvent(confResp)
if err != nil {
log.Println("Get Status Active Event Info Error:", err)
continue
}
log.Println("STATUS CHANGE", status)
switch status {
case "CREATED":
// 情况三 可以不进行操作
// 不会出现,因为初始化的时候一定是关闭的。
log.Println("Add Agent Empty Status :", currentChangedkey)
case "CLOSE":
for _, c := range collectors {
log.Println("有collector离开")
// 获取config prefix 全部collector,判断是否是最后一个用这个topic的collector
currentAbleCollector, err := LoadCollectors()
if err != nil {
log.Println("Get Current Able Collector Error:", err)
continue
}
var set = make(map[string]bool)
for _, v := range currentAbleCollector {
set[v.Topic] = true
}
if !set[c.Topic] {
// 可以删了
deleteTopicChannel <- c.Topic
log.Println(currentChangedkey, "Agent Close Delete Collector with Topic Customer left", c.Topic)
continue
}
log.Println(currentChangedkey, "Agent Close with Close Collector", c)
}
case "OPEN":
for _, c := range collectors {
_, ok := GetCustomer(c.Topic)
// 如果首次出现 那就初始化这个Topic了
// 根据topicname去获取TopicConfig
if !ok {
changedConf, err := conf.GetTopicFromEtcd(c.Topic)
if err != nil {
log.Println("Load Agent Open New Puted Topic Config Error:", err)
continue
}
// 有PUT操作才进行通知
var newPutTopicCondfig TopicConfig
err = json.Unmarshal(changedConf, &newPutTopicCondfig)
if err != nil {
log.Println("Unmarshal Agent Open New Puted Topic Config Error:", err)
continue
}
log.Println("load Agent New Puted Topic success!")
startTopicChannel <- generateTopic(newPutTopicCondfig)
log.Println(currentChangedkey, "Agent open with open Collector And Init Topic Customer", c)
}
log.Println(currentChangedkey, "Agent open with open Collector, But has opend", c)
}
default:
log.Println("Get Status Active Event Info Unkonw Error")
}
}
}
}
}
func WatchConfigs() {
for confResp := range conf.WatchLogConfigToEtcd() {
// 如果是关闭的Agent 这些可以忽略的!!!
agentKey := string(confResp.Events[0].Kv.Key)
currentChangedkey := agentKey[strings.LastIndex(agentKey, "/")+1:]
if !conf.CheckAgentActive(currentChangedkey){
if !conf.CheckAgentActive(currentChangedkey) {
continue
}
// 只有开启的才去更改配置
......@@ -216,7 +302,7 @@ func WatchConfigs() {
}
switch status {
case "CREATED":
// 情况三 可以不进行操作
// 情况三 可以不进行操作
// 不会出现,因为初始化的时候一定是关闭的。
log.Println("Add Agent", currentChangedkey)
case "PUT":
......@@ -242,7 +328,7 @@ func WatchConfigs() {
log.Println("load Agent New Puted Topic success!")
startTopicChannel <- generateTopic(newPutTopicCondfig)
log.Println(currentChangedkey, "Agent Add Collector And Init Topic", diff)
log.Println(currentChangedkey, "Agent Add Collector And Init Topic Customer", diff)
}
log.Println(currentChangedkey, "Agent Add Collector", diff)
case "DEL":
......@@ -261,7 +347,9 @@ func WatchConfigs() {
if !set[diff.Topic] {
// 可以删了
deleteTopicChannel <-diff.Topic
deleteTopicChannel <- diff.Topic
log.Println(currentChangedkey, "Agent Delete Collector with Topic Customer left", diff)
continue
}
log.Println(currentChangedkey, "Agent Delete Collector", diff)
......@@ -402,3 +490,52 @@ func getCollectorChangeWithEvent(confResp clientv3.WatchResponse) (different Col
return different, changeType, err
}
func getStatusChangeWithEvent(confResp clientv3.WatchResponse) (collectors []Collector, changeType string, err error) {
changeStatus := fmt.Sprintf("%s", confResp.Events[0].Kv.Value)
// 先对比一下
oldKey := confResp.Events[0].Kv.Key
rev := confResp.Events[0].Kv.ModRevision - 1
oldValue, err := conf.GetDelRevValueFromEtcd(string(oldKey), rev)
if err != nil {
if len(oldValue) == 0 {
// 新来的可以无视
changeType = "CREATE"
err = nil
return collectors, changeType, err
}
return collectors, changeType, err
}
// 获取这个agent的内容
agentKey := string(confResp.Events[0].Kv.Key)
currentChangedkey := agentKey[strings.LastIndex(agentKey, "/")+1:]
AllCollectors, err := LoadCollector(currentChangedkey)
if err != nil {
return collectors, changeType, err
}
changeType = "CLOSE"
if changeStatus == "1" {
changeType = "OPEN"
}
var topicSet = make(map[string]bool)
for _, c := range AllCollectors {
if topicSet[c.Topic] {
continue
}
topicSet[c.Topic] = true
collectors = append(collectors, c)
}
return collectors, changeType, err
}
......@@ -44,13 +44,13 @@ func Run(confPath string) {
go TopicWatcherHandle()
// 监听Agent Collector变更
// 监听 Agent Collector 变更
go source.WatchConfigs()
// 还要监听Topic的配置变更
// 还要监听 Topic 的配置变更
go source.WatchTopics()
// TODO: 监听Agent 启动状态变更
// 还要监听 Status 的配置变更
go source.WatchStatus()
for sign := range sign() {
switch sign {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment