Skip to content

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: segmentio/topicctl
Failed to load repositories. Confirm that selected base ref is valid, then try again.
base: b53d3c7b6e22c35a74f23198a7a5a4345ef33c26
Choose a base ref
head repository: segmentio/topicctl
Failed to load repositories. Confirm that selected head ref is valid, then try again.
compare: ac6a37f57e3722b77f4b3b8a49ed475784cac2b2
Choose a head ref
Showing with 73 additions and 232 deletions.
  1. +4 −40
  2. +69 −136 cmd/topicctl/subcmd/reset.go
  3. +0 −22 pkg/cli/cli.go
  4. +0 −34 pkg/groups/groups.go
44 changes: 4 additions & 40 deletions
Original file line number Diff line number Diff line change
@@ -77,7 +77,7 @@ docker-compose up -d
3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics):

topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml
topicctl apply --skip-confirm examples/local-cluster/topics/*yaml

4. Send some test messages to the `topic-default` topic:
@@ -225,49 +225,13 @@ subcommands interactively.
topicctl reset-offsets [topic] [group] [flags]

The `reset-offsets` subcommand allows resetting the offsets
for a consumer group in a topic.
There are a few typical approaches for setting the offsets:
The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets:

1. Use `--partitions` and combine it with one of the offset operators:
`--delete`, `--offset`, `--to-earliest` or `--to-latest`.
2. Use `--partition-offset-map` to pass specific offsets per partition.
For example, `1=5,2=10` means that the consumer group offset
for partition 1 must be set to 5, and partition 2 to offset 10.
This is mainly used for replays of specific traffic,
such as when a deploy has mishandled or corrupted state,
and the prior release must be rerun
starting at a specific offset per partition.
This is the most flexible approach for offset setting.
1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration.

Note that `--partition-offset-map` flag is standalone
and cannot be coupled with other flags.
2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags.

##### Partition selection flags

At most one of the following may be selected:

* `--partitions` specifies a comma-separated list of partitions IDs.

If none of these are specified,
the command defaults to selecting ALL of the partitions.

##### Offset selection flags

At most one of the following may be selected:

* `--delete` removes stored group offsets.
This will generally have the same effect as `--to-earliest` or `--to-latest`,
depending on the consumer group configuration.
However, `--delete` is more reliable and convenient,
since `--to-earliest` in particular involves a race with message retention
that may require numerous attempts.
* `--offset` indicates the specific value that all selected
consumer group partitions will be set to.
* `--to-earliest` resets group offsets to oldest still-retained per partition.
* `--to-latest` resets group offsets to newest per partitions.

If none of these are specified, `--to-earliest` will be the default.

#### tail

205 changes: 69 additions & 136 deletions cmd/topicctl/subcmd/reset.go
Original file line number Diff line number Diff line change
@@ -6,19 +6,17 @@ import (

log ""

log ""

var resetOffsetsCmd = &cobra.Command{
Use: "reset-offsets <topic-name> <group-name>",
Use: "reset-offsets [topic name] [group name]",
Short: "reset consumer group offsets",
Args: cobra.ExactArgs(2),
Args: cobra.MinimumNArgs(2),
PreRunE: resetOffsetsPreRun,
RunE: resetOffsetsRun,
@@ -29,7 +27,6 @@ type resetOffsetsCmdConfig struct {
partitionOffsetMap map[string]int64
toEarliest bool
toLatest bool
delete bool

shared sharedOptions
@@ -65,185 +62,121 @@ func init() {
"Resets offsets of consumer group members to latest offsets of partitions")
"Deletes offsets for the given consumer group")

addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared)

func resetOffsetsPreRun(cmd *cobra.Command, args []string) error {
resetOffsetSpec := "You must choose only one of the following " +
"reset-offset specifications: --delete, --to-earliest, --to-latest, " +
"--offset, or --partition-offset-map."
offsetMapSpec := "--partition-offset-map option cannot be used with --partitions."

cfg := resetOffsetsConfig

numOffsetSpecs := numTrue(
len(cfg.partitionOffsetMap) > 0,
resetOffsetSpecification := "You must choose only one of the following reset-offset specifications: --to-earliest, --to-latest, --offset."
offsetMapSpecification := "--partition-offset-map option cannot be coupled with any of the following options: --partitions, --to-earliest, --to-latest, --offset."

if numOffsetSpecs > 1 {
return errors.New(resetOffsetSpec)
if len(resetOffsetsConfig.partitionOffsetMap) > 0 && (cmd.Flags().Changed("offset") ||
len(resetOffsetsConfig.partitions) > 0 ||
resetOffsetsConfig.toEarliest ||
resetOffsetsConfig.toLatest) {
return errors.New(offsetMapSpecification)

if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 {
return errors.New(offsetMapSpec)
} else if resetOffsetsConfig.toEarliest && resetOffsetsConfig.toLatest {
return errors.New(resetOffsetSpecification)

return cfg.shared.validate()
} else if cmd.Flags().Changed("offset") && (resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest) {
return errors.New(resetOffsetSpecification)
return resetOffsetsConfig.shared.validate()

func resetOffsetsRun(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cfg := resetOffsetsConfig

adminClient, err := cfg.shared.getAdminClient(ctx, nil, true)
adminClient, err := resetOffsetsConfig.shared.getAdminClient(ctx, nil, true)
if err != nil {
return err

defer adminClient.Close()

connector := adminClient.GetConnector()

topic := args[0]
group := args[1]

topicInfo, err := adminClient.GetTopic(ctx, topic, false)
if err != nil {
return err

partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions))
partitionIDsMap := map[int]struct{}{}
for _, partitionInfo := range topicInfo.Partitions {
partitionIDsMap[partitionInfo.ID] = struct{}{}

var strategy string

switch {
case resetOffsetsConfig.toLatest:
strategy = groups.LatestResetOffsetsStrategy
case resetOffsetsConfig.toEarliest:
strategy = groups.EarliestResetOffsetsStrategy
var resetOffsetsStrategy string
if resetOffsetsConfig.toLatest {
resetOffsetsStrategy = groups.LatestResetOffsetsStrategy
} else if resetOffsetsConfig.toEarliest {
resetOffsetsStrategy = groups.EarliestResetOffsetsStrategy
partitionOffsets := map[int]int64{}

// If explicit per-partition offsets were specified, set them now.
partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap)
if err != nil {
return err
if len(resetOffsetsConfig.partitionOffsetMap) > 0 {
for partition, offset := range resetOffsetsConfig.partitionOffsetMap {
var partitionID int
if partitionID, err = strconv.Atoi(partition); err != nil {
return fmt.Errorf("Partition value %s must be a number", partition)
if _, ok := partitionIDsMap[partitionID]; !ok {
return fmt.Errorf("Partition %d not found in topic %s", partitionID, topic)

// Set explicit partitions (without offsets) if specified,
// otherwise operate on fetched partition info;
// these will only take effect of per-partition offsets were not specified.
partitions := cfg.partitions
if len(partitions) == 0 && len(partitionOffsets) == 0 {
convert := func(info admin.PartitionInfo) int { return info.ID }
partitions = convertSlice(topicInfo.Partitions, convert)
partitionOffsets[partitionID] = offset

for _, partition := range partitions {
_, ok := partitionIDsMap[partition]
if !ok {
format := "Partition %d not found in topic %s"
return fmt.Errorf(format, partition, topic)

if strategy == "" {
partitionOffsets[partition] = cfg.offset
return nil
} else if len(resetOffsetsConfig.partitions) > 0 {
for _, partition := range resetOffsetsConfig.partitions {
if _, ok := partitionIDsMap[partition]; !ok {
return fmt.Errorf("Partition %d not found in topic %s", partition, topic)
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partition], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partition)
if err != nil {
return err
} else {
partitionOffsets[partition] = resetOffsetsConfig.offset

offset, err := groups.GetEarliestOrLatestOffset(ctx, connector, topic, strategy, partition)
if err != nil {
return err

partitionOffsets[partition] = offset
} else {
for _, partitionInfo := range topicInfo.Partitions {
if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest {
partitionOffsets[partitionInfo.ID], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partitionInfo.ID)
if err != nil {
return err
} else {
partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset

"This will reset the offsets for the following partitions "+
"in topic %s for group %s:\n%s",
"This will reset the offsets for the following partitions in topic %s for group %s:\n%s",

log.Info("Please ensure that all other consumers are stopped, " +
"otherwise the reset might be overridden.")
"Please ensure that all other consumers are stopped, otherwise the reset might be overridden.",

ok, _ := util.Confirm("OK to continue?", false)
if !ok {
return errors.New("Stopping because of user response")

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)

if resetOffsetsConfig.delete {
input := groups.DeleteOffsetsInput{
GroupID: group,
Topic: topic,
Partitions: partitions,

return cliRunner.DeleteOffsets(ctx, &input)

return cliRunner.ResetOffsets(ctx, topic, group, partitionOffsets)

func numTrue(bools ...bool) int {
var n int
for _, b := range bools {
if b {

return n

func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 {
out := make([]T2, len(input))

for i, v := range input {
out[i] = fn(v)

return out

func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) {
out := make(map[int]int64, len(input))

for partition, offset := range input {
partitionID, err := strconv.Atoi(partition)
if err != nil {
format := "Partition value %s must be an integer"
return nil, fmt.Errorf(format, partition)

_, ok := partitionIDsMap[partitionID]
if !ok {
format := "Partition %d not found"
return nil, fmt.Errorf(format, partitionID)

out[partitionID] = offset

return out, nil
return cliRunner.ResetOffsets(
22 changes: 0 additions & 22 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
@@ -594,11 +594,6 @@ func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error {
return nil

// DeleteOffsets removes offsets for a single consumer group / topic combination.
func (c *CLIRunner) DeleteOffsets(ctx context.Context, input *groups.DeleteOffsetsInput) error {
return invoke(ctx, c, input, groups.DeleteOffsets)

// ResetOffsets resets the offsets for a single consumer group / topic combination.
func (c *CLIRunner) ResetOffsets(
ctx context.Context,
@@ -654,7 +649,6 @@ func (c *CLIRunner) Tail(

stats, err := tailer.LogMessages(ctx, maxMessages, filterRegexp, raw, headers)
filtered := filterRegexp != ""

@@ -695,22 +689,6 @@ func (c *CLIRunner) stopSpinner() {

type invokeFunc[T any] func(context.Context, *admin.Connector, T) error

func invoke[T any](ctx context.Context, c *CLIRunner, v T, fn invokeFunc[T]) error {

err := fn(ctx, c.adminClient.GetConnector(), v)
if err != nil {
return err


return nil

func stringsToInts(strs []string) ([]int, error) {
ints := []int{}
