数据库日志分析

整体架构

filebeat -> logstash -> elasticseach -> kibana

  • filebeat 收集日志
  • logstash 中转及日志规则匹配过滤
  • elasticsearch 日志存储检索库
  • kibana 查看界面

postgresql

log_destination = 'csvlog'
logging_collector = 'on'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_rotation_age = '1d'
log_rotation_size = '100MB'
log_min_messages = 'info'
log_min_duration_statement = '1000'
log_statement = 'ddl'

filebeat

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/lib/pgsql/***/postgresql-*.csv
  fields:
    log_topics: postgresql

  multiline.pattern: '^[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3} [A-Z]{3}'
  multiline.negate: true
  multiline.match: after
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 1
tags: ["postgesql"]
setup.kibana:
output.logstash:
  hosts: ["*.*.*.*:5044"]
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

logstash

#
# this config is use for version logstash 7.3.1
#

input {
  beats {
    port => 5044
  }

  #sdtin{
  #
  #}

}


# The filter part of this file is commented out to indicate that it
# is optional.

# use csv plugin against pglog , pglog neet set log format to csv first.
filter {
  csv {
    columns => [
      "log_time",
      "user_name",
      "database_name",
      "process_id",
      "connection_from",
      "session_id",
      "session_line_num",
      "command_tag",
      "session_start_time",
      "virtual_transaction_id",
      "transaction_id",
      "error_severity",
      "sql_state_code",
      "message",
      "detail",
      "hint",
      "internal_query",
      "internal_query_pos",
      "context",
      "query",
      "query_pos",
      "location",
      "application_name"
    ]
  }
}

#
# different log type out put different log format
# 1. log duration log
# 2. norm log statment
# 3. checkpint log
# 4. other
filter {
 if [message] =~ /duration: [0-9]{1,8}[.]{0,1}[0-9]{1,5} ms/ {
     mutate {
         split => { "message" => "statement:" }
         add_field => {"duration" => "%{[message][0]}"}
         add_field => {"statement" => "%{[message][1]}"}
     }
    mutate { gsub => [ "duration", "duration: ", "" ] }
    mutate { gsub => [ "duration", " ms", "" ] }
    mutate { convert => { "duration" => "float" } }
    mutate {
      add_field => {"sqltype" =>  "slowsql" }
      remove_field => "message"
   }
  } else if [message] =~ /statement: / {
    mutate { gsub => [ "message", "statement: ", "" ] }
    mutate { rename => {"message" => "statement"}}
    mutate {add_field => { "sqltype" => "statementsql" } }
  } else if [message] =~ /checkpoint / {
    mutate {add_field => { "sqltype" => "checkpoint" } }
    mutate { rename => {"message" => "statement"}}
  } else{
    mutate {add_field => { "sqltype" => "other" } }
  }

  mutate { remove_field => [
    "connection_from",
    "session_line_num",
    "command_tag",
    "session_start_time",
    "virtual_transaction_id",
    "transaction_id",
    "error_severity",
    "sql_state_code",
    "detail",
    "hint",
    "internal_query",
    "internal_query_pos",
    "context",
    "query",
    "query_pos",
    "location",
    "application_name",
    "source",
    "input_type"
  ] }
}

output {
# file {
#   path => "/etc/logstash/pg.log"
#   codec => line { format => "custom format: %{message}"}
# }

#   for debug

#    stdout {
#        codec => rubydebug
#    }


 elasticsearch {
   hosts => "*.*.*.*:9200"
#   manage_template => false
   template_name => "postgres_template"
   index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
   #user => "elastic"
   #password => "123456"
 }
}