Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14487: Move LogManager static methods/fields to storage module #19302

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ project(':storage') {
}

dependencies {
implementation project(':metadata')
implementation project(':storage:storage-api')
implementation project(':server-common')
implementation project(':clients')
Expand Down
2 changes: 2 additions & 0 deletions checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
<allow pkg="com.fasterxml.jackson" />
<allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.storage.log.metrics"/>
Expand Down
73 changes: 5 additions & 68 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.server.metadata.BrokerMetadataPublisher.info
import kafka.utils.threadsafe
import kafka.utils.{CoreUtils, Logging, Pool}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
Expand All @@ -41,7 +40,7 @@ import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsem
import java.util.{Collections, Optional, OptionalLong, Properties}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{FileLock, Scheduler}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, LogManager => JLogManager, LogOffsetsListener, ProducerStateManagerConfig, RemoteIndexCache, UnifiedLog}
import org.apache.kafka.storage.internals.checkpoint.{CleanShutdownFileHandler, OffsetCheckpointFile}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

Expand Down Expand Up @@ -78,8 +77,6 @@ class LogManager(logDirs: Seq[File],
remoteStorageSystemEnable: Boolean,
val initialTaskDelayMs: Long) extends Logging {

import LogManager._

private val metricsGroup = new KafkaMetricsGroup(this.getClass)

private val logCreationOrDeletionLock = new Object
Expand Down Expand Up @@ -125,9 +122,9 @@ class LogManager(logDirs: Seq[File],
def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet

@volatile private var recoveryPointCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile), logDirFailureChannel))).toMap
(dir, new OffsetCheckpointFile(new File(dir, JLogManager.RECOVERY_POINT_CHECKPOINT_FILE), logDirFailureChannel))).toMap
@volatile private var logStartOffsetCheckpoints = liveLogDirs.map(dir =>
(dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile), logDirFailureChannel))).toMap
(dir, new OffsetCheckpointFile(new File(dir, JLogManager.LOG_START_OFFSET_CHECKPOINT_FILE), logDirFailureChannel))).toMap

private val preferredLogDirs = new ConcurrentHashMap[TopicPartition, String]()

Expand Down Expand Up @@ -259,7 +256,7 @@ class LogManager(logDirs: Seq[File],
private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
dirs.flatMap { dir =>
try {
val lock = new FileLock(new File(dir, LockFileName))
val lock = new FileLock(new File(dir, JLogManager.LOCK_FILE_NAME))
if (!lock.tryLock())
throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParent +
". A Kafka instance in another process or thread is using this directory.")
Expand Down Expand Up @@ -678,7 +675,7 @@ class LogManager(logDirs: Seq[File],

try {
jobs.foreachEntry { (dir, dirJobs) =>
if (waitForAllToComplete(dirJobs,
if (JLogManager.waitForAllToComplete(dirJobs.toList.asJava,
e => warn(s"There was an error in one of the threads during LogManager shutdown: ${e.getCause}"))) {
val logs = logsInDir(localLogsByDir, dir)

Expand Down Expand Up @@ -1515,25 +1512,6 @@ class LogManager(logDirs: Seq[File],
}

object LogManager {
val LockFileName = ".lock"

/**
* Wait all jobs to complete
* @param jobs jobs
* @param callback this will be called to handle the exception caused by each Future#get
* @return true if all pass. Otherwise, false
*/
private[log] def waitForAllToComplete(jobs: Seq[Future[_]], callback: Throwable => Unit): Boolean = {
jobs.count(future => Try(future.get) match {
case Success(_) => false
case Failure(e) =>
callback(e)
true
}) == 0
}

val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"

def apply(config: KafkaConfig,
initialOfflineDirs: Seq[String],
Expand Down Expand Up @@ -1569,45 +1547,4 @@ object LogManager {
remoteStorageSystemEnable = config.remoteLogManagerConfig.isRemoteStorageSystemEnabled,
initialTaskDelayMs = config.logInitialTaskDelayMs)
}

/**
* Returns true if the given log should not be on the current broker
* according to the metadata image.
*
* @param brokerId The ID of the current broker.
* @param newTopicsImage The new topics image after broker has been reloaded
* @param log The log object to check
* @return true if the log should not exist on the broker, false otherwise.
*/
def isStrayKraftReplica(
brokerId: Int,
newTopicsImage: TopicsImage,
log: UnifiedLog
): Boolean = {
if (log.topicId.isEmpty) {
// Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing
// data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always
// flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as
// a stray log.
info(s"The topicId does not exist in $log, treat it as a stray log")
return true
}

val topicId = log.topicId.get
val partitionId = log.topicPartition.partition()
Option(newTopicsImage.getPartition(topicId, partitionId)) match {
case Some(partition) =>
if (!partition.replicas.contains(brokerId)) {
info(s"Found stray log dir $log: the current replica assignment ${partition.replicas.mkString("[", ", ", "]")} " +
s"does not contain the local brokerId $brokerId.")
true
} else {
false
}

case None =>
info(s"Found stray log dir $log: the topicId $topicId does not exist in the metadata image")
true
}
}
}
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.util.OptionalInt
import java.util.concurrent.CompletableFuture
import java.util.{Map => JMap}
import java.util.{Collection => JCollection}
import kafka.log.LogManager
import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.Logging
Expand All @@ -49,7 +48,7 @@ import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.server.util.{FileLock, KafkaScheduler}
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.server.util.timer.SystemTimer
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.storage.internals.log.{LogManager, UnifiedLog}

import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
Expand All @@ -63,7 +62,7 @@ object KafkaRaftManager {
}

private def lockDataDir(dataDir: File): FileLock = {
val lock = new FileLock(new File(dataDir, LogManager.LockFileName))
val lock = new FileLock(new File(dataDir, LogManager.LOCK_FILE_NAME))

if (!lock.tryLock()) {
throw new KafkaException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.fault.FaultHandler
import org.apache.kafka.storage.internals.log.{LogManager => JLogManager}

import java.util.concurrent.CompletableFuture
import scala.collection.mutable
Expand Down Expand Up @@ -299,7 +300,7 @@ class BrokerMetadataPublisher(
// recovery-from-unclean-shutdown if required.
logManager.startup(
metadataCache.getAllTopics().asScala,
isStray = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
isStray = log => JLogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
)

// Rename all future replicas which are in the same directory as the
Expand Down
Loading