Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection recovery not working #230

Open
claria opened this issue Aug 25, 2023 · 1 comment
Open

Connection recovery not working #230

claria opened this issue Aug 25, 2023 · 1 comment

Comments

@claria
Copy link

claria commented Aug 25, 2023

Dear developers,

we are using the rabbitmq trigger for an azure function (python) where we connect to a third-party rabbitmq instance to receive messages from a queue but do not have permission to publish to the queue (we have no access to the exchange).

We observe the following behaviour:

  • Consumption of messages works fine until one invocation fails for whatever reason.
  • This leads to the RabbitMqListener to try to publish the same message with an increased requeueCount
  • This leads to an error from rabbitmq (access to the exchange not allowed) and the connection is closed.

RabbitMQ.Client.Exceptions.AlreadyClosedException: Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=403, text='ACCESS_REFUSED - access to exchange 'xxxx' in vhost 'xxxxx' refused for user 'xxxxx'', classId=60, methodId=40

So far so good. But now the rabbitmq trigger does not try to reconnect to the broker and retrieval of messages is stopped until we restart the function app.

We then wrapped everything into a try-except block to avoid any failing event-processings, which basically disables retry.

  • Is there any setting to disable the retry behaviour in the settings of the trigger?

  • In the source code of the listener, the following stackoverflow is mentioned in which an updated answer mentions quorum queue behaviour. Maybe that is worth implementing so

  • Why did the Listener not reconnect after the connection was closed? What would happen if there are any network problems? Is there any setting to enable auto reconnections?

Thank you for your help & time!

@netdragonboberb
Copy link

This is a glaring bug in a most critical piece of .Net Isolated now that Durable Queues are no longer used.

See a suggested solution in #227 (comment)

Barring that, one non-optimal workaround until this is fixed (and maybe even useful afterwards).
What you can do is set up the health check to verify RMQ has consumers via the RMQ API, something like the following.
This may not run out of the box, since I made some changes while pasting it in here to generalize.

Then you can configure your AZ function in Azure Portal to the healthcheck path "/api/healthcheck" so it restarts if you run out of consumers.

Image
Pre-requisites:

  1. You need to have the RabbitMQ Management plugin enabled. You can check this on port 15672 of your RabbitMQ server or cluster. You should also see the API info page on http://{rmqhostname}:15672/api/
  2. You'll also need to change RabbitMQConnection with the configuration var you use for setting the RabbitMQTrigger on environments.

Limitations: The health check will only fail if you completely run out of consumers, so if you have multiple instances of your function running and in some unlikely event multiple consumers disconnect but one remains connected, the function will not restart, and you will see reduced throughput.

For more info on healthchecks in general: https://learn.microsoft.com/en-us/aspnet/core/host-and-deploy/health-checks?view=aspnetcore-9.0

Program.cs

 services.AddAzureFunctionHealthChecks().AddRabbitMqHealthCheck(configuration, "RabbitMQConnection", queueName: "xyz");

 services.AddHttpClient("RabbitMqHealthCheck", c =>
  {
     var esbOptions = configuration.GetEsbOptions("RabbitMQConnection");

     c.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String(
         System.Text.Encoding.UTF8.GetBytes($"{esbOptions.Username}:{esbOptions.Password}")));
 });

RabbitMqExtensions.cs

        public static IHealthChecksBuilder AddRabbitMqHealthCheck(
            this IHealthChecksBuilder builder,
            IConfiguration configuration,
            string connectionStringSetting,
            string queueName,
            long managementPort = 15672)
        {
            ArgumentNullException.ThrowIfNull(builder);

            var esbOptions = configuration.GetEsbOptions(connectionStringSetting);

            var apiUrl = $"http://{esbOptions.Host}:{managementPort}/api/"; // Note: vhost left out intentionally (api places vhost AFTER the resource identifier)

            return builder.AddTypeActivatedCheck<RabbitMqHealthCheck>("rabbitMQ" + queueName, apiUrl, esbOptions.VirtualHost, queueName);
        }

       public static EsbOptions GetEsbOptions(this IConfiguration configuration, string connectionStringSetting)
       {
            var connectionStringValue = configuration.GetValue<string>(connectionStringSetting);

            var connectionString = new Uri(connectionStringValue);

            var virtualHost = connectionString.AbsolutePath.Replace("/", string.Empty);

            if (virtualHost is { Length: 0 })
            {
                virtualHost = "/";
            }

            return new EsbOptions
            {
                Host = connectionString.Host,
                VirtualHost = virtualHost,
                Username = connectionString.UserInfo.Split(':')[0],
                Password = connectionString.UserInfo.Split(':')[1]
            };
        }

RabbitMqHealthCheck.cs

public class RabbitMqHealthCheck(
        string apiUrl,
        string virtualHost,
        string queueName,
        IHttpClientFactory httpClientFactory) : IHealthCheck
{

    // Restart this isntance if at least one instance is not listening to the queue
    public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(context);

        var client = httpClientFactory.CreateClient("RabbitMqHealthCheck");

        if (!apiUrl.EndsWith("/"))
        {
            apiUrl += "/";
        }

        var vhost = virtualHost;
        if (vhost == "/" || string.IsNullOrWhiteSpace(vhost))
        {
            vhost = "%2F";
        }


        try
        {
            var url = apiUrl + "queues/" + vhost + "/" + queueName;

            var result = await client.GetAsync(url, cancellationToken);
            if (result.IsSuccessStatusCode)
            {
                var response = await result.Content.ReadFromJsonAsync<RabbitMqManagementQueueResponse>();
                if (response.Consumers != null && response.Consumers > 0)
                {
                    return HealthCheckResult.Healthy();
                }
                else
                {
                   return HealthCheckResult.Unhealthy($"No consumers for queue:{queueName}");
               }
             }
             return HealthCheckResult.Unhealthy("Failed status check: HTTP response unsuccessful");
        }
        catch (Exception exception)
        {
            return HealthCheckResult.Unhealthy(exception.Message, exception);
        }
    }
}

public class RabbitMqManagementQueueResponse
{
   // Left out irrelevant response values from management api
    public long? Consumers { get; set; }
}

EsbOptions:

    public class EsbOptions
    {
        public string Host { get; set; }
        public string VirtualHost { get; set; }
        public string Username { get; set; }
        public string Password { get; set; }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants
@claria @netdragonboberb and others