ElasticSearchNote
Summary: Author: 张亚飞 | Read Time: 3 minute read | Published: 2018-03-24
Filed under
—
Categories:
Linux
—
Tags:
Note,
查看 logstash 已安装插件列表
./bin/logstash-plugin list --group input
插件列表
logstash-input-azure_event_hubs
logstash-input-beats
logstash-input-couchdb_changes
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jdbc
logstash-input-jms
logstash-input-pipe
logstash-input-redis
logstash-input-s3
logstash-input-snmp
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix
如何使用 Http 插件调试日志解析数据
input {
http {
host => "127.0.0.1" # default: 0.0.0.0
port => 7474 # default: 8080
}
}
curl -XPUT 'http://127.0.0.1:7474/twitter/tweet/1' -d 'hello'
如何解析 Json 日志格式
filebeat.yaml
filebeat:
prospectors:
- paths:
- my_json.log
fields_under_root: true
fields:
tags: ['json']
output:
logstash:
hosts: ['localhost:5050']
logstash.yaml
input {
beats {
port => 5050
}
}
filter {
if [tags][json] {
json {
source => "message"
}
}
}
output {
stdout { codec => rubydebug { metadata => true } }
}
或者使用 filebeat
配置加上 json.message_key: event
和 json.keys_under_root: true
两个字段:
- type: log
enabled: true
paths:
- /home/worker/server/vrectifier/logs/run.log
fields:
logtype: "brtc-vrectifier"
fields_under_root: true
json.message_key: event
json.keys_under_root: true
close_inactive: 1m
scan_frequency: 10s
tail_files: true
- Structured logging with Filebeat
- Sending json format log to kibana using filebeat, logstash and elasticsearch?
mutate {
remove_field => [
"input", "prospector", "@version", "@timestamp"
]
}
if [logtype] == "brtc-vconsole"{
mutate {
remove_field => [
"@version", "@timestamp"
]
}
}
解析测试
strict field reference parser rejects valid input data
echo '{"[foo":"bar"}' | /usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash" -e 'input{stdin{codec=>json_lines}} output{stdout{codec=>rubydebug}}'
正则替换
ruby {
code => "if event['message']
event['message'] = event['message'].gsub('\\x','\\\x')
end
"
}
使用以下规则报错,暂不知道原因
mutate {
gsub => [
"message", "\\x", "\\\x"
]
}
logstash中文文档以及input,filter,output解析
Logstash 获取时间
解析时间戳: [kafka][timestamp]
date {
timezone => "Asia/Shanghai"
match => ["[kafka][timestamp]", "UNIX_MS"]
target => "[kafka][ts]"
}
UNIX_MS
规则可以匹配: 13800000000666
使用 [][]
解析下级
date {
match => [ "[docs][time]", "ISO8601" ]
target => "timing"
}
解析不带时区信息的时间
date {
match => [ "[docs][time]", "ISO8601" ]
target => "timing"
}
ISO8601
规则可以匹配: xxxx
date {
match => [ "times", "YYYY-MM-dd HH:mm:ss.SSSSSS" ]
target => "timing"
}
YYYY-MM-dd HH:mm:ss.SSSSSS
可以匹配 2022-05-14T05:48:45.085321
时间 2022-05-14T05:48:45.085321
解析后为 timing:2022-05-14T05:48:45.085Z
注意:如果匹配不上的话
[](https://discuss.elastic.co/t/setting-a-timestamp-from-a-json-parsed-object/86910) [](http://niyanchun.com/modify-attimestamp-field-in-logstash.html) [](https://discuss.elastic.co/t/how-to-set-my-own-field-value-as-timestamp/77366)
消息丢弃
filter {
if [level] in ["DEBUG"] {
drop{}
}
}
ruby code
正则匹配
filter {
if [logtype] == "brtc-vcollections" and [msg] == "APIStats" and [docs][Status] >= 200 {
ruby {
code => "
http_host='brtc-data-collection.baijiayun.com'
uri=event.get('[docs][URI]')
if uri =~ /^\/offer/
uri='/offers'
end
alert_events = LogStash::Event.new()
alert_events.set('@timestamp', event.get('@timestamp'))
alert_events.tag('alert-gate-event')
alert_events.set('hostname', event.get('[host][name]'))
alert_events.set('event_ts', (event.get('@timestamp').to_f.round(3)*1000).to_i)
alert_events.set('group', 'bdata')
alert_events.set('project', event.get('logtype'))
alert_events.set('level', 'ERROR')
alert_events.set('key', 'APIStats:'+event.get('[docs][Status]').to_s+'.'+uri)
alert_events.set('msg', 'status: [' + event.get('[docs][Status]').to_s + '] http_host:['+http_host+'] url:['+event.get('[docs][URI]')+'] timestamp: ['+event.get('@timestamp').to_s + '] guid: [' + event.get('[docs][trace]') + '] client_ip:[' + event.get('[docs][RemoteIP]').to_s + '] request_time: [' + event.get('[docs][time]').to_s + ']')
new_event_block.call(alert_events)
"
}
}
}
正则替换
filter{
event.to_hash.each { |k,v|
if k =~ "^prometheus.metrics.ems"
newK = k.sub(/^prometheus.metrics.ems/, "")
event.remove(k)
event.set(newK, v)
end
}
}
Logstash Event API
Logstash Event API event_spec.rb
Comments