Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Configuration - add one configuration item "log.preallocate",  parse it in KafkaConfig.scala, and transfer to KafkaServer.scala, LogConfig.scala.
  2. In Log.scala 
    1.  add one function initFileSize().  if the log preallocation is enabled, the value is config.segmentSize - 2 * config.maxMessageSize,  else the value is 0.
    2.  pass the initFileSize and config.preallocate to LogSegment when need create one new LogSegment.
    3.  When need roll to a new file, trim log file of active segment
  3. In LogSegment.scala 
    1. add 3 parameters to constructor and pass the parameters (fileAlreadyExists, initFileSize and preallocation) to FileMessageSet.     
  4. In FileMessageSet.scala 
    1.  add one function trim().  Call it when close or roll a new LogSegment.
    2. Move openChannel from CoreUtil.scala and add three more parameter "fileAlreadyExists", "initFileSize" and "preallocate".
    3. Add one constructor function with three more parameters  "fileAlreadyExists", "initFileSize" and "preallocate".
      1. if  file is one new file (fileAlreadyExists = false)  and preallocate is true,  end will be set to 0,  and position will be set to 0
      2. Else the end will be set to Int,MaxValue and position will be set to file size
    4. Change calculation of the position after create or open file
      1. The code changed to   "" channel.position(math.min(channel.size().toInt, end)) ""
      2. if  file is one new file (fileAlreadyExists = false)  and preallocate is true,  end will be set to 0,  position will be set to 0
      3. Else the end will be set to Int,MaxValue and position will be set to file size
  5. In CoreUtils.scala
    1. Move openChallel to FileMessageSet.scala

...

  1. If do clear shutdown,  the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(),
  2. If crash, then when restart,  will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file)
  3. When service start and open existing file
    1. Will run the LogSegment constructor  with parameter "fileAlreadyExists" as true
    2. Then in FileMessageSet,  the "end" in FileMessageSet will be Int.MaxValue,   and then "channel.position(math.min(channel.size().toInt, end))"  will make the position be end of the file,
    3. If recover needed, the recover function will truncate file to end of valid data, and also move the position to file end,
  4. When service running and need create new NEW log segment and new FileMessageSet
    1. If preallocate = true
      1. the "end" in FileMessageSet will be 0,  the file size will be "initFileSize", and then "channel.position(math.min(channel.size().toInt, end))"  will make the position be 0,
    2. Else if preallocate = false
      1. backward compatible, the "end" in FileMessageSet will be Int.MaxValue, the file size will be "0",  and then "channel.position(math.min(channel.size().toInt, end))"  will make the position be 0,

...