Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
titleConfiguration propertyObservers usage
linenumberstrue
private def sendResponse(request: RequestChannel.Request,
                         responseOpt: Option[AbstractResponse],
                         onComplete: Option[Send => Unit]): Unit = {
  // Update error metrics for each error code in the response including Errors.NONE
  responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))

  val response = responseOpt match {
    case Some(response) =>
      val responseSend = request.context.buildResponse(response)
	  
      // Convert RequestChannel.Request and AbstractResponse into their adapter instances which implements the request/response info interface
      val requestAdapter = new RequestAdapter(request)
      val responseAdapter = new ResponseAdapter(response)
	  // Invoke the record method on each observer on each request and response info instance. "observers" is a list of observers and each of
      // them represents one user-provided implementation of the observer interface.
      for (observer <- observers) {
        try {
          (observer: Observer).record(requestAdapter, responseAdapter)
        } catch {
          case e: Throwable => error(s"Observer failed to record $request and $response", e)
        }
      }

      val responseString =
        if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
        else None
      new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
    case None =>
      new RequestChannel.NoOpResponse(request)
  }
  sendResponse(response)
}

...