Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Otherwise we pick the first role             => <kafka_topic_prefix>_<first role>_logs ex: staging_gfac_logs (kafka.topic.prefix = staging)

How to use the log messages from Kafka topics

We have tried using the log messages and push them to elastic search cluster and view them kibana. To learn about how to setup elastic search and logstash please refer Related articles section. We have used below logstash configuration to read airavata log messages and push them 

to elastic search.

Code Block
input {
  kafka {
    topic_id => "local_all_logs"
    zk_connect => "127.0.0.1:2181"
    auto_offset_reset => "smallest"
    type => "all_logs"
  }
  kafka {
    topic_id => "local_apiserver_logs"
    zk_connect => "127.0.0.1:2181"
    auto_offset_reset => "smallest"
    type => "apiserver_logs"
  }
  kafka {
    topic_id => "local_gfac_logs"
    zk_connect => "127.0.0.1:2181"
    auto_offset_reset => "smallest"
    type => "gfac_logs"
  }
  kafka {
    topic_id => "local_orchestrator_logs"
    zk_connect => "127.0.0.1:2181"
    auto_offset_reset => "smallest"
    type => "orchestrator_logs"
  }
  kafka {
    topic_id => "local_credentialstore_logs"
    zk_connect => "127.0.0.1:2181"
    auto_offset_reset => "smallest"
    type => "credentialstore_logs"
  }
}

filter {
  mutate { add_field => { "[@metadata][level]" => "%{[level]}" } }
  mutate { lowercase => ["[@metadata][level]"] }
  mutate { gsub => ["level", "LOG_", ""] }
  mutate {
    add_tag => ["local", "CoreOS-899.13.0"]
  }
  ruby {
    code => "
    begin
    t = Time.iso8601(event['timestamp'])
    rescue ArgumentError => e
    # drop the event if format is invalid
    event.cancel
    return
    end
    event['timestamp_usec'] = t.usec % 1000
    event['timestamp'] = t.utc.strftime('%FT%T.%LZ')
    "
  }
}

output {
  stdout { codec => rubydebug }
  if [type] == "apiserver_logs" {
      elasticsearch {
        hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
 	user => "admin"
	password => "adminpassword"
        index => "local-apiserver-logs-logstash-%{+YYYY.MM.dd}"
    }
  } else if [type] == "gfac_logs" {
      elasticsearch {
        hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
	user => "admin"
	password => "adminpassword"
        index => "local-gfac-logs-logstash-%{+YYYY.MM.dd}"
    }
  } else if [type] == "orchestrator_logs" {
      elasticsearch {
        hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
	user => "admin"
	password => "adminpassword"
        index => "local-orchestrator-logs-logstash-%{+YYYY.MM.dd}"
    }
  } else if [type] == "credentialstore_logs" {
      elasticsearch {
        hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
	user => "admin"
	password => "adminpassword"
        index => "local-credentialstore-logs-logstash-%{+YYYY.MM.dd}"
    }
  } else {
  elasticsearch {
    hosts => ["elasticsearch.us-east-1.aws.found.io:9200"]
    user => "admin"
    password => "adminpassword"
    index => "local-airavata-logs-logstash-%{+YYYY.MM.dd}"
  }
}
}

http://kafka.apache.org/documentation.html

...