Skip to content

Commit

Permalink
Fix #194 - Add CAA test
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Feb 11, 2025
1 parent 583c017 commit 494594e
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 0 deletions.
60 changes: 60 additions & 0 deletions checks/categories.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def __init__(self, name="web-tls"):
WebTlsCertPubkey,
WebTlsCertSignature,
WebTlsCertHostmatch,
WebCaa,
WebTlsDaneExists,
WebTlsDaneValid,
WebTlsZeroRTT,
Expand Down Expand Up @@ -248,6 +249,7 @@ def __init__(self, name="mail-tls"):
MailTlsCertPubkey,
MailTlsCertSignature,
MailTlsCertHostmatch,
MailCaa,
MailTlsDaneExists,
MailTlsDaneValid,
MailTlsDaneRollover,
Expand Down Expand Up @@ -1307,6 +1309,35 @@ def result_bad(self, tech_data):
self.tech_data = tech_data


class WebCaa(Subtest):
def __init__(self):
super(WebCaa, self).__init__(
name="cert_caa",
label="detail web cert-caa label",
explanation="detail web cert-caa exp",
tech_string="",
init_tech_type="",
worst_status=scoring.CAA_WORST_STATUS,
full_score=scoring.CAA_GOOD,
model_score_field="cert_caa_score",
)

def result_good(self, tech_data):
self._status(STATUS_SUCCESS)
self.verdict = "detail web cert-caa verdict good"
self.tech_data = ""

def result_info(self, tech_data):
self._status(STATUS_INFO)
self.verdict = "detail web cert-caa verdict warning"
self.tech_data = ""

def result_bad(self, tech_data):
self._status(STATUS_FAIL)
self.verdict = "detail web cert-caa verdict bad"
self.tech_data = ""


class WebTlsDaneExists(Subtest):
def __init__(self):
super().__init__(
Expand Down Expand Up @@ -1909,6 +1940,35 @@ def result_has_daneTA(self, tech_data):
self.tech_data = tech_data


class MailCaa(Subtest):
def __init__(self):
super(MailCaa, self).__init__(
name="cert_caa",
label="detail mail cert-caa label",
explanation="detail mail cert-caa exp",
tech_string="detail mail cert-caa tech table",
init_tech_type="",
worst_status=scoring.CAA_WORST_STATUS,
full_score=scoring.CAA_GOOD,
model_score_field="cert_caa_score",
)

def result_good(self, tech_data):
self._status(STATUS_SUCCESS)
self.verdict = "detail mail cert-caa verdict good"
self.tech_data = ""

def result_info(self, tech_data):
self._status(STATUS_INFO)
self.verdict = "detail mail cert-caa verdict warning"
self.tech_data = ""

def result_bad(self, tech_data):
self._status(STATUS_FAIL)
self.verdict = "detail mail cert-caa verdict bad"
self.tech_data = ""


class MailTlsZeroRTT(Subtest):
def __init__(self):
super().__init__(
Expand Down
23 changes: 23 additions & 0 deletions checks/migrations/0017_domaintesttls_cert_caa_record_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 4.2.16 on 2025-01-20 16:13

import checks.models
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("checks", "0016_webtestappsecpriv_timestamp_webtesttls_timestamp"),
]

operations = [
migrations.AddField(
model_name="domaintesttls",
name="cert_caa_record",
field=checks.models.ListField(default=[]),
),
migrations.AddField(
model_name="domaintesttls",
name="cert_caa_score",
field=models.IntegerField(null=True),
),
]
7 changes: 7 additions & 0 deletions checks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ class DomainTestTls(BaseTestModel):
cert_hostmatch_bad = ListField(null=True)
cert_hostmatch_score = models.IntegerField(null=True)

cert_caa_score = models.IntegerField(null=True)
cert_caa_record = ListField(default=[])

score = models.IntegerField(null=True)

def __dir__(self):
Expand Down Expand Up @@ -615,6 +618,8 @@ def __dir__(self):
"cert_signature_score",
"cert_hostmatch_bad",
"cert_hostmatch_score",
"cert_caa_score",
"cert_caa_record",
"score",
"protocols_good",
]
Expand Down Expand Up @@ -647,6 +652,7 @@ def get_web_api_details(self):
"cert_pubkey_phase_out": self.cert_pubkey_phase_out,
"cert_signature_bad": self.cert_signature_bad,
"cert_hostmatch_bad": self.cert_hostmatch_bad,
"cert_caa_bad": self.cert_caa_bad,
}

def get_mail_api_details(self):
Expand All @@ -673,6 +679,7 @@ def get_mail_api_details(self):
"cert_pubkey_phase_out": self.cert_pubkey_phase_out,
"cert_signature_bad": self.cert_signature_bad,
"cert_hostmatch_bad": self.cert_hostmatch_bad,
"cert_caa_bad": self.cert_caa_bad,
}

class Meta:
Expand Down
4 changes: 4 additions & 0 deletions checks/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@
WEB_TLS_HOSTMATCH_BAD = NO_POINTS
WEB_TLS_HOSTMATCH_WORST_STATUS = STATUS_FAIL

