Skip to content

Commit

Permalink
Introduce batched services.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 688122044
  • Loading branch information
panhania authored and copybara-github committed Oct 24, 2024
1 parent baa893c commit 65b63bc
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 53 deletions.
28 changes: 28 additions & 0 deletions fleetspeak/src/server/comms.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,34 @@ func (c commsContext) handleMessagesFromClient(ctx context.Context, info *comms.
return nil
}

msgsByService := make(map[string][]*fspb.Message, len(msgs))
for _, msg := range msgs {
msgsByService[msg.Destination.ServiceName] = append(msgsByService[msg.Destination.ServiceName], msg)
}

// TODO(hanuszczak): Is it better to potentially over-allocate with capacity
// of `len(msgs)` or start with 0?
unbatchedMsgs := make([]*fspb.Message, 0)

for service, msgs := range msgsByService {
if len(msgs) == 0 {
continue
}
if service == "" {
log.ErrorContextf(ctx, "dropping %v messages with no service set", len(msgs))
continue
}

if c.s.serviceConfig.IsBatchedService(service) {
c.s.serviceConfig.ProcessBatchedMessages(service, msgs)
} else {
unbatchedMsgs = append(unbatchedMsgs, msgs...)
}
}

// TODO(hanuszczak): Is it better to assign `msgs` to `unbatchedMsgs` here or
// to change the occurrences below (that makes the diff worse?).
msgs = unbatchedMsgs
sort.Slice(msgs, func(a, b int) bool {
return bytes.Compare(msgs[a].MessageId, msgs[b].MessageId) == -1
})
Expand Down
69 changes: 58 additions & 11 deletions fleetspeak/src/server/internal/services/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,25 @@ const MaxServiceFailureReasonLength = 900

// A Manager starts, remembers, and shuts down services.
type Manager struct {
services map[string]*liveService
dataStore db.Store
serviceRegistry map[string]service.Factory // Used to look up the correct factory when configuring services.
stats stats.Collector
cc *cache.Clients
services map[string]*liveService
batchedServices map[string]service.BatchedService
dataStore db.Store
serviceRegistry map[string]service.Factory // Used to look up the correct factory when configuring services.
batchedServiceRegistry map[string](func(*spb.BatchedServiceConfig) (service.BatchedService, error))
stats stats.Collector
cc *cache.Clients
}

// NewManager creates a new manager using the provided components. Initially it only contains the 'system' service.
func NewManager(dataStore db.Store, serviceRegistry map[string]service.Factory, stats stats.Collector, clientCache *cache.Clients) *Manager {
func NewManager(dataStore db.Store, serviceRegistry map[string]service.Factory, batchedServiceRegistry map[string](func(*spb.BatchedServiceConfig) (service.BatchedService, error)), stats stats.Collector, clientCache *cache.Clients) *Manager {
m := Manager{
services: make(map[string]*liveService),
dataStore: dataStore,
serviceRegistry: serviceRegistry,
stats: stats,
cc: clientCache,
services: make(map[string]*liveService),
batchedServices: make(map[string]service.BatchedService),
dataStore: dataStore,
serviceRegistry: serviceRegistry,
batchedServiceRegistry: batchedServiceRegistry,
stats: stats,
cc: clientCache,
}

ssd := liveService{
Expand Down Expand Up @@ -137,6 +141,32 @@ func (c *Manager) Install(cfg *spb.ServiceConfig) error {
return nil
}

// InstallBatched adds a new batched service based on the given configuration,
// replacing any existing service registered under the same name.
func (c *Manager) InstallBatched(cfg *spb.BatchedServiceConfig) error {
factory := c.batchedServiceRegistry[cfg.Factory]
if factory == nil {
return fmt.Errorf("no such batched service factory: %v", cfg.Factory)
}

service, err := factory(cfg)
if err != nil {
return err
}

c.batchedServices[cfg.Name] = service
log.Infof("installed batched service: %v", cfg.Name)

return nil
}

// IsBatchedService returns true if the service under given name is registered
// as a batched service.
func (c *Manager) IsBatchedService(name string) bool {
_, ok := c.batchedServices[name]
return ok
}

// Stop closes and removes all services in the configuration.
func (c *Manager) Stop() {
for _, d := range c.services {
Expand Down Expand Up @@ -196,6 +226,23 @@ func (c *Manager) ProcessMessages(msgs []*fspb.Message) {
}
}

// ProcessBatchedMessages processes a batch of messages using the specified
// service.
func (c *Manager) ProcessBatchedMessages(serviceName string, msgs []*fspb.Message) {
ctx, fin := context.WithTimeout(context.Background(), 30*time.Second)
defer fin()

service := c.batchedServices[serviceName]
if service == nil {
log.ErrorContextf(ctx, "no such batched service: %v", serviceName)
return
}

if err := service.ProcessMessages(ctx, msgs); err != nil {
log.ErrorContextf(ctx, "process batched messages: %v", err)
}
}

// processMessage attempts to processes m, returning a fspb.MessageResult. It
// also updates stats, calling exactly one of MessageDropped, MessageFailed,
// MessageProcessed.
Expand Down
59 changes: 38 additions & 21 deletions fleetspeak/src/server/proto/fleetspeak_server/server.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions fleetspeak/src/server/proto/fleetspeak_server/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ message ServerConfig {
// The collection of services that this server should include.
repeated ServiceConfig services = 1;

// The collection of batched services that this server should include.
repeated BatchedServiceConfig batched_services = 3;

// The approximate time to wait between checking for new broadcasts. If unset,
// a default of 1 minute is used.
google.protobuf.Duration broadcast_poll_time = 2;
Expand Down
Loading

0 comments on commit 65b63bc

Please sign in to comment.