From fcf85ac87481f28af1f518e73b987380ddedf897 Mon Sep 17 00:00:00 2001 From: itechdima <61321708+itechdima@users.noreply.github.com> Date: Fri, 5 May 2023 19:20:35 +0200 Subject: [PATCH] Not omit error in logger (#1096) Add explicit errors as arguments so anyone can handle them in the logger implementation. --- reader.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/reader.go b/reader.go index 13a336e51..1acb676e9 100644 --- a/reader.go +++ b/reader.go @@ -238,7 +238,7 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) { commit := func() { if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil { - r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) }) + r.withErrorLogger(func(l Logger) { l.Printf("%v", err) }) } else { offsets.reset() } @@ -311,7 +311,7 @@ func (r *Reader) run(cg *ConsumerGroup) { } r.stats.errors.observe(1) r.withErrorLogger(func(l Logger) { - l.Printf(err.Error()) + l.Printf("%v", err) }) // Continue with next attempt... } @@ -1346,7 +1346,7 @@ func (r *reader) run(ctx context.Context, offset int64) { case errors.Is(err, UnknownTopicOrPartition): r.withErrorLogger(func(log Logger) { - log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers) + log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err) }) conn.Close() @@ -1358,7 +1358,7 @@ func (r *reader) run(ctx context.Context, offset int64) { case errors.Is(err, NotLeaderForPartition): r.withErrorLogger(func(log Logger) { - log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset)) + log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err) }) conn.Close() @@ -1372,7 +1372,7 @@ func (r *reader) run(ctx context.Context, offset int64) { // Timeout on the kafka side, this can be safely retried. errcount = 0 r.withLogger(func(log Logger) { - log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset)) + log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err) }) r.stats.timeouts.observe(1) continue