Skip to content

Commit

Permalink
Merge branch 'probe'
Browse files Browse the repository at this point in the history
  • Loading branch information
sylr committed Feb 8, 2023
2 parents 658a204 + 97bbe29 commit a90088a
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cmd/fix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sylr.dev/fix/cmd/list"
"sylr.dev/fix/cmd/marketdata"
"sylr.dev/fix/cmd/new"
"sylr.dev/fix/cmd/probe"
"sylr.dev/fix/cmd/status"
"sylr.dev/fix/config"
)
Expand All @@ -44,6 +45,7 @@ func init() {
FixCmd.AddCommand(list.ListCmd)
FixCmd.AddCommand(marketdata.MarketDataCmd)
FixCmd.AddCommand(new.NewCmd)
FixCmd.AddCommand(probe.ProbeCmd)
FixCmd.AddCommand(status.StatusCmd)

configPath := filepath.Join("$HOME", ".fix", "config")
Expand Down
199 changes: 199 additions & 0 deletions cmd/probe/probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package probe

import (
"os"
"os/signal"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/quickfixgo/quickfix"
"github.com/rs/zerolog"
"github.com/spf13/cobra"

"sylr.dev/fix/config"
"sylr.dev/fix/pkg/initiator"
"sylr.dev/fix/pkg/initiator/application"
"sylr.dev/fix/pkg/utils"
)

var (
optionProbeInterval time.Duration

status = make(chan result)
)

// ConfigCmd represents the buy command
var ProbeCmd = &cobra.Command{
Use: "probe",
Short: "Test fix session",
Long: "Test fix session.",
RunE: Execute,
PersistentPreRunE: utils.MakePersistentPreRunE(Validate),
}

func init() {
ProbeCmd.Flags().DurationVar(&optionProbeInterval, "probe-interval", 10*time.Second, "Interval between each probing")

prometheus.MustRegister(metricFixSessionFailuresTotal)
prometheus.MustRegister(metricFixSessionSuccessesTotal)
prometheus.MustRegister(metricFixSessionStatus)
}

func Validate(_ *cobra.Command, _ []string) error {
return nil
}

func Execute(_ *cobra.Command, _ []string) error {
logger := config.GetLogger()

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)

time.Sleep(time.Until(time.Now().Truncate(optionProbeInterval).Add(optionProbeInterval)))
ticker := time.NewTicker(optionProbeInterval)

LOOP:
for {
select {
case signal := <-interrupt:
logger.Debug().Msgf("Received signal: %s", signal)
break LOOP

case <-ticker.C:
go probe()

case result := <-status:
if result.connected {
metricFixSessionSuccessesTotal.WithLabelValues(result.context, result.session).Inc()
metricFixSessionStatus.WithLabelValues(result.context, result.session).Set(1.0)
} else {
metricFixSessionFailuresTotal.WithLabelValues(result.context, result.session).Inc()
metricFixSessionStatus.WithLabelValues(result.context, result.session).Set(0.0)
}
}
}

return nil
}

// Metrics
var (
metricFixSessionFailuresTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "fix",
Subsystem: "session",
Name: "failures_total",
Help: "Total number of session errors",
},
[]string{"context", "session"},
)
metricFixSessionSuccessesTotal = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "fix",
Subsystem: "session",
Name: "successes_total",
Help: "Total number of session successes",
},
[]string{"context", "session"},
)
metricFixSessionStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "fix",
Subsystem: "session",
Name: "status",
Help: "Status of the fix session",
},
[]string{"context", "session"},
)
)

type result struct {
context string
session string
connected bool
}

