Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added configurable options and fixed ack logic #114

Merged
merged 7 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 48 additions & 3 deletions src/Config/RabbitMQOptions.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,68 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.WebJobs.Description;
using RabbitMQ.Client;
using Microsoft.Azure.WebJobs.Hosting;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ
{
public class RabbitMQOptions
/// <summary>
/// Configuration options for the RabbitMQ extension.
/// </summary>
public class RabbitMQOptions : IOptionsFormatter
{
public RabbitMQOptions()
{
PrefetchCount = 30;
}

/// <summary>
/// Gets or sets the HostName used to authenticate with RabbitMQ.
/// </summary>
public string HostName { get; set; }

/// <summary>
/// Gets or sets the QueueName to receive messages from or enqueue messages to.
/// </summary>
public string QueueName { get; set; }

/// <summary>
/// Gets or sets the UserName used to authenticate with RabbitMQ.
/// </summary>
public string UserName { get; set; }

/// <summary>
/// Gets or sets the Password used to authenticate with RabbitMQ.
/// </summary>
public string Password { get; set; }

/// <summary>
/// Gets or sets the ConnectionString used to authenticate with RabbitMQ.
/// </summary>
public string ConnectionString { get; set; }

/// <summary>
/// Gets or sets the Port used. Defaults to 0.
/// </summary>
public int Port { get; set; }

/// <summary>
/// Gets or sets the prefetch count while creating the RabbitMQ QoS. This seting controls how many values are cached.
/// </summary>
public ushort PrefetchCount { get; set; }

public string Format()
{
JObject options = new JObject
{
{ nameof(HostName), HostName },
{ nameof(QueueName), QueueName },
{ nameof(Port), Port },
{ nameof(PrefetchCount), PrefetchCount },
};

return options.ToString(Formatting.Indented);
}
}
}
26 changes: 14 additions & 12 deletions src/Trigger/RabbitMQListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
Expand All @@ -22,6 +21,7 @@ internal sealed class RabbitMQListener : IListener, IScaleMonitor<RabbitMQTrigge
{
private readonly ITriggeredFunctionExecutor _executor;
private readonly string _queueName;
private readonly ushort _prefetchCount;
private readonly IRabbitMQService _service;
private readonly ILogger _logger;
private readonly FunctionDescriptor _functionDescriptor;
Expand All @@ -40,7 +40,8 @@ public RabbitMQListener(
IRabbitMQService service,
string queueName,
ILogger logger,
FunctionDescriptor functionDescriptor)
FunctionDescriptor functionDescriptor,
ushort prefetchCount)
{
_executor = executor;
_service = service;
Expand All @@ -50,6 +51,7 @@ public RabbitMQListener(
_functionDescriptor = functionDescriptor ?? throw new ArgumentNullException(nameof(functionDescriptor));
_functionId = functionDescriptor.Id;
_scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-RabbitMQTrigger-{_queueName}".ToLower());
_prefetchCount = prefetchCount;
}

public ScaleMonitorDescriptor Descriptor
Expand Down Expand Up @@ -95,8 +97,7 @@ public Task StartAsync(CancellationToken cancellationToken)
throw new InvalidOperationException("The listener has already been started.");
}

// TODO: Add prefetch params as part of RabbitMQOptions
_rabbitMQModel.BasicQos(0, 30, false);
_rabbitMQModel.BasicQos(0, _prefetchCount, false); // Non zero prefetchSize doesn't work (tested upto 5.2.0) and will throw NOT_IMPLEMENTED exception
_consumer = new EventingBasicConsumer(_rabbitMQModel.Model);

_consumer.Received += async (model, ea) =>
Expand Down Expand Up @@ -145,8 +146,6 @@ public Task StopAsync(CancellationToken cancellationToken)

internal void CreateHeadersAndRepublish(BasicDeliverEventArgs ea)
{
_rabbitMQModel.BasicAck(ea.DeliveryTag, false);

if (ea.BasicProperties.Headers == null)
{
ea.BasicProperties.Headers = new Dictionary<string, object>();
Expand All @@ -155,6 +154,7 @@ internal void CreateHeadersAndRepublish(BasicDeliverEventArgs ea)
ea.BasicProperties.Headers[Constants.RequeueCount] = 0;
_logger.LogDebug("Republishing message");
_rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: ea.RoutingKey, basicProperties: ea.BasicProperties, body: ea.Body);
_rabbitMQModel.BasicAck(ea.DeliveryTag, false);
}

internal void RepublishMessages(BasicDeliverEventArgs ea)
Expand All @@ -167,9 +167,9 @@ internal void RepublishMessages(BasicDeliverEventArgs ea)

if (Convert.ToInt32(ea.BasicProperties.Headers[Constants.RequeueCount]) < 5)
{
_rabbitMQModel.BasicAck(ea.DeliveryTag, false); // Manually ACK'ing, but resend
_logger.LogDebug("Republishing message");
_rabbitMQModel.BasicPublish(exchange: string.Empty, routingKey: ea.RoutingKey, basicProperties: ea.BasicProperties, body: ea.Body);
_rabbitMQModel.BasicAck(ea.DeliveryTag, false); // Manually ACK'ing, but ack after resend
}
else
{
Expand Down Expand Up @@ -221,16 +221,18 @@ private ScaleStatus GetScaleStatusCore(int workerCount, RabbitMQTriggerMetrics[]
Vote = ScaleVote.None,
};

const int NumberOfSamplesToConsider = 5;
// TODO: Make the below two ints configurable.
int numberOfSamplesToConsider = 5;
int targetQueueLength = 1000;

if (metrics == null || metrics.Length < NumberOfSamplesToConsider)
if (metrics == null || metrics.Length < numberOfSamplesToConsider)
{
return status;
}

long latestQueueLength = metrics.Last().QueueLength;

if (latestQueueLength > workerCount * 1000)
if (latestQueueLength > workerCount * targetQueueLength)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation($"QueueLength ({latestQueueLength}) > workerCount ({workerCount}) * 1000");
Expand All @@ -250,7 +252,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, RabbitMQTriggerMetrics[]
bool queueLengthIncreasing =
IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
numberOfSamplesToConsider,
(prev, next) => prev.QueueLength < next.QueueLength) && metrics[0].QueueLength > 0;

if (queueLengthIncreasing)
Expand All @@ -263,7 +265,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, RabbitMQTriggerMetrics[]
bool queueLengthDecreasing =
IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
numberOfSamplesToConsider,
(prev, next) => prev.QueueLength > next.QueueLength);

if (queueLengthDecreasing)
Expand Down
2 changes: 1 addition & 1 deletion src/Trigger/RabbitMQTriggerAttributeBindingProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext contex

IRabbitMQService service = _provider.GetService(connectionString, hostName, queueName, userName, password, port);

return Task.FromResult<ITriggerBinding>(new RabbitMQTriggerBinding(service, hostName, queueName, _logger, parameter.ParameterType));
return Task.FromResult<ITriggerBinding>(new RabbitMQTriggerBinding(service, hostName, queueName, _logger, parameter.ParameterType, _options.Value.PrefetchCount));
}

private string Resolve(string name)
Expand Down
6 changes: 4 additions & 2 deletions src/Trigger/RabbitMQTriggerBinding.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ internal class RabbitMQTriggerBinding : ITriggerBinding
private readonly Type _parameterType;
private readonly string _queueName;
private readonly string _hostName;
private readonly ushort _prefetchCount;

public RabbitMQTriggerBinding(IRabbitMQService service, string hostname, string queueName, ILogger logger, Type parameterType)
public RabbitMQTriggerBinding(IRabbitMQService service, string hostname, string queueName, ILogger logger, Type parameterType, ushort prefetchCount)
{
_service = service;
_queueName = queueName;
_hostName = hostname;
_logger = logger;
_parameterType = parameterType;
_prefetchCount = prefetchCount;
BindingDataContract = CreateBindingDataContract();
}

Expand Down Expand Up @@ -57,7 +59,7 @@ public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
throw new ArgumentNullException("context");
}

return Task.FromResult<IListener>(new RabbitMQListener(context.Executor, _service, _queueName, _logger, context.Descriptor));
return Task.FromResult<IListener>(new RabbitMQListener(context.Executor, _service, _queueName, _logger, context.Descriptor, _prefetchCount));
}

public ParameterDescriptor ToParameterDescriptor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Moq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
Expand Down Expand Up @@ -36,19 +37,18 @@ public RabbitMQListenerTests()
_mockDescriptor = new Mock<FunctionDescriptor>();

_mockService.Setup(m => m.RabbitMQModel).Returns(_mockModel.Object);

QueueDeclareOk queueInfo = new QueueDeclareOk("blah", 5, 1);
_mockModel.Setup(m => m.QueueDeclarePassive(It.IsAny<string>())).Returns(queueInfo);

_testListener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, new FunctionDescriptor { Id = "TestFunction" });
_testListener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, new FunctionDescriptor { Id = "TestFunction" }, 30);
}

