安装
从源码安装
- 下载依赖包
使用 govender 管理依赖.
govender sync
- 编译
make
下载编译后二进制文件
RUN
gohangout --config config.yml
日志
日志模块使用 github.com/golang/glog , 几个常用参数如下:
-
-logtostderr 日志打印出标准错误
-
-v 5 设置日志级别. 我这边一般设置到 5 , 数字越大, 日志级别越详细.
pprof debug
-
-pprof=true 默认不开启 pprof
-
-pprof-address 127.0.0.1:8899 pprof 的http地址
多线程处理
--worker 4 使用四个线程(goroutine)处理数据. 每个线程拥有自己的filter, output. 比如说translate filter, 每个线程有自己的字典, 他们占用多份内存. elasticsearch output也是一样的, 如果每个 elasticsearch 设置了2并发, 那一共就是8个并发. 默认是一个线程
一个简单的配置
字段格式约定
以 Add Filter 举例
格式1 [XX][YY]
city: '[geo][cityname]'
格式2 {{XXX}}
{{XXX}}name: 'my name is {{.firstname}}.{{.lastname}}'
格式3 %{XXX}
%{XXX}%{date} {%time}web-%{appid}-%{+2006-01-02}
2006 01 02 15 06 05 这几个数字是 golang 里面特定的数字, 代表年月日时分秒. 1月2号3点4分5秒06年. 其实就像hangout里面的YYYY MM dd HH mm SS
格式4 除了1,2,3 之外的其它
在不同Filter中, 可能意义不同. 像 Date 中的 src: logtime, 是说取 logtime 字段的值.
Elasticsearch 中的 index_type: logs , 这里的 logs 不是指字段名, 就是字面值.
INPUT
Stdin
从标准输入读取数据.
codec
目前有json/plain两种.
@timestamp@timestamp@timestamp@timestamp
TCP
network
默认为 tcp , 可以明确指定使用 tcp4 或者 tcp6
address
监听端口, 无默认值, 必须设置
codec
默认 plain
Kafka
topic
weblog: 1
assign
topic
consumer_settings
bootstrap.servers group.id 必须配置
auto.commit.interval.ms 是指多久commit一次offset, 太长的话有可能造成数据重复消费,太短的话可能会对kafka造成太大压力.
max.partition.fetch.bytes 是指kafka client一次从kafka server读取多少数据,默认是10MB
from.beginning 如果第一次消费此topic, 或者是offset已经失效, 是从头消费还是从最新消费. 默认是 false. 但是如果已经有过commit offset, 会接着之前的消费.
sasl.mechanism 认证方式, 目前还只支持 PLAIN 一种
sasl.user sasl认证的用户名
sasl.password sasl认证的密码
OUTPUT
Stdout
输出到标准输出
if的语法参考下面 IF语法
TCP
network
默认为 tcp , 可以明确指定使用 tcp4 或者 tcp6
address
TCP 远端地址, 无默认值, 必须设置
concurrent
开几个 tcp 连接一起写, 默认1
Elasticsearch
bulk_actions
多少次提交一次Bulk请求到ES集群. 默认 5000
bulk_size
单位是MB, 多少大写提交一次到ES. 默认 15MB
flush_interval
单位秒, 间隔多少时间提交一次到ES. 默认 30
concurrent
bulk 的goroutine 最大值, 默认1
举例来说, 如果Bulk 1W条数据到ES需要5秒, 1W条数据从Input处理完所有Filters然后到Output也需要5秒. 那么把concurrent设置为1就合适, Bulk是异步的, 这5秒钟gohangout会去Filter接下来的数据.
如果Bulk 1W条数据需要10秒, Filter只要5秒, 那么concurrent设置为2可以达到更大的吞吐量.
routing
默认为空, 不做routing
id
默认为空, 不设置id (文档id由ES生成)
compress
默认 true, http请求时做zip压缩
retry_response_code
默认 [401, 502] , 当Bulk请求的返回码是401或者502时, 会重试.
两个额外的配置
_source
bytes_source_field优先级高于source_field. bytes_source_field是指字段是[]byte类型, source_field是指字段是string类型
增加这个配置的来由是这样的. 上游数据源已经是 json.dump之后的[]byte数据, 做一次json.parse, 然后再json.dump, 耗费了大量CPU做无用功.
clickhouse
table
表名. 必须配置
hosts
clickhouse 节点列表. 必须配置
fields
字段名. 必须配置
bulk_actions
多少次提交一次Bulk请求到ES集群. 默认 1000
flush_interval
单位秒, 间隔多少时间提交一次到ES. 默认 30
concurrent
bulk 的goroutine 最大值, 默认1
FILTER
通用字段
if
if 的语法如下
if 数组中的条件是 AND 关系, 需要全部满足.
目前 if 支持两种语法, 一种是 golang 自带的 template 语法, 一种是我自己实现的一套简单的DSL, 实现的常用的一些功能, 性能远超 template , 我把上面的语法按自己的DSL翻译一下.
Exist(a) && (Exist(b) || Exist(c))
目前支持的函数: 只有 EQ 函数需要使用双引号代表字符串, 因为 EQ 也可能做数字的比较, 其他所有函数都不需要双引号, 因为他们肯定是字符串函数
Exist(user,name)EQ(user,age,20)EQ(user,age,"20")HasPrefix(user,name,liu)HasSuffix(user,name,jia)Contains(user,name,jia)Match(user,name,^liu.*a$)^liu.*a$Random(20)Before(24h)当前时间+24小时After(-24h)当前时间-24小时
add_fields
例:
当Filter执行成功时, 可以添加一些字段. 如果Filter失败, 则忽略. 下面具体的Filter说明中, 提到的"返回false", 就是指Filter失败
remove_fields
例子如上. 当Filter执行成功时, 可以删除一些字段. 如果Filter失败, 则忽略.
failTag
tags
overwrite
配置的新字段要不要覆盖之前已有的字段, 默认 true
Add
- 增加 name 字段, 内容是 childe
- 增加 hostname 字段, 内容是原 host 字段中的内容. (相当于改名)
- 增加 logtime 字段, 内容是 date 和 time 两个字段的拼接
- 增加 message 字段, 是 event.stored.message 中的内容
- 将 event.stored.message 中的内容写入 event.a.b 字段中(如果没有则创建)
overwrite: true 的情况下, 这些新字段会覆盖老字段(如果有的话).
Convert
remove_if_fail
如果转换失败刚删除这个字段, 默认 false
setto_if_fail: XX
如果转换失败, 刚将此字段的值设置为 XX . 优先级比 remove_if_fail 低. 如果 remove_if_fail 设置为 true, 则setto_if_fail 无效.
Date
如果源字段不存在, 返回 false. 如果所有 formats 都匹配失败, 返回 false
src
源字段, 必须配置.
target
@timestamp
overwrite
默认 true, 如果目标字段已经存在, 会覆盖
add_year
有些日志中的时间戳不带年份信息, 默认 false . add_year: true 可以先在源字段最前面加四位数的年份信息然后再解析.
formats
除此外, 还有 UNIX UNIX_MS 两个可以设置
Drop
丢弃此条消息, 配置 if 条件使用
Filters
目的是为了一个 if 条件后跟多个Filter
Grok
源字段不存在, 返回 false. 所有格式不匹配, 返回 false
src
源字段, 默认 message
match
依次匹配, 直到有一个成功.
pattern_paths
会加载定义的 patterns 文件. 如果是目录会加载目录下的所有文件.
这里推荐 https://github.com/vjeantet/grok 项目, 里面把 logstash 中使用的 pattern 都翻译成了 golang 的正则库可以使用的.
ignore_blank
默认 true. 如果匹配到的字段为空字符串, 则忽略这个字段. 如果 ignore_blank: false , 则添加此字段, 其值为空字符串.
IPIP
根据 IP 信息补充地址信息, 会生成如下字段.
country_name province_name city_name
下面四个字段视情况生成, 可能会缺失. latitude longitude location country_code
如果没有源字段, 或者寻找失败, 返回 false
database
数据库地址. 数据可以在 https://www.ipip.net/ 下载
src
源字段, 必须设置
target
目标字段, 如果不设置, 则将IPIP Filter生成的所有字段写入到根一层.
KV
将 a=1&b=2, 或者name=app id=123 type=nginx 这样的字符串拆分成{a:1,b:2} {name:app, id:123, type:nginx} 等多个字段, 放到日志里面去.
配置如下
如果targete有定义, 会把拆分出来字段放在这个字段中, 如果没有定义,放到在顶层.
trim 是把拆分出来的字段内容做前后修整. 将不需要的字符去掉. 下面的示例就是说把双引号和tag都去掉.
trimkey和trim类似, 处理的是字段名称.
src
源字段, 必须设置
target
目标字段, 如果不设置, 则将IPIP Filter生成的所有字段写入到根一层.
field_split
各字段&值之间以什么分割, 一般都是逗号或者空格之类. 必须设置
value_split
字段名和值之间以什么连接, 一般是等号. 必须设置
Json
如果源字段不存在, 或者Json.parse 失败, 返回 false
field
源字段
target
目标字段, 如果不设置, 则将Json Filter生成的所有字段写入到根一层.
LinkMetric
做简单的流式统计, 统计多个字段之间的聚合数据.
每600s输出一次, 输出结果形式如下:
fieldsLink
->
timestamp
使用哪个字段做时间戳. 这个字段必须是通过 Date Filter 生成的(保证是 time.Time 类型)
batchWindow
多长时间内的数据聚合在一起, 单独是秒. 每隔X秒输出一次. 如果设置为1800 (半小时), 那么延时半小时以上的数据会被丢弃.
reserveWindow
保留多久的数据, 单独是秒. 因为数据可能会有延时, 所以需要额外保存一定时间的数据在内存中.
accumulateMode
两种聚合模式.
-
cumulative 累加模式. 假设batchWindow 是300, reserveWindow 是 1800. 在每5分钟时, 会输出过去5分钟的一批聚合数据, 同时因为延时的存在, 可能还会有(过去10分钟-过去5分钟)之间的一批数据. cumulative 配置下, 会保留(过去10分钟-过去5分钟)之前count值的内存中, 新的数据进来时, 累加到一起, 下个5分钟时, 输出一个累加值.
-
separate 独立模式. 每个5分钟输出之后, 把各时间段的值清为0, 从头计数.
windowOffset
延时输出, 默认为0. 如果设置 windowOffset 为1 , 那么每个5分钟输出时, 最近的一个window数据保留不会输出.
drop_original_event
是否丢弃原始数据, 默认为 false. 如果设置为true, 则丢弃原始数据, 只输出聚合统计数据.
reduce
countsum
LinkStatsMetric
和 LinkMetric 类似, 但最后一个字段需要是数字类型, 对它进行统计.
举例:
Lowercase
Remove
Rename
Split
src
数据来源字段, 默认 message , 如果字段不存在, 返回false
sep
分隔符, 在 strings.SplitN(src, sep, maxSplit) 中用被调用. 必须配置.
如果分隔符包含不可见字符, yaml配置以及gohangout也是支持的, 像下面这样
maxSplit
在 strings.SplitN(src, sep, maxSplit) 中用被调用, 默认 -1, 代表无限制
fields
如果分割后的字符串数组长度与 fields 长度不一样, 返回false
ignore_blank
如果分割后的某字段为空, 刚不放后 event 中, 默认 true
trim
用来把分割后的字段, 去除两边的一些空格或者是标点等.
Translate
字段翻译. 字典使用 yaml 格式. 配置例子如下:
Uppercase
Replace
最后面的 1 代表, 只替换一次. 如果不给这个值, 代表替换所有的.
比如上面, 就是把 name 字段中的第一个 wang 换成 Wang, 把所有 en 换成 eng