Skip to content

Commit bda7982

Browse files
StreamTask
1 parent d5e7004 commit bda7982

4 files changed

+113
-8
lines changed

SUMMARY.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@
310310
.. link:kafka-streams-internals-StandbyTask.adoc[StandbyTask]
311311
.. link:kafka-streams-internals-StreamTask.adoc[StreamTask]
312312

313+
. link:kafka-streams-internals-ProducerSupplier.adoc[ProducerSupplier]
314+
313315
. link:kafka-streams-internals-AssignedTasks.adoc[AssignedTasks]
314316
.. link:kafka-streams-AssignedStandbyTasks.adoc[AssignedStandbyTasks -- AssignedTasks For StandbyTasks]
315317
.. link:kafka-streams-AssignedStreamsTasks.adoc[AssignedStreamsTasks -- AssignedTasks For StreamTasks]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[ProducerSupplier]] ProducerSupplier
2+
3+
`ProducerSupplier` is...FIXME

kafka-streams-internals-StreamTask.adoc

+106-8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,24 @@ log4j.logger.org.apache.kafka.streams.processor.internals.StreamTask=ALL
3131
Refer to <<kafka-logging.adoc#log4j.properties, Application Logging Using log4j>>.
3232
====
3333

34+
=== StreamTask and RecordCollector
35+
36+
`StreamTask` creates a new <<recordCollector, RecordCollector>> or is given one (when <<creating-instance, created>>).
37+
38+
The <<recordCollector, RecordCollector>> is requested to <<kafka-streams-internals-RecordCollector.adoc#init, initialize>> (with a <<producer, Kafka Producer>>) when `StreamTask` is <<creating-instance, created>> and <<resume, resumed>>.
39+
40+
`StreamTask` uses the <<recordCollector, RecordCollector>> for the following:
41+
42+
* Creating an <<kafka-streams-internals-AbstractTask.adoc#processorContext, InternalProcessorContext>> (when <<creating-instance, created>>)
43+
44+
* <<kafka-streams-internals-RecordCollector.adoc#offsets, Getting offsets>> when <<activeTaskCheckpointableOffsets, activeTaskCheckpointableOffsets>>
45+
46+
* <<kafka-streams-internals-RecordCollector.adoc#flush, Flushing>> when <<flushState, flushState>>
47+
48+
* <<suspend, suspend>> and <<maybeAbortTransactionAndCloseRecordCollector, maybeAbortTransactionAndCloseRecordCollector>>
49+
50+
The <<recordCollector, RecordCollector>> is requested to <<kafka-streams-internals-RecordCollector.adoc#close, close>> when `StreamTask` is requested to <<suspend, suspend>> and <<maybeAbortTransactionAndCloseRecordCollector, maybeAbortTransactionAndCloseRecordCollector>>.
51+
3452
=== [[closeTopology]] `closeTopology` Internal Method
3553

3654
[source, java]
@@ -74,19 +92,21 @@ NOTE: `close` is part of link:kafka-streams-internals-Task.adoc#close[Task Contr
7492

7593
=== [[creating-instance]] Creating StreamTask Instance
7694

77-
`StreamTask` takes the following when created:
95+
`StreamTask` takes the following to be created:
7896

