Commit 38764ddb authored by 谢宇轩's avatar 谢宇轩 😅

去除多余的变量

parent 680ac7fa
...@@ -59,6 +59,16 @@ func GetCustomer(topic string) (customer *Customer, ok bool) { ...@@ -59,6 +59,16 @@ func GetCustomer(topic string) (customer *Customer, ok bool) {
return customer, ok return customer, ok
} }
// 获取全部的注册过的消费处理器 所使用的的Topic名字
func GetRegisterTopics() (topics []string) {
mu.Lock()
for topic := range CustomerManger {
topics = append(topics, topic)
}
mu.Unlock()
return topics
}
// 從Kafka中消費消息,注意这里会提交commit offset // 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(ctx context.Context, c *Customer) { func ReadingMessage(ctx context.Context, c *Customer) {
......
...@@ -19,21 +19,9 @@ import ( ...@@ -19,21 +19,9 @@ import (
var ( var (
Start = make(chan *source.Customer) Start = make(chan *source.Customer)
Close = make(chan string) Close = make(chan string)
CustomerManger = make(map[string]*source.Customer)
MaxRetryTime = 10
mu sync.Mutex
closeWg sync.WaitGroup closeWg sync.WaitGroup
) )
func getRegisterTopics() (topics []string) {
mu.Lock()
for topic := range CustomerManger {
topics = append(topics, topic)
}
mu.Unlock()
return topics
}
// 核心启动 // 核心启动
func Run(confPath string) { func Run(confPath string) {
// 加载配置 // 加载配置
...@@ -84,7 +72,7 @@ func Run(confPath string) { ...@@ -84,7 +72,7 @@ func Run(confPath string) {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM: case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM:
log.Println("Safe Exit with:", sign) log.Println("Safe Exit with:", sign)
currentTopics := getRegisterTopics() currentTopics := source.GetRegisterTopics()
for _, topic := range currentTopics { for _, topic := range currentTopics {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment