diff --git a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml index f5c640e8..f1da0572 100644 --- a/config/crd/bases/workload.codeflare.dev_appwrappers.yaml +++ b/config/crd/bases/workload.codeflare.dev_appwrappers.yaml @@ -161,6 +161,13 @@ spec: check should happen, and if requeuing has reached its maximum number of times. properties: + forceDeletionTimeInSeconds: + default: 0 + description: Enable forceful deletion of generic items and + pods with the AppWrapper label after specified seconds. + This may be necesary to prevent redeployment of generic + items that create pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time @@ -192,6 +199,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after + it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml index 6330ddb0..56e88209 100644 --- a/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml +++ b/config/crd/bases/workload.codeflare.dev_schedulingspecs.yaml @@ -58,6 +58,13 @@ spec: time. Values in this field control how often the pod check should happen, and if requeuing has reached its maximum number of times. properties: + forceDeletionTimeInSeconds: + default: 0 + description: Enable forceful deletion of generic items and pods + with the AppWrapper label after specified seconds. This may + be necesary to prevent redeployment of generic items that create + pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time between @@ -88,6 +95,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after it has + been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml index f5c640e8..f1da0572 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_appwrappers.yaml @@ -161,6 +161,13 @@ spec: check should happen, and if requeuing has reached its maximum number of times. properties: + forceDeletionTimeInSeconds: + default: 0 + description: Enable forceful deletion of generic items and + pods with the AppWrapper label after specified seconds. + This may be necesary to prevent redeployment of generic + items that create pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time @@ -192,6 +199,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after + it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml index 6330ddb0..56e88209 100644 --- a/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml +++ b/deployment/mcad-controller/crds/workload.codeflare.dev_schedulingspecs.yaml @@ -58,6 +58,13 @@ spec: time. Values in this field control how often the pod check should happen, and if requeuing has reached its maximum number of times. properties: + forceDeletionTimeInSeconds: + default: 0 + description: Enable forceful deletion of generic items and pods + with the AppWrapper label after specified seconds. This may + be necesary to prevent redeployment of generic items that create + pods that were not correctly deleted. + type: integer growthType: default: exponential description: Growth strategy to increase the waiting time between @@ -88,6 +95,12 @@ spec: description: Field to keep track of how many times a requeuing event has been triggered. type: integer + pauseTimeInSeconds: + default: 0 + description: When a job is ready to be redispatched after it has + been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' + before redispatching + type: integer timeInSeconds: default: 300 description: Initial waiting time before requeuing conditions diff --git a/pkg/apis/controller/v1beta1/schedulingspec.go b/pkg/apis/controller/v1beta1/schedulingspec.go index 90f5d65f..111459e0 100644 --- a/pkg/apis/controller/v1beta1/schedulingspec.go +++ b/pkg/apis/controller/v1beta1/schedulingspec.go @@ -72,6 +72,14 @@ type RequeuingTemplate struct { // items are stopped and removed from the cluster (AppWrapper remains deployed). // +kubebuilder:default=0 MaxNumRequeuings int `json:"maxNumRequeuings,omitempty" protobuf:"bytes,6,rep,name=maxNumRequeuings"` + // Enable forceful deletion of generic items and pods with the AppWrapper label after specified seconds. + // This may be necesary to prevent redeployment of generic items that create pods that were not correctly deleted. + // +kubebuilder:default=0 + ForceDeletionTimeInSeconds int `json:"forceDeletionTimeInSeconds,omitempty" protobuf:"bytes,7,rep,name=forceDeletionTimeInSeconds"` + // When a job is ready to be redispatched after it has been requeued due to preemption, MCAD will + // wait 'pauseTimeInSeconds' before redispatching + // +kubebuilder:default=0 + PauseTimeInSeconds int `json:"pauseTimeInSeconds,omitempty" protobuf:"bytes,8,rep,name=pauseTimeInSeconds"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index da79ee34..09a12a33 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -323,6 +323,18 @@ func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfigurat // we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { ctx := context.Background() + // We need to update AW before analyzing it as a candidate for preemption + updateErr := qjm.UpdateQueueJobStatus(inspectAw) + if updateErr != nil { + klog.Warningf("[PreemptQueueJobs] update of pod count to AW %s/%s failed hence skipping preemption", inspectAw.Namespace, inspectAw.Name) + return + } + // Check if AppWrapper is waiting for deletion to be completed + if inspectAw.Status.QueueJobState == arbv1.AppWrapperCondDeleted { + qjm.enqueue(inspectAw) + return + } + // Check if AppWrapper should be preempted aw := qjm.GetQueueJobEligibleForPreemption(inspectAw) if aw != nil { if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed { @@ -336,12 +348,6 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) return } - // we need to update AW before analyzing it as a candidate for preemption - updateErr := qjm.UpdateQueueJobStatus(newjob) - if updateErr != nil { - klog.Warningf("[PreemptQueueJobs] update of pod count to AW %s/%s failed hence skipping preemption", newjob.Namespace, newjob.Name) - return - } newjob.Status.CanRun = false newjob.Status.FilterIgnore = true // update QueueJobState only cleanAppWrapper := false @@ -372,7 +378,6 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { // cannot use cleanup AW, since it puts AW back in running state qjm.qjqueue.AddUnschedulableIfNotPresent(updateNewJob) generatedCondition = true - } } @@ -415,6 +420,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { updateNewJob = newjob.DeepCopy() generatedCondition = true + } else if newjob.Status.Running == 0 && newjob.Status.Succeeded == 0 && newjob.Status.State == arbv1.AppWrapperStateActive { // If pods failed scheduling generate new preempt condition message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(newjob.Status.PendingPodConditions), newjob.Status.Running) @@ -1642,9 +1648,7 @@ func (cc *XController) addQueueJob(obj interface{}) { cc.UpdateQueueJobs(latestAw) klog.V(2).Infof("[Informer-addQJ] requeing AW %s/%s to determine completion status for AW", qj.Namespace, qj.Name) } - } - } }() } @@ -1883,9 +1887,9 @@ func (cc *XController) worker() { // if there are running resources for this job then delete them because the job was put in // pending state... - // If this the first time seeing this AW, no need to delete. + // If this is the first time seeing this AW, no need to delete. stateLen := len(queuejob.Status.State) - if stateLen > 0 { + if stateLen > 0 && queuejob.Status.QueueJobState != arbv1.AppWrapperCondDeleted { klog.V(2).Infof("[worker] Deleting resources for AppWrapper Job '%s/%s' because it was preempted, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) err00 := cc.Cleanup(ctx, queuejob) if err00 != nil { @@ -1893,13 +1897,74 @@ func (cc *XController) worker() { return err00 } klog.V(2).Infof("[worker] Delete resources for AppWrapper Job '%s/%s' due to preemption was sucessfull, status.CanRun=%t, status.State=%s", queuejob.Namespace, queuejob.Name, queuejob.Status.CanRun, queuejob.Status.State) + + if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 || queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds > 0 { + // 1) Waiting for deletion of the AppWrapper to be complete before forcing the deletion of pods + // 2) Delaying redispatching with user specified wait time + var err error + newjob, err := cc.getAppWrapper(queuejob.Namespace, queuejob.Name, "[worker] get fresh AppWrapper") + if err != nil { + klog.Errorf("[worker] Failed getting a new AppWrapper: '%s/%s',Status=%+v, err=%+v.", queuejob.Namespace, queuejob.Name, queuejob.Status, err) + return err + } + newjob.Status.QueueJobState = arbv1.AppWrapperCondDeleted + cc.addOrUpdateCondition(newjob, arbv1.AppWrapperCondDeleted, v1.ConditionTrue, "AwaitingDeletion", "") + index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") // TODO: addOrUpdateCondition is NOT changing the transition time properly so need to do it here + newjob.Status.Conditions[index].LastTransitionMicroTime = metav1.NowMicro() + newjob.Status.Conditions[index].LastUpdateMicroTime = metav1.NowMicro() + newjob.Status.FilterIgnore = true + err = cc.updateStatusInEtcdWithRetry(ctx, newjob, "AwaitingDeletion") + if err != nil { + klog.Errorf("[worker] Error updating status 'Deleted' for AppWrapper: '%s/%s', status=%+v, err=%+v.", newjob.Namespace, newjob.Name, newjob.Status, err) + return err + } + return nil + } + } else if queuejob.Status.QueueJobState == arbv1.AppWrapperCondDeleted { + // Checking of 'AwaitingDeletion' condition exists + index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondDeleted, "AwaitingDeletion") + if index < 0 { + klog.V(4).Infof("WARNING: [worker] Forced deletion condition was not added after 'Cleanup'. Silently ignoring forced cleanup.") + } else { + // Get current time to compare to + currentTime := time.Now() + + // The AppWrapper was preempted and its objects were deleted. In case the deletion was not successful for all the items + // MCAD will force delete any pods that remain in the system + if queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds > 0 { + forceDeletionTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.ForceDeletionTimeInSeconds) * time.Second) + + if currentTime.After(forceDeletionTime) { + if err := cc.ForcefulCleanup(ctx, queuejob); err != nil { + klog.V(5).Infof("[worker] Forced deletion of remaining live pods didn't work (Ending %s/%s). Retrying in the next cycle.", queuejob.Namespace, queuejob.Name) + return nil + } + } else { + klog.V(8).Infof("[worker] Waiting for 'ForceDeletionTimeInSeconds' seconds before requeueing job '%s/%s'.", queuejob.Namespace, queuejob.Name) + return nil + } + } + + // When a job is ready to be redispatched after it has been requeued due to preemption, MCAD will wait 'pauseTimeInSeconds' before redispatching + if queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds > 0 { + redispatchingTime := queuejob.Status.Conditions[index].LastTransitionMicroTime.Add(time.Duration(queuejob.Spec.SchedSpec.Requeuing.PauseTimeInSeconds) * time.Second) + + if currentTime.After(redispatchingTime) { + klog.V(5).Infof("[worker] Ready to redispatch the AppWrapper (Ending %s/%s).", queuejob.Namespace, queuejob.Name) + } else { + klog.V(8).Infof("[worker] Waiting for 'PauseTimeInSeconds' seconds before redispatching job '%s/%s'.", queuejob.Namespace, queuejob.Name) + return nil + } + } + } } + // Preparing job for redispatching queuejob.Status.State = arbv1.AppWrapperStateEnqueued + queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing klog.V(10).Infof("[worker] before add to activeQ %s/%s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", queuejob.Namespace, queuejob.Name, cc.qjqueue.IfExistActiveQ(queuejob), cc.qjqueue.IfExistUnschedulableQ(queuejob), queuejob, queuejob.ResourceVersion, queuejob.Status) index := getIndexOfMatchedCondition(queuejob, arbv1.AppWrapperCondQueueing, "AwaitingHeadOfLine") if index < 0 { - queuejob.Status.QueueJobState = arbv1.AppWrapperCondQueueing cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondQueueing, v1.ConditionTrue, "AwaitingHeadOfLine", "") queuejob.Status.Conditions = append(queuejob.Status.Conditions, cond) } else { @@ -2201,6 +2266,28 @@ func (cc *XController) manageQueueJob(ctx context.Context, qj *arbv1.AppWrapper, } // Cleanup function +func (qjm *XController) ForcefulCleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error { + labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", appwrapper.Name) + pods, getPodsErr := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) + + if getPodsErr != nil { + klog.V(5).Infof("[ForcefulCleanup] Listing pods with label '%s' was not successful", labelSelector) + return getPodsErr + } + + for _, pod := range pods.Items { + klog.V(3).Infof("[ForcefulCleanup] Forcibly deleting long-terminating pod='%s/%s'", pod.Namespace, pod.Name) + zero := int64(0) + delPodErr := qjm.clients.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &zero}) + if delPodErr != nil { + klog.V(3).Infof("[ForcefulCleanup] Couldn't forcibly delete long-terminating pod='%s/%s'", pod.Namespace, pod.Name) + return delPodErr + } + } + + return nil +} + func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error { klog.V(3).Infof("[Cleanup] begin AppWrapper '%s/%s' Version=%s", appwrapper.Namespace, appwrapper.Name, appwrapper.ResourceVersion) var err *multierror.Error