...
Otherwise we pick the first role => <kafka_topic_prefix>_<first role>_logs ex: staging_gfac_logs (kafka.topic.prefix = staging)
We have tried using the log messages and push them to elastic search cluster and view them kibana. To learn about how to setup elastic search and logstash please refer Related articles section. We have used below logstash configuration to read airavata log messages and push them
to elastic search.
Code Block |
---|
input {
kafka {
topic_id => "local_all_logs"
zk_connect => "127.0.0.1:2181"
auto_offset_reset => "smallest"
type => "all_logs"
}
kafka {
topic_id => "local_apiserver_logs"
zk_connect => "127.0.0.1:2181"
auto_offset_reset => "smallest"
type => "apiserver_logs"
}
kafka {
topic_id => "local_gfac_logs"
zk_connect => "127.0.0.1:2181"
auto_offset_reset => "smallest"
type => "gfac_logs"
}
kafka {
topic_id => "local_orchestrator_logs"
zk_connect => "127.0.0.1:2181"
auto_offset_reset => "smallest"
type => "orchestrator_logs"
}
kafka {
topic_id => "local_credentialstore_logs"
zk_connect => "127.0.0.1:2181"
auto_offset_reset => "smallest"
type => "credentialstore_logs"
}
}
filter {
mutate { add_field => { "[@metadata][level]" => "%{[level]}" } }
mutate { lowercase => ["[@metadata][level]"] }
mutate { gsub => ["level", "LOG_", ""] }
mutate {
add_tag => ["local", "CoreOS-899.13.0"]
}
ruby {
code => "
begin
t = Time.iso8601(event['timestamp'])
rescue ArgumentError => e
# drop the event if format is invalid
event.cancel
return
end
event['timestamp_usec'] = t.usec % 1000
event['timestamp'] = t.utc.strftime('%FT%T.%LZ')
"
}
}
output {
stdout { codec => rubydebug }
if [type] == "apiserver_logs" {
elasticsearch {
hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
user => "admin"
password => "adminpassword"
index => "local-apiserver-logs-logstash-%{+YYYY.MM.dd}"
}
} else if [type] == "gfac_logs" {
elasticsearch {
hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
user => "admin"
password => "adminpassword"
index => "local-gfac-logs-logstash-%{+YYYY.MM.dd}"
}
} else if [type] == "orchestrator_logs" {
elasticsearch {
hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
user => "admin"
password => "adminpassword"
index => "local-orchestrator-logs-logstash-%{+YYYY.MM.dd}"
}
} else if [type] == "credentialstore_logs" {
elasticsearch {
hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
user => "admin"
password => "adminpassword"
index => "local-credentialstore-logs-logstash-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
user => "admin"
password => "adminpassword"
index => "local-airavata-logs-logstash-%{+YYYY.MM.dd}"
}
}
}
|
http://kafka.apache.org/documentation.html
...