Logstash正则提取Nginx日志 发表于 2021-12-06 | 分类于 ELK | 暂无评论 为什么需要提取?使用一整行日志无法分析,需要提取单独的字段 - 分析哪个IP访问量大 - 分析Nginx的响应状态码 ```shell Nginx日志格式 192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-" Nginx日志格式配置 log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; ``` Grok提取利器,需要掌握正则表达式。借助Kibana的Grok工具验证提取 自写正则提取(建议) 内置规则提取(简化):/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns ```shell Grok自写正则提取语法: (?<字段名>自写正则表达式) (?\d+\.\d+\.\d+\.\d+) 内置正则提取语法:%{内置正则表达式:字段名} %{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent} 混合语法提取 (?\d+\.\d+\.\d+\.\d+) - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] ``` #### 普通正则表达式符号 ```shell . 表示任意一个字符,* 表示前面一个字符出现0次或者多次 [abc]表示中括号内任意一个字符,[^abc]表示非中括号内的字符 [0-9]表示数字,[a-z]表示小写字母,[A-Z]表示大写字母,[a-zA-Z]表示所有字母,[a-zA-Z0-9]表示所有字母+数字 [^0-9]表示非数字 ^xx表示以xx开头,xx$表示以xx结尾 \s表示空白字符,\S表示非空白字符,\d表示数字 ``` #### 扩展正则表达式,在普通正则基础上再进行扩展 ```shell ?表示前面字符出现0或者1次,+前面字符出现1或者多次 {a}表示前面字符匹配a次,{a,b}表示前面字符匹配a到b次 {,b}表示前面字符匹配0次到b次,{a,}前面字符匹配a或a+次 string1|string2表示匹配string1或者string2 ``` #### Logstash正则提取Nginx写入ES ```shell Logstash提取字段配置 input { file { path => "/var/log/nginx/access.log" } } filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } } output { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgnginx-%{+YYYY.MM.dd}" } } ``` #### Kibana显示感叹号问题处理 Kibana索引刷新 Kibana索引的操作并不会影响到数据,删除重建也没问题 #### Logstash字段特殊处理-替换或转类型 ```shell http_user_agent包含双引号,需要去除 filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } mutate { gsub => [ "http_user_agent",'"',"" ] } } Logstash字符串转整形 mutate{ gsub => [ "http_user_agent",'"',"" ] convert => { "status" => "integer" } convert => { "body_bytes_sent" => "integer" } } ``` #### Logstash替换时间戳@timestamp ```shell Nginx模拟用户访问 while true;do curl 192.168.238.90/sjg666 curl 127.0.0.1 sleep 2 done 场景假设 假设我们要分析用户昨天的访问日志 Logstash分析所有Nginx日志,发现问题 input { file { path => "/var/log/nginx/access.log" start_position => "beginning" sincedb_path => "/dev/null" } } 两种时间 发送日志时间,无法分析日志 用户的访问时间在日志里,需要以日志里的为准,分析的结果才准确 以用户访问时间为准,格式为01/Aug/2020:10:34:20 +0800 filter { grok { match => { "message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}' } remove_field => ["message"] } date { match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"] target => "@timestamp" } } #### 日志里如果有不同的时间格式,覆盖的时候格式要对应 20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss 2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS 手动统计Nginx的请求和网页显示进行对比 cat /var/log/nginx/access.log |awk '{print $4}'|sed 's/:[0-9][0-9]$//g'|sort |uniq -c #### 时间戳覆盖后删除 mutate { gsub => [ "http_user_agent",'"',"" ] convert => { "status" => "integer" } convert => { "body_bytes_sent" => "integer" } remove_field => ["time_local"] } ``` #### Logstash正则提取异常处理 ```shell Logstash改成分析最新日志 input { file { path => "/var/log/nginx/access.log" } } 正则提取有异常的情况 echo "sjgmethods xxx xxx" >> /var/log/nginx/access.log tags: _grokparsefailure 设置正则出错提取到另外的索引里 output { if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgnginx-%{+YYYY.MM.dd}" } } else{ elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgfail-%{+YYYY.MM.dd}" } } } ``` ### Kibana图形简单使用 模拟数据 while true;do curl 192.168.238.90/sjg666; curl 127.0.0.1; sleep 2; done #### 首页区域 - 可以根据时间查看访问量:每分钟访问量 - 可以根据某个字段查询 - 可以单独看某个字段的统计 #### Kibana图形有建立,选择terms去查看对应的数据 - 饼图的创建 pie_remote_addr - 表的创建 table_remote_addr #### Kibana面板的创建sjg_dash - 创建面板 - 在面板上添加图形 建议采用Grafana展示 ### Logstash分析Linux系统日志 ```shell 默认的日志格式 Aug 3 18:37:57 sjg1 sshd[1318]: Accepted password for root from xxx port 49205 ssh2 无年份的字段 系统日志配置/etc/rsyslog.conf,重启rsyslog $template sjgformat,"%$NOW% %TIMESTAMP:8:15% %hostname% %syslogtag% %msg%\n" $ActionFileDefaultTemplate sjgformat 日志格式 2020-08-03 18:47:34 sjg1 sshd[1522]: Accepted password for root from 58.101.14.103 port 49774 ssh2 %{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?.*) 只读权限添加 chmod +r secure 提取secure日志,messages等其它日志提取原理类似 input { file { path => "/var/log/secure" } } filter { grok { match => { "message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?.*)' } remove_field => ["message"] } date { match => ["timestamp", "yyyy-MM-dd HH:mm:ss"] target => "@timestamp" } mutate { remove_field => ["timestamp"] } } output { elasticsearch { hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"] user => "elastic" password => "sjgpwd" index => "sjgsecure-%{+YYYY.MM.dd}" } } ``` 转载自 > https://www.cnblogs.com/k8s-pod/p/13906258.html