Commit 646c44af authored by 谢宇轩's avatar 谢宇轩 😅

优化ES Sender逻辑

parent fc26f103
...@@ -43,7 +43,7 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) { ...@@ -43,7 +43,7 @@ func FormatServiceWfLog(sourceKey string, message string) (Matedata, error) {
} }
} }
mateItem.Data["timestamp"] = mateItem.create mateItem.Data["timestamp"] = mateItem.create.Format("2006-01-02 15:04:05")
result := *mateItem result := *mateItem
mateItem.reset() mateItem.reset()
MatePool.Put(mateItem) MatePool.Put(mateItem)
......
...@@ -44,6 +44,7 @@ func CloseMessageChan() { ...@@ -44,6 +44,7 @@ func CloseMessageChan() {
func MatedateSender(ctx context.Context, esClient *elastic.Client) { func MatedateSender(ctx context.Context, esClient *elastic.Client) {
tick := time.NewTicker(3 * time.Second) tick := time.NewTicker(3 * time.Second)
var ( var (
SenderMu sync.Mutex SenderMu sync.Mutex
) )
...@@ -52,21 +53,32 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) { ...@@ -52,21 +53,32 @@ func MatedateSender(ctx context.Context, esClient *elastic.Client) {
for { for {
select { select {
case m := <-messages: case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data) indexRequest := elastic.NewBulkIndexRequest().Index(m.Index).Doc(m.Data)
SenderMu.Lock()
bulkRequest.Add(indexRequest) bulkRequest.Add(indexRequest)
SenderMu.Unlock()
case <-tick.C: case <-tick.C:
// Do sends the bulk requests to Elasticsearch // Do sends the bulk requests to Elasticsearch
SenderMu.Lock() SenderMu.Lock()
count := bulkRequest.NumberOfActions() count := bulkRequest.NumberOfActions()
if count > 0 { if count > 0 {
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions()) log.Printf("Send messages to Index: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx) response, err := bulkRequest.Do(ctx)
if err != nil { if err != nil {
log.Println("Save Es Error:", err) log.Println("Save Es Error:", err)
} }
for _ , v := range response.Items {
for _, item := range v{
if item.Error != nil {
log.Printf("Find Error in ES Result in (%s): %s", item.Index, item.Error.Reason)
}
}
}
bulkRequest.Reset() bulkRequest.Reset()
} }
SenderMu.Unlock() SenderMu.Unlock()
......
...@@ -79,7 +79,7 @@ func ReadingMessage(ctx context.Context, c *Customer) { ...@@ -79,7 +79,7 @@ func ReadingMessage(ctx context.Context, c *Customer) {
// var trycount int // var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区 // var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
var errMessage strings.Builder var errMessage strings.Builder
// log.Println(c.HandlePipeline.pipe)
var matedata entity.Matedata var matedata entity.Matedata
for { for {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment