Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #194 - Add CAA test #1624

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added checks/caa/__init__.py
Empty file.
276 changes: 276 additions & 0 deletions checks/caa/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
import textwrap
from urllib.parse import urlparse

from abnf.grammars.misc import load_grammar_rulelist
from abnf.parser import Rule as _Rule, ParseError, NodeVisitor

from checks.tasks.shared import TranslatableTechTableItem


class CAAParseError(ValueError):
def __init__(self, msg_id: str, context: dict[str, str]):
self.msg_id = msg_id
self.context = context

def to_translatable_tech_table_item(self):
return TranslatableTechTableItem(self.msg_id, self.context)


def node_get_named_child_value(node, name):
"""Do a breadth-first search of the tree for addr-spec node. If found,
return its value."""
queue = [node]
while queue:
n, queue = queue[0], queue[1:]
if n.name == name:
return n.value
else:
queue.extend(n.children)
return None


# https://www.iana.org/assignments/acme/acme.xhtml#acme-validation-methods
ACME_VALIDATION_METHODS = {
"http-01",
"dns-01",
"http-01",
"tls-alpn-01",
"tls-alpn-01",
"email-reply-00",
"tkauth-01",
}

# RFC 8657 4
ACME_VALIDATION_CUSTOM_PREFIX = "ca-"


@load_grammar_rulelist()
class CAAValidationMethodsGrammar(_Rule):
"""
Grammar for validationmethods CAA parameter to the issue/issuewild property.
Per RFC8657 4
"""

grammar = textwrap.dedent(
"""
value = [*(label ",") label]
label = 1*(ALPHA / DIGIT / "-")
"""
)


def validate_issue_validation_methods(parameter_value: str) -> set[str]:
"""Validate the validationmethods parameter value for the issue/issuewild CAA property."""
parse_result = CAAValidationMethodsGrammar("value").parse_all(parameter_value)
# Careful: label/value are used as properties of the parse tree, but also as properties
# in the original ABNF grammer, in opposite roles. Not confusing at all.
validation_methods = {label.value for label in parse_result.children if label.name == "label"}
for validation_method in validation_methods:
if validation_method not in ACME_VALIDATION_METHODS and not validation_method.startswith(
ACME_VALIDATION_CUSTOM_PREFIX
):
raise CAAParseError(msg_id="invalid_property_issue_validation_method", context={"value": parameter_value})
return validation_methods


@load_grammar_rulelist()
class CAAPropertyIssueGrammar(_Rule):
"""
Grammar for issue/issuewild CAA property values.
Per RFC8659 4.2
# TODO: consider https://www.rfc-editor.org/errata/eid7139
"""

grammar = textwrap.dedent(
"""
issue-value = *WSP [issuer-domain-name *WSP]
[";" *WSP [parameters *WSP]]

issuer-domain-name = label *("." label)
label = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT))

parameters = (parameter *WSP ";" *WSP parameters) / parameter
parameter = tag *WSP "=" *WSP value
tag = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT))
value = *(%x21-3A / %x3C-7E)
"""
)


class CAAPropertyIssueVisitor(NodeVisitor):
def __init__(self):
super().__init__()
self.issuer_domain_name = None
self.parameters = {}

def visit_issue_value(self, node):
for child_node in node.children:
self.visit(child_node)

def visit_issuer_domain_name(self, node):
self.issuer_domain_name = node.value

def visit_parameters(self, node):
for child_node in node.children:
self.visit(child_node)

def visit_parameter(self, node):
tag = node_get_named_child_value(node, "tag")
value = node_get_named_child_value(node, "value")
self.parameters[tag] = value


def validate_property_issue(value: str):
parse_result = CAAPropertyIssueGrammar("issue-value").parse_all(value)
visitor = CAAPropertyIssueVisitor()
visitor.visit(parse_result)
if "validationmethods" in visitor.parameters:
validate_issue_validation_methods(visitor.parameters["validationmethods"])


def validate_property_iodef(value: str):
"""Validate iodef value per RFC8659 4.4"""
# TODO: do we want different translations for the various sub-errors?
try:
url = urlparse(value)
except ValueError:
raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value})
if url.scheme in ["http", "https"]:
# RFC8659 refers to RFC6546, which is unclear on requirements. Let's assume a netloc is needed.
if not url.netloc:
raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value})
elif url.scheme == "mailto":
# RFC8659 does not prescribe what an email address is
if "@" not in url.path:
raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value})
else:
raise CAAParseError(msg_id="invalid_property_iodef_value", context={"value": value})


def validate_property_contactemail(value: str):
"""Validate contactemail per CAB BR 1.6.3, requiring a single RFC 6532 3.2 address."""
# TODO: the grammar is nontrivial, consider if this needs refinement
if "@" not in value:
raise CAAParseError(msg_id="invalid_property_contactemail_value", context={"value": value})


@load_grammar_rulelist()
class PhoneNumberRule(_Rule):
"""
Grammar for phone numbers per RFC3966.
Includes https://www.rfc-editor.org/errata/eid203
local-number-digits and its dependencies were stripped out,
as the ABNF parser had issues with it, and they are not used by us now.
"""

grammar = textwrap.dedent(
"""
telephone-uri = "tel:" telephone-subscriber
telephone-subscriber = global-number
global-number = global-number-digits *par
par = parameter / extension / isdn-subaddress
isdn-subaddress = ";isub=" 1*uric
extension = ";ext=" 1*phonedigit
context = ";phone-context=" descriptor
descriptor = domainname / global-number-digits
global-number-digits = "+" *phonedigit DIGIT *phonedigit
domainname = *( domainlabel "." ) toplabel [ "." ]
domainlabel = alphanum
/ alphanum *( alphanum / "-" ) alphanum
toplabel = ALPHA / ALPHA *( alphanum / "-" ) alphanum
parameter = ";" pname ["=" pvalue ]
pname = 1*( alphanum / "-" )
pvalue = 1*paramchar
paramchar = param-unreserved / unreserved / pct-encoded
unreserved = alphanum / mark
mark = "-" / "_" / "." / "!" / "~" / "*" /
"'" / "(" / ")"
pct-encoded = "%" HEXDIG HEXDIG
param-unreserved = "[" / "]" / "/" / ":" / "&" / "+" / "$"
phonedigit = DIGIT / [ visual-separator ]
phonedigit-hex = HEXDIG / "*" / "#" / [ visual-separator ]
visual-separator = "-" / "." / "(" / ")"
alphanum = ALPHA / DIGIT
reserved = ";" / "/" / "?" / ":" / "@" / "&" /
"=" / "+" / "$" / ","
uric = reserved / unreserved / pct-encoded
"""
)


def validate_property_contactphone(value: str):
"""Validate contactphone per CAB SC014, requiring an RFC3966 5.1.4 global number."""
parse_result = PhoneNumberRule("global-number").parse_all(value)
if not parse_result:
raise CAAParseError(msg_id="invalid_property_contactphone_value", context={"value": value})


@load_grammar_rulelist()
class CAAPropertyIssueMailRule(_Rule):
"""
Grammar for CAA issuemail property per RFC9495.
"""

grammar = textwrap.dedent(
"""
issuemail-value = *WSP [issuer-domain-name *WSP]
[";" *WSP [parameters *WSP]]

issuer-domain-name = label *("." label)
label = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT))

parameters = (parameter *WSP ";" *WSP parameters) / parameter
parameter = tag *WSP "=" *WSP value
tag = (ALPHA / DIGIT) *( *("-") (ALPHA / DIGIT))
value = *(%x21-3A / %x3C-7E)
"""
)


def validate_property_issuemail(value: str):
"""Validate issuemail property per RFC9495."""
parse_result = CAAPropertyIssueMailRule("issuemail-value").parse_all(value)
if not parse_result:
raise CAAParseError(msg_id="invalid_property_issuemail_value", context={"value": value})


def validate_tag(tag: int):
# RFC8659 4.1
if tag not in [0, 128]:
raise CAAParseError(msg_id="invalid_tag_reserved_bits", context={"value": str(tag)})


# https://www.iana.org/assignments/pkix-parameters/pkix-parameters.xhtml#caa-properties
CAA_PROPERTY_VALIDATORS = {
"issue": validate_property_issue,
"issuewild": validate_property_issue,
"iodef": validate_property_iodef,
"auth": None,
"path": None,
"policy": None,
"contactemail": validate_property_contactemail,
"contactphone": validate_property_contactphone,
"issuevmc": validate_property_issue,
"issuemail": validate_property_issuemail,
}


