Skip to content

Commit

Permalink
Allow more control over Prometheus metrics collection
Browse files Browse the repository at this point in the history
  • Loading branch information
mateka committed Jan 14, 2025
1 parent b2b3b58 commit 05a4bc2
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 24 deletions.
13 changes: 13 additions & 0 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,18 @@ marker_table
created if it doesn't already exist. Defaults to "table_updates".


[prometheus]
------------

use_task_family_in_labels
Should task family be used as a prometheus bucket label.
Default value is true.

task_parameters_to_use_in_labels
List of task arguments' names used as additional prometheus bucket labels.
Passed in a form of a json list.


[redshift]
----------

Expand Down Expand Up @@ -1045,6 +1057,7 @@ metric_namespace
Optional prefix to add to the beginning of every metric sent to Datadog.
Default value is "luigi".


Per Task Retry-Policy
---------------------

Expand Down
49 changes: 35 additions & 14 deletions luigi/contrib/prometheus_metric.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,84 @@
from prometheus_client import CollectorRegistry, Counter, Gauge, generate_latest, CONTENT_TYPE_LATEST
from luigi import parameter
from luigi.metrics import MetricsCollector
from luigi.task import Config


class prometheus(Config):
use_task_family_in_labels = parameter.BoolParameter(
default=True, parsing=parameter.BoolParameter.EXPLICIT_PARSING
)
task_parameters_to_use_in_labels = parameter.ListParameter(default=[])


class PrometheusMetricsCollector(MetricsCollector):

def __init__(self):
def _generate_task_labels(self, task):
return {
label: task.family if label == "family" else task.params.get(label)
for label in self.labels
}

def __init__(self, *args, **kwargs):
super(PrometheusMetricsCollector, self).__init__()
self.registry = CollectorRegistry()
config = prometheus(**kwargs)
self.labels = list(config.task_parameters_to_use_in_labels)
if config.use_task_family_in_labels:
self.labels += ["family"]
if not self.labels:
raise ValueError("Prometheus labels cannot be empty (see prometheus configuration)")
self.task_started_counter = Counter(
'luigi_task_started_total',
'number of started luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_failed_counter = Counter(
'luigi_task_failed_total',
'number of failed luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_disabled_counter = Counter(
'luigi_task_disabled_total',
'number of disabled luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_done_counter = Counter(
'luigi_task_done_total',
'number of done luigi tasks',
['family'],
self.labels,
registry=self.registry
)
self.task_execution_time = Gauge(
'luigi_task_execution_time_seconds',
'luigi task execution time in seconds',
['family'],
self.labels,
registry=self.registry
)

def generate_latest(self):
return generate_latest(self.registry)

def handle_task_started(self, task):
self.task_started_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family)
self.task_started_counter.labels(**self._generate_task_labels(task)).inc()
self.task_execution_time.labels(**self._generate_task_labels(task))

def handle_task_failed(self, task):
self.task_failed_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
self.task_failed_counter.labels(**self._generate_task_labels(task)).inc()
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)

def handle_task_disabled(self, task, config):
self.task_disabled_counter.labels(family=task.family).inc()
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
self.task_disabled_counter.labels(**self._generate_task_labels(task)).inc()
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)

def handle_task_done(self, task):
self.task_done_counter.labels(family=task.family).inc()
self.task_done_counter.labels(**self._generate_task_labels(task)).inc()
# time_running can be `None` if task was already complete
if task.time_running is not None:
self.task_execution_time.labels(family=task.family).set(task.updated - task.time_running)
self.task_execution_time.labels(**self._generate_task_labels(task)).set(task.updated - task.time_running)

def configure_http_handler(self, http_handler):
http_handler.set_header('Content-Type', CONTENT_TYPE_LATEST)
50 changes: 40 additions & 10 deletions test/contrib/prometheus_metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,28 @@
WORKER = 'myworker'
TASK_ID = 'TaskID'
TASK_FAMILY = 'TaskFamily'
A_PARAM_VALUE = "1"
B_PARAM_VALUE = "2"
C_PARAM_VALUE = "3"


@pytest.mark.contrib
class PrometheusMetricTest(unittest.TestCase):
class PrometheusMetricBaseTest(unittest.TestCase):
COLLECTOR_KWARGS = {}
EXPECTED_LABELS = {"family": TASK_FAMILY}

def setUp(self):
self.collector = PrometheusMetricsCollector()
self.collector = PrometheusMetricsCollector(**self.COLLECTOR_KWARGS)
self.s = Scheduler(metrics_collector=MetricsCollectors.prometheus)
self.gauge_name = 'luigi_task_execution_time_seconds'
self.labels = {'family': TASK_FAMILY}
self.gauge_name = "luigi_task_execution_time_seconds"

def startTask(self):
self.s.add_task(worker=WORKER, task_id=TASK_ID, family=TASK_FAMILY)
self.s.add_task(
worker=WORKER,
task_id=TASK_ID,
family=TASK_FAMILY,
params={"a": A_PARAM_VALUE, "b": B_PARAM_VALUE, "c": C_PARAM_VALUE},
)
task = self.s._state.get_task(TASK_ID)
task.time_running = 0
task.updated = 5
Expand All @@ -38,9 +48,11 @@ def test_handle_task_started(self):

counter_name = 'luigi_task_started_total'
gauge_name = self.gauge_name
labels = self.labels
labels = self.EXPECTED_LABELS

assert self.collector.registry.get_sample_value(counter_name, labels=self.labels) == 1
assert (
self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
)
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == 0

def test_handle_task_failed(self):
Expand All @@ -49,7 +61,7 @@ def test_handle_task_failed(self):

counter_name = 'luigi_task_failed_total'
gauge_name = self.gauge_name
labels = self.labels
labels = self.EXPECTED_LABELS

assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running
Expand All @@ -60,7 +72,7 @@ def test_handle_task_disabled(self):

counter_name = 'luigi_task_disabled_total'
gauge_name = self.gauge_name
labels = self.labels
labels = self.EXPECTED_LABELS

assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running
Expand All @@ -71,7 +83,7 @@ def test_handle_task_done(self):

counter_name = 'luigi_task_done_total'
gauge_name = self.gauge_name
labels = self.labels
labels = self.EXPECTED_LABELS

assert self.collector.registry.get_sample_value(counter_name, labels=labels) == 1
assert self.collector.registry.get_sample_value(gauge_name, labels=labels) == task.updated - task.time_running
Expand All @@ -80,3 +92,21 @@ def test_configure_http_handler(self):
mock_http_handler = mock.MagicMock()
self.collector.configure_http_handler(mock_http_handler)
mock_http_handler.set_header.assert_called_once_with('Content-Type', CONTENT_TYPE_LATEST)


@pytest.mark.contrib
class PrometheusMetricTaskParamsOnlyTest(PrometheusMetricBaseTest):
COLLECTOR_KWARGS = {
"use_task_family_in_labels": False,
"task_parameters_to_use_in_labels": ["a", "c"],
}
EXPECTED_LABELS = {"a": A_PARAM_VALUE, "c": C_PARAM_VALUE}


@pytest.mark.contrib
class PrometheusMetricTaskFamilyAndTaskParamsTest(PrometheusMetricBaseTest):
COLLECTOR_KWARGS = {
"use_task_family_in_labels": True,
"task_parameters_to_use_in_labels": ["b"],
}
EXPECTED_LABELS = {"family": TASK_FAMILY, "b": B_PARAM_VALUE}

0 comments on commit 05a4bc2

Please sign in to comment.