Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Using admin.fetchTopicMetadata to monitor under replicated partitions between brokers restarts #1741

Open
tomerguttman opened this issue Feb 23, 2025 · 0 comments

Comments

@tomerguttman
Copy link

tomerguttman commented Feb 23, 2025

Hey there, recently we began discussing improvements to our rolling-restart process for brokers and quickly turned to KafkaJS to explore its potential for monitoring under-replicated partitions during broker restarts.

Our approach focuses on checking two key conditions for each topic partition after we restart one of the brokers:

  1. If the number of current in-sync replicas (isr.length) for a partition is less than the configured minimum (min.insync.replicas), it indicates an under-replicated partition
  2. If a partition has no leader (partition.leader < 0), it is also considered problematic

Sharing a short snippet to give a bit of context, not the final code, but helps get the idea... specifically referring to the areAllInSync function, also attached the functions it uses.

  async fetchTopicMetadata(): Promise<{ topics: KafkaJS.ITopicMetadata[] }> {
    return this.admin.fetchTopicMetadata();
  }

  configEntriesToMap(configEntries: KafkaJS.ConfigEntries[]): Map<string, string> {
    const configMap = new Map<string, string>();

    configEntries.forEach((config) => configMap.set(config.configName, config.configValue));

    return configMap;
  }

  async describeConfigs(topicMetadata: {
    topics: KafkaJS.ITopicMetadata[];
  }): Promise<Map<string, Map<string, string>>> {
    const topicConfigurationsByName = new Map<string, Map<string, string>>();
    const resources = topicMetadata.topics.map((topic: KafkaJS.ITopicMetadata) => ({
      type: Constants.Types.Topic,
      configName: [Constants.MinInSyncReplicas],
      name: topic.name,
    }));

    const rawConfigurations = await this.admin.describeConfigs({ resources, includeSynonyms: false });

    // Set the configurations by topic name for easier access
    rawConfigurations.resources.forEach((resource) =>
      topicConfigurationsByName.set(resource.resourceName, this.configEntriesToMap(resource.configEntries))
    );

    return topicConfigurationsByName;
  }

  async areAllInSync(): Promise<boolean> {
    const topicMetadata = await this.fetchTopicMetadata();
    const topicConfigurations = await this.describeConfigs(topicMetadata);

    // Flatten the replication metadata extracted from each partition of every topic into a single array
    const validationResults = topicMetadata.topics.flatMap((topic: KafkaJS.ITopicMetadata) =>
      topic.partitions.map((partition: PartitionMetadata) =>
        this.extractReplicationMetadata(topic.name, partition, topicConfigurations)
      )
    );

    const problematicPartitions = validationResults.filter((partition) => partition.isProblematic);
  ...
}
extractReplicationMetadata(
    topicName: string,
    partition: PartitionMetadata,
    topicConfigurations: Map<string, Map<string, string>>
  ): {
    topicName: string;
    partitionMetadata: PartitionMetadata;
    isProblematic: boolean;
  } {
    const minISR = topicConfigurations.get(topicName).get(Constants.MinInSyncReplicas);

    return {
      topicName,
      partitionMetadata: partition,
      isProblematic: partition.isr.length < parseInt(minISR) || partition.leader < 0,
    };
  }

I’d appreciate any feedback that could help validate whether our logic for identifying problematic partitions between brokers restarts is correct, which currently relies on the condition partition.isr.length < parseInt(minISR) || partition.leader < 0.

@tulios @Nevon

Thanks in advance! 😃

@tomerguttman tomerguttman changed the title [Question] Using admin.fetchTopicMetadata to monitor under replicated partitions [Question] Using admin.fetchTopicMetadata to monitor under replicated partitions between brokers restarts Feb 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant