diff --git a/checks/categories.py b/checks/categories.py index c8f79455f..74a72297a 100644 --- a/checks/categories.py +++ b/checks/categories.py @@ -177,6 +177,7 @@ def __init__(self, name="web-tls"): WebTlsCertPubkey, WebTlsCertSignature, WebTlsCertHostmatch, + WebCaa, WebTlsDaneExists, WebTlsDaneValid, WebTlsZeroRTT, @@ -248,6 +249,7 @@ def __init__(self, name="mail-tls"): MailTlsCertPubkey, MailTlsCertSignature, MailTlsCertHostmatch, + MailCaa, MailTlsDaneExists, MailTlsDaneValid, MailTlsDaneRollover, @@ -1307,6 +1309,35 @@ def result_bad(self, tech_data): self.tech_data = tech_data +class WebCaa(Subtest): + def __init__(self): + super(WebCaa, self).__init__( + name="cert_caa", + label="detail web cert-caa label", + explanation="detail web cert-caa exp", + tech_string="", + init_tech_type="", + worst_status=scoring.CAA_WORST_STATUS, + full_score=scoring.CAA_GOOD, + model_score_field="cert_caa_score", + ) + + def result_good(self, tech_data): + self._status(STATUS_SUCCESS) + self.verdict = "detail web cert-caa verdict good" + self.tech_data = "" + + def result_info(self, tech_data): + self._status(STATUS_INFO) + self.verdict = "detail web cert-caa verdict warning" + self.tech_data = "" + + def result_bad(self, tech_data): + self._status(STATUS_FAIL) + self.verdict = "detail web cert-caa verdict bad" + self.tech_data = "" + + class WebTlsDaneExists(Subtest): def __init__(self): super().__init__( @@ -1909,6 +1940,35 @@ def result_has_daneTA(self, tech_data): self.tech_data = tech_data +class MailCaa(Subtest): + def __init__(self): + super(MailCaa, self).__init__( + name="cert_caa", + label="detail mail cert-caa label", + explanation="detail mail cert-caa exp", + tech_string="detail mail cert-caa tech table", + init_tech_type="", + worst_status=scoring.CAA_WORST_STATUS, + full_score=scoring.CAA_GOOD, + model_score_field="cert_caa_score", + ) + + def result_good(self, tech_data): + self._status(STATUS_SUCCESS) + self.verdict = "detail mail cert-caa verdict good" + self.tech_data = "" + + def result_info(self, tech_data): + self._status(STATUS_INFO) + self.verdict = "detail mail cert-caa verdict warning" + self.tech_data = "" + + def result_bad(self, tech_data): + self._status(STATUS_FAIL) + self.verdict = "detail mail cert-caa verdict bad" + self.tech_data = "" + + class MailTlsZeroRTT(Subtest): def __init__(self): super().__init__( diff --git a/checks/migrations/0017_domaintesttls_cert_caa_record_and_more.py b/checks/migrations/0017_domaintesttls_cert_caa_record_and_more.py new file mode 100644 index 000000000..a9fc9fab7 --- /dev/null +++ b/checks/migrations/0017_domaintesttls_cert_caa_record_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 4.2.16 on 2025-01-20 16:13 + +import checks.models +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("checks", "0016_webtestappsecpriv_timestamp_webtesttls_timestamp"), + ] + + operations = [ + migrations.AddField( + model_name="domaintesttls", + name="cert_caa_record", + field=checks.models.ListField(default=[]), + ), + migrations.AddField( + model_name="domaintesttls", + name="cert_caa_score", + field=models.IntegerField(null=True), + ), + ] diff --git a/checks/models.py b/checks/models.py index fd34f043d..6d136aa34 100644 --- a/checks/models.py +++ b/checks/models.py @@ -555,6 +555,9 @@ class DomainTestTls(BaseTestModel): cert_hostmatch_bad = ListField(null=True) cert_hostmatch_score = models.IntegerField(null=True) + cert_caa_score = models.IntegerField(null=True) + cert_caa_record = ListField(default=[]) + score = models.IntegerField(null=True) def __dir__(self): @@ -615,6 +618,8 @@ def __dir__(self): "cert_signature_score", "cert_hostmatch_bad", "cert_hostmatch_score", + "cert_caa_score", + "cert_caa_record", "score", "protocols_good", ] @@ -647,6 +652,7 @@ def get_web_api_details(self): "cert_pubkey_phase_out": self.cert_pubkey_phase_out, "cert_signature_bad": self.cert_signature_bad, "cert_hostmatch_bad": self.cert_hostmatch_bad, + "cert_caa_bad": self.cert_caa_bad, } def get_mail_api_details(self): @@ -673,6 +679,7 @@ def get_mail_api_details(self): "cert_pubkey_phase_out": self.cert_pubkey_phase_out, "cert_signature_bad": self.cert_signature_bad, "cert_hostmatch_bad": self.cert_hostmatch_bad, + "cert_caa_bad": self.cert_caa_bad, } class Meta: diff --git a/checks/scoring.py b/checks/scoring.py index 4b5606819..1e05b8c18 100644 --- a/checks/scoring.py +++ b/checks/scoring.py @@ -159,6 +159,10 @@ WEB_TLS_HOSTMATCH_BAD = NO_POINTS WEB_TLS_HOSTMATCH_WORST_STATUS = STATUS_FAIL +CAA_GOOD = FULL_WEIGHT_POINTS +CAA_BAD = NO_POINTS +CAA_WORST_STATUS = STATUS_INFO + # DANE_EXISTS has no score. It is combined with # DANE_VALID below. WEB_TLS_DANE_EXISTS_WORST_STATUS = STATUS_INFO diff --git a/checks/tasks/tls.py b/checks/tasks/tls.py index 61df33410..3d8c1b358 100644 --- a/checks/tasks/tls.py +++ b/checks/tasks/tls.py @@ -5,6 +5,7 @@ import socket import ssl import time +import unbound from urllib.parse import urlparse import requests @@ -652,6 +653,8 @@ def save_results(model, results, addr, domain, category): model.cert_signature_score = result.get("sigalg_score") model.cert_hostmatch_score = result.get("hostmatch_score") model.cert_hostmatch_bad = result.get("hostmatch_bad") + model.cert_caa_score = result.get("caa_score") + model.cert_caa_record = result.get("caa_record") model.dane_log = result.get("dane_log") model.dane_score = result.get("dane_score") model.dane_status = result.get("dane_status") @@ -720,6 +723,7 @@ def save_results(model, results, addr, domain, category): model.cert_signature_score = result.get("sigalg_score") model.cert_hostmatch_score = result.get("hostmatch_score") model.cert_hostmatch_bad = result.get("hostmatch_bad") + model.cert_caa_score = result.get("caa_score") model.dane_log = result.get("dane_log") model.dane_score = result.get("dane_score") model.dane_status = result.get("dane_status") @@ -877,6 +881,13 @@ def annotate_and_combine_all(good_items, sufficient_items, bad_items, phaseout_i else: category.subtests["cert_hostmatch"].result_good() + if dttls.cert_caa_score is None: + category.subtests["cert_caa"].result_info(dttls.cert_caa_record) + elif dttls.cert_caa_score is scoring.CAA_WORST_STATUS: + category.subtests["cert_caa"].result_info(dttls.cert_caa_record) + else: + category.subtests["cert_caa"].result_good(dttls.cert_caa_record) + if dttls.dane_status == DaneStatus.none: category.subtests["dane_exists"].result_bad() elif dttls.dane_status == DaneStatus.none_bogus: @@ -1619,6 +1630,50 @@ def do_web_cert(af_ip_pairs, url, task, *args, **kwargs): return ("cert", results) +def as_txt(data): + try: + txt = "".join(unbound.ub_data.dname2str(data)) + except UnicodeError: + txt = "" + return txt + + +def caa_callback(data, status, r): + data["score"] = scoring.CAA_WORST_STATUS + data["available"] = False + data["record"] = [] + if status == 0: + available = False + if r.rcode == unbound.RCODE_NOERROR and r.havedata == 1: + available = True + score = scoring.CAA_GOOD + for d in r.data.data: + txt = as_txt(d) + data["record"].append(txt) + elif r.rcode == unbound.RCODE_NXDOMAIN: + # we know for sure there is no DKIM pubkey + score = scoring.CAA_WORST_STATUS + else: + # resolving problems, servfail probably + score = scoring.CAA_WORST_STATUS + data["score"] = score + data["available"] = available + data["done"] = True + + +def check_caa(task, url): + caa_score = scoring.CAA_GOOD + try: + cb_data = task.async_resolv(url, unbound.RR_TYPE_CAA, caa_callback) + result = dict(available="available" in cb_data and cb_data["available"], score=cb_data["score"]) + caa_score = cb_data["score"] + # KeyError is due to score missing, happens in case of timeout on non resolving domain + except (SoftTimeLimitExceeded, KeyError): + result = dict(available=False, score=scoring.scoring.CAA_WORST_STATUS) + caa_score = scoring.CAA_WORST_STATUS + return (caa_score, cb_data["record"]) + + def cert_checks(url, mode, task, af_ip_pair=None, starttls_details=None, *args, **kwargs): """ Perform certificate checks. @@ -1689,6 +1744,7 @@ def cert_checks(url, mode, task, af_ip_pair=None, starttls_details=None, *args, pubkey_score, pubkey_bad, pubkey_phase_out = debug_chain.check_pubkey() sigalg_score, sigalg_bad = debug_chain.check_sigalg() chain_str = debug_chain.chain_str() + caa_score, caa_record = check_caa(task, url) if starttls_details: dane_results = debug_chain.check_dane(url, conn_port, task, dane_cb_data=starttls_details.dane_cb_data) @@ -1707,6 +1763,8 @@ def cert_checks(url, mode, task, af_ip_pair=None, starttls_details=None, *args, sigalg_score=sigalg_score, hostmatch_bad=hostmatch_bad, hostmatch_score=hostmatch_score, + caa_score=caa_score, + caa_record=caa_record, ) results.update(dane_results)