ElasticSearchNote

Summary: Author: 张亚飞 | Read Time: 3 minute read | Published: 2018-03-24
Filed under Categories: LinuxTags: Note,

查看 logstash 已安装插件列表

./bin/logstash-plugin list --group input

插件列表

logstash-input-azure_event_hubs
logstash-input-beats
logstash-input-couchdb_changes
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jdbc
logstash-input-jms
logstash-input-pipe
logstash-input-redis
logstash-input-s3
logstash-input-snmp
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix

如何使用 Http 插件调试日志解析数据

input {
  http {
    host => "127.0.0.1" # default: 0.0.0.0
    port => 7474 # default: 8080
  }
}
curl -XPUT 'http://127.0.0.1:7474/twitter/tweet/1' -d 'hello'

如何解析 Json 日志格式

filebeat.yaml

filebeat:
  prospectors:
    - paths:
        - my_json.log
      fields_under_root: true
      fields:
        tags: ['json']
output:
  logstash:
    hosts: ['localhost:5050']

logstash.yaml

input {
  beats {
    port => 5050
  }   
}  

filter {
  if [tags][json] {
    json {
      source => "message"
    } 
  }
}

output {
  stdout { codec => rubydebug { metadata => true } } 
}

或者使用 filebeat 配置加上 json.message_key: eventjson.keys_under_root: true 两个字段:

  - type: log
    enabled: true
    paths:
      - /home/worker/server/vrectifier/logs/run.log
    fields:
      logtype: "brtc-vrectifier"
    fields_under_root: true
    json.message_key: event
    json.keys_under_root: true
    close_inactive: 1m
    scan_frequency: 10s
    tail_files: true

mutate {
    remove_field => [
        "input", "prospector", "@version", "@timestamp"
    ]
}
if [logtype] == "brtc-vconsole"{
    mutate {
        remove_field => [
            "@version", "@timestamp"
        ]
    }
}

解析测试

strict field reference parser rejects valid input data

echo '{"[foo":"bar"}' | /usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash" -e 'input{stdin{codec=>json_lines}} output{stdout{codec=>rubydebug}}'

正则替换

ruby {
    code => "if event['message']
          event['message'] = event['message'].gsub('\\x','\\\x')
          end
        "
}

使用以下规则报错,暂不知道原因

mutate {
    gsub => [
        "message", "\\x", "\\\x"
    ]
}

logstash中文文档以及input,filter,output解析


Logstash 获取时间

解析时间戳: [kafka][timestamp]

date {
    timezone => "Asia/Shanghai"
    match => ["[kafka][timestamp]", "UNIX_MS"]
    target => "[kafka][ts]"
}

UNIX_MS 规则可以匹配: 13800000000666

使用 [][] 解析下级

date {
    match => [ "[docs][time]", "ISO8601" ]
    target => "timing"
}

解析不带时区信息的时间

date {
    match => [ "[docs][time]", "ISO8601" ]
    target => "timing"
}

ISO8601 规则可以匹配: xxxx

date {
    match => [ "times", "YYYY-MM-dd HH:mm:ss.SSSSSS" ]
    target => "timing"
}

YYYY-MM-dd HH:mm:ss.SSSSSS 可以匹配 2022-05-14T05:48:45.085321

时间 2022-05-14T05:48:45.085321 解析后为 timing:2022-05-14T05:48:45.085Z

注意:如果匹配不上的话

[](https://discuss.elastic.co/t/setting-a-timestamp-from-a-json-parsed-object/86910) [](http://niyanchun.com/modify-attimestamp-field-in-logstash.html) [](https://discuss.elastic.co/t/how-to-set-my-own-field-value-as-timestamp/77366)


消息丢弃

filter {
  if [level] in ["DEBUG"] {
    drop{}
  }
}

ruby code

正则匹配

filter {
  if [logtype] == "brtc-vcollections" and [msg] == "APIStats" and [docs][Status] >= 200 {
    ruby {
        code => "
            http_host='brtc-data-collection.baijiayun.com'
            uri=event.get('[docs][URI]')
            if uri =~ /^\/offer/
                uri='/offers'
            end
            alert_events = LogStash::Event.new()
            alert_events.set('@timestamp', event.get('@timestamp'))
            alert_events.tag('alert-gate-event')
            alert_events.set('hostname', event.get('[host][name]'))
            alert_events.set('event_ts', (event.get('@timestamp').to_f.round(3)*1000).to_i)
            alert_events.set('group', 'bdata')
            alert_events.set('project', event.get('logtype'))
            alert_events.set('level', 'ERROR')
            alert_events.set('key', 'APIStats:'+event.get('[docs][Status]').to_s+'.'+uri)
            alert_events.set('msg', 'status: [' + event.get('[docs][Status]').to_s + '] http_host:['+http_host+'] url:['+event.get('[docs][URI]')+'] timestamp: ['+event.get('@timestamp').to_s + '] guid: [' + event.get('[docs][trace]') + '] client_ip:[' + event.get('[docs][RemoteIP]').to_s + '] request_time: [' + event.get('[docs][time]').to_s + ']')
            new_event_block.call(alert_events)
        "
    }
  }
}

正则替换

filter{
  event.to_hash.each { |k,v|
    if k =~ "^prometheus.metrics.ems"
        newK = k.sub(/^prometheus.metrics.ems/, "")
        event.remove(k)
        event.set(newK, v)
    end
  }
}

Logstash Event API

Logstash Event API event_spec.rb


Comments

Cor-Ethan, the beverage → www.iirii.com