CAA_GOOD = FULL_WEIGHT_POINTS
CAA_BAD = NO_POINTS
CAA_WORST_STATUS = STATUS_INFO

# DANE_EXISTS has no score. It is combined with
# DANE_VALID below.
WEB_TLS_DANE_EXISTS_WORST_STATUS = STATUS_INFO
Expand Down
58 changes: 58 additions & 0 deletions checks/tasks/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import socket
import ssl
import time
import unbound
from urllib.parse import urlparse

import requests
Expand Down Expand Up @@ -652,6 +653,8 @@ def save_results(model, results, addr, domain, category):
model.cert_signature_score = result.get("sigalg_score")
model.cert_hostmatch_score = result.get("hostmatch_score")
model.cert_hostmatch_bad = result.get("hostmatch_bad")
model.cert_caa_score = result.get("caa_score")
model.cert_caa_record = result.get("caa_record")
model.dane_log = result.get("dane_log")
model.dane_score = result.get("dane_score")
model.dane_status = result.get("dane_status")
Expand Down Expand Up @@ -720,6 +723,7 @@ def save_results(model, results, addr, domain, category):
model.cert_signature_score = result.get("sigalg_score")
model.cert_hostmatch_score = result.get("hostmatch_score")
model.cert_hostmatch_bad = result.get("hostmatch_bad")
model.cert_caa_score = result.get("caa_score")
model.dane_log = result.get("dane_log")
model.dane_score = result.get("dane_score")
model.dane_status = result.get("dane_status")
Expand Down Expand Up @@ -877,6 +881,13 @@ def annotate_and_combine_all(good_items, sufficient_items, bad_items, phaseout_i
else:
category.subtests["cert_hostmatch"].result_good()

if dttls.cert_caa_score is None:
category.subtests["cert_caa"].result_info(dttls.cert_caa_record)
elif dttls.cert_caa_score is scoring.CAA_WORST_STATUS:
category.subtests["cert_caa"].result_info(dttls.cert_caa_record)
else:
category.subtests["cert_caa"].result_good(dttls.cert_caa_record)

if dttls.dane_status == DaneStatus.none:
category.subtests["dane_exists"].result_bad()
elif dttls.dane_status == DaneStatus.none_bogus:
Expand Down Expand Up @@ -1619,6 +1630,50 @@ def do_web_cert(af_ip_pairs, url, task, *args, **kwargs):
return ("cert", results)


def as_txt(data):
try:
txt = "".join(unbound.ub_data.dname2str(data))
except UnicodeError:
txt = "<Non ASCII characters found>"
return txt


def caa_callback(data, status, r):
data["score"] = scoring.CAA_WORST_STATUS
data["available"] = False
data["record"] = []
if status == 0:
available = False
if r.rcode == unbound.RCODE_NOERROR and r.havedata == 1:
available = True
score = scoring.CAA_GOOD
for d in r.data.data:
txt = as_txt(d)
data["record"].append(txt)
elif r.rcode == unbound.RCODE_NXDOMAIN:
# we know for sure there is no DKIM pubkey
score = scoring.CAA_WORST_STATUS
else:
# resolving problems, servfail probably
score = scoring.CAA_WORST_STATUS
data["score"] = score
data["available"] = available
data["done"] = True


def check_caa(task, url):
caa_score = scoring.CAA_GOOD
try:
cb_data = task.async_resolv(url, unbound.RR_TYPE_CAA, caa_callback)
result = dict(available="available" in cb_data and cb_data["available"], score=cb_data["score"])
caa_score = cb_data["score"]
# KeyError is due to score missing, happens in case of timeout on non resolving domain
except (SoftTimeLimitExceeded, KeyError):
result = dict(available=False, score=scoring.scoring.CAA_WORST_STATUS)
caa_score = scoring.CAA_WORST_STATUS
return (caa_score, cb_data["record"])


def cert_checks(url, mode, task, af_ip_pair=None, starttls_details=None, *args, **kwargs):
"""
Perform certificate checks.
Expand Down Expand Up @@ -1689,6 +1744,7 @@ def cert_checks(url, mode, task, af_ip_pair=None, starttls_details=None, *args,
pubkey_score, pubkey_bad, pubkey_phase_out = debug_chain.check_pubkey()
sigalg_score, sigalg_bad = debug_chain.check_sigalg()
chain_str = debug_chain.chain_str()
caa_score, caa_record = check_caa(task, url)

if starttls_details:
dane_results = debug_chain.check_dane(url, conn_port, task, dane_cb_data=starttls_details.dane_cb_data)
Expand All @@ -1707,6 +1763,8 @@ def cert_checks(url, mode, task, af_ip_pair=None, starttls_details=None, *args,
sigalg_score=sigalg_score,
hostmatch_bad=hostmatch_bad,
hostmatch_score=hostmatch_score,
caa_score=caa_score,
caa_record=caa_record,
)
results.update(dane_results)

Expand Down

0 comments on commit 494594e

Please sign in to comment.