# Data input: If logstash is used alone, enter data directly from our log file. Here we enter data from filebeat port echoes the port number we configured in Logstash output.
input {
beats {
port => 9601
}
#file {
#path => "E:/test/log/*/*.log"
#start_position => "beginning"
#stat_interval => 3
#}
}
# Data filtering
filter {
if [fields][logtype] == "testlog" { # [fields][logtype] equals the logtype field in filebeat
if ([message]=~ "regular expression") { # message refers to the original message: if the original message satisfies this regular expression, the record will be discarded.
drop{}
}
grok {
match => {
# Try to match the message data and store the matching data into the es table. The field name is as follows: timestamp,client_ip,...
# message configuration: string or array
"message" => [
"\[%{TIMESTAMP_ISO8601:timestamp}\] %{IP:client_ip} %{USERNAME:method} %{URL:url}%{CUSTOMURIPARAM:param} %{NUMBER:duration}"
]
}
}
# Convert our time format YYYY-mm-dd HH:ii:ss into a timestamp form and save it to es: replace es' @timestamp field
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# Since we matched a timestamp field and replaced the default @timestamp field of es; so the es field timestamp and @timestamp values are to wait, so we delete the fields that we matched
mutate{
remove_field => ["timestamp"]
}
}
}
# Data output
output {
if [fields][logtype] == "testlog" { #The source of the file is testlog, corresponding to the logtype field of filebeat
if "_grokparsefailure" not in [tags]{ #Match successfully, save es success-log table
elasticsearch {
# hosts configuration: string or array
hosts => ["localhost:9200"]
index => "success-log-%{+}"
}
} else { #Match failed, the error-log table of es is stored
elasticsearch {
hosts => ["localhost:9200"]
index => "error-log-%{+}"
}
}
} #else { #When filebeat has multiple log file sources, determine which es table to be stored according to the logtype field of filebeat
#elasticsearch {
#hosts => ["localhost:9200"]
#index => "other-log-%{+}"
#}
#}
}