diff --git a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java index f0c2acf7efa..aaa83484926 100644 --- a/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java +++ b/rt/rs/sse/src/main/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImpl.java @@ -18,22 +18,30 @@ */ package org.apache.cxf.jaxrs.sse.client; +import java.lang.reflect.Constructor; import java.security.AccessController; import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.Collection; +import java.util.Date; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.logging.Level; import java.util.logging.Logger; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.Configuration; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.sse.InboundSseEvent; import javax.ws.rs.sse.SseEventSource; @@ -44,33 +52,35 @@ import org.apache.cxf.jaxrs.utils.ExceptionUtils; /** - * SSE Event Source implementation + * SSE Event Source implementation */ public class SseEventSourceImpl implements SseEventSource { private static final Logger LOG = LogUtils.getL7dLogger(SseEventSourceImpl.class); - + private final WebTarget target; private final Collection listeners = new CopyOnWriteArrayList<>(); private final AtomicReference state = new AtomicReference<>(SseSourceState.CLOSED); - + // It may happen that open() and close() could be called on separate threads private volatile ScheduledExecutorService executor; private volatile boolean managedExecutor = true; - private volatile InboundSseEventProcessor processor; - private volatile TimeUnit unit; + private volatile InboundSseEventProcessor processor; + // delay here is always in Milliseconds - conversion takes place in the ctor private volatile long delay; + // Indicates the this SseEventSource has been opened. It will remain true even if this is moved back to the + // connecting state due to a scheduled reconnect. + private volatile boolean opened; private class InboundSseEventListenerDelegate implements InboundSseEventListener { private String lastEventId; - + @Override public void onNext(InboundSseEvent event) { lastEventId = event.getId(); listeners.forEach(listener -> listener.onNext(event)); - + // Reconnect delay is set in milliseconds if (event.isReconnectDelaySet()) { - unit = TimeUnit.MILLISECONDS; delay = event.getReconnectDelay(); } } @@ -78,31 +88,33 @@ public void onNext(InboundSseEvent event) { @Override public void onError(Throwable ex) { listeners.forEach(listener -> listener.onError(ex)); - if (delay >= 0 && unit != null) { - scheduleReconnect(delay, unit, lastEventId); + if (delay >= 0) { + scheduleReconnect(delay, lastEventId); } } @Override public void onComplete() { listeners.forEach(InboundSseEventListener::onComplete); - if (delay >= 0 && unit != null) { - scheduleReconnect(delay, unit, lastEventId); + if (delay >= 0) { + scheduleReconnect(delay, lastEventId); } + // reset the delay and units + delay = -1; } } - + private class InboundSseEventListenerImpl implements InboundSseEventListener { private final Consumer onEvent; private final Consumer onError; private final Runnable onComplete; - + InboundSseEventListenerImpl(Consumer e) { this(e, ex -> { }, () -> { }); } - + InboundSseEventListenerImpl(Consumer e, Consumer t) { - this(e, t, () -> { }); + this(e, t, () -> { }); } InboundSseEventListenerImpl(Consumer e, Consumer t, Runnable c) { @@ -126,20 +138,19 @@ public void onComplete() { onComplete.run(); } } - + /** * https://www.w3.org/TR/2012/WD-eventsource-20120426/#dom-eventsource-connecting */ - private enum SseSourceState { + enum SseSourceState { CONNECTING, OPEN, CLOSED } - + SseEventSourceImpl(WebTarget target, long delay, TimeUnit unit) { this.target = target; - this.delay = delay; - this.unit = unit; + this.delay = TimeUnit.MILLISECONDS.convert(delay, unit); } @Override @@ -163,26 +174,30 @@ public void open() { throw new IllegalStateException("The SseEventSource is already in " + state.get() + " state"); } - // Create the executor for scheduling the reconnect tasks + // Create the executor for scheduling the reconnect tasks final Configuration configuration = target.getConfiguration(); if (executor == null) { executor = (ScheduledExecutorService)configuration .getProperty("scheduledExecutorService"); - + if (executor == null) { executor = Executors.newSingleThreadScheduledExecutor(); managedExecutor = false; /* we manage lifecycle */ } } - + final Object lastEventId = configuration.getProperty(HttpHeaders.LAST_EVENT_ID_HEADER); connect(lastEventId != null ? lastEventId.toString() : null); + // If a 503 was receieved during connect we might be in the "Connecting" state, however + // the isOpened flag will need to be set indicating that the eventsource has been opened + // and not yet closed. + opened = true; } private void connect(String lastEventId) { final InboundSseEventListenerDelegate delegate = new InboundSseEventListenerDelegate(); Response response = null; - + try { Invocation.Builder builder = target.request(MediaType.SERVER_SENT_EVENTS); if (lastEventId != null) { @@ -190,11 +205,12 @@ private void connect(String lastEventId) { } response = builder.get(); - // A client can be told to stop reconnecting using the HTTP 204 No Content + // A client can be told to stop reconnecting using the HTTP 204 No Content // response code. In this case, we should give up. final int status = response.getStatus(); if (status == 204) { LOG.fine("SSE endpoint " + target.getUri() + " returns no data, disconnecting"); + delegate.onComplete(); state.set(SseSourceState.CLOSED); response.close(); return; @@ -206,6 +222,52 @@ private void connect(String lastEventId) { throw ExceptionUtils.toWebApplicationException(response); } + // A client can be told to trigger a reconnect delay via a HTTP 503 Service Unavailable response code. + if (status == 503) { + LOG.fine("SSE endpoint " + target.getUri() + " returns 503"); + MultivaluedMap headerMap = response.getHeaders(); + // There should only be one header entry + Object retryAfter = headerMap.getFirst(HttpHeaders.RETRY_AFTER); + if (retryAfter != null) { + long retryAfterDelay = handleRetry((String) retryAfter); + delay = retryAfterDelay; + if (retryAfterDelay > -1) { + scheduleReconnect(retryAfterDelay, lastEventId); + response.close(); + return; + } + + } + } + + String contentType = response.getHeaderString(HttpHeaders.CONTENT_TYPE); + if (status != 200 || !MediaType.SERVER_SENT_EVENTS.equals(contentType)) { + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "Received " + status + " Content-Type=" + contentType); + } + final Response fResponse = response; + Throwable t; + if (!MediaType.SERVER_SENT_EVENTS.equals(contentType)) { + t = new WebApplicationException("Unexpected Content-Type in response", response); + } else { + t = AccessController.doPrivileged((PrivilegedExceptionAction) () -> { + @SuppressWarnings("unchecked") + Class throwableClass = (Class) ExceptionUtils + .getWebApplicationExceptionClass(fResponse, WebApplicationException.class); + Constructor ctor; + try { + ctor = throwableClass.getConstructor(Response.class); + } catch (NoSuchMethodException ex) { + ctor = null; + } + return ctor == null ? throwableClass.newInstance() : ctor.newInstance(fResponse); + }); + } + + delegate.onError(t); + response.close(); + return; + } // Should not happen but if close() was called from another thread, we could // end up there. if (state.get() == SseSourceState.CLOSED) { @@ -213,29 +275,32 @@ private void connect(String lastEventId) { response.close(); return; } - - final Endpoint endpoint = WebClient.getConfig(target).getEndpoint(); - // Create new processor if this is the first time or the old one has been closed + + // Create new processor if this is the first time or the old one has been closed if (processor == null || processor.isClosed()) { + final Endpoint endpoint = WebClient.getConfig(target).getEndpoint(); LOG.fine("Creating new instance of SSE event processor ..."); processor = new InboundSseEventProcessor(endpoint, delegate); } - + // Start consuming events processor.run(response); LOG.fine("SSE event processor has been started ..."); - + if (!state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.OPEN)) { throw new IllegalStateException("The SseEventSource is already in " + state.get() + " state"); } - + LOG.fine("Successfuly opened SSE connection to " + target.getUri()); } catch (final Exception ex) { + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "caught exception in connect(...)", ex); + } if (processor != null) { processor.close(1, TimeUnit.SECONDS); processor = null; } - + if (response != null) { response.close(); } @@ -246,29 +311,85 @@ private void connect(String lastEventId) { } } + // return the milliseconds to delay before reconnecting; -1 means don't reconnect + private long handleRetry(String retryValue) { + + // RETRY_AFTER is a String that can either correspond to seconds (long) + // or a HTTP-Date (which can be one of 7 variations)" + if (!(retryValue.contains(":"))) { + // Must be a long since all dates include ":" + try { + Long retryLong = Long.valueOf(retryValue); + // The RETRY_AFTER value is in seconds so change units + return TimeUnit.MILLISECONDS.convert(retryLong.longValue(), TimeUnit.SECONDS); + } catch (NumberFormatException e) { + LOG.fine("SSE RETRY_AFTER Incorrect time value: " + e); + } + } else { + char[] retryValueArray = retryValue.toCharArray(); + // handle date + try { + SimpleDateFormat sdf = null; + // Determine the appropriate HTTP-Date pattern + if (retryValueArray[3] == ',') { + sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z"); // RTC 822, updated by RFC 1123 + } else if (retryValueArray[6] == ',') { + sdf = new SimpleDateFormat("EEEEEE, dd-MMM-yy HH:mm:ss z"); // RFC 850, obsoleted by RFC 1036 + } else if (retryValueArray[7] == ',') { + sdf = new SimpleDateFormat("EEEEEEE, dd-MMM-yy HH:mm:ss z"); // RFC 850, obsoleted by RFC 1036 + } else if (retryValueArray[8] == ',') { + sdf = new SimpleDateFormat("EEEEEEEE, dd-MMM-yy HH:mm:ss z"); // RFC 850, obsoleted by RFC 1036 + } else if (retryValueArray[9] == ',') { + sdf = new SimpleDateFormat("EEEEEEEEE, dd-MMM-yy HH:mm:ss z"); // RFC 850, obsoleted by RFC 1036 + } else if (retryValueArray[8] == ',') { + sdf = new SimpleDateFormat("EEEEEEEE, dd-MMM-yy HH:mm:ss z"); // RFC 850, obsoleted by RFC 1036 + } else if (retryValueArray[8] == ' ') { + sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); // ANSI C's asctime() format + } else { + sdf = new SimpleDateFormat("EEE MMM dd HH:mm:ss yyyy"); // ANSI C's asctime() format + } + Date retryDate = sdf.parse(retryValue); + + long retryTime = retryDate.getTime(); + long now = System.currentTimeMillis(); + long delayTime = retryTime - now; + if (delayTime > 0) { + return delayTime; // HTTP Date is in milliseconds + } + LOG.fine("SSE RETRY_AFTER Date value represents a time already past"); + } catch (IllegalArgumentException ex) { + LOG.fine("SSE RETRY_AFTER Date value format incorrect: " + ex); + } catch (ParseException e2) { + LOG.fine("SSE RETRY_AFTER Date value cannot be parsed: " + e2); + } + } + return -1L; + } + @Override public boolean isOpen() { - return state.get() == SseSourceState.OPEN; + return opened; } @Override public boolean close(long timeout, TimeUnit tunit) { + opened = false; if (state.get() == SseSourceState.CLOSED) { return true; } - + if (state.compareAndSet(SseSourceState.CONNECTING, SseSourceState.CLOSED)) { LOG.fine("The SseEventSource was not connected, closing anyway"); } else if (!state.compareAndSet(SseSourceState.OPEN, SseSourceState.CLOSED)) { throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get() + " state"); } - + if (executor != null && !managedExecutor) { AccessController.doPrivileged((PrivilegedAction) () -> { - executor.shutdown(); + executor.shutdownNow(); return null; }); - + executor = null; managedExecutor = true; } @@ -277,39 +398,43 @@ public boolean close(long timeout, TimeUnit tunit) { if (processor == null) { return true; } - - return processor.close(timeout, tunit); + + return processor.close(timeout, tunit); } - - private void scheduleReconnect(long tdelay, TimeUnit tunit, String lastEventId) { + + private void scheduleReconnect(long tdelay, String lastEventId) { // If delay == RECONNECT_NOT_SET, no reconnection attempt should be performed if (tdelay < 0 || executor == null) { return; } - + // If the event source is already closed, do nothing if (state.get() == SseSourceState.CLOSED) { return; } - + // If the connection was still on connecting state, just try to reconnect - if (state.get() != SseSourceState.CONNECTING) { + if (state.get() != SseSourceState.CONNECTING) { LOG.fine("The SseEventSource is still opened, moving it to connecting state"); if (!state.compareAndSet(SseSourceState.OPEN, SseSourceState.CONNECTING)) { throw new IllegalStateException("The SseEventSource is not opened, but in " + state.get() + " state, unable to reconnect"); } } - + executor.schedule(() -> { // If we are still in connecting state (not closed/open), let's try to reconnect if (state.get() == SseSourceState.CONNECTING) { LOG.fine("Reestablishing SSE connection to " + target.getUri()); connect(lastEventId); } - }, tdelay, tunit); - - LOG.fine("The reconnection attempt to " + target.getUri() + " is scheduled in " - + tunit.toMillis(tdelay) + "ms"); + }, tdelay, TimeUnit.MILLISECONDS); + + LOG.fine("The reconnection attempt to " + target.getUri() + " is scheduled in " + tdelay + "ms"); + } + + // used for testing + SseSourceState getState() { + return state.get(); } } diff --git a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java index 3926febb87f..911ec5265b9 100644 --- a/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java +++ b/rt/rs/sse/src/test/java/org/apache/cxf/jaxrs/sse/client/SseEventSourceImplTest.java @@ -41,6 +41,7 @@ import org.apache.cxf.endpoint.Server; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.sse.client.SseEventSourceImpl.SseSourceState; import org.junit.After; import org.junit.AfterClass; @@ -118,50 +119,70 @@ public void tearDown() throws InterruptedException { @Test public void testNoReconnectWhenNoContentIsReturned() { - try (SseEventSource eventSource = withNoReconnect(Type.NO_CONTENT)) { + SseEventSource eventSource = withNoReconnect(Type.NO_CONTENT); + try (SseEventSource ses = eventSource) { eventSource.open(); - assertThat(eventSource.isOpen(), equalTo(false)); + if (eventSource instanceof SseEventSourceImpl) { + assertThat(((SseEventSourceImpl)eventSource).getState(), equalTo(SseSourceState.CLOSED)); + } assertThat(events.size(), equalTo(0)); } + assertThat(eventSource.isOpen(), equalTo(false)); } - @Test - public void testReuseSameEventSourceSeveralTimes() { - try (SseEventSource eventSource = withNoReconnect(Type.NO_CONTENT)) { + @Test(expected = IllegalStateException.class) + public void testReuseSameEventSourceThrowsIllegalStateException() { + SseEventSource eventSource = withReconnect(Type.NO_SERVER); + try (SseEventSource ses = eventSource) { eventSource.open(); - assertThat(eventSource.isOpen(), equalTo(false)); - - eventSource.open(); - assertThat(eventSource.isOpen(), equalTo(false)); + if (eventSource instanceof SseEventSourceImpl) { + assertThat(((SseEventSourceImpl) eventSource).getState(), equalTo(SseSourceState.CONNECTING)); + } + assertThat(eventSource.isOpen(), equalTo(true)); + eventSource.open(); // should throw IllegalStateException + } finally { assertThat(events.size(), equalTo(0)); + assertThat(eventSource.isOpen(), equalTo(false)); } } @Test public void testReconnectWillBeScheduledOnError() throws InterruptedException { - try (SseEventSource eventSource = withReconnect(Type.NO_SERVER)) { + SseEventSource eventSource = withReconnect(Type.NO_SERVER); + try (SseEventSource ses = eventSource) { eventSource.open(); - assertThat(eventSource.isOpen(), equalTo(false)); + if (eventSource instanceof SseEventSourceImpl) { + assertThat(((SseEventSourceImpl) eventSource).getState(), equalTo(SseSourceState.CONNECTING)); + } // Sleep a little bit for reconnect to reschedule Thread.sleep(150L); assertThat(errors.size(), equalTo(2)); } + assertThat(eventSource.isOpen(), equalTo(false)); } @Test public void testNoReconnectWillBeScheduledWhenClosed() throws InterruptedException { - try (SseEventSource eventSource = withReconnect(Type.NO_SERVER)) { + SseEventSource eventSource = withReconnect(Type.NO_SERVER); + try (SseEventSource ses = eventSource) { eventSource.open(); - assertThat(eventSource.isOpen(), equalTo(false)); + if (eventSource instanceof SseEventSourceImpl) { + assertThat(((SseEventSourceImpl)eventSource).getState(), equalTo(SseSourceState.CONNECTING)); + } + assertThat(eventSource.isOpen(), equalTo(true)); eventSource.close(1L, TimeUnit.SECONDS); + assertThat(eventSource.isOpen(), equalTo(false)); // Sleep a little bit to make sure for reconnect to reschedule (after 100ms) Thread.sleep(150L); assertThat(errors.size(), equalTo(1)); } + if (eventSource instanceof SseEventSourceImpl) { + assertThat(((SseEventSourceImpl)eventSource).getState(), equalTo(SseSourceState.CLOSED)); + } } @Test @@ -301,30 +322,38 @@ public void testReconnectAndTwoEventsReceived() throws InterruptedException, IOE @Test public void testReconnectAndNotAuthorized() throws InterruptedException, IOException { - try (SseEventSource eventSource = withReconnect(Type.EVENT_NOT_AUTHORIZED)) { + SseEventSource eventSource = withReconnect(Type.EVENT_NOT_AUTHORIZED); + try (SseEventSource ses = eventSource) { eventSource.open(); - assertThat(eventSource.isOpen(), equalTo(false)); + if (eventSource instanceof SseEventSourceImpl) { + assertThat(((SseEventSourceImpl) eventSource).getState(), equalTo(SseSourceState.CONNECTING)); + } + assertThat(eventSource.isOpen(), equalTo(true)); assertThat(errors.size(), equalTo(1)); // Allow the event processor to pull for events (150ms) Thread.sleep(150L); } - + assertThat(eventSource.isOpen(), equalTo(false)); assertThat(errors.size(), equalTo(2)); assertThat(events.size(), equalTo(0)); } @Test public void testNoReconnectAndNotAuthorized() throws InterruptedException, IOException { - try (SseEventSource eventSource = withNoReconnect(Type.EVENT_NOT_AUTHORIZED)) { + SseEventSource eventSource = withNoReconnect(Type.EVENT_NOT_AUTHORIZED); + try (SseEventSource ses = eventSource) { eventSource.open(); - assertThat(eventSource.isOpen(), equalTo(false)); + if (eventSource instanceof SseEventSourceImpl) { + assertThat(((SseEventSourceImpl) eventSource).getState(), equalTo(SseSourceState.CONNECTING)); + } assertThat(errors.size(), equalTo(1)); // Allow the event processor to pull for events (150ms) Thread.sleep(150L); } - + + assertThat(eventSource.isOpen(), equalTo(false)); assertThat(errors.size(), equalTo(1)); assertThat(events.size(), equalTo(0)); } @@ -363,16 +392,19 @@ public void testInvalidReconnectDelayInTheEvent() throws InterruptedException, I @Test public void testTryToCloseWhileConnecting() throws ExecutionException, InterruptedException { - try (SseEventSource eventSource = withNoReconnect(Type.BUSY)) { + SseEventSource eventSource = withNoReconnect(Type.BUSY); + try (SseEventSource ses = eventSource) { final Future future = executor.submit(() -> eventSource.open()); // Wait a bit for open() to advance Thread.sleep(50L); eventSource.close(); + assertThat(eventSource.isOpen(), equalTo(false)); assertThat(future.get(), equalTo(null)); - assertThat(eventSource.isOpen(), equalTo(false)); + assertThat(eventSource.isOpen(), equalTo(true)); // now re-opened } + assertThat(eventSource.isOpen(), equalTo(false)); } private SseEventSource withNoReconnect(Type type) {