filebeat采集多个目录日志的场景是因为我的场景是kubernetes,在kubernetes中所有的服务都跑在容器中,容器中的日志都在node节点上,server端的日志我是直接采集的一个路径,在路径中所有的server服务的日志都采集,nginx的日志路径和server日志路径是一致的,不过在全文采集时出现了利用正常,server的日志格式和nginx的日志格式不一样。所以采集日志的时候server的日志采集的比较全,但是nginx的日志就采集不全了,这就需要我采集两个路径的日志,不过两个不同路径的日志就需要分别存储,我这里使用的是kafka进行过滤,那就需要使用logstash进行区分。
# cat filebeat-kafka.yaml apiVersion: v1 kind: ConfigMap metadata: name: filebeat namespace: logging labels: k8s-app: filebeat data: filebeat.yml: |- filebeat.inputs: - type: log paths: - /var/log/containers/english*.log tags: ["eng_java_logs"] multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}\ [0-9]{2}:[0-9]{2}:[0-9]{2}' multiline.negate: true multiline.match: after multiline.timeout: 10s encoding: utf-8 document_type: mysql-proxy scan_frequency: 20s harverster_buffer_size: 16384 max_bytes: 10485760 tail_files: true - type: log paths: - /var/log/pods/*.log key: ["eng_nginx_logs"] multiline.pattern: '^{' multiline.negate: true multiline.match: after multiline.timeout: 10s encoding: utf-8 document_type: mysql-proxy scan_frequency: 20s harverster_buffer_size: 16384 max_bytes: 10485760 tail_files: true .................................
这样在filebeat中就能采集两个路径下的所有日志,这里要注意,日志的数据一定要在两个路径下,要不会采集不到数据,把采集的数据发送到kafka中,然后在logstash中进行过滤,根据不同的路径日志写到不同的索引中。
input { kafka { bootstrap_servers => "10.16.30.1:9092" client_id => "logstash01" topics => ["ienglish"] group_id => "logstash" decorate_events => true codec => "json" tags => "eng_java_logs" key => "eng_nginx_logs" } } filter { mutate{ remove_field => ["_id"] remove_field => ["_score"] remove_field => ["_type"] remove_field => ["_index"] remove_field => ["host"] remove_field => ["agent"] remove_field => ["ecs"] remove_field => ["tags"] remove_field => ["fields"] remove_field => ["@version"] #remove_field => ["@timestamp"] remove_field => ["stream"] remove_field => ["log"] remove_field => ["kubernetes"] remove_field => ["input"] } mutate{ add_field => { "kubernetes.container.name" => "kubernetes.container.name" } } grok{ match => { "message" => "%{TIMESTAMP_ISO8601:access_time} %{LOGLEVEL:loglevel} \[%{DATA:exception_info}\] - \<%{MESSAGE:message}\>" } pattern_definitions => { "MESSAGE" => "[\s\S]*" } } date { match => [ "access_time","yyyy-MM-dd HH:mm:ss,SSS" ] } mutate { remove_field => ["access_time","[message][0]"] } } output { if [tags] == "eng_java_logs" { elasticsearch { hosts => ["127.0.0.1:9200"] index => "server-%{+YYYY.MM.dd}" } } else if [key] == "eng_nginx_logs" { elasticsearch { hosts => ["127.0.0.1:9200"] index => "msg-%{+YYYY.MM.dd}" } }
这样就能保证两个不同的路径写到不同的索引中了,第二个路径中的日志没有做过滤,直接写到es中,在es中会产生两个索引的文件分别是msg开头和server开头的文件,这样就可以根据不同的索引进行匹配内容了。
您可以选择一种方式赞助本站
支付宝扫一扫赞助
微信钱包扫描赞助
赏