20
20
import static org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .Preconditions .checkState ;
21
21
22
22
import java .util .concurrent .CompletableFuture ;
23
+ import java .util .concurrent .CountDownLatch ;
23
24
import java .util .concurrent .ExecutorService ;
24
25
import java .util .concurrent .Executors ;
26
+ import java .util .concurrent .TimeUnit ;
25
27
import java .util .concurrent .atomic .AtomicBoolean ;
26
28
import java .util .concurrent .atomic .AtomicReference ;
27
29
import java .util .function .Function ;
30
+ import java .util .function .Supplier ;
31
+ import javax .annotation .Nullable ;
32
+ import javax .annotation .concurrent .GuardedBy ;
28
33
import javax .annotation .concurrent .ThreadSafe ;
29
34
import org .apache .beam .runners .dataflow .worker .windmill .Windmill .GetWorkRequest ;
30
35
import org .apache .beam .runners .dataflow .worker .windmill .WindmillConnection ;
41
46
import org .apache .beam .runners .dataflow .worker .windmill .work .refresh .FixedStreamHeartbeatSender ;
42
47
import org .apache .beam .sdk .annotations .Internal ;
43
48
import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .util .concurrent .ThreadFactoryBuilder ;
49
+ import org .slf4j .Logger ;
50
+ import org .slf4j .LoggerFactory ;
44
51
45
52
/**
46
53
* Owns and maintains a set of streams used to communicate with a specific Windmill worker.
57
64
@ Internal
58
65
@ ThreadSafe
59
66
final class WindmillStreamSender implements GetWorkBudgetSpender , StreamSender {
60
- private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d" ;
61
- private final AtomicBoolean started ;
62
- private final AtomicReference <GetWorkBudget > getWorkBudget ;
63
- private final GetWorkStream getWorkStream ;
67
+ private static final Logger LOG = LoggerFactory .getLogger (WindmillStreamSender .class );
68
+ private static final String STREAM_MANAGER_THREAD_NAME_FORMAT = "WindmillStreamManagerThread-%d" ;
69
+ private static final int GET_WORK_STREAM_TTL_MINUTES = 45 ;
70
+
71
+ private final AtomicBoolean isRunning = new AtomicBoolean (false );
64
72
private final GetDataStream getDataStream ;
65
73
private final CommitWorkStream commitWorkStream ;
66
74
private final WorkCommitter workCommitter ;
67
75
private final StreamingEngineThrottleTimers streamingEngineThrottleTimers ;
68
76
private final ExecutorService streamStarter ;
77
+ private final String backendWorkerToken ;
78
+
79
+ @ GuardedBy ("activeGetWorkStream" )
80
+ private final AtomicReference <GetWorkStream > activeGetWorkStream ;
81
+
82
+ @ GuardedBy ("activeGetWorkStream" )
83
+ private final AtomicReference <GetWorkBudget > getWorkBudget ;
84
+
85
+ @ GuardedBy ("activeGetWorkStream" )
86
+ private final Supplier <GetWorkStream > getWorkStreamFactory ;
69
87
70
88
private WindmillStreamSender (
71
89
WindmillConnection connection ,
@@ -75,10 +93,9 @@ private WindmillStreamSender(
75
93
WorkItemScheduler workItemScheduler ,
76
94
Function <GetDataStream , GetDataClient > getDataClientFactory ,
77
95
Function <CommitWorkStream , WorkCommitter > workCommitterFactory ) {
78
- this .started = new AtomicBoolean ( false );
96
+ this .backendWorkerToken = connection . backendWorkerToken ( );
79
97
this .getWorkBudget = getWorkBudget ;
80
98
this .streamingEngineThrottleTimers = StreamingEngineThrottleTimers .create ();
81
-
82
99
// Stream instances connect/reconnect internally, so we can reuse the same instance through the
83
100
// entire lifecycle of WindmillStreamSender.
84
101
this .getDataStream =
@@ -88,19 +105,21 @@ private WindmillStreamSender(
88
105
streamingEngineStreamFactory .createDirectCommitWorkStream (
89
106
connection , streamingEngineThrottleTimers .commitWorkThrottleTimer ());
90
107
this .workCommitter = workCommitterFactory .apply (commitWorkStream );
91
- this .getWorkStream =
92
- streamingEngineStreamFactory .createDirectGetWorkStream (
93
- connection ,
94
- withRequestBudget (getWorkRequest , getWorkBudget .get ()),
95
- streamingEngineThrottleTimers .getWorkThrottleTimer (),
96
- FixedStreamHeartbeatSender .create (getDataStream ),
97
- getDataClientFactory .apply (getDataStream ),
98
- workCommitter ,
99
- workItemScheduler );
108
+ this .getWorkStreamFactory =
109
+ () ->
110
+ streamingEngineStreamFactory .createDirectGetWorkStream (
111
+ connection ,
112
+ withRequestBudget (getWorkRequest , getWorkBudget .get ()),
113
+ streamingEngineThrottleTimers .getWorkThrottleTimer (),
114
+ FixedStreamHeartbeatSender .create (getDataStream ),
115
+ getDataClientFactory .apply (getDataStream ),
116
+ workCommitter ,
117
+ workItemScheduler );
118
+ this .activeGetWorkStream = new AtomicReference <>();
100
119
// 3 threads, 1 for each stream type (GetWork, GetData, CommitWork).
101
120
this .streamStarter =
102
121
Executors .newFixedThreadPool (
103
- 3 , new ThreadFactoryBuilder ().setNameFormat (STREAM_STARTER_THREAD_NAME ).build ());
122
+ 3 , new ThreadFactoryBuilder ().setNameFormat (STREAM_MANAGER_THREAD_NAME_FORMAT ).build ());
104
123
}
105
124
106
125
static WindmillStreamSender create (
@@ -126,35 +145,45 @@ private static GetWorkRequest withRequestBudget(GetWorkRequest request, GetWorkB
126
145
}
127
146
128
147
synchronized void start () {
129
- if (! started . get ( )) {
148
+ if (isRunning . compareAndSet ( false , true )) {
130
149
checkState (!streamStarter .isShutdown (), "WindmillStreamSender has already been shutdown." );
131
-
132
150
// Start these 3 streams in parallel since they each may perform blocking IO.
151
+ CountDownLatch waitForInitialStream = new CountDownLatch (1 );
152
+ streamStarter .execute (() -> getWorkStreamLoop (waitForInitialStream ));
133
153
CompletableFuture .allOf (
134
- CompletableFuture .runAsync (getWorkStream ::start , streamStarter ),
135
154
CompletableFuture .runAsync (getDataStream ::start , streamStarter ),
136
155
CompletableFuture .runAsync (commitWorkStream ::start , streamStarter ))
137
156
.join ();
157
+ try {
158
+ waitForInitialStream .await ();
159
+ } catch (InterruptedException e ) {
160
+ close ();
161
+ LOG .error ("GetWorkStream to {} was never able to start." , backendWorkerToken );
162
+ throw new IllegalStateException ("GetWorkStream unable to start aborting." , e );
163
+ }
138
164
workCommitter .start ();
139
- started .set (true );
140
165
}
141
166
}
142
167
143
168
@ Override
144
169
public synchronized void close () {
170
+ isRunning .set (false );
145
171
streamStarter .shutdownNow ();
146
- getWorkStream .shutdown ();
147
172
getDataStream .shutdown ();
148
173
workCommitter .stop ();
149
174
commitWorkStream .shutdown ();
150
175
}
151
176
152
177
@ Override
153
178
public void setBudget (long items , long bytes ) {
154
- GetWorkBudget budget = GetWorkBudget .builder ().setItems (items ).setBytes (bytes ).build ();
155
- getWorkBudget .set (budget );
156
- if (started .get ()) {
157
- getWorkStream .setBudget (budget );
179
+ synchronized (activeGetWorkStream ) {
180
+ GetWorkBudget budget = GetWorkBudget .builder ().setItems (items ).setBytes (bytes ).build ();
181
+ getWorkBudget .set (budget );
182
+ if (isRunning .get ()) {
183
+ GetWorkStream stream = activeGetWorkStream .get ();
184
+ assert stream != null ;
185
+ stream .setBudget (budget );
186
+ }
158
187
}
159
188
}
160
189
@@ -165,4 +194,39 @@ long getAndResetThrottleTime() {
165
194
long getCurrentActiveCommitBytes () {
166
195
return workCommitter .currentActiveCommitBytes ();
167
196
}
197
+
198
+ /**
199
+ * Creates, starts, and gracefully terminates {@link GetWorkStream} before the clientside deadline
200
+ * to prevent {@link org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Status#DEADLINE_EXCEEDED} errors.
201
+ * If at any point the server closes the stream, reconnects immediately.
202
+ */
203
+ private void getWorkStreamLoop (CountDownLatch waitForInitialStream ) {
204
+ @ Nullable GetWorkStream newStream = null ;
205
+ while (isRunning .get ()) {
206
+ synchronized (activeGetWorkStream ) {
207
+ newStream = getWorkStreamFactory .get ();
208
+ newStream .start ();
209
+ waitForInitialStream .countDown ();
210
+ activeGetWorkStream .set (newStream );
211
+ }
212
+ try {
213
+ // Try to gracefully terminate the stream.
214
+ if (!newStream .awaitTermination (GET_WORK_STREAM_TTL_MINUTES , TimeUnit .MINUTES )) {
215
+ newStream .halfClose ();
216
+ }
217
+
218
+ // If graceful termination is unsuccessful, forcefully shutdown.
219
+ if (!newStream .awaitTermination (30 , TimeUnit .SECONDS )) {
220
+ newStream .shutdown ();
221
+ }
222
+
223
+ } catch (InterruptedException e ) {
224
+ // continue until !isRunning.
225
+ }
226
+ }
227
+
228
+ if (newStream != null ) {
229
+ newStream .shutdown ();
230
+ }
231
+ }
168
232
}
0 commit comments