[Fact]
public void CreatesHeadersAndRepublishes()
{
_mockService.Setup(m => m.RabbitMQModel).Returns(_mockModel.Object);

RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object, 30);

var properties = new BasicProperties();
BasicDeliverEventArgs args = new BasicDeliverEventArgs("tag", 1, false, "", "queue", properties, Encoding.UTF8.GetBytes("hello world"));
Expand All @@ -62,7 +62,7 @@ public void CreatesHeadersAndRepublishes()
public void RepublishesMessages()
{
_mockService.Setup(m => m.RabbitMQModel).Returns(_mockModel.Object);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object, 30);

var properties = new BasicProperties()
{
Expand All @@ -79,7 +79,7 @@ public void RepublishesMessages()
public void RejectsStaleMessages()
{
_mockService.Setup(m => m.RabbitMQModel).Returns(_mockModel.Object);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object);
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "blah", _mockLogger.Object, _mockDescriptor.Object, 30);

var properties = new BasicProperties()
{
Expand All @@ -100,7 +100,7 @@ public void ScaleMonitor_Id_ReturnsExpectedValue()
[Fact]
public async Task GetMetrics_ReturnsExpectedResult()
{
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "listener_test_queue", _mockLogger.Object, new FunctionDescriptor { Id = "TestFunction" });
RabbitMQListener listener = new RabbitMQListener(_mockExecutor.Object, _mockService.Object, "listener_test_queue", _mockLogger.Object, new FunctionDescriptor { Id = "TestFunction" }, 30);
var metrics = await listener.GetMetricsAsync();

Assert.Equal((uint)5, metrics.QueueLength);
Expand Down
101 changes: 101 additions & 0 deletions test/WebJobs.Extensions.RabbitMQ.Tests/RabbitMQOptionsTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Xunit;

namespace WebJobs.Extensions.RabbitMQ.Tests
{
public class RabbitMQOptionsTests
{
private string GetFormattedOption(RabbitMQOptions option)
{
JObject options = new JObject
{
{ nameof(option.HostName), option.HostName },
{ nameof(option.QueueName), option.QueueName },
{ nameof(option.Port), option.Port },
{ nameof(option.PrefetchCount), option.PrefetchCount },
};

return options.ToString(Formatting.Indented);
}

[Fact]
public void TestDefaultOptions()
{
RabbitMQOptions options = new RabbitMQOptions();

Assert.Equal<ushort>(30, options.PrefetchCount);
Assert.Equal(0, options.Port);
Assert.Null(options.HostName);
Assert.Null(options.QueueName);
Assert.Null(options.UserName);
Assert.Null(options.Password);
Assert.Null(options.ConnectionString);

// Test formatted
Assert.Equal(GetFormattedOption(options), options.Format());
}

[Fact]
public void TestConfiguredRabbitMQOptions()
{
ushort expectedPrefetchCount = 100;
int expectedPort = 8080;
string expectedHostName = "someHostName";
string expectedQueueName = "someQueueName";
string expectedUserName = "someUserName";
string expectedPassword = "somePassword";
string expectedConnectionString = "someConnectionString";
RabbitMQOptions options = new RabbitMQOptions()
{
Port = expectedPort,
HostName = expectedHostName,
QueueName = expectedQueueName,
UserName = expectedUserName,
Password = expectedPassword,
ConnectionString = expectedConnectionString,
PrefetchCount = expectedPrefetchCount,
};

Assert.Equal(expectedPrefetchCount, options.PrefetchCount);
Assert.Equal(expectedPort, options.Port);
Assert.Equal(expectedHostName, options.HostName);
Assert.Equal(expectedQueueName, options.QueueName);
Assert.Equal(expectedUserName, options.UserName);
Assert.Equal(expectedPassword, options.Password);
Assert.Equal(expectedConnectionString, options.ConnectionString);

// Test formatted
Assert.Equal(GetFormattedOption(options), options.Format());
}

[Fact]
public void TestJobHostHasTheRightConfiguration()
{
ushort expectedPrefetchCount = 10;

var builder = new HostBuilder()
.UseEnvironment("Development")
.ConfigureWebJobs(webJobsBuilder =>
{
webJobsBuilder
.AddRabbitMQ(a => a.PrefetchCount = expectedPrefetchCount); // set to non-default prefetch count
})
.UseConsoleLifetime();

var host = builder.Build();
using (host)
{
var config = host.Services.GetService<IOptions<RabbitMQOptions>>();
Assert.Equal(config.Value.PrefetchCount, expectedPrefetchCount);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="3.0.5" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.10" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" />
<PackageReference Include="Moq" Version="4.12.0" />
Expand Down