Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Sign in
Toggle navigation
L
logtransfer
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
谢宇轩
logtransfer
Commits
f3b5156b
Commit
f3b5156b
authored
Jul 10, 2023
by
谢宇轩
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
perf: 优化
parent
b1573403
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
44 additions
and
36 deletions
+44
-36
collector.go
source/collector.go
+21
-35
topic.go
source/topic.go
+23
-1
No files found.
source/collector.go
View file @
f3b5156b
...
@@ -14,19 +14,22 @@ import (
...
@@ -14,19 +14,22 @@ import (
"github.com/y7ut/logtransfer/plugin"
"github.com/y7ut/logtransfer/plugin"
)
)
// 收藏家, 收藏家的目的就是采集收集目标目录的数据, 并投递到对应 Topic 的 Kafka topic 中
type
Collector
struct
{
type
Collector
struct
{
Style
string
`json:"style"`
Style
string
`json:"style"`
Path
string
`json:"path"`
Path
string
`json:"path"`
Topic
string
`json:"topic"`
Topic
string
`json:"topic"`
}
}
// 日志主题
type
Topic
struct
{
type
Topic
struct
{
Name
string
Name
string
// 主题的名字 对应Kafka的topic名称
Label
string
Label
string
// 主题的简介
PipeLine
*
plugin
.
PipeLine
PipeLine
*
plugin
.
PipeLine
// 主题所拥有的插件列表,以管道的形式存在
Format
entity
.
Formater
Format
entity
.
Formater
// 流入这个主题的数据,所要进行元数据的处理格式
}
}
// 日志主题在Etcd中所存储的数据结构
type
TopicConfig
struct
{
type
TopicConfig
struct
{
Format
int
`json:"format"`
Format
int
`json:"format"`
Label
string
`json:"label"`
Label
string
`json:"label"`
...
@@ -34,19 +37,14 @@ type TopicConfig struct {
...
@@ -34,19 +37,14 @@ type TopicConfig struct {
PipelineConfig
[]
PipeLinePluginsConfig
`json:"piepline"`
PipelineConfig
[]
PipeLinePluginsConfig
`json:"piepline"`
}
}
// 日志主题中的插件配置, 在Etcd中所存储的数据结构
type
PipeLinePluginsConfig
struct
{
type
PipeLinePluginsConfig
struct
{
Label
string
`json:"label"`
Label
string
`json:"label"`
Name
string
`json:"name"`
Name
string
`json:"name"`
Params
string
`json:"params"`
Params
string
`json:"params"`
}
}
var
watchTopicChannel
=
make
(
chan
*
Topic
)
// 获取所有的收藏家
var
startTopicChannel
=
make
(
chan
*
Topic
)
var
deleteTopicChannel
=
make
(
chan
string
)
// 加载所有的可用的collector
func
LoadCollectors
()
([]
Collector
,
error
)
{
func
LoadCollectors
()
([]
Collector
,
error
)
{
collectors
:=
make
([]
Collector
,
0
)
collectors
:=
make
([]
Collector
,
0
)
configs
,
err
:=
conf
.
GetAllConfFromEtcd
()
configs
,
err
:=
conf
.
GetAllConfFromEtcd
()
...
@@ -69,7 +67,7 @@ func LoadCollectors() ([]Collector, error) {
...
@@ -69,7 +67,7 @@ func LoadCollectors() ([]Collector, error) {
return
collectors
,
nil
return
collectors
,
nil
}
}
//
加载Agent所有的collector
//
根据收藏家的名称获取对应的收藏家
func
LoadCollector
(
name
string
)
([]
Collector
,
error
)
{
func
LoadCollector
(
name
string
)
([]
Collector
,
error
)
{
var
collectors
[]
Collector
var
collectors
[]
Collector
config
,
err
:=
conf
.
GetConfFromEtcd
(
name
)
config
,
err
:=
conf
.
GetConfFromEtcd
(
name
)
...
@@ -85,7 +83,7 @@ func LoadCollector(name string) ([]Collector, error) {
...
@@ -85,7 +83,7 @@ func LoadCollector(name string) ([]Collector, error) {
return
collectors
,
nil
return
collectors
,
nil
}
}
//
收集所有需要监听的topic
//
确定所有需要监听的日志主题
func
ChooseTopic
()
(
map
[
*
Topic
]
bool
,
error
)
{
func
ChooseTopic
()
(
map
[
*
Topic
]
bool
,
error
)
{
// 收集全部的agent的collector信息
// 收集全部的agent的collector信息
ableTopics
:=
make
(
map
[
*
Topic
]
bool
)
ableTopics
:=
make
(
map
[
*
Topic
]
bool
)
...
@@ -132,42 +130,30 @@ func loadTopics() (map[string]*Topic, error) {
...
@@ -132,42 +130,30 @@ func loadTopics() (map[string]*Topic, error) {
return
topics
,
nil
return
topics
,
nil
}
}
func
TopicChangeListener
()
<-
chan
*
Topic
{
// 监听 Etcd 中 Topic 的配置变更
return
watchTopicChannel
}
func
TopicDeleteListener
()
<-
chan
string
{
return
deleteTopicChannel
}
func
TopicStartListener
()
<-
chan
*
Topic
{
return
startTopicChannel
}
func
WatchTopics
()
{
func
WatchTopics
()
{
for
confResp
:=
range
conf
.
WatchLogTopicToEtcd
()
{
for
confResp
:=
range
conf
.
WatchLogTopicToEtcd
()
{
for
_
,
event
:=
range
confResp
.
Events
{
for
_
,
event
:=
range
confResp
.
Events
{
switch
event
.
Type
{
switch
event
.
Type
{
case
mvccpb
.
PUT
:
case
mvccpb
.
PUT
:
// 有PUT操作才进行通知
// 有PUT操作才进行通知
var
newVersionTopicConfig
TopicConfig
var
newTopicCondfig
TopicConfig
if
len
(
confResp
.
Events
)
==
0
{
if
len
(
confResp
.
Events
)
==
0
{
continue
continue
}
}
changedConf
:=
confResp
.
Events
[
0
]
.
Kv
.
Value
changedConf
:=
confResp
.
Events
[
0
]
.
Kv
.
Value
err
:=
json
.
Unmarshal
(
changedConf
,
&
newTopicCondfig
)
err
:=
json
.
Unmarshal
(
changedConf
,
&
newVersionTopicConfig
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Println
(
"Unmarshal New Topic Config Error:"
,
err
)
log
.
Println
(
"Unmarshal New Topic Config Error:"
,
err
)
}
}
log
.
Println
(
"load New Topic success!"
)
watchTopicChannel
<-
generateTopic
(
newTopicCondfig
)
log
.
Printf
(
"reload New Topic[%s] success!"
,
confResp
.
Events
[
0
]
.
Kv
.
Key
)
watchTopicChannel
<-
generateTopic
(
newVersionTopicConfig
)
case
mvccpb
.
DELETE
:
// 获取旧版本数据 来进行对比
case
mvccpb
.
DELETE
:
// 获取旧版本数据 来进行对比
// 要清空这个register 中全部这个topic的customer, 不过也应该没有了,应该都会被config watcher 给捕获到
// 要清空这个register 中全部这个topic的customer, 不过也应该没有了,应该都会被config watcher 给捕获到
oldTopic
,
err
:=
getHistoryTopicWithEvent
(
confResp
)
oldTopic
,
err
:=
getHistoryTopicWithEvent
(
confResp
)
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -175,7 +161,7 @@ func WatchTopics() {
...
@@ -175,7 +161,7 @@ func WatchTopics() {
continue
continue
}
}
log
.
Println
(
"some Topic remove"
,
oldTopic
.
Name
)
log
.
Println
(
"some Topic remove"
,
oldTopic
.
Name
)
}
}
time
.
Sleep
(
2
*
time
.
Second
)
time
.
Sleep
(
2
*
time
.
Second
)
...
...
source/topic.go
View file @
f3b5156b
...
@@ -7,6 +7,27 @@ import (
...
@@ -7,6 +7,27 @@ import (
"github.com/y7ut/logtransfer/plugin"
"github.com/y7ut/logtransfer/plugin"
)
)
// 用于监听Topic改变的 channel
var
watchTopicChannel
=
make
(
chan
*
Topic
)
// 用于监听Topic启动的 channel
var
startTopicChannel
=
make
(
chan
*
Topic
)
// 用于监听Topic关闭的 channel
var
deleteTopicChannel
=
make
(
chan
string
)
func
TopicChangeListener
()
<-
chan
*
Topic
{
return
watchTopicChannel
}
func
TopicDeleteListener
()
<-
chan
string
{
return
deleteTopicChannel
}
func
TopicStartListener
()
<-
chan
*
Topic
{
return
startTopicChannel
}
func
generateTopic
(
config
TopicConfig
)
*
Topic
{
func
generateTopic
(
config
TopicConfig
)
*
Topic
{
if
config
.
PipelineConfig
==
nil
{
if
config
.
PipelineConfig
==
nil
{
...
@@ -19,7 +40,8 @@ func generateTopic(config TopicConfig) *Topic {
...
@@ -19,7 +40,8 @@ func generateTopic(config TopicConfig) *Topic {
currentPlugin
:=
plugin
.
LoadRegistedPlugins
(
v
.
Name
)
currentPlugin
:=
plugin
.
LoadRegistedPlugins
(
v
.
Name
)
err
:=
currentPlugin
.
SetParams
(
v
.
Params
)
err
:=
currentPlugin
.
SetParams
(
v
.
Params
)
if
err
!=
nil
{
if
err
!=
nil
{
log
.
Panicln
(
"plugin encode params error:"
,
err
)
log
.
Printf
(
"plugin encode params error: %s"
,
err
)
continue
}
}
p
.
AppendPlugin
(
currentPlugin
)
p
.
AppendPlugin
(
currentPlugin
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment