Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19386. Avoid adding asynchronous calls indefinitely which can cause Out-of-Memory. #7287

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
"ipc.client.async.calls.max";
public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_KEY =
"ipc.client.async.calls.permits.acquire.timeout.ms";
public static final int IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_DEFAULT = 1000;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public static Object getExternalHandler() {
private final byte[] clientId;
private final int maxAsyncCalls;
private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
private final int asyncCalllPermitsTimeoutMs;

/**
* set the ping interval value in configuration
Expand Down Expand Up @@ -387,6 +388,7 @@ private class Connection extends Thread {

// currently active calls
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
private Semaphore asyncCallPermits = new Semaphore(maxAsyncCalls);
private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed
private IOException closeException; // close reason
Expand Down Expand Up @@ -473,9 +475,10 @@ private void touch() {
* @param call to add
* @return true if the call was added.
*/
private synchronized boolean addCall(Call call) {
private synchronized boolean addCall(Call call) throws IOException {
if (shouldCloseConnection.get())
return false;
checkAsyncCall();
calls.put(call.id, call);
notify();
return true;
Expand Down Expand Up @@ -1215,6 +1218,7 @@ private void receiveRpcResponse() {
if (status == RpcStatusProto.SUCCESS) {
Writable value = packet.newInstance(valueClass, conf);
final Call call = calls.remove(callId);
releaseAsyncCallPermit();
if (call.alignmentContext != null) {
call.alignmentContext.receiveResponseState(header);
}
Expand All @@ -1238,6 +1242,7 @@ private void receiveRpcResponse() {
RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);
if (status == RpcStatusProto.ERROR) {
final Call call = calls.remove(callId);
releaseAsyncCallPermit();
call.setException(re);
} else if (status == RpcStatusProto.FATAL) {
// Close the connection
Expand Down Expand Up @@ -1313,6 +1318,38 @@ private void cleanupCalls() {
c.setException(closeException); // local exception
}
}

private void releaseAsyncCallPermit() {
if (asyncCallPermits != null) {
asyncCallPermits.release(1);
}
}

private void checkAsyncCall() throws IOException {
if (isAsynchronousMode()) {
asyncCallCounter.incrementAndGet();
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Acquiring async call permit for connectionId {}", this.remoteId);
}
boolean isAcquired = asyncCallPermits.tryAcquire(asyncCalllPermitsTimeoutMs,
TimeUnit.MILLISECONDS);
if (!isAcquired) {
String errMsg = String.format(
"Exceeded limit of max asynchronous calls: %d, " +
"please configure %s to adjust it.",
maxAsyncCalls,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
throw new AsyncCallLimitExceededException(errMsg);
}
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Interrupted when acquiring async call permit for connectionId {}", remoteId);
}
throw new IOException(e);
}
}
}
}

/**
Expand Down Expand Up @@ -1340,6 +1377,9 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
this.asyncCalllPermitsTimeoutMs = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_DEFAULT);
}

/**
Expand Down Expand Up @@ -1428,20 +1468,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
fallbackToSimpleAuth, alignmentContext);
}

private void checkAsyncCall() throws IOException {
if (isAsynchronousMode()) {
if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
asyncCallCounter.decrementAndGet();
String errMsg = String.format(
"Exceeded limit of max asynchronous calls: %d, " +
"please configure %s to adjust it.",
maxAsyncCalls,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
throw new AsyncCallLimitExceededException(errMsg);
}
}
}

Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth)
Expand Down Expand Up @@ -1471,11 +1497,10 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
throws IOException {
final Call call = createCall(rpcKind, rpcRequest);
call.setAlignmentContext(alignmentContext);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);

final Connection connection;
try {
checkAsyncCall();
connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
Expand All @@ -1487,7 +1512,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ioe.initCause(ie);
throw ioe;
}
} catch(Exception e) {
} catch (Exception e) {
if (isAsynchronousMode()) {
releaseAsyncCall();
}
Expand Down
Loading