Skip to content

Commit 59bab13

Browse files
authored
remove unused queues (#605)
* remove unused queues * optimize dispatch perf * remove cleanup logic * fix tests * fix failing tests
1 parent 8b80a7d commit 59bab13

File tree

1 file changed

+28
-39
lines changed

1 file changed

+28
-39
lines changed

pkg/controller/queuejob/queuejob_controller_ex.go

+28-39
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ type XController struct {
9393

9494
// QueueJobs that need to be initialized
9595
// Add labels and selectors to AppWrapper
96-
initQueue *cache.FIFO
96+
//initQueue *cache.FIFO
9797

9898
// QueueJobs that need to sync up after initialization
99-
updateQueue *cache.FIFO
99+
//updateQueue *cache.FIFO
100100

101101
// eventQueue that need to sync up
102102
eventQueue *cache.FIFO
@@ -241,9 +241,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *
241241
arbclients: clientset.NewForConfigOrDie(config),
242242
eventQueue: cache.NewFIFO(GetQueueJobKey),
243243
agentEventQueue: cache.NewFIFO(GetQueueJobKey),
244-
initQueue: cache.NewFIFO(GetQueueJobKey),
245-
updateQueue: cache.NewFIFO(GetQueueJobKey),
246-
qjqueue: NewSchedulingQueue(),
244+
//initQueue: cache.NewFIFO(GetQueueJobKey),
245+
//updateQueue: cache.NewFIFO(GetQueueJobKey),
246+
qjqueue: NewSchedulingQueue(),
247247
//cache is turned-off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588
248248
//cache: clusterstatecache.New(config),
249249
schedulingAW: nil,
@@ -1262,7 +1262,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
12621262
klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s",
12631263
qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg)
12641264
//call update etcd here to retrigger AW execution for failed quota
1265-
1265+
//TODO: quota management tests fail if this is converted into go-routine, need to inspect why?
12661266
qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage)
12671267

12681268
}
@@ -1702,31 +1702,12 @@ func (cc *XController) deleteQueueJob(obj interface{}) {
17021702
klog.Errorf("[Informer-deleteQJ] obj is not AppWrapper. obj=%+v", obj)
17031703
return
17041704
}
1705-
current_ts := metav1.NewTime(time.Now())
1706-
klog.V(10).Infof("[Informer-deleteQJ] %s *Delay=%.6f seconds before enqueue &qj=%p Version=%s Status=%+v Deletion Timestame=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, qj.GetDeletionTimestamp())
1707-
accessor, err := meta.Accessor(qj)
1708-
if err != nil {
1709-
klog.V(10).Infof("[Informer-deleteQJ] Error obtaining the accessor for AW job: %s", qj.Name)
1710-
qj.SetDeletionTimestamp(&current_ts)
1711-
} else {
1712-
accessor.SetDeletionTimestamp(&current_ts)
1713-
}
1714-
// validate that app wraper has not been marked for deletion by the infomer's delete handler
1715-
if qj.DeletionTimestamp != nil {
1716-
klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name)
1717-
// cleanup resources for running job, ignoring errors
1718-
if err00 := cc.Cleanup(context.Background(), qj); err00 != nil {
1719-
klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00)
1720-
}
1721-
// empty finalizers and delete the queuejob again
1722-
if accessor, err00 := meta.Accessor(qj); err00 == nil {
1723-
accessor.SetFinalizers(nil)
1724-
}
1725-
// we delete the job from the queue if it is there, ignoring errors
1726-
cc.qjqueue.Delete(qj)
1727-
cc.eventQueue.Delete(qj)
1728-
klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s deleted.", qj.Namespace, qj.Name)
1705+
// we delete the job from the queue if it is there, ignoring errors
1706+
if cc.serverOption.QuotaEnabled && cc.quotaManager != nil {
1707+
cc.quotaManager.Release(qj)
17291708
}
1709+
cc.qjqueue.Delete(qj)
1710+
cc.eventQueue.Delete(qj)
17301711
}
17311712

17321713
func (cc *XController) enqueue(obj interface{}) error {
@@ -1888,19 +1869,27 @@ func (cc *XController) worker() {
18881869
//if everything passes then CanRun is set to true and AW is ready for dispatch
18891870
if !queuejob.Status.CanRun && (queuejob.Status.State != arbv1.AppWrapperStateActive) {
18901871
cc.ScheduleNext(queuejob)
1891-
return nil
1872+
//When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with
1873+
//Sync queuejob will not unwrap an AW to spawn genericItems
1874+
if queuejob.Status.CanRun {
1875+
1876+
// errs := make(chan error, 1)
1877+
// go func() {
1878+
// errs <- cc.syncQueueJob(ctx, queuejob)
1879+
// }()
1880+
1881+
// // later:
1882+
// if err := <-errs; err != nil {
1883+
// return err
1884+
// }
1885+
if err := cc.syncQueueJob(ctx, queuejob); err != nil {
1886+
// If any error, requeue it.
1887+
return err
1888+
}
18921889

1893-
}
1894-
//When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with
1895-
//Sync queuejob will not unwrap an AW to spawn genericItems
1896-
if queuejob.Status.CanRun {
1897-
if err := cc.syncQueueJob(ctx, queuejob); err != nil {
1898-
// If any error, requeue it.
1899-
return err
19001890
}
19011891

19021892
}
1903-
19041893
//asmalvan- ends
19051894

19061895
klog.V(10).Infof("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v", queuejob.Name, time.Now().Sub(queuejob.Status.ControllerFirstTimestamp.Time).Seconds(), queuejob, queuejob.ResourceVersion, queuejob.Status)

0 commit comments

Comments
 (0)