Logstash配置详解
Logstash配置详解
配置文件简介
Logstash的配置文件目录在安装目录下的config目录下。
配置文件分类:
logstash.yml:Logstash的主配置文件。
pipelines.yml:包含在单个Logstash实例中运行多个管道的框架和说明。
jvm.options:配置Logstash的JVM,使用此文件设置总堆空间的初始值和最大值,对该服务的资源占用进行调优。
log4j2.properties:log4j 2库的默认设置。
管道配置
基础配置规则
配置格式
Logstash管道通常有三个阶段:收集—>过滤—>输出,所以我们配置中也可以分成三个部分进行配置。格式如下:
input {
...
}
filter {
...
}
output {
...
}
配置匹配规则
每个部分都可以配置一个或者多个插件配置,如果配置了多个过滤和输出,则会按照配置文件中编写的顺序进行应用和依次输出到每个目标。
多管道配置
如果需要在同一个进程中运行多个管道,通过配置pipelines.yml文件来处理,必须放在path.settings文件夹中。
# config/pipelines.yml
- pipeline.id: pipeline-1
path.config: "/usr/local/logstash/conf/nginx-logs.conf"
pipeline.workers: 3
- pipeline.id: pipeline-2
path.config: "/usr/local/logstash/conf/apache-logs.conf"
queue.type: persisted
Input配置
配置项参数
| 配置项 | 说明 |
|---|---|
| path | 数组类型,用于监听读取的文件路径,基于glob匹配语法。 |
| exclude | 数组类型,排除不想监听的文件规则,基于glob匹配语法 |
| sincedb_path | 字符串类型,记录sinceddb文件路径,默认路径是当前用户的home文件夹 |
| start_position | 字符串类型,可以配置为beginning/end,是否从头读取文件 |
| stat_interval | 数值类型,单位为秒,定时检查文件是否有更新,默认是1秒 |
| discover_interval | 数值类型,单位为秒,定时检查是否有新文件待读取,默认是15秒 |
| ignore_older | 数值类型,单位为秒,扫描文件列表时,如果该文件上次更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭 |
| close_older | 数值类型,单位为秒,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认3600秒,即1小时 |
1.从文件输入
input {
file {
path => "文件路径"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
2.从HTTP输入
input {
http {
port => 端口号
}
}
3.从TCP输入
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 端口号
codec => json_lines
}
}
Filter配置
配置项参数
| 插件名 | 说明 |
|---|---|
| date | 日期解析 |
| grok | 正则匹配解析 |
| dissect | 分隔符解析 |
| mutate | 对字段作处理,比如重命名、删除、替换 |
| json | 按照json解析字段内容到指定字段中 |
| geoip | 增加地理位置数据 |
| ruby | 利用ruby代码来动态修改Logstash Event |
1.date插件配置
可以将日期字符串解析为日期类型,然后替换@timestamp字段或者指定其他字段。
filter {
date {
match => ["logdate", "MM dd yyy HH:mm:ss"]
}
}
2.grok插件配置
解析日志内容,可以解析成json数组。
grok语法
- 匹配数值类型:%{NUMBER:duration}
- 通过在最后指定为int或者float来强转类型:%{NUMBER:duration:int}
配置实例
原日志内容:
64.88.9.216 [17/July/2024:08:05:03 +0000] "GET /presentations/logstash-monitorama-2024/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
配置grok:
filter {
grok {
match => {
"message" => "%{IPORHOST:clientip} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"
}
}
}
3.dissect插件配置
通过分隔符原理解析数据。因为dissect语法简单,因此它能处理的场景比较有限,它只能处理格式相似,且有分隔符的字符串。
dissect语法
- %{}里面是字段
- 两个%{}之间是分隔符。
配置实例
原日志内容:
64.88.9.216 [17/July/2024:08:05:03 +0000] "GET /presentations/logstash-monitorama-2024/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
dissect配置:
filter {
dissect {
mapping => {
"message" => "%{clientip} [%{timestamp}] %{request} %{response} %{bytes} %{referrer} %{agent}"
}
}
}
4.mutate插件配置
对字段进行各种操作,比如重命名、删除、替换、更新等。
- convert:类型转换
- gsub:字符串替换
- split、join、merge:字符串切割、数组合并为字符串、数组合并为数组
- rename:字段重命名
- update、replace:字段内容更新或替换。它们都可以更新字段的内容,区别在于update只在字段存在时生效,而replace在字段不存在时会执行新增字段的操作
- remove_field:删除字段
filter {
mutate {
convert => {"age" => "integer"}
gsub => [
"birthday", "-", "/",
"image", "\/", "_"
]
split => {"hobities" => ","}
rename => {"hobities" => "hobityList"}
replace => {"icon" => "%{image}"}
remove_field => ["message"]
}
}
5.json插件配置
将内容为json格式的数据解析出来。
filter {
json {
source => "message"
target => "msg_json"
}
}
6.geoip插件配置
根据ip地址提供对应的地域信息,比如经纬度、城市名等,方便进行地理数据分析。
filter {
geoip {
source => "clientIp"
}
}
Output配置
1.输出到控制台
output {
file {
path => "文件路径"
codec => line {format => %{message}}
}
}
2.输出到文件
output {
file {
path => "文件路径"
codec => line {format => %{message}}
}
}
3.输出到Elasticsearch
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
document_type => "_doc"
user => "用户名"
password => "密码"
}
}
根据不同类型写入不同索引或目标
可以在input部分定义类型,然后在output中根据类型判断决定输出的目标
input {
file{
path => "/var/log/nginx/access.log"
type => "access"
start_position => "beginning"
}
file{
path => "/var/log/nginx/error.log"
type => "error"
start_position => "beginning"
}
}
output {
if [type] == "access" {
elasticsearch {
hosts => ["192.168.31.101:9200"]
index => "nginx_access-%{+YYYY.MM.dd}"
}
}
if [type] == "error" {
elasticsearch {
hosts => ["192.168.31.102:9200"]
index => "nginx_error-%{+YYYY.MM.dd}"
}
}
}