Commit 77ec5f8f authored by 谢宇轩's avatar 谢宇轩 😅

歷史版本遷移

parents
/.idea
*.exe
/log
logtransfer
*.conf
\ No newline at end of file
### LogTransfer Customer
# 原理
从Etcd中获取所有在注册的配置,收集需要消费的Topic和对应信息。
# 版本
### 1.0.0
1. 从kafka消费数据
2. 管道的形式处理数据
3. 提供数据同步到ES的插件
package conf
import (
"context"
"fmt"
"log"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"gopkg.in/ini.v1"
)
var (
configPath = "/logagent/config/"
statusPath = "/logagent/active/"
)
type LogagentConfig []byte
var cli *clientv3.Client
func Init(confPath string) {
// 加载配置文件
if err := ini.MapTo(APPConfig, confPath); err != nil {
log.Println("load ini file error: ", err)
return
}
cli = initConnect()
}
func initConnect() *clientv3.Client {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{APPConfig.Etcd.Address},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(fmt.Sprintf("connect failed, err:%s \n", err))
}
log.Println("connect etcd succ")
return cli
}
func GetAllConfFromEtcd() []LogagentConfig {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
panic(fmt.Sprintf("get failed, err:%s \n", err))
}
configs := make([]LogagentConfig, 0)
for _, etcdResult := range resp.Kvs {
etcdKey := statusPath + string(etcdResult.Key[strings.LastIndex(string(etcdResult.Key), "/")+1:])
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, etcdKey)
cancel()
if err != nil {
panic(fmt.Sprintf("Get Etcd config failed, err:%s \n", err))
}
if len(resp.Kvs) != 0 {
status := string(resp.Kvs[0].Value)
if status == "1" {
log.Printf("load config from:%s ", etcdResult.Key)
configs = append(configs, etcdResult.Value)
}
}
}
return configs
}
func WatchLogConfToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), configPath, clientv3.WithPrefix())
return wch
}
package conf
type LogTransferConf struct {
Kafka `ini:"kafka"`
Etcd `ini:"etcd"`
Es `ini:"es"`
}
// kafka 配置
type Kafka struct {
Address string `ini:"address"`
}
// ETCD 配置
type Etcd struct {
Address string `ini:"address"`
}
// Es 属性
type Es struct {
Address string `ini:"address"`
BulkSize int `ini:"bulk_size"`
}
var (
APPConfig = new(LogTransferConf)
)
module github.com/y7ut/logtransfer
go 1.15
require (
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect
github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/envoyproxy/go-control-plane v0.9.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/olivere/elastic/v7 v7.0.24
github.com/segmentio/kafka-go v0.4.13
go.uber.org/zap v1.16.0 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.3.0
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/tools v0.1.0 // indirect
google.golang.org/grpc v1.33.2
gopkg.in/ini.v1 v1.62.0
)
replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
This diff is collapsed.
#### LogTransfer
# Kafka配置
[kafka]
address=
# Kafka配置
[etcd]
address=
# ES 配置
[es]
address=
bulk_size=50
package main
import (
"flag"
"fmt"
"github.com/y7ut/logtransfer/transfer"
)
const version = "1.0.0"
var c = flag.String("c", "./logtransfer.conf", "使用配置文件启动")
var v = flag.Bool("v", false, "查看当前程序版本")
func main() {
// 加载配置文件
flag.Parse()
if *v {
fmt.Println("Jiwei Logtransfer \nversion: ", version)
return
}
transfer.Run(*c)
}
package queue
import (
"log"
"strings"
"github.com/segmentio/kafka-go"
"github.com/y7ut/logtransfer/conf"
)
func InitReader(topic string, groupId string) *kafka.Reader {
// 先去创建一下这个分组
CreateCustomerGroup(topic, groupId)
// make a writer that produces to topic-A, using the least-bytes distribution
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topic: topic,
GroupID: groupId,
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
return r
}
func CreateCustomerGroup(topic string, groupId string) {
config := kafka.ConsumerGroupConfig{
ID: groupId,
Brokers: strings.Split(conf.APPConfig.Kafka.Address, ","),
Topics: []string{topic},
StartOffset: kafka.LastOffset,
}
_, err := kafka.NewConsumerGroup(config)
if err != nil {
log.Println("create CustomerGroup error:", err)
}
}
package storage
import (
"fmt"
"github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
)
func GetClient() *elastic.Client {
var err error
client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
return nil
}
return client
}
package transfer
import (
"encoding/json"
"log"
"github.com/y7ut/logtransfer/conf"
)
type Collector struct {
Style string `json:"style"`
Path string `json:"path"`
Topic string `json:"topic"`
}
// 加载所有的collector
func loadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd()
collectors := make([]Collector, 0)
for _, v := range configs {
var currentCollector []Collector
err := json.Unmarshal(v, &currentCollector)
if err != nil {
log.Printf("json decode config(%s) err : err: %s", v, err)
}
if currentCollector !=nil {
log.Printf("Init config:%s ", v)
collectors = append(collectors, currentCollector...)
}
}
return collectors
}
// 收集所有需要监听的topic
func ChooseTopic() map[string]bool {
collector := loadCollectors()
topics := make(map[string]bool, 0)
for _, v := range collector {
topics[v.Topic] = true
}
return topics
}
package transfer
import "github.com/segmentio/kafka-go"
type Customer struct {
Reader *kafka.Reader
HandlePipeline *PipeLine
Format Formater
done chan struct{}
}
func (c Customer) Exit() {
go func() {
c.done <- struct{}{}
}()
}
func (c Customer) Listen() chan struct{} {
return c.done
}
func registerManger(c *Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
func getCustomer(topic string) (customer *Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
func uninstallCustomer(topic string) {
mu.Lock()
delete(CustomerManger, topic)
mu.Unlock()
}
package transfer
import (
"fmt"
"regexp"
"strings"
"time"
)
var (
contentRegexp = regexp.MustCompile(`\[(?s:(.*?))\]`)
serviceWfLogKeyWord = []string{"errno", "logId", "uri", "refer", "cookie", "ua", "host", "clientIp", "optime", "request_params", "errmsg"}
)
type Formater func(string, string) (Matedate, error)
// service错误日志的处理
func FormatServiceWfLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate, 0)
levelIndex := strings.Index(message, ":")
if levelIndex == -1 {
return result, fmt.Errorf("message format error.")
}
// log.Println(message)
result["topic"] = sourceKey
result["level"] = message[:levelIndex]
result["project"] = "jiwei-service"
message = message[levelIndex:]
loc, _ := time.LoadLocation("Local")
logTime, _ := time.ParseInLocation(": 06-01-02 15:04:05 ", message[:strings.Index(message, "[")], loc)
result["created_at"] = logTime
keyword := serviceWfLogKeyWord
for _, word := range keyword {
flysnowRegexp := regexp.MustCompile(fmt.Sprintf(`%s\[(?s:(.*?))\]`, word))
logContent := flysnowRegexp.FindString(message)
curentSub := contentRegexp.FindStringSubmatch(logContent)
if len(curentSub) < 1 {
continue
}
result[word] = contentRegexp.FindStringSubmatch(logContent)[1]
}
return result, nil
}
// service错误日志的处理
func DefaultLog(sourceKey string, message string) (Matedate, error) {
result := make(Matedate, 0)
result["topic"] = sourceKey
result["message"] = message
return result, nil
}
package transfer
import (
"fmt"
)
type Matedate map[string]interface{}
type Handler func(Matedate) Matedate
type PipeLine struct {
Next *PipeLine
Current Handler
}
func (p PipeLine) Enter(m Matedate) Matedate {
if p.Current == nil {
return m
}
if p.Next == nil {
return p.Current(m)
}
return p.Next.Enter(p.Current(m))
}
// 持久化到ES
func Edit(m Matedate) Matedate {
m["source"] = "logtranfers"
return m
}
// 持久化到ES
func SaveES(m Matedate) Matedate {
messages <- m
return m
}
// 打印
func Dump(m Matedate) Matedate {
for k, v := range m {
fmt.Printf("%s : %s\n", k, v)
}
fmt.Println("------------")
return m
}
package transfer
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
elastic "github.com/olivere/elastic/v7"
"github.com/y7ut/logtransfer/conf"
"github.com/y7ut/logtransfer/queue"
)
var (
Start = make(chan *Customer)
Close = make(chan string)
CustomerManger = make(map[string]*Customer, 0)
MaxRetryTime = 10
mu sync.Mutex
closeWg sync.WaitGroup
messages = make(chan Matedate, conf.APPConfig.Es.BulkSize)
)
func getRegisterTopics() (topics []string) {
mu.Lock()
for topic := range CustomerManger {
topics = append(topics, topic)
}
mu.Unlock()
return topics
}
// 核心启动
func Run(confPath string) {
conf.Init(confPath)
esClient, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(conf.APPConfig.Es.Address))
if err != nil {
fmt.Println("connect es error", err)
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 3; i++ {
go MatedateSender(ctx, esClient)
}
go func() {
for {
select {
case customer := <-Start:
registerManger(customer)
go ReadingMessage(ctx, customer)
case closer := <-Close:
c, ok := getCustomer(closer)
if !ok {
log.Printf(" Customer %s unstall Failed ", closer)
}
c.Exit()
closeWg.Done()
}
}
}()
for topic := range ChooseTopic() {
GroupID := topic + "_group"
r := queue.InitReader(topic, GroupID)
currentFormater := DefaultLog
// TODO:使用ETCD下发需要的pipe handler
if strings.Contains(topic, "service") {
currentFormater = FormatServiceWfLog
}
// Edit->Save->Dump 这个顺序
// pipe := &PipeLine{Current: Edit, Next: &PipeLine{Current: Dump, Next: &PipeLine{Current: SaveES, Next: nil}}}
pipe := &PipeLine{Current: Edit, Next: &PipeLine{Current: SaveES, Next: nil}}
currentCustomer := &Customer{Reader: r, done: make(chan struct{}), HandlePipeline: pipe, Format: currentFormater}
log.Printf("Check Customer group of [%s] success!", GroupID)
Start <- currentCustomer
}
for sign := range sign() {
switch sign {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM:
log.Println("Safe Exit with:", sign)
currentTopics := getRegisterTopics()
for _, topic := range currentTopics {
closeWg.Add(1)
Close <- topic
log.Printf(" Customer %s unstalling...", topic)
closeWg.Wait()
}
close(messages)
log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0)
}
}
defer cancel()
}
func sign() <-chan os.Signal {
c := make(chan os.Signal,2)
// 监听信号
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
return c
}
// 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(ctx context.Context, c *Customer) {
defer c.Reader.Close()
log.Printf("Start Customer Group[%s] success!", c.Reader.Config().GroupID)
// var trycount int
// var cstSh, _ = time.LoadLocation("Asia/Shanghai") //上海时区
for {
select {
case <-c.Listen():
// 监听需要关闭的信号
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// // 使用超时上下文, 但是这样是非阻塞,超过deadline就报错了
// // timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
// // 这里使用阻塞的上下文
m, err := c.Reader.ReadMessage(ctx)
if err != nil {
// 退出
// c.Exit()
log.Println(err)
continue
}
matedata, err := c.Format(string(c.Reader.Config().Topic), string(m.Value))
if err != nil {
log.Println(err)
continue
}
// 流入pipe
c.HandlePipeline.Enter(matedata)
}
}
}
func MatedateSender(ctx context.Context, esClient *elastic.Client) {
tick := time.NewTicker(3 * time.Second)
var (
SenderMu sync.Mutex
)
bulkRequest := esClient.Bulk()
for {
select {
case m := <-messages:
indexRequest := elastic.NewBulkIndexRequest().Index("logs").Doc(m)
bulkRequest.Add(indexRequest)
case <-tick.C:
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
count := bulkRequest.NumberOfActions()
if count > 0 {
log.Printf("Send message to Es: %d : \n", bulkRequest.NumberOfActions())
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
}
SenderMu.Unlock()
case <-ctx.Done():
// Do sends the bulk requests to Elasticsearch
SenderMu.Lock()
_, err := bulkRequest.Do(ctx)
if err != nil {
log.Println("Save Es Error:", err)
}
bulkRequest.Reset()
SenderMu.Unlock()
log.Println("Exiting...")
return
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment