-
-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
policies/password: password uniqueness history #13453
base: main
Are you sure you want to change the base?
Changes from all commits
eb56da4
798af64
f57d7fb
7ee7115
771ef97
8de2221
f067fd9
ff79ad6
909aaaa
94f7d69
d678c0c
be3b144
e942c3e
ee06b8a
8c93822
4d812c0
0a6bcef
8da0e12
836efed
9f6a7d8
32cbdd9
a3ac49e
7676939
61edb52
4ff7b9b
741c1bb
58a64b5
50da439
d4342d5
c4b31fc
d62fa76
5681f4b
eeba846
97c316f
42ed2ee
d7c2c3d
e071160
35c74a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
from rest_framework.viewsets import ModelViewSet | ||
|
||
from authentik.core.api.used_by import UsedByMixin | ||
from authentik.policies.api.policies import PolicySerializer | ||
from authentik.policies.unique_password.models import UniquePasswordPolicy | ||
|
||
|
||
class UniquePasswordPolicySerializer(PolicySerializer): | ||
"""Password Uniqueness Policy Serializer""" | ||
|
||
class Meta: | ||
model = UniquePasswordPolicy | ||
fields = PolicySerializer.Meta.fields + [ | ||
"password_field", | ||
"num_historical_passwords", | ||
] | ||
|
||
|
||
class UniquePasswordPolicyViewSet(UsedByMixin, ModelViewSet): | ||
"""Password Uniqueness Policy Viewset""" | ||
|
||
queryset = UniquePasswordPolicy.objects.all() | ||
serializer_class = UniquePasswordPolicySerializer | ||
filterset_fields = "__all__" | ||
ordering = ["name"] | ||
search_fields = ["name"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
"""authentik Unique Password policy app config""" | ||
|
||
from authentik.blueprints.apps import ManagedAppConfig | ||
|
||
|
||
class AuthentikPoliciesUniquePasswordConfig(ManagedAppConfig): | ||
name = "authentik.policies.unique_password" | ||
label = "authentik_policies_unique_password" | ||
verbose_name = "authentik Policies.Unique Password" | ||
default = True |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
# Generated by Django 5.0.7 on 2024-08-09 02:49 | ||
|
||
import django.db.models.deletion | ||
from django.conf import settings | ||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
initial = True | ||
|
||
dependencies = [ | ||
("authentik_policies", "0011_policybinding_failure_result_and_more"), | ||
migrations.swappable_dependency(settings.AUTH_USER_MODEL), | ||
] | ||
|
||
operations = [ | ||
migrations.CreateModel( | ||
name="UniquePasswordPolicy", | ||
fields=[ | ||
( | ||
"policy_ptr", | ||
models.OneToOneField( | ||
auto_created=True, | ||
on_delete=django.db.models.deletion.CASCADE, | ||
parent_link=True, | ||
primary_key=True, | ||
serialize=False, | ||
to="authentik_policies.policy", | ||
), | ||
), | ||
( | ||
"password_field", | ||
models.TextField( | ||
default="password", | ||
help_text="Field key to check, field keys defined in Prompt stages are available.", | ||
), | ||
), | ||
( | ||
"num_historical_passwords", | ||
models.PositiveIntegerField( | ||
default=0, help_text="Number of passwords to check against." | ||
), | ||
), | ||
], | ||
options={ | ||
"verbose_name": "Password Uniqueness Policy", | ||
"verbose_name_plural": "Password Uniqueness Policies", | ||
"indexes": [ | ||
models.Index(fields=["policy_ptr_id"], name="authentik_p_policy__f559aa_idx") | ||
], | ||
}, | ||
bases=("authentik_policies.policy",), | ||
), | ||
migrations.CreateModel( | ||
name="UserPasswordHistory", | ||
fields=[ | ||
( | ||
"id", | ||
models.AutoField( | ||
auto_created=True, primary_key=True, serialize=False, verbose_name="ID" | ||
), | ||
), | ||
("old_password", models.CharField(max_length=128)), | ||
("created_at", models.DateTimeField(auto_now_add=True)), | ||
( | ||
"user", | ||
models.ForeignKey( | ||
on_delete=django.db.models.deletion.CASCADE, | ||
related_name="old_passwords", | ||
to=settings.AUTH_USER_MODEL, | ||
), | ||
), | ||
], | ||
options={ | ||
"verbose_name": "User Password History", | ||
}, | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
from django.contrib.auth.hashers import identify_hasher | ||
from django.db import models | ||
from django.utils.translation import gettext as _ | ||
from rest_framework.serializers import BaseSerializer | ||
from structlog.stdlib import get_logger | ||
|
||
from authentik.core.models import User | ||
from authentik.policies.models import Policy | ||
from authentik.policies.types import PolicyRequest, PolicyResult | ||
from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT | ||
|
||
LOGGER = get_logger() | ||
|
||
|
||
class UniquePasswordPolicy(Policy): | ||
"""Policy ensuring a user's password is not identical to a previously used password. | ||
|
||
After enabling the policy, Authentik stores every user's previous password whenever a user | ||
changes their own password. Old passwords remain stored in hashed form. | ||
""" | ||
|
||
password_field = models.TextField( | ||
default="password", | ||
help_text=_("Field key to check, field keys defined in Prompt stages are available."), | ||
) | ||
|
||
# Limit on the number of previous passwords the policy evaluates | ||
# Also controls number of old passwords the system stores. | ||
num_historical_passwords = models.PositiveIntegerField( | ||
default=0, | ||
help_text=_("Number of passwords to check against."), | ||
) | ||
|
||
@property | ||
def serializer(self) -> type[BaseSerializer]: | ||
from authentik.policies.unique_password.api import UniquePasswordPolicySerializer | ||
|
||
return UniquePasswordPolicySerializer | ||
|
||
@property | ||
def component(self) -> str: | ||
return "ak-policy-password-uniqueness-form" | ||
|
||
def passes(self, request: PolicyRequest) -> PolicyResult: | ||
from authentik.policies.unique_password.models import UserPasswordHistory | ||
|
||
password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get( | ||
self.password_field, request.context.get(self.password_field) | ||
) | ||
if not password: | ||
LOGGER.warning( | ||
"Password field not found in request when checking UniquePasswordPolicy", | ||
field=self.password_field, | ||
fields=request.context.keys(), | ||
) | ||
return PolicyResult(False, _("Password not set in context")) | ||
password = str(password) | ||
|
||
if not self.num_historical_passwords: | ||
# Policy not configured to check against any passwords | ||
return PolicyResult(True) | ||
|
||
num_to_check = self.num_historical_passwords | ||
password_history = UserPasswordHistory.objects.filter(user=request.user).order_by( | ||
"-created_at" | ||
)[:num_to_check] | ||
|
||
if not password_history: | ||
return PolicyResult(True) | ||
|
||
for record in password_history: | ||
if not record.old_password: | ||
continue | ||
|
||
if self._passwords_match(new_password=password, old_password=record.old_password): | ||
# Return on first match. Authentik does not consider timing attacks | ||
# on old passwords to be an attack surface. | ||
return PolicyResult( | ||
False, | ||
_("This password has been used previously. Please choose a different one."), | ||
) | ||
|
||
return PolicyResult(True) | ||
|
||
def _passwords_match(self, *, new_password: str, old_password: str) -> bool: | ||
try: | ||
hasher = identify_hasher(old_password) | ||
except ValueError: | ||
LOGGER.warning( | ||
"Skipping password; could not load hash algorithm", | ||
) | ||
return False | ||
|
||
return hasher.verify(new_password, old_password) | ||
|
||
@classmethod | ||
def is_in_use(cls): | ||
"""Check if any UniquePasswordPolicy is in use, either through policy bindings | ||
or direct attachment to a PromptStage. | ||
|
||
Returns: | ||
bool: True if any policy is in use, False otherwise | ||
""" | ||
from authentik.policies.models import PolicyBinding | ||
|
||
# Check if any policy is in use through bindings | ||
if PolicyBinding.in_use.for_policy(cls).exists(): | ||
return True | ||
|
||
# Check if any policy is attached to a PromptStage | ||
if cls.objects.filter(promptstage__isnull=False).exists(): | ||
return True | ||
|
||
return False | ||
|
||
class Meta(Policy.PolicyMeta): | ||
verbose_name = _("Password Uniqueness Policy") | ||
verbose_name_plural = _("Password Uniqueness Policies") | ||
|
||
|
||
class UserPasswordHistory(models.Model): | ||
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="old_passwords") | ||
# Mimic's column type of AbstractBaseUser.password | ||
old_password = models.CharField(max_length=128) | ||
created_at = models.DateTimeField(auto_now_add=True) | ||
|
||
class Meta: | ||
verbose_name = _("User Password History") | ||
|
||
def __str__(self) -> str: | ||
if self.created_at: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should never be null. |
||
return ( | ||
f"Previous Password (user: {self.user_id}, " | ||
f"recorded: {self.created_at:%Y/%m/%d %X})" | ||
) | ||
return f"Previous Password (user: {self.user_id}, recorded: None)" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
"""authentik policy signals""" | ||
|
||
from typing import Any | ||
|
||
from django.db.models.signals import post_delete | ||
from django.dispatch import receiver | ||
from django.http import HttpRequest | ||
|
||
from authentik.core.middleware import SESSION_KEY_IMPERSONATE_USER | ||
from authentik.core.models import User | ||
from authentik.policies.models import PolicyBinding | ||
from authentik.policies.unique_password.tasks import ( | ||
purge_password_history_table, | ||
trim_user_password_history, | ||
) | ||
from authentik.stages.user_write.signals import user_write | ||
|
||
|
||
@receiver(post_delete, sender=PolicyBinding) | ||
def purge_password_history(sender, instance, **_): | ||
from authentik.policies.unique_password.models import UniquePasswordPolicy | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to have this as a JIT import? |
||
|
||
if not isinstance(instance.policy, UniquePasswordPolicy): | ||
return | ||
purge_password_history_table.delay() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may (?) want this to run on a schedule rather than triggering it here. |
||
|
||
|
||
@receiver(user_write) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will also allow you to ignore password changes in case |
||
def copy_password_to_password_history( | ||
sender, request: HttpRequest, user: User, data: dict[str, Any], **kwargs | ||
): | ||
"""Preserve the user's old password if UniquePasswordPolicy is enabled anywhere""" | ||
from authentik.policies.unique_password.models import UniquePasswordPolicy, UserPasswordHistory | ||
|
||
user_changed_own_password = ( | ||
any("password" in x for x in data.keys()) | ||
and request.user.pk == user.pk | ||
and SESSION_KEY_IMPERSONATE_USER not in request.session | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should have a special case for impersonation here. In my mind, this is happening in a password change flow, and should run the same way on impersonation than on normal runs. If the admin wants to change a password for a user without going through all the checks, they can still do so from the admin interface directly. |
||
) | ||
if user_changed_own_password: | ||
# Check if any UniquePasswordPolicy is in use | ||
unique_pwd_policy_in_use = UniquePasswordPolicy.is_in_use() | ||
|
||
if unique_pwd_policy_in_use: | ||
"""NOTE: Because we run this in a signal after saving the user, | ||
we are not atomically guaranteed to save password history. | ||
""" | ||
UserPasswordHistory.objects.create(user=user, old_password=user.password) | ||
trim_user_password_history.delay(user.pk) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want this to run on a schedule rather than triggering it here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does 0 mean in that case? Also it defaults to 1 in the frontend. If 0 is not valid, we should error on this.