Skip to content

Commit bbe7a74

Browse files
author
Pavel Okhlopkov
committed
second
Signed-off-by: Pavel Okhlopkov <[email protected]>
1 parent ee25c53 commit bbe7a74

File tree

2 files changed

+189
-147
lines changed

2 files changed

+189
-147
lines changed

pkg/addon-operator/operator.go

+105-117
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ func shouldEnableSchedulesOnStartup(hk typedHook) bool {
443443
}
444444

445445
func (op *AddonOperator) RegisterManagerEventsHandlers() {
446+
// Register handler for schedule events
446447
op.engine.ManagerEventsHandler.WithScheduleEventHandler(func(crontab string) []sh_task.Task {
447448
logLabels := map[string]string{
448449
"event.id": uuid.Must(uuid.NewV4()).String(),
@@ -452,69 +453,15 @@ func (op *AddonOperator) RegisterManagerEventsHandlers() {
452453
logEntry.Debug("Create tasks for 'schedule' event",
453454
slog.String("event", crontab))
454455

455-
var tasks []sh_task.Task
456-
op.ModuleManager.HandleScheduleEvent(crontab,
457-
func(globalHook *hooks.GlobalHook, info controller.BindingExecutionInfo) {
458-
if !op.allowHandleScheduleEvent(globalHook) {
459-
return
460-
}
461-
462-
hookLabels := utils.MergeLabels(logLabels, map[string]string{
463-
"hook": globalHook.GetName(),
464-
"hook.type": "module",
465-
"queue": info.QueueName,
466-
})
467-
if len(info.BindingContext) > 0 {
468-
hookLabels["binding.name"] = info.BindingContext[0].Binding
469-
}
470-
delete(hookLabels, "task.id")
471-
newTask := sh_task.NewTask(task.GlobalHookRun).
472-
WithLogLabels(hookLabels).
473-
WithQueueName(info.QueueName).
474-
WithMetadata(task.HookMetadata{
475-
EventDescription: "Schedule",
476-
HookName: globalHook.GetName(),
477-
BindingType: htypes.Schedule,
478-
BindingContext: info.BindingContext,
479-
AllowFailure: info.AllowFailure,
480-
ReloadAllOnValuesChanges: true,
481-
})
482-
483-
tasks = append(tasks, newTask)
484-
},
485-
func(module *modules.BasicModule, moduleHook *hooks.ModuleHook, info controller.BindingExecutionInfo) {
486-
if !op.allowHandleScheduleEvent(moduleHook) {
487-
return
488-
}
489-
490-
hookLabels := utils.MergeLabels(logLabels, map[string]string{
491-
"module": module.GetName(),
492-
"hook": moduleHook.GetName(),
493-
"hook.type": "module",
494-
"queue": info.QueueName,
495-
})
496-
if len(info.BindingContext) > 0 {
497-
hookLabels["binding.name"] = info.BindingContext[0].Binding
498-
}
499-
delete(hookLabels, "task.id")
500-
newTask := sh_task.NewTask(task.ModuleHookRun).
501-
WithLogLabels(hookLabels).
502-
WithQueueName(info.QueueName).
503-
WithMetadata(task.HookMetadata{
504-
EventDescription: "Schedule",
505-
ModuleName: module.GetName(),
506-
HookName: moduleHook.GetName(),
507-
BindingType: htypes.Schedule,
508-
BindingContext: info.BindingContext,
509-
AllowFailure: info.AllowFailure,
510-
})
511-
512-
tasks = append(tasks, newTask)
513-
})
514-
515-
return tasks
456+
// Handle global hook schedule events
457+
return op.ModuleManager.HandleScheduleEvent(
458+
crontab,
459+
op.createGlobalHookTaskFactory(logLabels, htypes.Schedule, "Schedule", true),
460+
op.createModuleHookTaskFactory(logLabels, htypes.Schedule, "Schedule"),
461+
)
516462
})
517463

464+
// Register handler for kubernetes events
518465
op.engine.ManagerEventsHandler.WithKubeEventHandler(func(kubeEvent types.KubeEvent) []sh_task.Task {
519466
logLabels := map[string]string{
520467
"event.id": uuid.Must(uuid.NewV4()).String(),
@@ -524,66 +471,107 @@ func (op *AddonOperator) RegisterManagerEventsHandlers() {
524471
logEntry.Debug("Create tasks for 'kubernetes' event",
525472
slog.String("event", kubeEvent.String()))
526473

527-
var tasks []sh_task.Task
528-
op.ModuleManager.HandleKubeEvent(kubeEvent,
529-
func(globalHook *hooks.GlobalHook, info controller.BindingExecutionInfo) {
530-
hookLabels := utils.MergeLabels(logLabels, map[string]string{
531-
"hook": globalHook.GetName(),
532-
"hook.type": "global",
533-
"queue": info.QueueName,
534-
})
535-
if len(info.BindingContext) > 0 {
536-
hookLabels["binding.name"] = info.BindingContext[0].Binding
537-
hookLabels["watchEvent"] = string(info.BindingContext[0].WatchEvent)
538-
}
539-
delete(hookLabels, "task.id")
540-
newTask := sh_task.NewTask(task.GlobalHookRun).
541-
WithLogLabels(hookLabels).
542-
WithQueueName(info.QueueName).
543-
WithMetadata(task.HookMetadata{
544-
EventDescription: "Kubernetes",
545-
HookName: globalHook.GetName(),
546-
BindingType: htypes.OnKubernetesEvent,
547-
BindingContext: info.BindingContext,
548-
AllowFailure: info.AllowFailure,
549-
Binding: info.Binding,
550-
ReloadAllOnValuesChanges: true,
551-
})
552-
553-
tasks = append(tasks, newTask)
554-
},
555-
func(module *modules.BasicModule, moduleHook *hooks.ModuleHook, info controller.BindingExecutionInfo) {
556-
hookLabels := utils.MergeLabels(logLabels, map[string]string{
557-
"module": module.GetName(),
558-
"hook": moduleHook.GetName(),
559-
"hook.type": "module",
560-
"queue": info.QueueName,
561-
})
562-
if len(info.BindingContext) > 0 {
563-
hookLabels["binding.name"] = info.BindingContext[0].Binding
564-
hookLabels["watchEvent"] = string(info.BindingContext[0].WatchEvent)
565-
}
566-
delete(hookLabels, "task.id")
567-
newTask := sh_task.NewTask(task.ModuleHookRun).
568-
WithLogLabels(hookLabels).
569-
WithQueueName(info.QueueName).
570-
WithMetadata(task.HookMetadata{
571-
EventDescription: "Kubernetes",
572-
ModuleName: module.GetName(),
573-
HookName: moduleHook.GetName(),
574-
Binding: info.Binding,
575-
BindingType: htypes.OnKubernetesEvent,
576-
BindingContext: info.BindingContext,
577-
AllowFailure: info.AllowFailure,
578-
})
579-
580-
tasks = append(tasks, newTask)
581-
})
474+
// Handle kubernetes events for global and module hooks
475+
tailTasks := op.ModuleManager.HandleKubeEvent(
476+
kubeEvent,
477+
op.createGlobalHookTaskFactory(logLabels, htypes.OnKubernetesEvent, "Kubernetes", true),
478+
op.createModuleHookTaskFactory(logLabels, htypes.OnKubernetesEvent, "Kubernetes"),
479+
)
582480

583-
return tasks
481+
return tailTasks
584482
})
585483
}
586484

485+
// createGlobalHookTaskFactory returns a factory function for creating tasks for global hooks
486+
func (op *AddonOperator) createGlobalHookTaskFactory(
487+
logLabels map[string]string,
488+
bindingType htypes.BindingType,
489+
eventDescription string,
490+
reloadOnValuesChanges bool,
491+
) func(globalHook *hooks.GlobalHook, info controller.BindingExecutionInfo) sh_task.Task {
492+
return func(globalHook *hooks.GlobalHook, info controller.BindingExecutionInfo) sh_task.Task {
493+
// For schedule events, check if we should allow handling
494+
if bindingType == htypes.Schedule && !op.allowHandleScheduleEvent(globalHook) {
495+
return nil
496+
}
497+
498+
hookLabels := utils.MergeLabels(logLabels, map[string]string{
499+
"hook": globalHook.GetName(),
500+
"hook.type": "global",
501+
"queue": info.QueueName,
502+
})
503+
504+
if len(info.BindingContext) > 0 {
505+
hookLabels["binding.name"] = info.BindingContext[0].Binding
506+
if bindingType == htypes.OnKubernetesEvent {
507+
hookLabels["watchEvent"] = string(info.BindingContext[0].WatchEvent)
508+
}
509+
}
510+
511+
delete(hookLabels, "task.id")
512+
513+
newTask := sh_task.NewTask(task.GlobalHookRun).
514+
WithLogLabels(hookLabels).
515+
WithQueueName(info.QueueName).
516+
WithMetadata(task.HookMetadata{
517+
EventDescription: eventDescription,
518+
HookName: globalHook.GetName(),
519+
BindingType: bindingType,
520+
BindingContext: info.BindingContext,
521+
AllowFailure: info.AllowFailure,
522+
Binding: info.Binding,
523+
ReloadAllOnValuesChanges: reloadOnValuesChanges,
524+
})
525+
526+
return newTask
527+
}
528+
}
529+
530+
// createModuleHookTaskFactory returns a factory function for creating tasks for module hooks
531+
func (op *AddonOperator) createModuleHookTaskFactory(
532+
logLabels map[string]string,
533+
bindingType htypes.BindingType,
534+
eventDescription string,
535+
) func(module *modules.BasicModule, moduleHook *hooks.ModuleHook, info controller.BindingExecutionInfo) sh_task.Task {
536+
return func(module *modules.BasicModule, moduleHook *hooks.ModuleHook, info controller.BindingExecutionInfo) sh_task.Task {
537+
// For schedule events, check if we should allow handling
538+
if bindingType == htypes.Schedule && !op.allowHandleScheduleEvent(moduleHook) {
539+
return nil
540+
}
541+
542+
hookLabels := utils.MergeLabels(logLabels, map[string]string{
543+
"module": module.GetName(),
544+
"hook": moduleHook.GetName(),
545+
"hook.type": "module",
546+
"queue": info.QueueName,
547+
})
548+
549+
if len(info.BindingContext) > 0 {
550+
hookLabels["binding.name"] = info.BindingContext[0].Binding
551+
if bindingType == htypes.OnKubernetesEvent {
552+
hookLabels["watchEvent"] = string(info.BindingContext[0].WatchEvent)
553+
}
554+
}
555+
556+
delete(hookLabels, "task.id")
557+
558+
newTask := sh_task.NewTask(task.ModuleHookRun).
559+
WithLogLabels(hookLabels).
560+
WithQueueName(info.QueueName).
561+
WithMetadata(task.HookMetadata{
562+
EventDescription: eventDescription,
563+
ModuleName: module.GetName(),
564+
HookName: moduleHook.GetName(),
565+
Binding: info.Binding,
566+
BindingType: bindingType,
567+
BindingContext: info.BindingContext,
568+
AllowFailure: info.AllowFailure,
569+
})
570+
571+
return newTask
572+
}
573+
}
574+
587575
// BootstrapMainQueue adds tasks to initiate Startup sequence:
588576
//
589577
// - Run onStartup hooks.

pkg/module_manager/module_manager.go

+84-30
Original file line numberDiff line numberDiff line change
@@ -741,26 +741,53 @@ func (mm *ModuleManager) RunModuleHook(moduleName, hookName string, binding Bind
741741
return ml.RunHookByName(hookName, binding, bindingContext, logLabels)
742742
}
743743

744-
func (mm *ModuleManager) HandleKubeEvent(kubeEvent KubeEvent, createGlobalTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo), createModuleTaskFn func(*modules.BasicModule, *hooks.ModuleHook, controller.BindingExecutionInfo)) {
744+
func (mm *ModuleManager) HandleKubeEvent(
745+
kubeEvent KubeEvent,
746+
createGlobalTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo) sh_task.Task,
747+
createModuleTaskFn func(*modules.BasicModule, *hooks.ModuleHook, controller.BindingExecutionInfo) sh_task.Task,
748+
) []sh_task.Task {
749+
tasks := make([]sh_task.Task, 0)
750+
751+
// Process hooks by binding type OnKubernetesEvent
745752
mm.LoopByBinding(OnKubernetesEvent, func(gh *hooks.GlobalHook, m *modules.BasicModule, mh *hooks.ModuleHook) {
753+
// Handle global hooks
746754
if gh != nil {
747-
if gh.GetHookController().CanHandleKubeEvent(kubeEvent) {
748-
gh.GetHookController().HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) {
749-
if createGlobalTaskFn != nil {
750-
createGlobalTaskFn(gh, info)
751-
}
752-
})
753-
}
754-
} else {
755-
if mh.GetHookController().CanHandleKubeEvent(kubeEvent) {
756-
mh.GetHookController().HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) {
757-
if createModuleTaskFn != nil {
758-
createModuleTaskFn(m, mh, info)
759-
}
760-
})
755+
if !gh.GetHookController().CanHandleKubeEvent(kubeEvent) {
756+
return
761757
}
758+
759+
gh.GetHookController().HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) {
760+
if createGlobalTaskFn == nil {
761+
return
762+
}
763+
764+
task := createGlobalTaskFn(gh, info)
765+
if task != nil {
766+
tasks = append(tasks, task)
767+
}
768+
})
769+
770+
return
762771
}
772+
773+
// Handle module hooks
774+
if !mh.GetHookController().CanHandleKubeEvent(kubeEvent) {
775+
return
776+
}
777+
778+
mh.GetHookController().HandleKubeEvent(kubeEvent, func(info controller.BindingExecutionInfo) {
779+
if createModuleTaskFn == nil {
780+
return
781+
}
782+
783+
task := createModuleTaskFn(m, mh, info)
784+
if task != nil {
785+
tasks = append(tasks, task)
786+
}
787+
})
763788
})
789+
790+
return tasks
764791
}
765792

766793
func (mm *ModuleManager) HandleGlobalEnableKubernetesBindings(hookName string, createTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo)) error {
@@ -829,26 +856,53 @@ func (mm *ModuleManager) DisableModuleHooks(moduleName string) {
829856
mm.SetModulePhaseAndNotify(ml, modules.HooksDisabled)
830857
}
831858

832-
func (mm *ModuleManager) HandleScheduleEvent(crontab string, createGlobalTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo), createModuleTaskFn func(*modules.BasicModule, *hooks.ModuleHook, controller.BindingExecutionInfo)) {
859+
func (mm *ModuleManager) HandleScheduleEvent(
860+
crontab string,
861+
createGlobalTaskFn func(*hooks.GlobalHook, controller.BindingExecutionInfo) sh_task.Task,
862+
createModuleTaskFn func(*modules.BasicModule, *hooks.ModuleHook, controller.BindingExecutionInfo) sh_task.Task,
863+
) []sh_task.Task {
864+
tasks := make([]sh_task.Task, 0)
865+
866+
// Process hooks by binding type Schedule
833867
mm.LoopByBinding(Schedule, func(gh *hooks.GlobalHook, m *modules.BasicModule, mh *hooks.ModuleHook) {
868+
// Handle global hooks
834869
if gh != nil {
835-
if gh.GetHookController().CanHandleScheduleEvent(crontab) {
836-
gh.GetHookController().HandleScheduleEvent(crontab, func(info controller.BindingExecutionInfo) {
837-
if createGlobalTaskFn != nil {
838-
createGlobalTaskFn(gh, info)
839-
}
840-
})
841-
}
842-
} else {
843-
if mh.GetHookController().CanHandleScheduleEvent(crontab) {
844-
mh.GetHookController().HandleScheduleEvent(crontab, func(info controller.BindingExecutionInfo) {
845-
if createModuleTaskFn != nil {
846-
createModuleTaskFn(m, mh, info)
847-
}
848-
})
870+
if !gh.GetHookController().CanHandleScheduleEvent(crontab) {
871+
return
849872
}
873+
874+
gh.GetHookController().HandleScheduleEvent(crontab, func(info controller.BindingExecutionInfo) {
875+
if createGlobalTaskFn == nil {
876+
return
877+
}
878+
879+
task := createGlobalTaskFn(gh, info)
880+
if task != nil {
881+
tasks = append(tasks, task)
882+
}
883+
})
884+
885+
return
850886
}
887+
888+
// Handle module hooks
889+
if !mh.GetHookController().CanHandleScheduleEvent(crontab) {
890+
return
891+
}
892+
893+
mh.GetHookController().HandleScheduleEvent(crontab, func(info controller.BindingExecutionInfo) {
894+
if createModuleTaskFn == nil {
895+
return
896+
}
897+
898+
task := createModuleTaskFn(m, mh, info)
899+
if task != nil {
900+
tasks = append(tasks, task)
901+
}
902+
})
851903
})
904+
905+
return tasks
852906
}
853907

854908
func (mm *ModuleManager) LoopByBinding(binding BindingType, fn func(gh *hooks.GlobalHook, m *modules.BasicModule, mh *hooks.ModuleHook)) {

0 commit comments

Comments
 (0)