Commit fc26f103 authored by 谢宇轩's avatar 谢宇轩 😅

调整代码结构

parent 38764ddb
......@@ -9,3 +9,11 @@
2. 管道的形式处理数据
3. 提供数据同步到ES的插件
### 2.0.0
1. 支持动态的安装卸载插件
# 安装
```
docker build -t logtransfer:2.0 .
docker run -d --name=LTF logtransfer:2.0
```
......@@ -18,9 +18,9 @@ var (
// 消费处理器
type Customer struct {
Reader *kafka.Reader // 特定消费者的专属Kafka Reader
HandlePipeline *plugin.PipeLine // 从Topic中构建的Piepline
Format entity.Formater // 解析元数据的格式器
Reader *kafka.Reader // 特定消费者的专属Kafka Reader (我从哪里来)
HandlePipeline *plugin.PipeLine // 从Topic中构建的Piepline (要到那里去)
Format entity.Formater // 解析元数据的格式器 (变形记。。)
done chan struct{} // 结束标志
}
......@@ -86,6 +86,8 @@ func ReadingMessage(ctx context.Context, c *Customer) {
select {
case <-c.Listen():
// 监听需要关闭的信号
c.Exit()
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
......
......@@ -67,6 +67,10 @@ func Run(confPath string) {
Start <- currentCustomer
}
// TODO: 还要监听Topic的配置变更
// 目前是通过topic的name来注册所有的消费处理器
// 所以直接给对应的topic中的customer重启就可以杀了就可以了
for sign := range sign() {
switch sign {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment