filebeat收集多个目录日志

avatar 2020年7月2日21:36:46 评论 5,315 次浏览

filebeat采集多个目录日志的场景是因为我的场景是kubernetes,在kubernetes中所有的服务都跑在容器中,容器中的日志都在node节点上,server端的日志我是直接采集的一个路径,在路径中所有的server服务的日志都采集,nginx的日志路径和server日志路径是一致的,不过在全文采集时出现了利用正常,server的日志格式和nginx的日志格式不一样。所以采集日志的时候server的日志采集的比较全,但是nginx的日志就采集不全了,这就需要我采集两个路径的日志,不过两个不同路径的日志就需要分别存储,我这里使用的是kafka进行过滤,那就需要使用logstash进行区分。

# cat filebeat-kafka.yaml 
apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat
  namespace: logging
  labels:
    k8s-app: filebeat
data:
  filebeat.yml: |-
    filebeat.inputs:
    - type: log
      paths:
        - /var/log/containers/english*.log
      tags: ["eng_java_logs"]  
      multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}\ [0-9]{2}:[0-9]{2}:[0-9]{2}'
      multiline.negate: true
      multiline.match: after
	  multiline.timeout: 10s
	  encoding: utf-8
	  document_type: mysql-proxy
	  scan_frequency: 20s
	  harverster_buffer_size: 16384
	  max_bytes: 10485760
	  tail_files: true
    - type: log
      paths:
        - /var/log/pods/*.log
	  key: ["eng_nginx_logs"]
	  multiline.pattern: '^{'
	  multiline.negate: true
	  multiline.match: after
	  multiline.timeout: 10s
	  encoding: utf-8
	  document_type: mysql-proxy
	  scan_frequency: 20s
	  harverster_buffer_size: 16384
	  max_bytes: 10485760
	  tail_files: true
.................................

这样在filebeat中就能采集两个路径下的所有日志,这里要注意,日志的数据一定要在两个路径下,要不会采集不到数据,把采集的数据发送到kafka中,然后在logstash中进行过滤,根据不同的路径日志写到不同的索引中。

input {
  kafka {
    bootstrap_servers => "10.16.30.1:9092"
    client_id => "logstash01"
    topics => ["ienglish"]
    group_id => "logstash"
    decorate_events => true
    codec => "json"
    tags =>  "eng_java_logs"
    key => "eng_nginx_logs"

  }
}

filter {
    mutate{
        remove_field => ["_id"]
        remove_field => ["_score"]
        remove_field => ["_type"]
        remove_field => ["_index"]
        remove_field => ["host"]
        remove_field => ["agent"]
        remove_field => ["ecs"]
        remove_field => ["tags"]
        remove_field => ["fields"]
        remove_field => ["@version"]
        #remove_field => ["@timestamp"]
        remove_field => ["stream"]
        remove_field => ["log"]
        remove_field => ["kubernetes"]
        remove_field => ["input"]
    }
   mutate{
        add_field => { "kubernetes.container.name" => "kubernetes.container.name" }
   }
   grok{
        match => {
            "message" => "%{TIMESTAMP_ISO8601:access_time} %{LOGLEVEL:loglevel} \[%{DATA:exception_info}\] - \<%{MESSAGE:message}\>"
        }
        pattern_definitions => {
            "MESSAGE" => "[\s\S]*"
        }        
   }
    date {
        match => [ "access_time","yyyy-MM-dd HH:mm:ss,SSS" ]
    }
    mutate {
        remove_field => ["access_time","[message][0]"]
    }

}


output {
  if [tags] == "eng_java_logs" {
    elasticsearch {
    hosts => ["127.0.0.1:9200"]
    index => "server-%{+YYYY.MM.dd}"
  }
} else if [key] == "eng_nginx_logs" {
  elasticsearch {
  hosts => ["127.0.0.1:9200"]
  index => "msg-%{+YYYY.MM.dd}"
  }
}

这样就能保证两个不同的路径写到不同的索引中了,第二个路径中的日志没有做过滤,直接写到es中,在es中会产生两个索引的文件分别是msg开头和server开头的文件,这样就可以根据不同的索引进行匹配内容了。

avatar

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: