Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explicit polling consumer #78

Open
macasado86 opened this issue Feb 3, 2025 · 4 comments
Open

Explicit polling consumer #78

macasado86 opened this issue Feb 3, 2025 · 4 comments

Comments

@macasado86
Copy link
Contributor

Hi @silviucpp ,

We are creating a connector for Broadway based on Erlkaf. This connector needs to accumulate messages from Kafka in an internal buffer until other processes are available to process them. When this buffer starts running out of messages, it requests more from Kafka.

The current version of Erlkaf does not allow us to manage this because implementing the handle_message callback simply involves adding messages to the internal buffer of our connector. This causes too many messages to accumulate in the buffer. We have tried using a sleep function in these cases to stop accumulating messages, but this means that our process does not handle other types of requests.

What we propose is to create a new type of consumer where users poll Kafka when they need to.

Instead of having a handle_message callback for users to implement, we would have these 3 functions (the names are tentative):

erlkaf_consumer_group.erl:

  • handle_call(get_consumer_pids, _From, State) - To retrieve the ActiveTopicsMap

erlkaf_poll_consumer.erl:

  • handle_call(poll_events, _From, State)
  • handle_call({store_offset, LastOffset}, _From, State)

I can create the PR without any problems, but I wanted to know first if you are interested in including this functionality in Erlkaf.

@silviucpp
Copy link
Owner

Hello,

I'm not sure this is the best approach, At least playing with ActiveTopicsMap outside erlkaf_consumer_group.
Because this one can be altered by a rebalance event at any time. So you will play with an wrong map outside that process.

I think we have 2 options:

Option1:

Inside erlkaf_consumer_group insted creating a erlkaf_consumer instance for every {topic, partition} you
specify this module by a config (default of course erlkaf_consumer for backward compatibility)

And you can add another another module erlkaf_poll_consumer where to implement your logic.

We can even create a behaviour of erlkaf_consumer_behaviour to specify that each consumer should implement:

- start_link(ClientRef, TopicName, Partition, Offset, QueueRef, TopicSettings)
- stop(Pid)

Then the erlkaf_poll_consumer (or whatevr you want to call it) can be even outside erlkaf because in this moment anyone
can create their own custom consumer logic. Of course we can add it in erlkaf as well.

Option2:

Keep everything as it is and inside erlkaf_consumer we modify the logic to let user poll manually events and to commit offsets.
But logic inside erlkaf_consumer will become complicated and I dont want to kill by head with this as time now it's
very stable. I'm processing tens of millions of events every day with it.

So I will go for option 1 even if erlkaf_consumer and the new one will share some code ..

Silviu

@ramonpin
Copy link
Contributor

Hi @silviucpp. I'm @macasado86's colleage working on this logic. I also believe that option 1 is far better than option 2, from a future user perspetive.

I only want to add to Miguel's comment that we battle tested in production this solution for about one and a half year, consuming several hundred million of messages per day with no issues. That's why we think is hour to share this back with you and the community.

@macasado86
Copy link
Contributor Author

Hi,

We will adapt the development we have with the suggestions you have made. We will create a new module erlkaf_poll_consumer with the new logic and from erlkaf_consumer_group we will indicate the module to use (already existing erlkaf_consumer by default).

I think it can be interesting to include it in erlkaf, since it can be useful for other users. Before doing the PR we will keep running the new code for several days in a row to check that there is no problem.

@silviucpp
Copy link
Owner

Sure, send me a PR once you have it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants