Skip to content

Commit c29f751

Browse files
committedFeb 18, 2019
InternalProcessorContext Contract et al.
1 parent bec4303 commit c29f751

27 files changed

+183
-88
lines changed
 

‎SUMMARY.adoc

+9-9
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@
140140
. link:kafka-streams-internals-CacheFlushListener.adoc[CacheFlushListener]
141141

142142
. link:kafka-streams-StreamsMetrics.adoc[StreamsMetrics]
143-
.. link:kafka-streams-StreamsMetricsImpl.adoc[StreamsMetricsImpl]
143+
.. link:kafka-streams-internals-StreamsMetricsImpl.adoc[StreamsMetricsImpl]
144144
.. link:kafka-streams-StreamsMetricsThreadImpl.adoc[StreamsMetricsThreadImpl]
145145

146146
. link:kafka-streams-StreamTask-TaskMetrics.adoc[TaskMetrics]
@@ -333,12 +333,13 @@
333333
.. link:kafka-streams-internals-TaskAssignor.adoc[TaskAssignor Contract]
334334
... link:kafka-streams-internals-StickyTaskAssignor.adoc[StickyTaskAssignor]
335335

336-
. link:kafka-streams-internals-AbstractProcessorContext.adoc[AbstractProcessorContext -- Base Of Internal Processor Contexts]
336+
. link:kafka-streams-internals-InternalProcessorContext.adoc[InternalProcessorContext Contract]
337+
.. link:kafka-streams-internals-AbstractProcessorContext.adoc[AbstractProcessorContext -- Base Of Internal Processor Contexts]
337338
.. link:kafka-streams-internals-GlobalProcessorContextImpl.adoc[GlobalProcessorContextImpl]
338-
.. link:kafka-streams-ProcessorContextImpl.adoc[ProcessorContextImpl]
339-
.. link:kafka-streams-StandbyContextImpl.adoc[StandbyContextImpl]
339+
.. link:kafka-streams-internals-ProcessorContextImpl.adoc[ProcessorContextImpl]
340+
.. link:kafka-streams-internals-StandbyContextImpl.adoc[StandbyContextImpl]
340341

341-
. link:kafka-streams-ThreadCache.adoc[ThreadCache]
342+
. link:kafka-streams-internals-ThreadCache.adoc[ThreadCache]
342343

343344
. link:kafka-streams-internals-GlobalStreamThread.adoc[GlobalStreamThread]
344345
.. link:kafka-streams-StateConsumer.adoc[StateConsumer]
@@ -350,7 +351,7 @@
350351
. link:kafka-streams-TimestampTracker.adoc[TimestampTracker]
351352
.. link:kafka-streams-MinTimestampTracker.adoc[MinTimestampTracker]
352353

353-
. link:kafka-streams-RecordQueue.adoc[RecordQueue]
354+
. link:kafka-streams-internals-RecordQueue.adoc[RecordQueue]
354355
.. link:kafka-streams-StampedRecord.adoc[StampedRecord -- Orderable Kafka ConsumerRecords At Timestamp]
355356

356357
. link:kafka-streams-PunctuationQueue.adoc[PunctuationQueue]
@@ -364,19 +365,18 @@
364365

365366
. link:kafka-streams-RecordDeserializer.adoc[RecordDeserializer]
366367

367-
. link:kafka-streams-PartitionGroup.adoc[PartitionGroup]
368+
. link:kafka-streams-internals-PartitionGroup.adoc[PartitionGroup]
368369
.. link:kafka-streams-internals-RecordInfo.adoc[RecordInfo]
369370

370371
. link:kafka-streams-internals-StateDirectory.adoc[StateDirectory]
371372
. link:kafka-streams-internals-ProcessorRecordContext.adoc[ProcessorRecordContext]
372-
. link:kafka-streams-internals-InternalProcessorContext.adoc[InternalProcessorContext]
373373

374374
. link:kafka-streams-internals-CopartitionedTopicsValidator.adoc[CopartitionedTopicsValidator]
375375

376376
=== State (Store) Management
377377

378378
. link:kafka-streams-internals-StateManager.adoc[StateManager Contract -- State Store Managers]
379-
.. link:kafka-streams-AbstractStateManager.adoc[AbstractStateManager]
379+
.. link:kafka-streams-internals-AbstractStateManager.adoc[AbstractStateManager]
380380

381381
. link:kafka-streams-ProcessorStateManager.adoc[ProcessorStateManager]
382382
. link:kafka-streams-GlobalStateManager.adoc[GlobalStateManager]

‎kafka-streams-ProcessorStateManager.adoc

+8-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
== [[ProcessorStateManager]] ProcessorStateManager
22

3-
`ProcessorStateManager` is a concrete <<kafka-streams-internals-StateManager.adoc#, StateManager>> (as a <<kafka-streams-AbstractStateManager.adoc#, AbstractStateManager>>) that...FIXME
3+
`ProcessorStateManager` is a concrete <<kafka-streams-internals-StateManager.adoc#, StateManager>> (as a <<kafka-streams-internals-AbstractStateManager.adoc#, AbstractStateManager>>) that...FIXME
44

55
`ProcessorStateManager` is <<creating-instance, created>> exclusively when `AbstractTask` is <<kafka-streams-internals-AbstractTask.adoc#stateMgr, created>> (for <<kafka-streams-internals-StandbyTask.adoc#, StandbyTask>> and <<kafka-streams-internals-StreamTask.adoc#, StreamTask>> tasks).
66

@@ -81,7 +81,7 @@ NOTE: `register` is part of link:kafka-streams-internals-StateManager.adoc#regis
8181
Registering state store [storeName] to its state manager
8282
```
8383

84-
`register` finds the <<kafka-streams-StateStore.adoc#, StateStore>> in the <<storeToChangelogTopic, storeToChangelogTopic>> internal registry. If not found, `register` registers the `StateStore` (in the <<kafka-streams-AbstractStateManager.adoc#stores, stores>> registry) and returns.
84+
`register` finds the <<kafka-streams-StateStore.adoc#, StateStore>> in the <<storeToChangelogTopic, storeToChangelogTopic>> internal registry. If not found, `register` registers the `StateStore` (in the <<kafka-streams-internals-AbstractStateManager.adoc#stores, stores>> registry) and returns.
8585

8686
If found however, `register` uses the topic to create a Kafka `TopicPartition` (with the <<getPartition, partition>>).
8787

@@ -97,15 +97,15 @@ If the `ProcessorStateManager` is not in <<isStandby, standby>> mode, `register`
9797
Restoring state store [storeName] from changelog topic [topic]
9898
```
9999

