Commit 64293201 authored by 谢宇轩's avatar 谢宇轩 😅

Initial commit

parents
/.idea
*.exe
/log
logagent
/data
\ No newline at end of file
### LogTransfer Customer
# 原理
从Etcd中获取所有在注册的配置,收集需要消费的Topic和对应信息。
# Done
1. 从kafka消费数据
2. 添加消费者的上下文
package conf
import (
"context"
"fmt"
"log"
"time"
"github.com/coreos/etcd/clientv3"
"gopkg.in/ini.v1"
)
var (
configPath = "/logagent/config/"
)
type LogagentConfig []byte
var cli *clientv3.Client
func init() {
// 加载配置文件
if err := ini.MapTo(APPConfig, "./logtransfer.conf"); err != nil {
log.Println("load ini file error: ", err)
return
}
cli = initConnect()
}
func initConnect() *clientv3.Client {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{APPConfig.Etcd.Address},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(fmt.Sprintf("connect failed, err:%s \n", err))
}
log.Println("connect etcd succ")
return cli
}
func GetAllConfFromEtcd() []LogagentConfig {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := cli.Get(ctx, configPath, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortDescend))
cancel()
if err != nil {
panic(fmt.Sprintf("get failed, err:%s \n", err))
}
configs := make([]LogagentConfig, 0)
for _, etcdResult := range resp.Kvs {
log.Printf("load config from:%s ", etcdResult.Key)
configs = append(configs, etcdResult.Value)
}
return configs
}
func WatchLogConfToEtcd() clientv3.WatchChan {
wch := cli.Watch(context.Background(), configPath, clientv3.WithPrefix())
return wch
}
package conf
type LogTransferConf struct {
Kafka `ini:"kafka"`
Etcd `ini:"etcd"`
Es `ini:"es"`
}
// kafka 配置
type Kafka struct {
Address string `ini:"address"`
QueueSize string `ini:"queue_size"`
}
// ETCD 配置
type Etcd struct {
Address string `ini:"address"`
}
// Es 属性
type Es struct {
Address string `ini:"address"`
}
var (
APPConfig = new(LogTransferConf)
)
module github.com/y7ut/logtransfer
go 1.15
require (
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect
github.com/coreos/etcd v3.3.25+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/envoyproxy/go-control-plane v0.9.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/segmentio/kafka-go v0.4.13
go.uber.org/zap v1.16.0 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
golang.org/x/text v0.3.6 // indirect
golang.org/x/tools v0.1.0 // indirect
google.golang.org/grpc v1.26.0 // indirect
gopkg.in/ini.v1 v1.62.0
)
This diff is collapsed.
package kafka
import (
"context"
"log"
"github.com/segmentio/kafka-go"
"github.com/y7ut/logtransfer/conf"
)
type Customer struct {
Reader *kafka.Reader
Ctx context.Context
Cancel context.CancelFunc
}
func InitReader(topic string, groupId string) *Customer {
// 先去创建一下这个分组
CreateCustomerGroup(topic, groupId)
// make a writer that produces to topic-A, using the least-bytes distribution
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{conf.APPConfig.Kafka.Address},
Topic: topic,
GroupID: groupId,
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
context, cancel := context.WithCancel(context.Background())
return &Customer{Reader: r, Ctx:context, Cancel: cancel}
}
func CreateCustomerGroup(topic string, groupId string) {
config := kafka.ConsumerGroupConfig{
ID: groupId,
Brokers: []string{conf.APPConfig.Kafka.Address},
Topics: []string{topic},
StartOffset: kafka.LastOffset,
}
_, err := kafka.NewConsumerGroup(config)
if err != nil {
log.Println("create CustomerGroup error:", err)
}
}
File added
#### LogTransfer
# Kafka配置
[kafka]
address=localhost:9091
queue_size=1000
# Kafka配置
[etcd]
address=localhost:23790
# ES 配置
[es]
address="http://localhost:9200/"
\ No newline at end of file
package main
import (
"github.com/y7ut/logtransfer/transfer"
)
func main() {
transfer.Run()
}
package storage
\ No newline at end of file
package transfer
import (
"encoding/json"
"log"
"github.com/y7ut/logtransfer/conf"
)
type Collector struct {
Style string `json:"style"`
Path string `json:"path"`
Topic string `json:"topic"`
}
// 加载所有的collector
func loadCollectors() []Collector {
configs := conf.GetAllConfFromEtcd()
collectors := make([]Collector, 0)
for _, v := range configs {
log.Printf("json decode config:%s ", v)
var currentCollector []Collector
err := json.Unmarshal(v, &currentCollector)
if err != nil {
log.Printf("json decode config:%s err: %s", v, err)
}
collectors = append(collectors, currentCollector...)
}
return collectors
}
// 收集所有需要监听的topic
func ChooseTopic() map[string]bool {
collector := loadCollectors()
topics := make(map[string]bool, 0)
for _, v := range collector {
topics[v.Topic] = true
}
return topics
}
package transfer
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/y7ut/logtransfer/kafka"
)
var (
Start = make(chan *kafka.Customer)
Close = make(chan string)
CustomerManger = make(map[string]*kafka.Customer, 0)
MaxRetryTime = 3
mu sync.Mutex
closeWg sync.WaitGroup
)
// type Kernel struct{
// CustomerManger map[string]*kafka.Customer
// mu sync.Mutex
// }
func registerManger(c *kafka.Customer) {
mu.Lock()
CustomerManger[c.Reader.Config().Topic] = c
mu.Unlock()
}
func getCustomer(topic string) (customer *kafka.Customer, ok bool) {
mu.Lock()
customer, ok = CustomerManger[topic]
mu.Unlock()
return customer, ok
}
func uninstallCustomer(topic string) {
mu.Lock()
delete(CustomerManger, topic)
mu.Unlock()
}
func getRegisterTopics() (topics []string) {
mu.Lock()
for topic := range CustomerManger {
topics = append(topics, topic)
}
mu.Unlock()
return topics
}
// 核心启动
func Run() {
go func() {
for{
select {
case customer := <-Start:
log.Println("Starting Customer...")
registerManger(customer)
go ReadingMessage(customer)
case closer := <-Close:
_, ok := getCustomer(closer)
if !ok {
log.Printf(" Customer %s unstall Failed ", closer)
}
// log.Printf(" Customer %s exit with offset(%d)", closer, c.Reader.Offset())
closeWg.Done()
}
}
}()
for topic := range ChooseTopic() {
GroupID := topic + "_group"
currentCustomer := kafka.InitReader(topic, GroupID)
log.Printf("Init Customer of (%s) success!", topic)
Start <- currentCustomer
}
for sign := range sign() {
switch sign {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM:
log.Println("Safe Exit with:", sign)
currentTopics := getRegisterTopics()
closeWg.Add(len(currentTopics))
for _, topic := range currentTopics {
log.Printf(" Customer %s unstalling...", topic)
Close <- topic
}
closeWg.Wait()
log.Printf(" Success unstall %d Transfer", len(currentTopics))
os.Exit(0)
}
}
}
func sign() <-chan os.Signal {
c := make(chan os.Signal)
// 监听信号
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1, syscall.SIGUSR2)
return c
}
// 從Kafka中消費消息,注意这里会提交commit offset
func ReadingMessage(c *kafka.Customer) {
defer c.Reader.Close()
log.Println("Start Customer success!")
var trycount int
for {
select {
case <-c.Ctx.Done():
closeWg.Add(1)
Close<- c.Reader.Config().Topic
closeWg.Wait()
log.Println("Close customer of Topic :", c.Reader.Config().Topic)
return
default:
// 使用超时上下文
timeoutCtx, cancel := context.WithTimeout(c.Ctx, 5*time.Second)
m, err := c.Reader.FetchMessage(timeoutCtx)
cancel()
if err != nil {
log.Println("failed to Reader messages:", err)
trycount++
if trycount >= MaxRetryTime {
log.Println("Too many retries, Customer has Suspended : ", err)
c.Cancel()
}
continue
}
// 临时打印
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 使用超时上下文
nextTimeoutCtx, nextCancel := context.WithTimeout(c.Ctx, 5*time.Second)
err = c.Reader.CommitMessages(nextTimeoutCtx, m)
nextCancel()
if err != nil {
cancel()
log.Println("failed to commit messages:", err)
trycount++
if trycount >= MaxRetryTime {
log.Println("Too many retries, Customer has Suspended : ", err)
c.Cancel()
}
continue
}
// 手动限速
time.Sleep(1 * time.Second)
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment