This repository was archived by the owner on May 21, 2020. It is now read-only.
File tree 3 files changed +8
-8
lines changed
main/scala/aserralle/akka/stream/kcl
test/scala/aserralle/akka/stream/kcl
3 files changed +8
-8
lines changed Original file line number Diff line number Diff line change 4
4
5
5
package aserralle .akka .stream .kcl
6
6
7
- import scala .util .control .NoStackTrace
8
-
9
7
object Errors {
8
+ sealed class KinesisWorkerSourceError (err : Throwable ) extends Throwable (err)
10
9
11
- sealed trait KinesisWorkerSourceError extends NoStackTrace
12
10
case class WorkerUnexpectedShutdown (cause : Throwable )
13
- extends KinesisWorkerSourceError
14
-
15
- case object BackpressureTimeout extends KinesisWorkerSourceError
11
+ extends KinesisWorkerSourceError (cause)
16
12
13
+ case class BackpressureTimeout (cause : Throwable )
14
+ extends KinesisWorkerSourceError (cause)
17
15
}
Original file line number Diff line number Diff line change @@ -53,7 +53,7 @@ object KinesisWorkerSource {
53
53
(Exception .nonFatalCatch either Await .result(
54
54
queue.offer(record),
55
55
settings.backpressureTimeout) left)
56
- .foreach(_ => queue.fail(BackpressureTimeout ))
56
+ .foreach(err => queue.fail(BackpressureTimeout (err) ))
57
57
semaphore.release()
58
58
},
59
59
settings.terminateStreamGracePeriod
Original file line number Diff line number Diff line change @@ -202,7 +202,9 @@ class KinesisWorkerSourceSourceSpec
202
202
203
203
Await .ready(watch, 5 .seconds)
204
204
val Failure (exception) = watch.value.get
205
- assert(exception == BackpressureTimeout )
205
+ assert(
206
+ exception.getCause.getMessage
207
+ .contains(" Futures timed out after [100 milliseconds]" ))
206
208
207
209
killSwitch.shutdown()
208
210
}
You can’t perform that action at this time.
0 commit comments