100-
`register` adds the (store) partition to the <<changelogPartitions, changelogPartitions>> and registers the `StateStore` (in the <<kafka-streams-AbstractStateManager.adoc#stores, stores>> registry).
100+
`register` adds the (store) partition to the <<changelogPartitions, changelogPartitions>> and registers the `StateStore` (in the <<kafka-streams-internals-AbstractStateManager.adoc#stores, stores>> registry).
101101

102102
`register` throws an `IllegalArgumentException` if the name of the <<kafka-streams-StateStore.adoc#, StateStore>> is *.checkpoint*.
103103

104104
```
105105
Illegal store name: .checkpoint
106106
```
107107

108-
`register` throws an `IllegalArgumentException` if the <<kafka-streams-StateStore.adoc#, StateStore>> is already registered (in the <<kafka-streams-AbstractStateManager.adoc#stores, stores>> registry).
108+
`register` throws an `IllegalArgumentException` if the <<kafka-streams-StateStore.adoc#, StateStore>> is already registered (in the <<kafka-streams-internals-AbstractStateManager.adoc#stores, stores>> registry).
109109

110110
```
111111
Store [storeName] has already been registered.
@@ -236,19 +236,19 @@ void checkpoint(final Map<TopicPartition, Long> checkpointableOffsets)
236236

237237
NOTE: `checkpoint` is part of the <<kafka-streams-Checkpointable.adoc#checkpoint, Checkpointable Contract>> to checkpoint offsets.
238238

239-
`checkpoint` requests the <<changelogReader, ChangelogReader>> for <<kafka-streams-ChangelogReader.adoc#restoredOffsets, restoredOffsets>> and adds them to the <<kafka-streams-AbstractStateManager.adoc#checkpointableOffsets, checkpointableOffsets>> registry.
239+
`checkpoint` requests the <<changelogReader, ChangelogReader>> for <<kafka-streams-ChangelogReader.adoc#restoredOffsets, restoredOffsets>> and adds them to the <<kafka-streams-internals-AbstractStateManager.adoc#checkpointableOffsets, checkpointableOffsets>> registry.
240240

241-
For every <<kafka-streams-StateStore.adoc#, state store>> (in the <<kafka-streams-AbstractStateManager.adoc#stores, stores>> internal registry), `checkpoint`...FIXME
241+
For every <<kafka-streams-StateStore.adoc#, state store>> (in the <<kafka-streams-internals-AbstractStateManager.adoc#stores, stores>> internal registry), `checkpoint`...FIXME
242242

243-
`checkpoint` creates a new <<kafka-streams-internals-OffsetCheckpoint.adoc#, OffsetCheckpoint>> (with the <<kafka-streams-AbstractStateManager.adoc#CHECKPOINT_FILE_NAME, .checkpoint>> file in the <<kafka-streams-AbstractStateManager.adoc#baseDir, base directory>>) unless <<kafka-streams-AbstractStateManager.adoc#checkpoint, it was done already>>.
243+
`checkpoint` creates a new <<kafka-streams-internals-OffsetCheckpoint.adoc#, OffsetCheckpoint>> (with the <<kafka-streams-internals-AbstractStateManager.adoc#CHECKPOINT_FILE_NAME, .checkpoint>> file in the <<kafka-streams-internals-AbstractStateManager.adoc#baseDir, base directory>>) unless <<kafka-streams-internals-AbstractStateManager.adoc#checkpoint, it was done already>>.
244244

245245
`checkpoint` prints out the following TRACE message to the logs:
246246

247247
```
248248
Writing checkpoint: [checkpointableOffsets]
249249
```
250250

251-
In the end, `checkpoint` requests the <<kafka-streams-AbstractStateManager.adoc#checkpoint, OffsetCheckpoint>> to <<kafka-streams-internals-OffsetCheckpoint.adoc#write, write>> the <<kafka-streams-AbstractStateManager.adoc#checkpointableOffsets, checkpointableOffsets>>.
251+
In the end, `checkpoint` requests the <<kafka-streams-internals-AbstractStateManager.adoc#checkpoint, OffsetCheckpoint>> to <<kafka-streams-internals-OffsetCheckpoint.adoc#write, write>> the <<kafka-streams-internals-AbstractStateManager.adoc#checkpointableOffsets, checkpointableOffsets>>.
252252

253253
=== [[registerGlobalStateStores]] `registerGlobalStateStores` Method
254254

‎kafka-streams-StampedRecord.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ In other words, `StampedRecord` represents a <<record, Kafka ConsumerRecord>> at
66

77
`StampedRecord` is <<creating-instance, created>> when:
88

9-
* `RecordQueue` is requested to link:kafka-streams-RecordQueue.adoc#addRawRecords[add Kafka ConsumerRecords (as StampedRecords)]
9+
* `RecordQueue` is requested to link:kafka-streams-internals-RecordQueue.adoc#addRawRecords[add Kafka ConsumerRecords (as StampedRecords)]
1010
1111
* `StreamTask` is requested to link:kafka-streams-internals-StreamTask.adoc#punctuate[punctuate]
1212

‎kafka-streams-StateStore.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Closing the state store
2121

2222
Used when:
2323

24-
* `AbstractStateManager` is requested to <<kafka-streams-AbstractStateManager.adoc#reinitializeStateStoresForPartitions, reinitializeStateStoresForPartitions>>
24+
* `AbstractStateManager` is requested to <<kafka-streams-internals-AbstractStateManager.adoc#reinitializeStateStoresForPartitions, reinitializeStateStoresForPartitions>>
2525
2626
* <<kafka-streams-internals-GlobalStateManagerImpl.adoc#close, GlobalStateManagerImpl>> and <<kafka-streams-ProcessorStateManager.adoc#close, ProcessorStateManager>> are requested to close
2727
@@ -61,7 +61,7 @@ Initializes the state store
6161

6262
Used when:
6363

64-
* `AbstractStateManager` is requested to <<kafka-streams-AbstractStateManager.adoc#reinitializeStateStoresForPartitions, reinitializeStateStoresForPartitions>>
64+
* `AbstractStateManager` is requested to <<kafka-streams-internals-AbstractStateManager.adoc#reinitializeStateStoresForPartitions, reinitializeStateStoresForPartitions>>
6565
6666
* `AbstractTask` is requested to <<kafka-streams-internals-AbstractTask.adoc#registerStateStores, registerStateStores>>
6767

‎kafka-streams-StreamThread.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ NOTE: The input <<kafka-streams-StateRestoreListener.adoc#, StateRestoreListener
544544
* `thread.[clientId]-StreamThread-[STREAM_THREAD_ID]` link:kafka-streams-StreamsMetricsThreadImpl.adoc#prefix[prefix]
545545
* link:kafka-streams-StreamsMetricsThreadImpl.adoc#tags[Tags] with one entry with `client-id` and the `[clientId]-StreamThread-[STREAM_THREAD_ID]` value.
546546

547-
`create` creates a <<kafka-streams-ThreadCache.adoc#, ThreadCache>> (with `cacheSizeBytes` for the `maxCacheSizeBytes` and the `StreamsMetricsThreadImpl`).
547+
`create` creates a <<kafka-streams-internals-ThreadCache.adoc#, ThreadCache>> (with `cacheSizeBytes` for the `maxCacheSizeBytes` and the `StreamsMetricsThreadImpl`).
548548

549549
`create` creates a link:kafka-streams-internals-TaskCreator.adoc#creating-instance[TaskCreator] and a link:kafka-streams-internals-StandbyTaskCreator.adoc#creating-instance[StandbyTaskCreator] that are used exclusively to create a link:kafka-streams-internals-TaskManager.adoc#creating-instance[TaskManager] (with a new link:kafka-streams-AssignedStreamsTasks.adoc#creating-instance[AssignedStreamsTasks] and link:kafka-streams-AssignedStandbyTasks.adoc#creating-instance[AssignedStandbyTasks] as well as the given <<kafka-streams-StreamsMetadataState.adoc#, StreamsMetadataState>>).
550550

‎kafka-streams-StreamsMetrics.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,4 @@ Used when...FIXME
5757
|===
5858

5959
[[implementations]]
60-
NOTE: link:kafka-streams-StreamsMetricsImpl.adoc[StreamsMetricsImpl] is the one and only known direct implementation of <<contract, StreamsMetrics Contract>> in Kafka Streams {{ book.kafka_version }}.
60+
NOTE: link:kafka-streams-internals-StreamsMetricsImpl.adoc[StreamsMetricsImpl] is the one and only known direct implementation of <<contract, StreamsMetrics Contract>> in Kafka Streams {{ book.kafka_version }}.

‎kafka-streams-StreamsMetricsThreadImpl.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
== [[StreamsMetricsThreadImpl]] StreamsMetricsThreadImpl
22

3-
`StreamsMetricsThreadImpl` is a concrete <<kafka-streams-StreamsMetricsImpl.adoc#, StreamsMetricsImpl>>.
3+
`StreamsMetricsThreadImpl` is a concrete <<kafka-streams-internals-StreamsMetricsImpl.adoc#, StreamsMetricsImpl>>.
44

55
`StreamsMetricsThreadImpl` is <<creating-instance, created>> for a link:kafka-streams-StreamThread.adoc#create[StreamThread] (when `KafkaStreams` is link:kafka-streams-KafkaStreams.adoc#creating-instance[created]).
66

‎kafka-streams-TimestampExtractor.adoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ interface TimestampExtractor {
1212
}
1313
----
1414

15-
The extracted timestamp is in milliseconds and can never be negative (or <<kafka-streams-RecordQueue.adoc#addRawRecords, will be dropped>>).
15+
The extracted timestamp is in milliseconds and can never be negative (or <<kafka-streams-internals-RecordQueue.adoc#addRawRecords, will be dropped>>).
1616

1717
You can define a custom timestamp extractor for reading a topic as a <<kafka-streams-StreamsBuilder.adoc#stream, KStream>> or a <<kafka-streams-StreamsBuilder.adoc#table, KTable>> in an <<kafka-streams-Consumed.adoc#, Consumed>> object (using <<kafka-streams-Consumed.adoc#with, with>> or <<kafka-streams-Consumed.adoc#withTimestampExtractor, withTimestampExtractor>>).
1818

@@ -46,7 +46,7 @@ props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ...)
4646
| Description
4747

4848
| `extract`
49-
| [[extract]] Used exclusively when `RecordQueue` is requested to link:kafka-streams-RecordQueue.adoc#addRawRecords[add Kafka ConsumerRecords (as StampedRecords)].
49+
| [[extract]] Used exclusively when `RecordQueue` is requested to link:kafka-streams-internals-RecordQueue.adoc#addRawRecords[add Kafka ConsumerRecords (as StampedRecords)].
5050
|===
5151

5252
[[implementations]]
@@ -65,4 +65,4 @@ props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, ...)
6565

6666
NOTE: `TimestampExtractor` is an `Evolving` contract which means that compatibility may be broken at a minor release.
6767

68-
`TimestampExtractor` is used to create a <<kafka-streams-internals-InternalTopologyBuilder-SourceNodeFactory.adoc#, SourceNodeFactory>>, <<kafka-streams-RecordQueue.adoc#, RecordQueue>>, <<kafka-streams-internals-InternalTopologyBuilder-SourceNodeFactory.adoc#, SourceNodeFactory>>.
68+
`TimestampExtractor` is used to create a <<kafka-streams-internals-InternalTopologyBuilder-SourceNodeFactory.adoc#, SourceNodeFactory>>, <<kafka-streams-internals-RecordQueue.adoc#, RecordQueue>>, <<kafka-streams-internals-InternalTopologyBuilder-SourceNodeFactory.adoc#, SourceNodeFactory>>.

‎kafka-streams-internals-AbstractProcessorContext.adoc

+4-4
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ NOTE: `AbstractProcessorContext` is a Java abstract class and cannot be <<creati
1414
| <<kafka-streams-internals-GlobalProcessorContextImpl.adoc#, GlobalProcessorContextImpl>>
1515
| [[GlobalProcessorContextImpl]]
1616

17-
| <<kafka-streams-ProcessorContextImpl.adoc#, ProcessorContextImpl>>
17+
| <<kafka-streams-internals-ProcessorContextImpl.adoc#, ProcessorContextImpl>>
1818
| [[ProcessorContextImpl]]
1919

20-
| <<kafka-streams-StandbyContextImpl.adoc#, StandbyContextImpl>>
20+
| <<kafka-streams-internals-StandbyContextImpl.adoc#, StandbyContextImpl>>
2121
| [[StandbyContextImpl]]
2222
|===
2323

@@ -92,9 +92,9 @@ NOTE: `uninitialize` is part of the <<kafka-streams-internals-InternalProcessorC
9292

9393
* [[taskId]] <<kafka-streams-TaskId.adoc#, TaskId>>
9494
* [[config]] <<kafka-streams-StreamsConfig.adoc#, StreamsConfig>>
95-
* [[metrics]] <<kafka-streams-StreamsMetricsImpl.adoc#, StreamsMetricsImpl>>
95+
* [[metrics]] <<kafka-streams-internals-StreamsMetricsImpl.adoc#, StreamsMetricsImpl>>
9696
* [[stateManager]] <<kafka-streams-internals-StateManager.adoc#, StateManager>>
97-
* [[cache]] <<kafka-streams-ThreadCache.adoc#, ThreadCache>>
97+
* [[cache]] <<kafka-streams-internals-ThreadCache.adoc#, ThreadCache>>
9898

9999
`AbstractProcessorContext` initializes the <<internal-registries, internal registries and counters>>.
100100

‎kafka-streams-internals-AbstractTask.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ The `taskInitialized` flag is used by the <<implementations, concrete AbstractTa
4747
| processorContext
4848
a| [[processorContext]] <<kafka-streams-internals-InternalProcessorContext.adoc#, InternalProcessorContext>>, i.e.
4949

50-
* <<kafka-streams-ProcessorContextImpl.adoc#, ProcessorContextImpl>> for <<kafka-streams-internals-StreamTask.adoc#, StreamTask>>
50+
* <<kafka-streams-internals-ProcessorContextImpl.adoc#, ProcessorContextImpl>> for <<kafka-streams-internals-StreamTask.adoc#, StreamTask>>
5151
52-
* <<kafka-streams-StandbyContextImpl.adoc#, StandbyContextImpl>> for <<kafka-streams-internals-StandbyTask.adoc#, StandbyTask>>
52+
* <<kafka-streams-internals-StandbyContextImpl.adoc#, StandbyContextImpl>> for <<kafka-streams-internals-StandbyTask.adoc#, StandbyTask>>
5353
5454
| stateMgr
5555
a| [[stateMgr]] <<kafka-streams-ProcessorStateManager.adoc#, ProcessorStateManager>>

‎kafka-streams-internals-GlobalStateManagerImpl.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
== [[GlobalStateManagerImpl]] GlobalStateManagerImpl
22

3-
`GlobalStateManagerImpl` is a concrete <<kafka-streams-GlobalStateManager.adoc#, GlobalStateManager>> (and a <<kafka-streams-AbstractStateManager.adoc#, AbstractStateManager>>) that...FIXME
3+
`GlobalStateManagerImpl` is a concrete <<kafka-streams-GlobalStateManager.adoc#, GlobalStateManager>> (and a <<kafka-streams-internals-AbstractStateManager.adoc#, AbstractStateManager>>) that...FIXME
44

55
`GlobalStateManagerImpl` is <<creating-instance, created>> exclusively when `GlobalStreamThread` is requested to <<kafka-streams-internals-GlobalStreamThread.adoc#initialize, initialize>> (when `GlobalStreamThread` is <<kafka-streams-internals-GlobalStreamThread.adoc#run, started>> with <<kafka-streams-KafkaStreams.adoc#start, KafkaStreams>>).
66

‎kafka-streams-internals-GlobalStreamThread.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
| Description
1313

1414
| cache
15-
| [[cache]] <<kafka-streams-ThreadCache.adoc#, ThreadCache>>
15+
| [[cache]] <<kafka-streams-internals-ThreadCache.adoc#, ThreadCache>>
1616
|===
1717

1818
=== [[initialize]] `initialize` Internal Method

0 commit comments

Comments
 (0)
Please sign in to comment.