Skip to content
This repository was archived by the owner on May 21, 2020. It is now read-only.

Commit 7c6d796

Browse files
committed
Refactor Source and remove unused code
1 parent 485201b commit 7c6d796

File tree

12 files changed

+139
-194
lines changed

12 files changed

+139
-194
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ In order to use it, you need to provide a Worker builder and the Source settings
3636
```scala
3737
val workerSourceSettings = KinesisWorkerSourceSettings(
3838
bufferSize = 1000,
39-
checkWorkerPeriodicity = 1 minute)
39+
terminateStreamGracePeriod = 1 minute)
4040
val builder: IRecordProcessorFactory => Worker = { recordProcessorFactory =>
4141
new Worker.Builder()
4242
.recordProcessorFactory(recordProcessorFactory)
@@ -71,7 +71,7 @@ In order to use the Flow you can provide additional settings:
7171

7272
```scala
7373
val checkpointSettings = KinesisWorkerCheckpointSettings(100, 30 seconds)
74-
KinesisWorker(builder, workerSourceSettings)
74+
KinesisWorkerSource(builder, workerSourceSettings)
7575
.via(KinesisWorker.checkpointRecordsFlow(checkpointSettings))
7676
.to(Sink.ignore)
7777
KinesisWorker(builder, workerSourceSettings).to(

src/main/scala/aserralle/akka/stream/kcl/worker/CommittableRecord.scala src/main/scala/aserralle/akka/stream/kcl/CommittableRecord.scala

+8-3
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
* Copyright (C) 2018 Albert Serrallé
33
*/
44

5-
package aserralle.akka.stream.kcl.worker
5+
package aserralle.akka.stream.kcl
66

7+
import akka.Done
78
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
89
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
910
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
@@ -26,13 +27,17 @@ class CommittableRecord(
2627
recordProcessor.shutdown
2728
def canBeCheckpointed(): Boolean =
2829
recordProcessorShutdownReason().isEmpty
29-
def checkpoint(): Future[Unit] =
30-
Future(checkpointer.checkpoint(record))
30+
def tryToCheckpoint(): Future[Done] =
31+
Future {
32+
checkpointer.checkpoint(record)
33+
Done
34+
}
3135

3236
}
3337

3438
object CommittableRecord {
3539

40+
// Only makes sense to compare Records belonging to the same shard
3641
implicit val orderBySequenceNumber: Ordering[CommittableRecord] =
3742
Ordering.by(_.sequenceNumber)
3843

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright (C) 2018 Albert Serrallé
3+
*/
4+
5+
package aserralle.akka.stream.kcl
6+
7+
import scala.util.control.NoStackTrace
8+
9+
object Errors {
10+
11+
sealed trait KinesisWorkerSourceError extends NoStackTrace
12+
case class WorkerUnexpectedShutdown(cause: Throwable)
13+
extends KinesisWorkerSourceError
14+
15+
}

src/main/scala/aserralle/akka/stream/kcl/worker/IRecordProcessor.scala src/main/scala/aserralle/akka/stream/kcl/IRecordProcessor.scala

+20-7
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
* Copyright (C) 2018 Albert Serrallé
33
*/
44

5-
package aserralle.akka.stream.kcl.worker
5+
package aserralle.akka.stream.kcl
66

7+
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
78
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
89
import com.amazonaws.services.kinesis.clientlibrary.types.{
910
ExtendedSequenceNumber,
@@ -14,22 +15,25 @@ import com.amazonaws.services.kinesis.clientlibrary.types.{
1415

1516
import scala.collection.JavaConverters._
1617
import scala.concurrent.ExecutionContext
18+
import scala.concurrent.duration.FiniteDuration
1719

1820
private[kcl] class IRecordProcessor(
19-
callback: CommittableRecord => Unit
21+
callback: CommittableRecord => Unit,
22+
terminateStreamGracePeriod: FiniteDuration
2023
)(implicit executionContext: ExecutionContext)
2124
extends com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor {
2225
private var shardId: String = _
2326
private var extendedSequenceNumber: ExtendedSequenceNumber = _
24-
2527
var shutdown: Option[ShutdownReason] = None
28+
var latestCheckpointer: Option[IRecordProcessorCheckpointer] = None
2629

2730
override def initialize(initializationInput: InitializationInput): Unit = {
28-
this.shardId = initializationInput.getShardId
29-
this.extendedSequenceNumber = initializationInput.getExtendedSequenceNumber
31+
shardId = initializationInput.getShardId
32+
extendedSequenceNumber = initializationInput.getExtendedSequenceNumber
3033
}
3134

32-
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit =
35+
override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
36+
latestCheckpointer = Some(processRecordsInput.getCheckpointer)
3337
processRecordsInput.getRecords.asScala.foreach { record =>
3438
callback(
3539
new CommittableRecord(
@@ -42,8 +46,17 @@ private[kcl] class IRecordProcessor(
4246
)
4347
)
4448
}
49+
}
4550

46-
override def shutdown(shutdownInput: ShutdownInput): Unit =
51+
override def shutdown(shutdownInput: ShutdownInput): Unit = {
4752
shutdown = Some(shutdownInput.getShutdownReason)
53+
latestCheckpointer = Some(shutdownInput.getCheckpointer)
54+
shutdownInput.getShutdownReason match {
55+
case ShutdownReason.TERMINATE =>
56+
Thread.sleep(terminateStreamGracePeriod.toMillis)
57+
case ShutdownReason.ZOMBIE => ()
58+
case ShutdownReason.REQUESTED => ()
59+
}
60+
}
4861

4962
}

src/main/scala/aserralle/akka/stream/kcl/KinesisErrors.scala

-31
This file was deleted.

src/main/scala/aserralle/akka/stream/kcl/KinesisWorkerSettings.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import scala.concurrent.duration._
88

99
case class KinesisWorkerSourceSettings(
1010
bufferSize: Int,
11-
checkWorkerPeriodicity: FiniteDuration) {
11+
terminateStreamGracePeriod: FiniteDuration) {
1212
require(
1313
bufferSize >= 1,
1414
"Buffer size must be greater than 0; use size 1 to disable stage buffering"
@@ -29,10 +29,9 @@ object KinesisWorkerSourceSettings {
2929
/**
3030
* Java API
3131
*/
32-
def create(
33-
bufferSize: Int,
34-
checkWorkerPeriodicity: FiniteDuration): KinesisWorkerSourceSettings =
35-
KinesisWorkerSourceSettings(bufferSize, checkWorkerPeriodicity)
32+
def create(bufferSize: Int, terminateStreamGracePeriod: FiniteDuration)
33+
: KinesisWorkerSourceSettings =
34+
KinesisWorkerSourceSettings(bufferSize, terminateStreamGracePeriod)
3635

3736
}
3837

src/main/scala/aserralle/akka/stream/kcl/KinesisWorkerSourceStage.scala

-87
This file was deleted.

src/main/scala/aserralle/akka/stream/kcl/javadsl/KinesisWorker.scala src/main/scala/aserralle/akka/stream/kcl/javadsl/KinesisWorkerSource.scala

+7-8
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,15 @@ package aserralle.akka.stream.kcl.javadsl
77
import java.util.concurrent.Executor
88

99
import akka.NotUsed
10-
import aserralle.akka.stream.kcl.worker.CommittableRecord
11-
import aserralle.akka.stream.kcl.{scaladsl, _}
10+
import aserralle.akka.stream.kcl.{CommittableRecord, scaladsl, _}
1211
import akka.stream.javadsl.{Flow, Sink, Source}
1312
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
1413
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
1514
import com.amazonaws.services.kinesis.model.Record
1615

1716
import scala.concurrent.ExecutionContext
1817

19-
object KinesisWorker {
18+
object KinesisWorkerSource {
2019

2120
abstract class WorkerBuilder {
2221
def build(r: IRecordProcessorFactory): Worker
@@ -26,32 +25,32 @@ object KinesisWorker {
2625
workerBuilder: WorkerBuilder,
2726
settings: KinesisWorkerSourceSettings,
2827
workerExecutor: Executor
29-
): Source[CommittableRecord, NotUsed] =
30-
scaladsl.KinesisWorker
28+
): Source[CommittableRecord, Worker] =
29+
scaladsl.KinesisWorkerSource
3130
.apply(workerBuilder.build, settings)(
3231
ExecutionContext.fromExecutor(workerExecutor))
3332
.asJava
3433

3534
def create(
3635
workerBuilder: WorkerBuilder,
3736
workerExecutor: Executor
38-
): Source[CommittableRecord, NotUsed] =
37+
): Source[CommittableRecord, Worker] =
3938
create(workerBuilder,
4039
KinesisWorkerSourceSettings.defaultInstance,
4140
workerExecutor)
4241

4342
def checkpointRecordsFlow(
4443
settings: KinesisWorkerCheckpointSettings
4544
): Flow[CommittableRecord, Record, NotUsed] =
46-
scaladsl.KinesisWorker.checkpointRecordsFlow(settings).asJava
45+
scaladsl.KinesisWorkerSource.checkpointRecordsFlow(settings).asJava
4746

4847
def checkpointRecordsFlow(): Flow[CommittableRecord, Record, NotUsed] =
4948
checkpointRecordsFlow(KinesisWorkerCheckpointSettings.defaultInstance)
5049

5150
def checkpointRecordsSink(
5251
settings: KinesisWorkerCheckpointSettings
5352
): Sink[CommittableRecord, NotUsed] =
54-
scaladsl.KinesisWorker.checkpointRecordsSink(settings).asJava
53+
scaladsl.KinesisWorkerSource.checkpointRecordsSink(settings).asJava
5554

5655
def checkpointRecordsSink(): Sink[CommittableRecord, NotUsed] =
5756
checkpointRecordsSink(KinesisWorkerCheckpointSettings.defaultInstance)

0 commit comments

Comments
 (0)