func probe() {
options := config.GetOptions()
logger := config.GetLogger()

logger.Info().Msgf("Start probing")

contexts := config.GetContexts()

for i, context := range contexts {
if len(context.Initiator) == 0 {
continue
}

for j, session := range context.Sessions {
go func(context *config.Context, i int, session string) {
logger.Debug().Str("context", context.GetName()).Str("session", session).Msgf("Start probing session")

// Make a copy of the context which has only one session.
contextSingleSession := *context
contextSingleSession.Sessions = nil
contextSingleSession.Sessions = context.Sessions[i : i+1]

logger.Debug().Str("context", context.GetName()).Str("session", session).Strs("sessions", contextSingleSession.Sessions).Msgf("Copied context")

settings, err := contextSingleSession.ToQuickFixInitiatorSettings()
if err != nil {
panic(err)
}

app := application.NewInitiator()
app.Settings = settings
app.Logger = logger

var quickfixLogger *zerolog.Logger
if options.QuickFixLogging {
quickfixLogger = logger
}

init, err := initiator.Initiate(app, settings, quickfixLogger)
if err != nil {
logger.Error().Err(err).Str("context", context.GetName()).Str("session", session).Msg("Unable to initiate initiator")
status <- result{context: context.GetName(), session: session, connected: false}
return
}

// Start session
if err = init.Start(); err != nil {
logger.Error().Err(err).Str("context", context.GetName()).Str("session", session).Msg("Unable to start initiator")
status <- result{context: context.GetName(), session: session, connected: false}
return
}

defer func() {
app.Stop()
init.Stop()
_ = quickfix.UnregisterSession(app.SessionID)
}()

// Choose right timeout cli option > config > default value (5s)
var timeout time.Duration
if options.Timeout != time.Duration(0) {
timeout = options.Timeout
} else {
timeout = 5 * time.Second
}

// Wait for session connection
select {
case <-time.After(timeout):
status <- result{context: context.GetName(), session: session, connected: false}

case _, ok := <-app.Connected:
if !ok {
status <- result{context: context.GetName(), session: session, connected: false}
} else {
logger.Debug().Msgf("connected")
status <- result{context: context.GetName(), session: session, connected: true}
}
}
}(contexts[i], j, session)
}
}
}
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,11 @@ type Session struct {
HeartBtInt int `yaml:"HeartBtInt"`
SenderCompID string `yaml:"SenderCompID"`
SenderSubID string `yaml:"SenderSubID"`
SenderLocationID string `yaml:"SenderLocationID"`
TargetCompID string `yaml:"TargetCompID"`
TargetSubID string `yaml:"TargetSubID"`
TargetLocationID string `yaml:"TargetLocationID"`
SessionQualifier string `yaml:"SessionQualifier"`
Username string `yaml:"Username"`
Password string `yaml:"Password"`
StartTime string `yaml:"StartTime"`
Expand Down Expand Up @@ -345,8 +348,11 @@ func (c Context) ToQuickFixInitiatorSettings() (*quickfix.Settings, error) {
setSessionSetting(sessionSettings, qconfig.DefaultApplVerID, session.DefaultApplVerID)
setSessionSetting(sessionSettings, qconfig.SenderCompID, session.SenderCompID)
setSessionSetting(sessionSettings, qconfig.SenderSubID, session.SenderSubID)
setSessionSetting(sessionSettings, qconfig.SenderLocationID, session.SenderLocationID)
setSessionSetting(sessionSettings, qconfig.TargetCompID, session.TargetCompID)
setSessionSetting(sessionSettings, qconfig.TargetSubID, session.TargetSubID)
setSessionSetting(sessionSettings, qconfig.TargetLocationID, session.TargetLocationID)
setSessionSetting(sessionSettings, qconfig.SessionQualifier, session.SessionQualifier)
setSessionSetting(sessionSettings, qconfig.BeginString, session.BeginString)
setSessionSetting(sessionSettings, "Username", session.Username)
setSessionSetting(sessionSettings, "Password", session.Password)
Expand Down
4 changes: 3 additions & 1 deletion pkg/initiator/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func ValidateOptions(cmd *cobra.Command, args []string) error {
case 1:
// OK
default:
return errors.ConfigContextMultipleSessions
if cmd.Name() != "probe" {
return errors.ConfigContextMultipleSessions
}
}
}

Expand Down

0 comments on commit a90088a

Please sign in to comment.