Commit e19aa95a authored by 谢宇轩's avatar 谢宇轩 😅

支持消息组合发送,优化代码

parent 64293201
...@@ -6,3 +6,141 @@ ...@@ -6,3 +6,141 @@
# Done # Done
1. 从kafka消费数据 1. 从kafka消费数据
2. 添加消费者的上下文 2. 添加消费者的上下文
---------
project : jiwei-service
errno : 71
logId : 2263624661
request_params :
refer :
cookie :
host : 127.0.0.1:8185
clientIp : 127.0.0.1
topic : jiwei-service-topic
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
errmsg : PARAM 'source' REQUIRED
source : logtranfers
level : WARNING
created_at : 2021-05-13 00:11:59 +0800 CST
uri : /api/news/feedstream
optime : 1620835919
------------
level : WARNING
clientIp : 127.0.0.1
project : jiwei-service
cookie :
optime : 1620835919
errmsg : PARAM 'source' REQUIRED
source : logtranfers
created_at : 2021-05-13 00:11:59 +0800 CST
errno : 71
refer :
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
request_params :
topic : jiwei-service-topic
logId : 2263653937
uri : /api/news/newsdetail
host : 127.0.0.1:8185
------------
topic : jiwei-service-topic
level : WARNING
created_at : 2021-05-13 00:12:00 +0800 CST
refer :
clientIp : 127.0.0.1
optime : 1620835919
source : logtranfers
request_params :
project : jiwei-service
logId : 2263669088
errmsg : PARAM 'source' REQUIRED
errno : 71
uri : /api/advert/advertitem
cookie :
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
host : 127.0.0.1:8185
------------
topic : jiwei-service-topic
level : WARNING
project : jiwei-service
optime : 1620835919
created_at : 2021-05-13 00:12:00 +0800 CST
cookie :
host : 127.0.0.1:8185
errmsg : PARAM 'source' REQUIRED
source : logtranfers
logId : 2263669418
uri : /api/news/newsrelation
refer :
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
errno : 71
clientIp : 127.0.0.1
request_params :
------------
clientIp : 127.0.0.1
request_params :
level : WARNING
uri : /api/news/feedstream
refer :
host : 127.0.0.1:8185
created_at : 2021-05-13 00:12:00 +0800 CST
errno : 71
optime : 1620835919
source : logtranfers
topic : jiwei-service-topic
project : jiwei-service
logId : 2263669593
cookie :
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
errmsg : PARAM 'source' REQUIRED
------------
errno : 71
logId : 2263749932
cookie :
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
clientIp : 127.0.0.1
request_params :
source : logtranfers
topic : jiwei-service-topic
created_at : 2021-05-13 00:12:00 +0800 CST
host : 127.0.0.1:8185
level : WARNING
uri : /api/news/newsdetail
optime : 1620835920
errmsg : PARAM 'source' REQUIRED
project : jiwei-service
refer :
------------
topic : jiwei-service-topic
level : WARNING
optime : 1620835920
project : jiwei-service
uri : /api/news/newsrelation
request_params :
source : logtranfers
refer :
cookie :
host : 127.0.0.1:8185
clientIp : 127.0.0.1
errmsg : PARAM 'source' REQUIRED
created_at : 2021-05-13 00:12:00 +0800 CST
errno : 71
logId : 2263762670
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
------------
request_params :
topic : jiwei-service-topic
cookie :
ua : node-fetch/1.0 (+https://github.com/bitinn/node-fetch)
host : 127.0.0.1:8185
project : jiwei-service
created_at : 2021-05-13 00:12:00 +0800 CST
errno : 71
clientIp : 127.0.0.1
source : logtranfers
level : WARNING
uri : /api/news/feedstream
refer :
optime : 1620835920
logId : 2263762960
errmsg : PARAM 'source' REQUIRED
------------
host : 127.0.0.1:8185
\ No newline at end of file
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"log" "log"
"strings"
"time" "time"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
...@@ -12,15 +13,16 @@ import ( ...@@ -12,15 +13,16 @@ import (
var ( var (
configPath = "/logagent/config/" configPath = "/logagent/config/"
statusPath = "/logagent/active/"
) )
type LogagentConfig []byte type LogagentConfig []byte
var cli *clientv3.Client var cli *clientv3.Client
func init() { func Init(confPath string) {
// 加载配置文件 // 加载配置文件
if err := ini.MapTo(APPConfig, "./logtransfer.conf"); err != nil { if err := ini.MapTo(APPConfig, confPath); err != nil {
log.Println("load ini file error: ", err) log.Println("load ini file error: ", err)
return return
} }
...@@ -53,8 +55,28 @@ func GetAllConfFromEtcd() []LogagentConfig { ...@@ -53,8 +55,28 @@ func GetAllConfFromEtcd() []LogagentConfig {
configs := make([]LogagentConfig, 0) configs := make([]LogagentConfig, 0)
for _, etcdResult := range resp.Kvs { for _, etcdResult := range resp.Kvs {
log.Printf("load config from:%s ", etcdResult.Key)
configs = append(configs, etcdResult.Value)
etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:])
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, etcdKey)
cancel()
if err != nil {
panic(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) != 0 {
status := string(resp.Kvs[0].Value)
if status == "1" {
log.Printf("load config from:%s ", etcdResult.Key)
configs = append(configs, etcdResult.Value)
}
}
} }
return configs return configs
......
...@@ -9,7 +9,6 @@ type LogTransferConf struct { ...@@ -9,7 +9,6 @@ type LogTransferConf struct {
// kafka 配置 // kafka 配置
type Kafka struct { type Kafka struct {
Address string `ini:"address"` Address string `ini:"address"`
QueueSize string `ini:"queue_size"`
} }
// ETCD 配置 // ETCD 配置
...@@ -20,6 +19,7 @@ type Etcd struct { ...@@ -20,6 +19,7 @@ type Etcd struct {
// Es 属性 // Es 属性
type Es struct { type Es struct {
Address string `ini:"address"` Address string `ini:"address"`
BulkSize int `ini:"bulk_size"`
} }
var ( var (
......
package ding
import (
"bytes"
"fmt"
"io/ioutil"
"log"
"net/http"
)
func GetDingClient(messageJson string) {
body := bytes.NewBuffer([]byte(messageJson))
// 构造request对象
request, err := http.NewRequest("POST", "https://oapi.dingtalk.com/robot/send?access_token=3bd7a0b3e0558b4a2c3650d42cdca704fd95b4108852c46a86b6339b083b92e1", body)
request.Header.Set("Content-Type", "application/json")
if err != nil {
log.Fatal(err)
}
client := &http.Client{}
resp, err := client.Do(request)
if err != nil {
panic(err)
}
defer resp.Body.Close()
fmt.Println("response Status:", resp.Status)
fmt.Println("response Headers:", resp.Header)
resultBody, _ := ioutil.ReadAll(resp.Body)
fmt.Println("response Body:", string(resultBody))
}
package ding
import (
"encoding/json"
"fmt"
"log"
)
const (
MARKTYPE = "markdown"
TEXTTYPE = "text"
CONTENT_TEMPLE = "# %s服务状态异常\r\n > 异常类型:%s \r\n > 触发时间:%s \r\n ###### Warning"
)
type DDAt struct {
IsAtAll bool `json:"isAtAll"`
AtMobiles []string `json:"atMobiles"`
}
type TextContent struct {
Content string `json:"content,omitempty"`
}
type MarkdownContent struct {
Title string `json:"title,omitempty"`
Text string `json:"text,omitempty"`
}
type DDMessage struct {
MsgType string `json:"msgtype"`
Markdown MarkdownContent `json:"markdown,omitempty"`
Text TextContent `json:"text,omitempty"`
At DDAt `json:"at"`
}
/**
* 获取可以使用的消息对象Json
*
* @return string
*/
func (m *DDMessage) process() string {
data, err := json.Marshal(m)
if err != nil {
log.Panic(err)
}
return fmt.Sprintf("%s\n", data)
}
/**
* 初始化消息
*
* @return string
*/
func initMessage(status string, appName string, time string) string {
var message DDMessage
message.MsgType = MARKTYPE
message.Markdown.Title = "日志报警"
message.Markdown.Text = fmt.Sprintf(CONTENT_TEMPLE, appName, status, time)
message.At.IsAtAll = false
message.At.AtMobiles = []string{}
messageJson := message.process()
log.Println(messageJson)
return messageJson
}
...@@ -12,13 +12,19 @@ require ( ...@@ -12,13 +12,19 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0 // indirect github.com/google/uuid v1.2.0 // indirect
github.com/olivere/elastic/v7 v7.0.24
github.com/segmentio/kafka-go v0.4.13 github.com/segmentio/kafka-go v0.4.13
go.uber.org/zap v1.16.0 // indirect go.uber.org/zap v1.16.0 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.3.0
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
golang.org/x/text v0.3.6 // indirect golang.org/x/text v0.3.6 // indirect
golang.org/x/tools v0.1.0 // indirect golang.org/x/tools v0.1.0 // indirect
google.golang.org/grpc v1.26.0 // indirect google.golang.org/grpc v1.33.2
gopkg.in/ini.v1 v1.62.0 gopkg.in/ini.v1 v1.62.0
) )
replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
This diff is collapsed.
No preview for this file type
...@@ -2,13 +2,13 @@ ...@@ -2,13 +2,13 @@
# Kafka配置 # Kafka配置
[kafka] [kafka]
address=localhost:9091 address=47.107.239.240:9091,47.107.239.240:9092
queue_size=1000
# Kafka配置 # Kafka配置
[etcd] [etcd]
address=localhost:23790 address=47.107.239.240:23379
# ES 配置 # ES 配置
[es] [es]
address="http://localhost:9200/" address="http://47.107.239.240:9320/"
\ No newline at end of file bulk_size=50
\ No newline at end of file
package main package main
import ( import (
"flag"
"fmt"
"github.com/y7ut/logtransfer/transfer" "github.com/y7ut/logtransfer/transfer"
) )
const version = "1.0.0"
var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动")
var v = flag.Bool("v", false, "查看当前程序版本")
func main() { func main() {
transfer.Run() // 加载配置文件
flag.Parse()
if *v {
fmt.Println("Jiwei Logtransfer \nversion: ", version)
return
}
transfer.Run(*c)
} }
package kafka package queue
import ( import (
"context"
"log" "log"
"strings"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
"github.com/y7ut/logtransfer/conf" "github.com/y7ut/logtransfer/conf"
) )
type Customer struct { func InitReader(topic string, groupId string) *kafka.Reader {
Reader *kafka.Reader
Ctx context.Context
Cancel context.CancelFunc
}
func InitReader(topic string, groupId string) *Customer {
// 先去创建一下这个分组 // 先去创建一下这个分组
CreateCustomerGroup(topic, groupId) CreateCustomerGroup(topic, groupId)
// make a writer that produces to topic-A, using the least-bytes distribution // make a writer that produces to topic-A, using the least-bytes distribution
r := kafka.NewReader(kafka.ReaderConfig{ r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{conf.APPConfig.Kafka.Address}, Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topic: topic, Topic: topic,
GroupID: groupId, GroupID: groupId,
Partition: 0, Partition: 0,
MinBytes: 10e3, // 10KB MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB MaxBytes: 10e6, // 10MB
}) })
context, cancel := context.WithCancel(context.Background())
return &Customer{Reader: r, Ctx:context, Cancel: cancel} return r
} }
func CreateCustomerGroup(topic string, groupId string) { func CreateCustomerGroup(topic string, groupId string) {
config := kafka.ConsumerGroupConfig{ config := kafka.ConsumerGroupConfig{
ID: groupId, ID: groupId,
Brokers: []string{conf.APPConfig.Kafka.Address}, Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topics: []string{topic}, Topics: []string{topic},
StartOffset: kafka.LastOffset, StartOffset: kafka.LastOffset,
} }
......
package storage package storage
\ No newline at end of file
import (
"fmt"
"github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
)
func GetClient() *elastic.Client {
var err error
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
return nil
}
return client
}
...@@ -18,13 +18,16 @@ func loadCollectors() []Collector { ...@@ -18,13 +18,16 @@ func loadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd() configs := conf.GetAllConfFromEtcd()
collectors := make([]Collector, 0) collectors := make([]Collector, 0)
for _, v := range configs { for _, v := range configs {
log.Printf("json decode config:%s ", v)
var currentCollector []Collector var currentCollector []Collector
err := json.Unmarshal(v, &currentCollector) err := json.Unmarshal(v, &currentCollector)
if err != nil { if err != nil {
log.Printf("json decode config:%s err: %s", v, err) log.Printf("json decode config(%s) err : err: %s", v, err)
}
if currentCollector !=nil {
log.Printf("Init config:%s ", v)
collectors = append(collectors, currentCollector...)
} }
collectors = append(collectors, currentCollector...)
} }
return collectors return collectors
} }
......
package transfer
import "github.com/segmentio/kafka-go"
type Customer struct {
Reader *kafka.Reader
HandlePipeline *PipeLine
Format Formater
done chan struct{}
}
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
func (c Customer) Listen() chan struct{} {
return c.done
}
func registerManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
func getCustomer(topic string) (customer *Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
func uninstallCustomer(topic string) {
mu.Lock()
delete(CustomerManger, topic)
mu.Unlock()
}
package transfer
import (
"fmt"
"regexp"
"strings"
"time"
)
var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
)
type Formater func(string, string) Matedate
// service错误日志的处理
func FormatServiceWfLog(sourceKey string,message string) Matedate {
result := make(Matedate,0)
levelIndex := strings.Index(message, ":")
result["topic"] = sourceKey
result["level"] = message[:levelIndex]
result["project"] = "jiwei-service"
message = message[levelIndex:]
loc, _ := time.LoadLocation("Local")
logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")],loc)
result["created_at"] = logTime
keyword := serviceWfLogKeyWord
for _, word := range keyword {
flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word))
logContent := flysnowRegexp.FindString(message)
result[word] = contentRegexp.FindStringSubmatch(logContent)[1]
}
return result
}
// service错误日志的处理
func DefaultLog(sourceKey string, message string) Matedate {
result := make(Matedate,0)
result["topic"] = sourceKey
result["message"] = message
return result
}
package transfer
import (
"fmt"
)
type Matedate map[string]interface{}
type Handler func(Matedate) Matedate
type PipeLine struct {
Next *PipeLine
Current Handler
}
func (p PipeLine) Enter(m Matedate) Matedate {
if p.Current == nil {
return m
}
if p.Next == nil {
return p.Current(m)
}
return p.Next.Enter(p.Current(m))
}
// 持久化到ES
func Edit(m Matedate) Matedate {
m["source"] = "logtranfers"
return m
}
// 持久化到ES
func SaveES(m Matedate) Matedate {
messages <- m
return m
}
// 打印
func Dump(m Matedate) Matedate {
for k, v := range m {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return m
}
...@@ -6,47 +6,26 @@ import ( ...@@ -6,47 +6,26 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/y7ut/logtransfer/kafka" elastic "github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/queue"
) )
var ( var (
Start = make(chan *kafka.Customer) Start = make(chan *Customer)
Close = make(chan string) Close = make(chan string)
CustomerManger = make(map[string]*kafka.Customer, 0) CustomerManger = make(map[string]*Customer, 0)
MaxRetryTime = 3 MaxRetryTime = 10
mu sync.Mutex mu sync.Mutex
closeWg sync.WaitGroup closeWg sync.WaitGroup
messages = make(chan Matedate, conf.APPConfig.Es.BulkSize)
) )
// type Kernel struct{
// CustomerManger map[string]*kafka.Customer
// mu sync.Mutex
// }
func registerManger(c *kafka.Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
func getCustomer(topic string) (customer *kafka.Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
func uninstallCustomer(topic string) {
mu.Lock()
delete(CustomerManger, topic)
mu.Unlock()
}
func getRegisterTopics() (topics []string) { func getRegisterTopics() (topics []string) {
mu.Lock() mu.Lock()
for topic := range CustomerManger { for topic := range CustomerManger {
...@@ -57,22 +36,33 @@ func getRegisterTopics() (topics []string) { ...@@ -57,22 +36,33 @@ func getRegisterTopics() (topics []string) {
} }
// 核心启动 // 核心启动
func Run() { func Run(confPath string) {
conf.Init(confPath)
esClient, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
go MatedateSender(ctx, esClient)
go func() { go func() {
for{ for {
select { select {
case customer := <-Start: case customer := <-Start:
log.Println("Starting Customer...")
registerManger(customer) registerManger(customer)
go ReadingMessage(customer) go ReadingMessage(ctx, customer)
case closer := <-Close: case closer := <-Close:
_, ok := getCustomer(closer) c, ok := getCustomer(closer)
if !ok { if !ok {
log.Printf(" Customer %s unstall Failed ", closer) log.Printf(" Customer %s unstall Failed ", closer)
} }
// log.Printf(" Customer %s exit with offset(%d)", closer, c.Reader.Offset()) c.Exit()
closeWg.Done() closeWg.Done()
} }
...@@ -81,8 +71,19 @@ func Run() { ...@@ -81,8 +71,19 @@ func Run() {
for topic := range ChooseTopic() { for topic := range ChooseTopic() {
GroupID := topic + "_group" GroupID := topic + "_group"
currentCustomer := kafka.InitReader(topic, GroupID) r := queue.InitReader(topic, GroupID)
log.Printf("Init Customer of (%s) success!", topic)
currentFormater := DefaultLog
// TODO:使用ETCD下发需要的pipe handler
if strings.Contains(topic, "service") {
currentFormater = FormatServiceWfLog
}
// Edit->Save->Dump 这个顺序
pipe := &PipeLine{Current: Edit, Next: &PipeLine{Current: Dump, Next: &PipeLine{Current: SaveES, Next: nil}}}
currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: pipe, Format: currentFormater}
log.Printf("Check Customer group of [%s] success!", GroupID)
Start <- currentCustomer Start <- currentCustomer
} }
...@@ -92,16 +93,22 @@ func Run() { ...@@ -92,16 +93,22 @@ func Run() {
log.Println("Safe Exit with:", sign) log.Println("Safe Exit with:", sign)
currentTopics := getRegisterTopics() currentTopics := getRegisterTopics()
closeWg.Add(len(currentTopics))
for _, topic := range currentTopics { for _, topic := range currentTopics {
log.Printf(" Customer %s unstalling...", topic)
closeWg.Add(1)
Close <- topic Close <- topic
log.Printf(" Customer %s unstalling...", topic)
closeWg.Wait()
} }
closeWg.Wait() close(messages)
log.Printf(" Success unstall %d Transfer", len(currentTopics)) log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0) os.Exit(0)
} }
} }
defer cancel()
} }
func sign() <-chan os.Signal { func sign() <-chan os.Signal {
...@@ -112,54 +119,77 @@ func sign() <-chan os.Signal { ...@@ -112,54 +119,77 @@ func sign() <-chan os.Signal {
} }
// 從Kafka中消費消息,注意这里会提交commit offset // 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(c *kafka.Customer) { func ReadingMessage(ctx context.Context, c *Customer) {
defer c.Reader.Close() defer c.Reader.Close()
log.Println("Start Customer success!") log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID)
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var trycount int
for { for {
select { select {
case <-c.Ctx.Done(): case <-c.Listen():
closeWg.Add(1) // 监听需要关闭的信号
Close<- c.Reader.Config().Topic
closeWg.Wait()
log.Println("Close customer of Topic :", c.Reader.Config().Topic) log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return return
default: default:
// 使用超时上下文 // // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
timeoutCtx, cancel := context.WithTimeout(c.Ctx, 5*time.Second) // // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
m, err := c.Reader.FetchMessage(timeoutCtx) // // 这里使用阻塞的上下文
cancel()
m, err := c.Reader.ReadMessage(ctx)
if err != nil { if err != nil {
log.Println("failed to Reader messages:", err) // 退出
trycount++ c.Exit()
if trycount >= MaxRetryTime {
log.Println("Too many retries, Customer has Suspended : ", err)
c.Cancel()
}
continue continue
} }
// 临时打印 // 流入pipe
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) c.HandlePipeline.Enter(c.Format(string(c.Reader.Config().Topic), string(m.Value)))
}
}
}
// 使用超时上下文 func MatedateSender(ctx context.Context, esClient *elastic.Client) {
nextTimeoutCtx, nextCancel := context.WithTimeout(c.Ctx, 5*time.Second)
err = c.Reader.CommitMessages(nextTimeoutCtx, m) tick := time.NewTicker(3 * time.Second)
nextCancel() var (
if err != nil { SenderMu sync.Mutex
cancel() )
log.Println("failed to commit messages:", err)
trycount++ bulkRequest := esClient.Bulk()
if trycount >= MaxRetryTime { for {
log.Println("Too many retries, Customer has Suspended : ", err) select {
c.Cancel() case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index("logs").Doc(m)
bulkRequest.Add(indexRequest)
case <-tick.C:
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
count := bulkRequest.NumberOfActions()
if count > 0{
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
} }
continue bulkRequest.Reset()
} }
SenderMu.Unlock()
// 手动限速 case <-ctx.Done():
time.Sleep(1 * time.Second) // Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
SenderMu.Unlock()
log.Println("Exiting...")
return
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment