Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add approx. transactional api #98

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

blindspotbounty
Copy link
Collaborator

@blindspotbounty blindspotbounty commented Jul 24, 2023

There is an approximate solution for Transactional API for Kafka gsoc.

Some ideas that were expressed in #78 (comment)

There are the following ideas:

  1. We might initTransaction and switch state in machine
  2. Run blocking transactional calls in GlobalQueue
  3. Retry retryable errors
  4. Automatically abort transactions when abort error received

UPD: after review API changed to the following:

To use kafka transactions, it is required to create KafkaTransactionalProducerConfiguration:

let config = KafkaTransactionalProducerConfiguration(transactionalID: "1234")

Similar to KafkaProducer, it is possible to create KafkaTransactionalProducer:

let transactionalProducer = try await KafkaTransactionalProducer.makeTransactionalProducerWithEvents(config: config)

To commit transactions, it is simply required to call withTransaction(...):

try await transactionalProducer.withTransaction { transaction in
    // Produce new messages:
    let newMessage = KafkaProducerMessage(
        topic: "<some topic>",
        value: "<some value>"
    )
    try transaction.send(newMessage)
...
    // commit offsets:
    let partitionlist = RDKafkaTopicPartitionList()
    partitionlist.setOffset(topic: self.uniqueTestTopic, partition: message.partition, offset: Int64(message.offset))
    try await transaction.send(offsets: partitionlist, forConsumer: consumer)
...
}

@blindspotbounty blindspotbounty marked this pull request as ready for review July 25, 2023 14:42
@@ -162,6 +192,10 @@ extension KafkaError {
case messageConsumption
case topicCreation
case topicDeletion
case transactionAborted
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking if we should add subcodes or provide code from Kafka as is..

Package.swift Outdated
@@ -44,6 +44,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", from: "2.55.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0-alpha.1"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/ordo-one/package-concurrency-helpers", .upToNextMajor(from: "1.0.0")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't add this dependency.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can copy helpers from there.

@@ -36,6 +36,8 @@ public struct KafkaProducerConfiguration {
/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
/// Default: `false`
public var enableIdempotence: Bool = false

public var transactionalId: String?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some comments on this and what it does. If it only makes sense for the transaction producer we should not expose it all in this configuration and maybe have to introduce a new KafkaTransactionalProducerConfiguration

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think of some wrapper around producer configuration like the following?

struct KafkaTransactionalProducerConfiguration {
    var producerConfiguration: KafkaTransactionalProducerConfiguration
    public var transactionalId: String
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that works. Presumably there are some configurations that don't apply to a transactional producer. We probably should introduce a new configuration and just duplicate stuff like we did for the consumer and producer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you suggest which properties do you have in mind, please?

@@ -261,6 +261,42 @@ public final class KafkaProducer: Service, Sendable {
return KafkaProducerMessageID(rawValue: newMessageID)
}
}

func initTransactions(timeout: Duration = .seconds(5)) async throws {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we definitely want to create a new KafkaTransactionalProducer type so that the APIs stay clear and separated between the normal producer and the transaction one.

Additionally, I think the APIs should look a bit different. I can imagine something like this

public func withTransaction<Result>(_ body: @Sendable (KafkaTransaction) async throws -> Void) async throws -> Result {
    // init transaction
    // begin transaction
    // call out to user code and pass a new `Transaction` struct to them
    // at the end call abort or finish transaction for them
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes perfect sense to have a separate producer for transactions.
We have it at a higher level but that might be done here

@blindspotbounty blindspotbounty marked this pull request as draft July 31, 2023 14:08
@@ -158,6 +117,8 @@ extension KafkaProducerConfiguration: Hashable {}

extension KafkaProducerConfiguration: Sendable {}

extension KafkaProducerConfiguration: KafkaProducerSharedProperties {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to use at least internal shared protocol to unite mostly all properties in producer/transactional producer instead of copying them


// performs blocking calls outside of cooperative thread pool
internal func performBlockingCall<T>(queue: DispatchQueue, body: @escaping () -> T) async -> T {
await withCheckedContinuation { continuation in
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that there is a good way to use swift concurrency with blocking calls.
The only thing is to use some separate thread to avoid contract violation

@@ -489,4 +494,115 @@ final class SwiftKafkaTests: XCTestCase {
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }))
}
}

func testProduceAndConsumeWithTransaction() async throws {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an example test so far to demonstrate how current API would look like from user perpective


/// When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order. The following configuration properties are adjusted automatically (if not modified by the user) when idempotence is enabled: max.in.flight.requests.per.connection=5 (must be less than or equal to 5), retries=INT32_MAX (must be greater than 0), acks=all, queuing.strategy=fifo. Producer instantation will fail if user-supplied configuration is incompatible.
/// Default: `false`
internal let enableIdempotence: Bool = true
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between KafkaProducerConfiguration and KafkaTransactionalProducerConfiguration is in couple of properties:

  1. enableIdempotence always true for transactions
  2. maxInFlightRequestsPerConnection not greater than 5
  3. transactionsTimeout is set to socket timeout or greater than socket timeout

/// The configuration object of the producer client.
private let config: KafkaProducerConfiguration
/// Configured poll interval
private let pollInterval: Duration
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to use private let config: any KafkaProducerSharedProperties but there is only 2 properties that required that might be more lightweight to store

@blindspotbounty blindspotbounty force-pushed the feature/sc-2764/gsoc-support-for-transactions branch from 63ffb51 to ce702f3 Compare August 10, 2023 13:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants