Apache Ranger by design provides configurable audits destinations. One of destinations is slf4j logging interface, which means audits can be streamed into any logging framework that is bound to slf4j. With this feature, Ranger can support streaming audits into popular messaging bus e.g. Kafka to provide realtime data feeds for various monitoring system.
Enabling audits logging is as simple as adding some configurations in logging properties files and adding a few kafka libraries.
Here is an example of configuring Hive logging properties file to enable sending audit events to Kafka messaging bus.
# # kafka Appender # ### for Ranger 0.4.0 log4j.logger.com.xasecure.audit.provider.Log4jAuditProvider=INFO,KAFKA_HIVE_AUDIT ### for Ranger 0.4 up ### log4j.logger.xaaudit.com.xasecure.audit.provider.Log4jAuditProvider=INFO,KAFKA_HIVE_AUDIT log4j.appender.KAFKA_HIVE_AUDIT=kafka.producer.KafkaLog4jAppender log4j.appender.KAFKA_HIVE_AUDIT.BrokerList=sandbox.hortonworks.com:6667 log4j.appender.KAFKA_HIVE_AUDIT.Topic=hive_audit_log log4j.appender.KAFKA_HIVE_AUDIT.layout=org.apache.log4j.PatternLayout log4j.appender.KAFKA_HIVE_AUDIT.layout.ConversionPattern=%d{ISO8601} %-5p [%t]: %c{2} (%F:%M(%L)) - %m%n log4j.appender.KAFKA_HIVE_AUDIT.ProducerType=async
For Kafka 0.8.1.x, the following jar files should be put into $HIVE_DIR/lib
kafka_2.10-0.8.1.2.2.4.2-2.jar scala-library-2.10.4.jar metrics-core-2.2.0.jar
In consumer side, some deserializer should be there to parse normalized audit records
public class AuditLogJsonDeserializer extends JsonDeserializer<HiveAuditLogDataModel> { private static Logger LOG = LoggerFactory.getLogger(AuditLogJsonDeserializer.class); @Override public HiveAuditLogDataModel deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { HiveAuditLogDataModel model = new HiveAuditLogDataModel(); JsonNode node = jp.getCodec().readTree(jp); String resource = node.get("resource").asText(); // split resource to database, table, and column String[] tmp = resource.split("/"); if(tmp.length >= 1){ model.db = tmp[0]; if(tmp.length >= 3){ model.table = tmp[1]; model.column = tmp[2]; } } model.action = node.get("action").asText(); model.clientIP = node.get("cliIP").asText(); SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z"); try{ model.timestamp = formatter.parse(node.get("evtTime").asText()).getTime(); }catch(Exception ex){ LOG.error("fail converting evtTime in hive audit log", ex); } model.user = node.get("reqUser").asText(); return model; } }