79-
* [[id]] `TaskId`
80-
* [[partitions]] Topic partitions (as Kafka's `TopicPartition`)
97+
* [[id]] <<kafka-streams-TaskId.adoc#, TaskId>>
98+
* [[partitions]] Topic partitions (Kafka `TopicPartition`)
8199
* [[topology]] <<kafka-streams-internals-ProcessorTopology.adoc#, ProcessorTopology>>
82100
* [[consumer]] Kafka https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html[Consumer] (`Consumer<byte[], byte[]>`)
83101
* [[changelogReader]] <<kafka-streams-ChangelogReader.adoc#, ChangelogReader>>
84-
* [[config]] link:kafka-streams-StreamsConfig.adoc[StreamsConfig]
85-
* [[metrics]] link:kafka-streams-StreamsMetrics.adoc[StreamsMetrics]
86-
* [[stateDirectory]] link:kafka-streams-internals-StateDirectory.adoc[StateDirectory]
102+
* [[config]] <<kafka-streams-StreamsConfig.adoc#, StreamsConfig>>
103+
* [[metrics]] <<kafka-streams-internals-StreamsMetricsImpl.adoc#, StreamsMetricsImpl>>
104+
* [[stateDirectory]] <<kafka-streams-internals-StateDirectory.adoc#, StateDirectory>>
87105
* [[cache]] <<kafka-streams-internals-ThreadCache.adoc#, ThreadCache>>
88106
* [[time]] `Time`
89-
* [[producer]] Kafka `Producer` (`Producer<byte[], byte[]>`)
107+
* [[producerSupplier]] <<kafka-streams-internals-ProducerSupplier.adoc#, ProducerSupplier>>
108+
* [[recordCollector]] <<kafka-streams-internals-RecordCollector.adoc#, RecordCollector>>
109+
* [[closeSensor]] `closeSensor` Kafka `Sensor`
90110

91111
`StreamTask` initializes the <<internal-properties, internal properties>>.
92112

@@ -385,6 +405,61 @@ Flushing state and producer
385405

386406
`flushState` requests the <<recordCollector, RecordCollector>> to <<kafka-streams-internals-RecordCollector.adoc#flush, flush>>.
387407

408+
=== [[isProcessable]] `isProcessable` Method
409+
410+
[source, java]
411+
----
412+
boolean isProcessable(final long now)
413+
----
414+
415+
`isProcessable`...FIXME
416+
417+
NOTE: `isProcessable` is used when...FIXME
418+
419+
=== [[resume]] `resume` Method
420+
421+
[source, java]
422+
----
423+
void resume()
424+
----
425+
426+
NOTE: `resume` is part of the <<kafka-streams-internals-Task.adoc#resume, Task Contract>> to resume the task.
427+
428+
`resume`...FIXME
429+
430+
== [[maybeAbortTransactionAndCloseRecordCollector]] `maybeAbortTransactionAndCloseRecordCollector` Internal Method
431+
432+
[source, java]
433+
----
434+
void maybeAbortTransactionAndCloseRecordCollector(final boolean isZombie)
435+
----
436+
437+
`maybeAbortTransactionAndCloseRecordCollector`...FIXME
438+
439+
NOTE: `maybeAbortTransactionAndCloseRecordCollector` is used when...FIXME
440+
441+
== [[initializeTransactions]] `initializeTransactions` Internal Method
442+
443+
[source, java]
444+
----
445+
void initializeTransactions()
446+
----
447+
448+
`initializeTransactions`...FIXME
449+
450+
NOTE: `initializeTransactions` is used when...FIXME
451+
452+
== [[producerMetrics]] `producerMetrics` Method
453+
454+
[source, java]
455+
----
456+
Map<MetricName, Metric> producerMetrics()
457+
----
458+
459+
`producerMetrics`...FIXME
460+
461+
NOTE: `producerMetrics` is used when...FIXME
462+
388463
=== [[internal-properties]] Internal Properties
389464

390465
.StreamTask's Internal Properties (e.g. Registries, Counters and Flags)
@@ -416,8 +491,31 @@ Used when `StreamTask` is requested for the following:
416491
| idleStartTime
417492
a| [[idleStartTime]]
418493

494+
| producer
495+
a| [[producer]][[getProducer]] Kafka xref:https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#[Producer] (`Producer<byte[], byte[]>`)
496+
497+
Created when `StreamTask` is <<creating-instance, created>> and <<resume, resumed>> by requesting the <<producerSupplier, ProducerSupplier>> to <<kafka-streams-internals-ProducerSupplier.adoc#get, supply a Producer>>
498+
499+
Cleared (_nullified_) when `StreamTask` is requested to <<suspend, suspend>> and <<maybeAbortTransactionAndCloseRecordCollector, maybeAbortTransactionAndCloseRecordCollector>>
500+
501+
Used for the following:
502+
503+
* Requesting the <<recordCollector, RecordCollector>> to <<kafka-streams-internals-RecordCollector.adoc#init, initialize>> (when `StreamTask` is <<creating-instance, created>> and <<resume, resumed>>)
504+
505+
* <<initializeTopology, initializeTopology>>, <<initializeTransactions, initializeTransactions>>, <<maybeAbortTransactionAndCloseRecordCollector, maybeAbortTransactionAndCloseRecordCollector>>, and <<commit, commit>> for <<kafka-streams-exactly-once-support-eos.adoc#, exactly-once support>>
506+
507+
* <<producerMetrics, producerMetrics>>
508+
419509
| taskMetrics
420-
| [[taskMetrics]] <<kafka-streams-StreamTask-TaskMetrics.adoc#, TaskMetrics>>
510+
a| [[taskMetrics]] <<kafka-streams-StreamTask-TaskMetrics.adoc#, TaskMetrics>> for the <<id, TaskId>> and the <<metrics, StreamsMetricsImpl>>
511+
512+
Used when `StreamTask` is requested for the following:
513+
514+
* <<isProcessable, isProcessable>> (to record an occurence of <<kafka-streams-StreamTask-TaskMetrics.adoc#taskEnforcedProcessSensor, taskEnforcedProcessSensor>> sensor)
515+
516+
* <<commit, commit>> (to record a value of <<kafka-streams-StreamTask-TaskMetrics.adoc#taskCommitTimeSensor, taskCommitTimeSensor>> sensor)
517+
518+
* <<closeSuspended, closeSuspended>> (to <<kafka-streams-StreamTask-TaskMetrics.adoc#removeAllSensors, remove all task sensors>>)
421519

422520
| transactionInFlight
423521
a| [[transactionInFlight]] Controls whether...FIXME

kafka-streams-internals-Task.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ a| [[resume]]
213213
void resume()
214214
----
215215

216+
Resumes the task
217+
216218
| suspend
217219
a| [[suspend]]
218220

0 commit comments

Comments
 (0)