-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler.C
91 lines (76 loc) · 2.51 KB
/
scheduler.C
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include "scheduler.h"
#include "convcore.h"
#include "queue.h"
#include <thread>
void CsdScheduler()
{
// get pthread level queue
ConverseQueue<void *> *queue = CmiGetQueue(CmiMyRank());
ConverseNodeQueue<void *> *nodeQueue = CmiGetNodeQueue();
while (CmiStopFlag() == 0)
{
CcdRaiseCondition(CcdSCHEDLOOP);
// poll node queue
if (!nodeQueue->empty())
{
QueueResult result = nodeQueue->pop();
if(result)
{
void *msg = result.msg;
// process event
CmiMessageHeader *header = (CmiMessageHeader *)msg;
void *data = (void *)((char *)msg + CmiMsgHeaderSizeBytes);
int handler = header->handlerId;
// call handler
CmiCallHandler(handler, data);
//release idle if necessary
if(CmiGetIdle())
{
CmiSetIdle(false);
CcdRaiseCondition(CcdPROCESSOR_END_IDLE);
}
}
}
// poll thread queue
else if (!queue->empty())
{
// get next event (guaranteed to be there because only single consumer)
void *msg = queue->pop();
// process event
CmiMessageHeader *header = (CmiMessageHeader *)msg;
void *data = (void *)((char *)msg + CmiMsgHeaderSizeBytes);
int handler = header->handlerId;
// call handler
CmiCallHandler(handler, msg);
//release idle if necessary
if(CmiGetIdle())
{
CmiSetIdle(false);
CcdRaiseCondition(CcdPROCESSOR_END_IDLE);
}
}
//the processor is idle
else
{
// if not already idle, set idle and raise condition
if(!CmiGetIdle())
{
CmiSetIdle(true);
CmiSetIdleTime(CmiWallTimer());
CcdRaiseCondition(CcdPROCESSOR_BEGIN_IDLE);
}
// if already idle, call still idle and (maybe) long idle
else
{
CcdRaiseCondition(CcdPROCESSOR_STILL_IDLE);
if(CmiWallTimer() - CmiGetIdleTime() > 10.0)
{
CcdRaiseCondition(CcdPROCESSOR_LONG_IDLE);
}
}
}
CcdCallBacks();
// TODO: suspend? or spin?
}
}
// TODO: implement CsdEnqueue/Dequeue (why are these necessary?)