Skip to content

Commit

Permalink
Merge pull request #68 from heroku/remove_front_buff
Browse files Browse the repository at this point in the history
Remove front buff
  • Loading branch information
Edward Muller committed Apr 27, 2016
2 parents 48c983b + a6c6c2b commit 6a79a10
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 166 deletions.
1 change: 0 additions & 1 deletion batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Batcher struct {
// NewBatcher created an empty Batcher for the provided shuttle
func NewBatcher(s *Shuttle) Batcher {
return Batcher{
inLogs: s.LogLines,
drops: s.Drops,
outBatches: s.Batches,
timeout: s.config.WaitDuration,
Expand Down
37 changes: 0 additions & 37 deletions batcher_test.go

This file was deleted.

21 changes: 16 additions & 5 deletions cmd/log-shuttle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ func parseFlags(c shuttle.Config) (shuttle.Config, error) {
flag.DurationVar(&c.Timeout, "timeout", c.Timeout, "Duration to wait for a response from logs-url.")

flag.IntVar(&c.MaxAttempts, "max-attempts", c.MaxAttempts, "Max number of retries.")
flag.IntVar(&c.NumBatchers, "num-batchers", c.NumBatchers, "The number of batchers to run.")
var b int
flag.IntVar(&b, "num-batchers", b, "[NO EFFECT/REMOVED] The number of batchers to run.")
flag.IntVar(&c.NumOutlets, "num-outlets", c.NumOutlets, "The number of outlets to run.")
flag.IntVar(&c.BatchSize, "batch-size", c.BatchSize, "Number of messages to pack into an application/logplex-1 http request.")
flag.IntVar(&c.FrontBuff, "front-buff", c.FrontBuff, "Number of messages to buffer in log-shuttle's input channel.")
var f int
flag.IntVar(&f, "front-buff", f, "[NO EFFECT/REMOVED] Number of messages to buffer in log-shuttle's input channel.")
flag.IntVar(&c.BackBuff, "back-buff", c.BackBuff, "Number of batches to buffer before dropping.")
flag.IntVar(&c.MaxLineLength, "max-line-length", c.MaxLineLength, "Number of bytes that the backend allows per line.")
flag.IntVar(&c.KinesisShards, "kinesis-shards", c.KinesisShards, "Number of unique partition keys to use per app.")
Expand All @@ -120,6 +122,14 @@ func parseFlags(c shuttle.Config) (shuttle.Config, error) {
os.Exit(0)
}

if f != 0 {
log.Println("Warning: Use of -front-buff is no longer supported. The flag has no effect and will be removed in the future.")
}

if b != 0 {
log.Println("Warning: Use of -num-batchers is no longer supported. The flag has no effect and will be removed in the future.")
}

if statsAddr != "" {
log.Println("Warning: Use of -stats-addr is deprecated and will be dropped in the future.")
}
Expand Down Expand Up @@ -245,13 +255,14 @@ func main() {
s.ErrLogger = errLogger
}

s.LoadReader(os.Stdin)

s.Launch()

go LogFmtMetricsEmitter(s.MetricsRegistry, config.StatsSource, config.StatsInterval, s.Logger)

// Blocks until os.Stdin errors
s.ReadLogLines(os.Stdin)
os.Stdin.Close()
// blocks until the readers all exit
s.WaitForReadersToFinish()

// Shutdown the shuttle.
s.Land()
Expand Down
6 changes: 0 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const (
const (
DefaultMaxLineLength = 10000 // Logplex max is 10000 bytes, so default to that
DefaultInputFormat = InputFormatRaw
DefaultFrontBuff = 1000
DefaultBackBuff = 50
DefaultTimeout = 5 * time.Second
DefaultWaitDuration = 250 * time.Millisecond
Expand All @@ -34,7 +33,6 @@ const (
DefaultHostname = "shuttle"
DefaultMsgID = "- -"
DefaultLogsURL = ""
DefaultNumBatchers = 2
DefaultNumOutlets = 4
DefaultBatchSize = 500
DefaultID = ""
Expand Down Expand Up @@ -65,9 +63,7 @@ type errData struct {
type Config struct {
MaxLineLength int
BackBuff int
FrontBuff int
BatchSize int
NumBatchers int
NumOutlets int
InputFormat int
MaxAttempts int
Expand Down Expand Up @@ -114,11 +110,9 @@ func NewConfig() Config {
StatsInterval: time.Duration(DefaultStatsInterval),
MaxAttempts: DefaultMaxAttempts,
InputFormat: DefaultInputFormat,
NumBatchers: DefaultNumBatchers,
NumOutlets: DefaultNumOutlets,
WaitDuration: time.Duration(DefaultWaitDuration),
BatchSize: DefaultBatchSize,
FrontBuff: DefaultFrontBuff,
BackBuff: DefaultBackBuff,
Timeout: time.Duration(DefaultTimeout),
ID: DefaultID,
Expand Down
2 changes: 1 addition & 1 deletion gzip_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestGzipFormatter(t *testing.T) {
gr := NewGzipFormatter(f)

if gr.MsgCount() != 1 {
t.Fatal(gr.MsgCount)
t.Fatal(gr.MsgCount())
}

// read the compressed bytes
Expand Down
46 changes: 23 additions & 23 deletions log_line_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"io"
"sync"
"testing"

"github.com/rcrowley/go-metrics"
)

const (
Expand All @@ -24,9 +22,7 @@ type InputProducer struct {
}

func NewInputProducer(c int) *InputProducer {
curr := 0
tb := 0
return &InputProducer{Total: c, Curr: curr, TotalBytes: tb, Data: TestData}
return &InputProducer{Total: c, Data: TestData}
}

func (llp *InputProducer) Read(p []byte) (n int, err error) {
Expand All @@ -43,10 +39,10 @@ func (llp InputProducer) Close() error {
}

type TestConsumer struct {
*sync.WaitGroup
sync.WaitGroup
}

func (tc TestConsumer) Consume(in <-chan LogLine) {
func (tc *TestConsumer) Consume(in <-chan Batch) {
tc.Add(1)
go func() {
defer tc.Done()
Expand All @@ -55,43 +51,47 @@ func (tc TestConsumer) Consume(in <-chan LogLine) {
}()
}

func doBasicLogLineReaderBenchmark(b *testing.B, frontBuffSize int) {
func doBasicLogLineReaderBenchmark(b *testing.B, backBuffSize int) {
b.ResetTimer()
var tb int
var tc TestConsumer
for i := 0; i < b.N; i++ {
b.StopTimer()
logs := make(chan LogLine, frontBuffSize)
rdr := NewLogLineReader(logs, metrics.NewRegistry())
testConsumer := TestConsumer{new(sync.WaitGroup)}
testConsumer.Consume(logs)
batches := make(chan Batch, backBuffSize)
tc.Consume(batches)
s := NewShuttle(NewConfig())
llp := NewInputProducer(TestProducerLines)
rdr := NewLogLineReader(llp, s)
b.StartTimer()
rdr.ReadLogLines(llp)
b.SetBytes(int64(llp.TotalBytes))
close(logs)
testConsumer.Wait()

rdr.ReadLines()
tb += llp.TotalBytes
close(batches)
tc.Wait()
}
b.SetBytes(int64(tb / b.N))
}

func BenchmarkLogLineReaderWithFrontBuffEqual0(b *testing.B) {
func BenchmarkLogLineReaderWithBackBuffEqual0(b *testing.B) {
doBasicLogLineReaderBenchmark(b, 0)
}

func BenchmarkLogLineReaderWithFrontBuffEqual1(b *testing.B) {
func BenchmarkLogLineReaderWithBackBuffEqual1(b *testing.B) {
doBasicLogLineReaderBenchmark(b, 1)
}

func BenchmarkLogLineReaderWithFrontBuffEqual100(b *testing.B) {
func BenchmarkLogLineReaderWithBackBuffEqual100(b *testing.B) {
doBasicLogLineReaderBenchmark(b, 100)
}

func BenchmarkLogLineReaderWithFrontBuffEqual1000(b *testing.B) {
func BenchmarkLogLineReaderWithBackBuffEqual1000(b *testing.B) {
doBasicLogLineReaderBenchmark(b, 1000)
}

func BenchmarkLogLineReaderWithFrontBuffEqual10000(b *testing.B) {
func BenchmarkLogLineReaderWithBackBuffEqual10000(b *testing.B) {
doBasicLogLineReaderBenchmark(b, 10000)
}

func BenchmarkLogLineReaderWithDefaultFrontBuff(b *testing.B) {
doBasicLogLineReaderBenchmark(b, DefaultFrontBuff)
func BenchmarkLogLineReaderWithDefaultBackBuff(b *testing.B) {
doBasicLogLineReaderBenchmark(b, DefaultBackBuff)
}
112 changes: 97 additions & 15 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package shuttle
import (
"bufio"
"io"
"sync"
"time"

"github.com/rcrowley/go-metrics"
Expand All @@ -11,39 +12,120 @@ import (
// LogLineReader performs the reading of lines from an io.ReadCloser, encapsulating
// lines into a LogLine and emitting them on outbox
type LogLineReader struct {
outbox chan<- LogLine
linesRead metrics.Counter
input io.ReadCloser // The input to read from
out chan<- Batch // Where to send batches
close chan struct{}
batchSize int // size of new batches
timeOut time.Duration // batch timeout
timer *time.Timer // timer to actually enforce timeout
drops *Counter
drop bool // Should we drop or block

linesRead metrics.Counter
linesBatchedCount metrics.Counter
linesDroppedCount metrics.Counter
batchFillTime metrics.Timer

mu sync.Mutex // protects access to below
b Batch
}

// NewLogLineReader constructs a new reader with it's own Outbox.
func NewLogLineReader(o chan<- LogLine, m metrics.Registry) LogLineReader {
return LogLineReader{
outbox: o,
linesRead: metrics.GetOrRegisterCounter("lines.read", m),
func NewLogLineReader(input io.ReadCloser, s *Shuttle) *LogLineReader {
t := time.NewTimer(time.Second)
t.Stop() // we only need a timer running when we actually have log lines in the batch

ll := LogLineReader{
input: input,
out: s.Batches,
close: make(chan struct{}),
batchSize: s.config.BatchSize,
timeOut: s.config.WaitDuration,
timer: t,
drops: s.Drops,
drop: s.config.Drop,

linesRead: metrics.GetOrRegisterCounter("lines.read", s.MetricsRegistry),
linesBatchedCount: metrics.GetOrRegisterCounter("lines.batched", s.MetricsRegistry),
linesDroppedCount: metrics.GetOrRegisterCounter("lines.dropped", s.MetricsRegistry),
batchFillTime: metrics.GetOrRegisterTimer("batch.fill", s.MetricsRegistry),

b: NewBatch(s.config.BatchSize),
}

go ll.expireBatches()

return &ll
}

// ReadLogLines reads lines from the Reader and returns with an error if there
// is an error
func (rdr LogLineReader) ReadLogLines(input io.Reader) error {
rdrIo := bufio.NewReader(input)
func (rdr *LogLineReader) expireBatches() {
for {
select {
case <-rdr.close:
return

case <-rdr.timer.C:
rdr.mu.Lock()
rdr.deliverOrDropCurrent()
rdr.mu.Unlock()
}
}
}

//Close the reader for input
func (rdr *LogLineReader) Close() error {
return rdr.input.Close()
}

// ReadLines from the input created for. Return any errors
// blocks until the underlying reader is closed
func (rdr *LogLineReader) ReadLines() error {
rdrIo := bufio.NewReader(rdr.input)

for {
line, err := rdrIo.ReadBytes('\n')

if len(line) > 0 {
currentLogTime := time.Now()
rdr.Enqueue(LogLine{line, currentLogTime})
rdr.linesRead.Inc(1)
rdr.mu.Lock()
if full := rdr.b.Add(LogLine{line, currentLogTime}); full {
rdr.deliverOrDropCurrent()
}
if rdr.b.MsgCount() == 1 { // First line so restart the timer
rdr.timer.Reset(rdr.timeOut)
}
rdr.mu.Unlock()
}

if err != nil {
rdr.mu.Lock()
rdr.deliverOrDropCurrent()
rdr.mu.Unlock()
close(rdr.close)
return err
}
}
}

// Enqueue a single log line and increment the line counters
func (rdr LogLineReader) Enqueue(ll LogLine) {
rdr.outbox <- ll
rdr.linesRead.Inc(1)
// Should only be called when rdr.mu is held
func (rdr *LogLineReader) deliverOrDropCurrent() {
rdr.timer.Stop()
// There is the possibility of a new batch being expired while this is happening.
// so guard against queueing up an empty batch
if c := rdr.b.MsgCount(); c > 0 {
if rdr.drop {
select {
case rdr.out <- rdr.b:
rdr.linesBatchedCount.Inc(int64(c))
default:
rdr.linesDroppedCount.Inc(int64(c))
rdr.drops.Add(c)
}
} else {
rdr.out <- rdr.b
rdr.linesBatchedCount.Inc(int64(c))
}
rdr.b = NewBatch(rdr.batchSize)
}
}
Loading

0 comments on commit 6a79a10

Please sign in to comment.