diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 31b6654afc578..0368f6bffc9be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -355,6 +355,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY = "ipc.client.async.calls.max"; public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100; + public static final String IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_KEY = + "ipc.client.async.calls.permits.acquire.timeout.ms"; + public static final int IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_DEFAULT = 1000; public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed"; public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 65fe89b30fc7b..1841cccfc3635 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -165,6 +165,7 @@ public static Object getExternalHandler() { private final byte[] clientId; private final int maxAsyncCalls; private final AtomicInteger asyncCallCounter = new AtomicInteger(0); + private final int asyncCallPermitsTimeoutMs; /** * set the ping interval value in configuration @@ -387,6 +388,7 @@ private class Connection extends Thread { // currently active calls private Hashtable calls = new Hashtable(); + private Semaphore asyncCallPermits = new Semaphore(maxAsyncCalls); private AtomicLong lastActivity = new AtomicLong();// last I/O activity time private AtomicBoolean shouldCloseConnection = new AtomicBoolean(); // indicate if the connection is closed private IOException closeException; // close reason @@ -473,9 +475,10 @@ private void touch() { * @param call to add * @return true if the call was added. */ - private synchronized boolean addCall(Call call) { + private synchronized boolean addCall(Call call) throws IOException { if (shouldCloseConnection.get()) return false; + checkAsyncCall(); calls.put(call.id, call); notify(); return true; @@ -1215,6 +1218,7 @@ private void receiveRpcResponse() { if (status == RpcStatusProto.SUCCESS) { Writable value = packet.newInstance(valueClass, conf); final Call call = calls.remove(callId); + releaseAsyncCallPermit(); if (call.alignmentContext != null) { call.alignmentContext.receiveResponseState(header); } @@ -1238,6 +1242,7 @@ private void receiveRpcResponse() { RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode); if (status == RpcStatusProto.ERROR) { final Call call = calls.remove(callId); + releaseAsyncCallPermit(); call.setException(re); } else if (status == RpcStatusProto.FATAL) { // Close the connection @@ -1313,6 +1318,38 @@ private void cleanupCalls() { c.setException(closeException); // local exception } } + + private void releaseAsyncCallPermit() { + if (asyncCallPermits != null) { + asyncCallPermits.release(1); + } + } + + private void checkAsyncCall() throws IOException { + if (isAsynchronousMode()) { + asyncCallCounter.incrementAndGet(); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Acquiring async call permit for connectionId {}", this.remoteId); + } + boolean isAcquired = asyncCallPermits.tryAcquire(asyncCallPermitsTimeoutMs, + TimeUnit.MILLISECONDS); + if (!isAcquired) { + String errMsg = String.format( + "Exceeded limit of max asynchronous calls: %d, " + + "please configure %s to adjust it.", + maxAsyncCalls, + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); + throw new AsyncCallLimitExceededException(errMsg); + } + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Interrupted when acquiring async call permit for connectionId {}", remoteId); + } + throw new IOException(e); + } + } + } } /** @@ -1340,6 +1377,9 @@ public Client(Class valueClass, Configuration conf, this.maxAsyncCalls = conf.getInt( CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); + this.asyncCallPermitsTimeoutMs = conf.getInt( + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_KEY, + CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_PERMITS_ACQUIRE_TIMEOUT_MS_DEFAULT); } /** @@ -1428,20 +1468,6 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, fallbackToSimpleAuth, alignmentContext); } - private void checkAsyncCall() throws IOException { - if (isAsynchronousMode()) { - if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) { - asyncCallCounter.decrementAndGet(); - String errMsg = String.format( - "Exceeded limit of max asynchronous calls: %d, " + - "please configure %s to adjust it.", - maxAsyncCalls, - CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY); - throw new AsyncCallLimitExceededException(errMsg); - } - } - } - Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ConnectionId remoteId, int serviceClass, AtomicBoolean fallbackToSimpleAuth) @@ -1471,11 +1497,10 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, throws IOException { final Call call = createCall(rpcKind, rpcRequest); call.setAlignmentContext(alignmentContext); - final Connection connection = getConnection(remoteId, call, serviceClass, - fallbackToSimpleAuth); - + final Connection connection; try { - checkAsyncCall(); + connection = getConnection(remoteId, call, serviceClass, + fallbackToSimpleAuth); try { connection.sendRpcRequest(call); // send the rpc request } catch (RejectedExecutionException e) { @@ -1487,7 +1512,7 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, ioe.initCause(ie); throw ioe; } - } catch(Exception e) { + } catch (Exception e) { if (isAsynchronousMode()) { releaseAsyncCall(); }