Commit 42d45662 authored by 谢宇轩's avatar 谢宇轩 😅

添加多个消费者协程

parent 5f738b70
......@@ -18,7 +18,8 @@ var (
// 消费处理器
type Customer struct {
Reader *kafka.Reader // 特定消费者的专属Kafka Reader (我从哪里来)
Readers []*kafka.Reader // 特定消费者的专属Kafka Reader Slice (我从哪里来)
Topic *Topic
HandlePipeline *plugin.PipeLine // 从Topic中构建的Piepline (要到那里去)
Format entity.Formater // 解析元数据的格式器 (变形记。。)
done chan struct{} // 结束标志
......@@ -26,9 +27,7 @@ type Customer struct {
// 结束一个消费处理器
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
// 结束信号监听
......@@ -37,17 +36,17 @@ func (c Customer) Listen() chan struct{} {
}
// 初始化一个消费处理器
func InitCustomer(topic *Topic) *Customer{
func InitCustomer(topic *Topic) *Customer {
GroupID := topic.Name + "_group"
r := InitReader(topic.Name, GroupID)
log.Printf("Check Customer group of [%s] success!", GroupID)
return &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
return &Customer{Topic: topic, Readers: r, done: make(chan struct{}), HandlePipeline: topic.PipeLine, Format: topic.Format}
}
// 全局的注册当前工作的消费处理器
func RegisterManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
CustomerManger[c.Topic.Name] = c
mu.Unlock()
}
......@@ -59,6 +58,7 @@ func GetCustomer(topic string) (customer *Customer, ok bool) {
return customer, ok
}
// 获取全部的注册过的消费处理器 所使用的的Topic名字
func GetRegisterTopics() (topics []string) {
mu.Lock()
......@@ -69,33 +69,30 @@ func GetRegisterTopics() (topics []string) {
return topics
}
// 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(ctx context.Context, c *Customer) {
defer c.Reader.Close()
log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID)
readyToRead := make(chan *kafka.Reader)
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
go func(ctx context.Context, c *Customer) {
for {
select {
case <-c.Listen():
return
case <-ctx.Done():
c.Exit()
case reader := <-readyToRead:
defer reader.Close()
go func(ctx context.Context, c *Customer) {
var errMessage strings.Builder
var matedata entity.Matedata
for {
select {
case <-c.Listen():
// 监听需要关闭的信号
case <-ctx.Done():
c.Exit()
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
// // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// // 这里使用阻塞的上下文
m, err := c.Reader.ReadMessage(ctx)
m, err := reader.ReadMessage(ctx)
if err != nil {
// 退出
......@@ -108,7 +105,7 @@ func ReadingMessage(ctx context.Context, c *Customer) {
continue
}
matedata, err = c.Format(string(c.Reader.Config().Topic), string(m.Value))
matedata, err = c.Format(string(reader.Config().Topic), string(m.Value))
if err != nil {
errMessage.Reset()
errMessage.WriteString("Format Error")
......@@ -120,4 +117,18 @@ func ReadingMessage(ctx context.Context, c *Customer) {
c.HandlePipeline.Enter(matedata)
}
}
}(ctx, c)
}
}
}(ctx, c)
for _, p := range c.Readers {
log.Printf("Start Customer Group[%s][%d] success!", p.Config().GroupID, p.Config().Partition)
readyToRead <- p
}
}
......@@ -8,21 +8,29 @@ import (
"github.com/y7ut/logtransfer/conf"
)
func InitReader(topic string, groupId string) *kafka.Reader {
func InitReader(topic string, groupId string) []*kafka.Reader {
// // 先去创建一下这个分组
// CreateCustomerGroup(topic, groupId)
// make a writer that produces to topic-A, using the least-bytes distribution
r := kafka.NewReader(kafka.ReaderConfig{
var readers []*kafka.Reader
for i := 0; i < 10; i++ {
readers = append(readers, kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topic: topic,
GroupID: groupId,
Partition: 0,
// Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
return r
}))
}
// readers = append(readers, kafka.NewReader(kafka.ReaderConfig{
// Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
// Topic: topic,
// GroupID: groupId,
// Partition: 0,
// MinBytes: 10e3, // 10KB
// MaxBytes: 10e6, // 10MB
// }))
return readers
}
func CreateCustomerGroup(topic string, groupId string) {
......@@ -37,4 +45,3 @@ func CreateCustomerGroup(topic string, groupId string) {
log.Println("create CustomerGroup error:", err)
}
}
......@@ -88,7 +88,7 @@ func Run(confPath string) {
closeWg.Wait()
}
entity.CloseMessageChan()
cancel()
log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment