Skip to content

Commit

Permalink
feat: Add configurable retries with exponential backoff (#119)
Browse files Browse the repository at this point in the history
- blackbox now exits when it runs out of retries
- default behavior does not change, configure max_retries to enable

On windows blackbox was preventing log rotation when configured with
invalid syslog credentials as it was locking the file while trying to
endlessly reconnect. This change makes it so that blackbox exits which
unlocks the file and indicates that the config is invalid.

Co-authored-by: Carson Long <[email protected]>
Co-authored-by: Rebecca Roberts <[email protected]>
Co-authored-by: Andrew Crump <[email protected]>
Co-authored-by: Matthew Kocher <[email protected]>
  • Loading branch information
4 people authored Apr 24, 2024
1 parent 84fff16 commit 5f3e237
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 10 deletions.
2 changes: 1 addition & 1 deletion integration/blackbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (runner *BlackboxRunner) StartWithConfig(config blackbox.Config, tailerCoun
Name: "blackbox",
Command: blackboxCmd,
AnsiColorCode: "90m",
StartCheck: "Start tail...",
StartCheck: "Starting to tail file:",
Cleanup: func() {
os.Remove(configPath)
},
Expand Down
97 changes: 95 additions & 2 deletions integration/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ var _ = Describe("Blackbox", func() {
Buffer: buffer,
})

Eventually(session.Err, "10s").Should(gbytes.Say("Start tail..."))
Eventually(session.Err, "10s").Should(gbytes.Say("Starting to tail file:"))

Write(logFile, "hello\n", false, false)
Write(logFile, "world\n", true, false)
Expand All @@ -656,7 +656,7 @@ var _ = Describe("Blackbox", func() {
Buffer: buffer2,
})

Eventually(buffer2, "5s").Should(gbytes.Say("more"))
Eventually(buffer2, "10s").Should(gbytes.Say("more"))

ginkgomon.Interrupt(serverProcess)

Expand All @@ -668,6 +668,7 @@ var _ = Describe("Blackbox", func() {
}
})
})

Context("When the server uses tls", func() {
var address string
var buffer *gbytes.Buffer
Expand Down Expand Up @@ -754,6 +755,98 @@ var _ = Describe("Blackbox", func() {
blackboxRunner.Stop()
})
})

Context("when max retries is configured", func() {
Context("when the syslog server never comes up", func() {
It("causes blackbox to fail", func() {
address := fmt.Sprintf("127.0.0.1:%d", 9090+GinkgoParallelProcess())

config := blackbox.Config{
Hostname: "",
Syslog: blackbox.SyslogConfig{
Destination: syslog.Drain{
Transport: "tcp",
Address: address,
MaxRetries: 3,
},
SourceDir: logDir,
},
}
configPath := CreateConfigFile(config)
defer os.Remove(configPath)

blackboxCmd := exec.Command(blackboxPath, "-config", configPath)
session, err := gexec.Start(blackboxCmd, GinkgoWriter, GinkgoWriter)
Expect(err).NotTo(HaveOccurred())
defer func() {
if runtime.GOOS == "windows" {
session.Kill()
} else {
session.Signal(os.Interrupt)
session.Wait()
}
}()
Eventually(session.Err, "10s").Should(gbytes.Say("Starting to tail file:"))

Write(logFile, "try to log this\n", false, false)
Write(logFile, "try to log more and notice can't write to socket\n", true, true)

Expect(session.Wait("20s")).To(gexec.Exit(1))
})
})

Context("when the syslog server goes down for a long enough time", func() {
It("causes blackbox to fail", func() {
address := fmt.Sprintf("127.0.0.1:%d", 9090+GinkgoParallelProcess())

buffer := gbytes.NewBuffer()
serverProcess := ginkgomon.Invoke(&TcpSyslogServer{
Addr: address,
Buffer: buffer,
})

config := blackbox.Config{
Hostname: "",
Syslog: blackbox.SyslogConfig{
Destination: syslog.Drain{
Transport: "tcp",
Address: address,
MaxRetries: 1,
},
SourceDir: logDir,
},
}
configPath := CreateConfigFile(config)
defer os.Remove(configPath)

blackboxCmd := exec.Command(blackboxPath, "-config", configPath)
session, err := gexec.Start(blackboxCmd, GinkgoWriter, GinkgoWriter)
Expect(err).NotTo(HaveOccurred())
defer func() {
if runtime.GOOS == "windows" {
session.Kill()
} else {
session.Signal(os.Interrupt)
session.Wait()
}
}()
Eventually(session.Err, "10s").Should(gbytes.Say("Starting to tail file:"))

Write(logFile, "hello\n", false, false)
Write(logFile, "world\n", true, false)

Eventually(buffer, "5s").Should(gbytes.Say("hello"))
Eventually(buffer, "5s").Should(gbytes.Say("world"))

ginkgomon.Interrupt(serverProcess)

Write(logFile, "try to log this\n", false, false)
Write(logFile, "try to log more and notice can't write to socket\n", true, true)

Expect(session.Wait("10s")).To(gexec.Exit(1))
})
})
})
})

func Write(file *os.File, line string, sync bool, close bool) {
Expand Down
34 changes: 28 additions & 6 deletions syslog/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

type Drain struct {
Transport string `yaml:"transport"`
Address string `yaml:"address"`
CA string `yaml:"ca"`
Transport string `yaml:"transport"`
Address string `yaml:"address"`
CA string `yaml:"ca"`
MaxRetries int `yaml:"max_retries"`
}

type Drainer interface {
Expand All @@ -35,6 +36,9 @@ type drainer struct {
structuredData rfc5424.StructuredData
maxMessageSize int
transport string
maxRetries int
connAttempts int
sleepSeconds int
}

func NewDrainer(errorLogger *log.Logger, drain Drain, hostname string, structuredData rfc5424.StructuredData, maxMessageSize int) (*drainer, error) {
Expand All @@ -53,13 +57,15 @@ func NewDrainer(errorLogger *log.Logger, drain Drain, hostname string, structure
maxMessageSize: maxMessageSize,
dialFunction: dialFunction,
transport: drain.Transport,
maxRetries: drain.MaxRetries,
sleepSeconds: 1,
}, nil
}

func generateDialer(drain Drain, tlsConf *tls.Config) func() (net.Conn, error) {
var dialFunction func() (net.Conn, error)
dialer := &net.Dialer{
Timeout: 30 * time.Second,
Timeout: time.Second * 30,
KeepAlive: time.Second * 60 * 3,
}
switch drain.Transport {
Expand Down Expand Up @@ -106,6 +112,8 @@ func generateTLSConfig(caString string) (*tls.Config, error) {
}

func (d *drainer) Drain(line string, tag string) error {
defer d.resetAttempts()

binary, err := d.formatMessage(line, tag)
if err != nil {
return err
Expand Down Expand Up @@ -158,12 +166,26 @@ func (d *drainer) formatMessage(line string, tag string) ([]byte, error) {
return binary, nil
}

func (d *drainer) resetAttempts() {
d.connAttempts = 0
d.sleepSeconds = 1
}

func (d *drainer) incrementAttempts() {
d.connAttempts++
d.sleepSeconds = d.sleepSeconds << 1
}

func (d *drainer) ensureConnection() {
for d.conn == nil {
d.incrementAttempts()
conn, err := d.dialFunction()
if err != nil {
d.errorLogger.Printf("Error connecting: %s \n", err.Error())
time.Sleep(time.Second)
if d.maxRetries > 0 && d.connAttempts > d.maxRetries {
d.errorLogger.Fatalln("Failed to connect to syslog server. Exiting now.")
}
d.errorLogger.Printf("Error connecting on attempt %d: %s. Will retry in %d seconds.\n", d.connAttempts, err.Error(), d.sleepSeconds)
time.Sleep(time.Second * time.Duration(d.sleepSeconds))
} else if conn != nil {
d.conn = conn
}
Expand Down
2 changes: 1 addition & 1 deletion tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Tailer struct {
func (tailer *Tailer) Run(signals <-chan os.Signal, ready chan<- struct{}) error {
watch.POLL_DURATION = 1 * time.Second

tailer.Logger.Printf("Start tail...")
tailer.Logger.Printf("Starting to tail file: %s", tailer.Path)
t, err := tail.TailFile(tailer.Path, tail.Config{
Follow: true,
ReOpen: true,
Expand Down

0 comments on commit 5f3e237

Please sign in to comment.