def validate_caa_record(tag: int, name: str, value: str):
validate_tag(tag)
try:
validator = CAA_PROPERTY_VALIDATORS[name.lower()]
if validator is None:
raise CAAParseError(msg_id="invalid_reserved_property", context={"value": name})
validator(value)
except ParseError as e:
raise CAAParseError(
msg_id="invalid_property_syntax",
context={
"property_name": name,
"property_value": value,
"invalid_character_position": e.start,
"invalid_character": value[e.start],
},
)
except KeyError:
raise CAAParseError(msg_id="invalid_unknown_property", context={"value": name})
40 changes: 40 additions & 0 deletions checks/caa/retrieval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from dataclasses import dataclass, field
from typing import Optional
from dns.rdtypes.ANY import CAA

from dns.resolver import NoAnswer, NXDOMAIN, LifetimeTimeout, NoNameservers

from checks import scoring
from checks.caa.parser import validate_caa_record, CAAParseError
from checks.resolver import dns_resolve_caa
from checks.tasks.rpki import logger
from checks.tasks.shared import TranslatableTechTableItem


@dataclass
class CAAResult:
enabled: bool
canonical_name: Optional[str] = None
errors: list[TranslatableTechTableItem] = field(default_factory=list)
recommendations: list[TranslatableTechTableItem] = field(default_factory=list)
caa_records: list[CAA] = field(default_factory=list)

@property
def score(self) -> int:
return scoring.CAA_GOOD if self.enabled and not self.errors else scoring.CAA_BAD


def retrieve_parse_caa(target_domain: str) -> CAAResult:
try:
canonical_name, rrset = dns_resolve_caa(target_domain)
except (NoAnswer, NXDOMAIN, LifetimeTimeout, NoNameservers):
return CAAResult(enabled=False)

result = CAAResult(enabled=True, canonical_name=canonical_name, caa_records=[caa.to_text() for caa in rrset])
for caa in rrset:
try:
validate_caa_record(caa.flags, caa.tag.decode("ascii"), caa.value.decode("ascii"))
except CAAParseError as cpe:
result.errors.append(TranslatableTechTableItem(cpe.msg_id, cpe.context))
logger.critical("⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ " + str(result))
return result
Empty file added checks/caa/tests/__init__.py
Empty file.
44 changes: 44 additions & 0 deletions checks/caa/tests/test_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import pytest

from checks.caa.parser import validate_caa_record, CAAParseError


def test_validate_caa_record():
valid_pairs = [
(0, "issue", ";"),
(128, "issue", ";"),
(128, "ISSue", ";"),
(0, "issue", "ca.example.com"),
(0, "issuewild", "ca.example.com"),
(0, "iodef", "https://report.example.com"),
(0, "iodef", "mailto:[email protected]"),
(0, "contactemail", "[email protected]"),
(0, "contactphone", "+3185123456"),
(0, "issuevmc", ";"),
(0, "issuevmc", "ca.example.com"),
(0, "contactphone", "+3185123456"),
(0, "issuemail", ";"),
(0, "issuemail", "authority.example; account=123456"),
(0, "issue", "example.net; accounturi=https://example.net/account/1234"),
(0, "issue", "example.net; validationmethods=dns-01,ca-custom"),
(0, "issuewild", "example.net; accounturi=https://example.net/account/2345; validationmethods=http-01"),
]
for tag, name, value in valid_pairs:
validate_caa_record(tag, name, value)

invalid_pairs = [
(255, "issue", ";"), # Reserved bit set in flag
(0, "issue", "%"), # Invalid issuer domain name
(0, "issue", "💩"), # Invalid issuer domain name
(0, "issuewild", "💩"), # Invalid issuer domain name
(0, "issuevmc", "💩"), # Invalid issuer domain name
(0, "iodef", "https://"), # Invalid URL
(0, "iodef", "ftp://report.example.com"), # Invalid URL scheme
(0, "contactemail", "not-an-email"), # Invalid email address
(0, "contactphone", "not-a-phone-number"), # Invalid phone number
(0, "issuemail", "authority.example; account=💩"), # Invalid account ID grammar
(0, "issue", "example.net; validationmethods=dns-01,custom"), # Invalid validation method
]
for tag, name, value in invalid_pairs:
with pytest.raises(CAAParseError):
validate_caa_record(tag, name, value)
Loading
Loading