@@ -37,11 +37,19 @@ protected ExecutableOperator getExecOp() {
37
37
@ Override
38
38
protected void produceOutput ( final IntermediateResultElementSink sink )
39
39
throws ExecOpExecutionException , ExecPlanTaskInputException , ExecPlanTaskInterruptionException {
40
+ produceOutputByConsumingAllInputsInParallel (sink );
41
+ //produceOutputByConsumingInputsOneAfterAnother(sink);
42
+ }
40
43
41
- // Attention: the current implementation of this method simply consumes
42
- // and pushes the complete i-th child input first before moving on to
43
- // the (i+1)-th child. Hence, with this implementation we do not
44
- // actually benefit from the parallelization.
44
+ /**
45
+ * Consumes the complete i-th input first (and pushes that input to the
46
+ * operator {@link #op}), before moving on to the (i+1)-th input. Hence,
47
+ * this implementation does not consume the inputs in parallel. Instead,
48
+ * if one of the inputs requires a long time, no progress is made in
49
+ * parallel based on any of the other inputs.
50
+ */
51
+ protected void produceOutputByConsumingInputsOneAfterAnother ( final IntermediateResultElementSink sink )
52
+ throws ExecOpExecutionException , ExecPlanTaskInputException , ExecPlanTaskInterruptionException {
45
53
46
54
for ( int i = 0 ; i < inputs .length ; i ++ ) {
47
55
boolean inputConsumed = false ;
@@ -58,4 +66,82 @@ protected void produceOutput( final IntermediateResultElementSink sink )
58
66
}
59
67
}
60
68
69
+
70
+ /**
71
+ * Consumes the complete i-th input first (and pushes that input to the
72
+ * operator {@link #op}), before moving on to the (i+1)-th input.
73
+ */
74
+ protected void produceOutputByConsumingAllInputsInParallel ( final IntermediateResultElementSink sink )
75
+ throws ExecOpExecutionException , ExecPlanTaskInputException , ExecPlanTaskInterruptionException {
76
+
77
+ final boolean [] inputConsumed = new boolean [inputs .length ];
78
+ for ( int i = 0 ; i < inputs .length ; i ++ ) { inputConsumed [i ] = false ; }
79
+
80
+ int indexOfNextInputToWaitFor = 0 ;
81
+ int numberOfInputsConsumed = 0 ;
82
+ while ( numberOfInputsConsumed < inputs .length ) {
83
+ // Before blindly asking any of the inputs to give us its next
84
+ // IntermediateResultBlock (which may cause this thread to wait
85
+ // if no such block is available at the moment), let's first ask
86
+ // them if they currently have a block available. If so, request
87
+ // the next block from the input that says it has a block available.
88
+ boolean blockConsumed = false ;
89
+ for ( int i = 0 ; i < inputs .length ; i ++ ) {
90
+ if ( ! inputConsumed [i ] && inputs [i ].hasNextIntermediateResultBlockAvailable () ) {
91
+ // calling 'getNextIntermediateResultBlock()' should not cause this thread to wait
92
+ final IntermediateResultBlock nextInputBlock = inputs [i ].getNextIntermediateResultBlock ();
93
+ if ( nextInputBlock != null ) {
94
+ op .processBlockFromXthChild (i , nextInputBlock , sink , execCxt );
95
+ }
96
+
97
+ blockConsumed = true ;
98
+ }
99
+ }
100
+
101
+ if ( ! blockConsumed ) {
102
+ // If none of the inputs had a block available at the moment,
103
+ // we ask one of them to produce its next block, which may
104
+ // cause this thread to wait until that next block has been
105
+ // produced. To decide which of the inputs we ask (and, then,
106
+ // wait for) we use a round robin approach. To this end, we
107
+ // use the 'indexOfNextInputToWaitFor' pointer which we advance
108
+ // each time we leave this code block here.
109
+
110
+ // First, we have to make sure that 'indexOfNextInputToWaitFor'
111
+ // points to an input that has not been consumed completely yet.
112
+ while ( inputConsumed [indexOfNextInputToWaitFor ] == true ) {
113
+ indexOfNextInputToWaitFor = advanceIndexOfInput (indexOfNextInputToWaitFor );
114
+ }
115
+
116
+ // Now we ask that input to produce its next block, which may
117
+ // cause this thread to wait.
118
+ final IntermediateResultBlock nextInputBlock = inputs [indexOfNextInputToWaitFor ].getNextIntermediateResultBlock ();
119
+ if ( nextInputBlock != null ) {
120
+ op .processBlockFromXthChild (indexOfNextInputToWaitFor , nextInputBlock , sink , execCxt );
121
+ }
122
+ else {
123
+ op .wrapUpForXthChild (indexOfNextInputToWaitFor , sink , execCxt );
124
+ inputConsumed [indexOfNextInputToWaitFor ] = true ;
125
+ numberOfInputsConsumed ++;
126
+ }
127
+
128
+ // Finally, we advance the 'indexOfNextInputToWaitFor' pointer
129
+ // so that, next time we will have to wait, we will wait for
130
+ // the next input (rather than always waiting for the same
131
+ // input before moving on to the next input).
132
+ indexOfNextInputToWaitFor = advanceIndexOfInput (indexOfNextInputToWaitFor );
133
+ }
134
+ }
135
+ }
136
+
137
+ /**
138
+ * Returns the given integer increased by one, unless such an
139
+ * increase results in an integer that is outside of the bounds
140
+ * of the {@link #inputs} array, in which case the function returns
141
+ * zero (effectively jumping back to the first index in the array).
142
+ */
143
+ protected int advanceIndexOfInput ( final int currentIndex ) {
144
+ final int i = currentIndex + 1 ;
145
+ return ( i < inputs .length ) ? i : 0 ;
146
+ }
61
147
}
0 commit comments