forked from GithubBackups/healthchecks
Refactor for testability, add more test cases
This commit is contained in:
parent
155a1f132b
commit
839c309cf7
@ -4,8 +4,6 @@ from datetime import timedelta as td
|
|||||||
from django import forms
|
from django import forms
|
||||||
from django.contrib.auth import authenticate
|
from django.contrib.auth import authenticate
|
||||||
from django.contrib.auth.models import User
|
from django.contrib.auth.models import User
|
||||||
from fido2.ctap2 import AttestationObject, AuthenticatorData
|
|
||||||
from fido2.client import ClientData
|
|
||||||
from hc.api.models import TokenBucket
|
from hc.api.models import TokenBucket
|
||||||
|
|
||||||
|
|
||||||
@ -125,17 +123,11 @@ class AddCredentialForm(forms.Form):
|
|||||||
|
|
||||||
def clean_client_data_json(self):
|
def clean_client_data_json(self):
|
||||||
v = self.cleaned_data["client_data_json"]
|
v = self.cleaned_data["client_data_json"]
|
||||||
binary = base64.b64decode(v.encode())
|
return base64.b64decode(v.encode())
|
||||||
obj = ClientData(binary)
|
|
||||||
|
|
||||||
return obj
|
|
||||||
|
|
||||||
def clean_attestation_object(self):
|
def clean_attestation_object(self):
|
||||||
v = self.cleaned_data["attestation_object"]
|
v = self.cleaned_data["attestation_object"]
|
||||||
binary = base64.b64decode(v.encode())
|
return base64.b64decode(v.encode())
|
||||||
obj = AttestationObject(binary)
|
|
||||||
|
|
||||||
return obj
|
|
||||||
|
|
||||||
|
|
||||||
class LoginTfaForm(forms.Form):
|
class LoginTfaForm(forms.Form):
|
||||||
@ -150,17 +142,11 @@ class LoginTfaForm(forms.Form):
|
|||||||
|
|
||||||
def clean_client_data_json(self):
|
def clean_client_data_json(self):
|
||||||
v = self.cleaned_data["client_data_json"]
|
v = self.cleaned_data["client_data_json"]
|
||||||
binary = base64.b64decode(v.encode())
|
return base64.b64decode(v.encode())
|
||||||
obj = ClientData(binary)
|
|
||||||
|
|
||||||
return obj
|
|
||||||
|
|
||||||
def clean_authenticator_data(self):
|
def clean_authenticator_data(self):
|
||||||
v = self.cleaned_data["authenticator_data"]
|
v = self.cleaned_data["authenticator_data"]
|
||||||
binary = base64.b64decode(v.encode())
|
return base64.b64decode(v.encode())
|
||||||
obj = AuthenticatorData(binary)
|
|
||||||
|
|
||||||
return obj
|
|
||||||
|
|
||||||
def clean_signature(self):
|
def clean_signature(self):
|
||||||
v = self.cleaned_data["signature"]
|
v = self.cleaned_data["signature"]
|
||||||
|
55
hc/accounts/tests/test_add_credential.py
Normal file
55
hc/accounts/tests/test_add_credential.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from django.core.signing import TimestampSigner
|
||||||
|
from hc.test import BaseTestCase
|
||||||
|
from hc.accounts.models import Credential
|
||||||
|
|
||||||
|
|
||||||
|
class AddCredentialTestCase(BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
|
||||||
|
self.url = "/accounts/two_factor/add/"
|
||||||
|
|
||||||
|
def _set_sudo_flag(self):
|
||||||
|
session = self.client.session
|
||||||
|
session["sudo"] = TimestampSigner().sign("active")
|
||||||
|
session.save()
|
||||||
|
|
||||||
|
def test_it_requires_sudo_mode(self):
|
||||||
|
self.client.login(username="alice@example.org", password="password")
|
||||||
|
|
||||||
|
r = self.client.get(self.url)
|
||||||
|
self.assertContains(r, "We have sent a confirmation code")
|
||||||
|
|
||||||
|
def test_it_shows_form(self):
|
||||||
|
self.client.login(username="alice@example.org", password="password")
|
||||||
|
self._set_sudo_flag()
|
||||||
|
|
||||||
|
r = self.client.get(self.url)
|
||||||
|
self.assertContains(r, "Add Security Key")
|
||||||
|
|
||||||
|
# It should put a "state" key in the session:
|
||||||
|
self.assertIn("state", self.client.session)
|
||||||
|
|
||||||
|
@patch("hc.accounts.views._get_credential_data")
|
||||||
|
def test_it_adds_credential(self, mock_get_credential_data):
|
||||||
|
mock_get_credential_data.return_value = b"dummy-credential-data"
|
||||||
|
|
||||||
|
self.client.login(username="alice@example.org", password="password")
|
||||||
|
self._set_sudo_flag()
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
"name": "My New Key",
|
||||||
|
"client_data_json": "e30=",
|
||||||
|
"attestation_object": "e30=",
|
||||||
|
}
|
||||||
|
|
||||||
|
r = self.client.post(self.url, payload, follow=True)
|
||||||
|
self.assertRedirects(r, "/accounts/profile/")
|
||||||
|
self.assertContains(r, "Added security key <strong>My New Key</strong>")
|
||||||
|
|
||||||
|
c = Credential.objects.get()
|
||||||
|
self.assertEqual(c.name, "My New Key")
|
||||||
|
|
||||||
|
# FIXME: test authentication failure
|
42
hc/accounts/tests/test_remove_credential.py
Normal file
42
hc/accounts/tests/test_remove_credential.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
from django.core.signing import TimestampSigner
|
||||||
|
|
||||||
|
from hc.test import BaseTestCase
|
||||||
|
from hc.accounts.models import Credential
|
||||||
|
|
||||||
|
|
||||||
|
class RemoveCredentialTestCase(BaseTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
|
||||||
|
self.c = Credential.objects.create(user=self.alice, name="Alices Key")
|
||||||
|
self.url = f"/accounts/two_factor/{self.c.code}/remove/"
|
||||||
|
|
||||||
|
def _set_sudo_flag(self):
|
||||||
|
session = self.client.session
|
||||||
|
session["sudo"] = TimestampSigner().sign("active")
|
||||||
|
session.save()
|
||||||
|
|
||||||
|
def test_it_shows_form(self):
|
||||||
|
self.client.login(username="alice@example.org", password="password")
|
||||||
|
self._set_sudo_flag()
|
||||||
|
|
||||||
|
r = self.client.get(self.url)
|
||||||
|
self.assertContains(r, "Remove Security Key")
|
||||||
|
self.assertContains(r, "Alices Key")
|
||||||
|
|
||||||
|
def test_it_removes_credential(self):
|
||||||
|
self.client.login(username="alice@example.org", password="password")
|
||||||
|
self._set_sudo_flag()
|
||||||
|
|
||||||
|
r = self.client.post(self.url, {"remove_credential": ""}, follow=True)
|
||||||
|
self.assertRedirects(r, "/accounts/profile/")
|
||||||
|
self.assertContains(r, "Removed security key <strong>Alices Key</strong>")
|
||||||
|
|
||||||
|
self.assertFalse(self.alice.credentials.exists())
|
||||||
|
|
||||||
|
def test_it_checks_owner(self):
|
||||||
|
self.client.login(username="charlie@example.org", password="password")
|
||||||
|
self._set_sudo_flag()
|
||||||
|
|
||||||
|
r = self.client.post(self.url, {"remove_credential": ""})
|
||||||
|
self.assertEqual(r.status_code, 400)
|
@ -18,6 +18,8 @@ from django.utils.timezone import now
|
|||||||
from django.urls import resolve, reverse, Resolver404
|
from django.urls import resolve, reverse, Resolver404
|
||||||
from django.views.decorators.csrf import csrf_exempt
|
from django.views.decorators.csrf import csrf_exempt
|
||||||
from django.views.decorators.http import require_POST
|
from django.views.decorators.http import require_POST
|
||||||
|
from fido2.ctap2 import AttestationObject, AuthenticatorData
|
||||||
|
from fido2.client import ClientData
|
||||||
from fido2.server import Fido2Server
|
from fido2.server import Fido2Server
|
||||||
from fido2.webauthn import PublicKeyCredentialRpEntity
|
from fido2.webauthn import PublicKeyCredentialRpEntity
|
||||||
from fido2 import cbor
|
from fido2 import cbor
|
||||||
@ -39,6 +41,8 @@ POST_LOGIN_ROUTES = (
|
|||||||
"hc-project-settings",
|
"hc-project-settings",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
FIDO2_SERVER = Fido2Server(PublicKeyCredentialRpEntity(settings.RP_ID, "healthchecks"))
|
||||||
|
|
||||||
|
|
||||||
def _allow_redirect(redirect_url):
|
def _allow_redirect(redirect_url):
|
||||||
if not redirect_url:
|
if not redirect_url:
|
||||||
@ -574,33 +578,46 @@ def remove_project(request, code):
|
|||||||
return redirect("hc-index")
|
return redirect("hc-index")
|
||||||
|
|
||||||
|
|
||||||
|
def _get_credential_data(request, form):
|
||||||
|
""" Complete Webauthn registration, return binary credential data.
|
||||||
|
|
||||||
|
This function is an interface to the fido2 library. It is separated
|
||||||
|
out so that we don't need to mock ClientData, AttestationObject,
|
||||||
|
register_complete separately in tests.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
auth_data = FIDO2_SERVER.register_complete(
|
||||||
|
request.session["state"],
|
||||||
|
ClientData(form.cleaned_data["client_data_json"]),
|
||||||
|
AttestationObject(form.cleaned_data["attestation_object"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
return auth_data.credential_data
|
||||||
|
|
||||||
|
|
||||||
@login_required
|
@login_required
|
||||||
@require_sudo_mode
|
@require_sudo_mode
|
||||||
def add_credential(request):
|
def add_credential(request):
|
||||||
rp = PublicKeyCredentialRpEntity("localhost", "Healthchecks")
|
|
||||||
server = Fido2Server(rp)
|
|
||||||
|
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
form = forms.AddCredentialForm(request.POST)
|
form = forms.AddCredentialForm(request.POST)
|
||||||
if not form.is_valid():
|
if not form.is_valid():
|
||||||
return HttpResponseBadRequest()
|
return HttpResponseBadRequest()
|
||||||
|
|
||||||
auth_data = server.register_complete(
|
credential_data = _get_credential_data(request, form)
|
||||||
request.session["state"],
|
if not credential_data:
|
||||||
form.cleaned_data["client_data_json"],
|
return HttpResponseBadRequest()
|
||||||
form.cleaned_data["attestation_object"],
|
|
||||||
)
|
|
||||||
|
|
||||||
c = Credential(user=request.user)
|
c = Credential(user=request.user)
|
||||||
c.name = request.POST["name"]
|
c.name = form.cleaned_data["name"]
|
||||||
c.data = auth_data.credential_data
|
c.data = credential_data
|
||||||
c.save()
|
c.save()
|
||||||
|
|
||||||
request.session["added_credential_name"] = c.name
|
request.session["added_credential_name"] = c.name
|
||||||
return redirect("hc-profile")
|
return redirect("hc-profile")
|
||||||
|
|
||||||
credentials = [c.unpack() for c in request.user.credentials.all()]
|
credentials = [c.unpack() for c in request.user.credentials.all()]
|
||||||
options, state = server.register_begin(
|
options, state = FIDO2_SERVER.register_begin(
|
||||||
{
|
{
|
||||||
"id": request.user.username.encode(),
|
"id": request.user.username.encode(),
|
||||||
"name": request.user.email,
|
"name": request.user.email,
|
||||||
@ -632,10 +649,28 @@ def remove_credential(request, code):
|
|||||||
return render(request, "accounts/remove_credential.html", ctx)
|
return render(request, "accounts/remove_credential.html", ctx)
|
||||||
|
|
||||||
|
|
||||||
def login_tfa(request):
|
def _check_credential(request, form, credentials):
|
||||||
rp = PublicKeyCredentialRpEntity("localhost", "Healthchecks")
|
""" Complete Webauthn authentication, return True on success.
|
||||||
server = Fido2Server(rp)
|
|
||||||
|
|
||||||
|
This function is an interface to the fido2 library. It is separated
|
||||||
|
out so that we don't need to mock ClientData, AuthenticatorData,
|
||||||
|
authenticate_complete separately in tests.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
FIDO2_SERVER.authenticate_complete(
|
||||||
|
request.session["state"],
|
||||||
|
credentials,
|
||||||
|
form.cleaned_data["credential_id"],
|
||||||
|
ClientData(form.cleaned_data["client_data_json"]),
|
||||||
|
AuthenticatorData(form.cleaned_data["authenticator_data"]),
|
||||||
|
form.cleaned_data["signature"],
|
||||||
|
)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def login_tfa(request):
|
||||||
if "2fa_user_id" not in request.session:
|
if "2fa_user_id" not in request.session:
|
||||||
return HttpResponseBadRequest()
|
return HttpResponseBadRequest()
|
||||||
|
|
||||||
@ -647,20 +682,15 @@ def login_tfa(request):
|
|||||||
if not form.is_valid():
|
if not form.is_valid():
|
||||||
return HttpResponseBadRequest()
|
return HttpResponseBadRequest()
|
||||||
|
|
||||||
server.authenticate_complete(
|
if not _check_credential(request, form, credentials):
|
||||||
request.session.pop("state", ""),
|
return HttpResponseBadRequest()
|
||||||
credentials,
|
|
||||||
form.cleaned_data["credential_id"],
|
|
||||||
form.cleaned_data["client_data_json"],
|
|
||||||
form.cleaned_data["authenticator_data"],
|
|
||||||
form.cleaned_data["signature"],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
request.session.pop("state")
|
||||||
request.session.pop("2fa_user_id")
|
request.session.pop("2fa_user_id")
|
||||||
auth_login(request, user, "hc.accounts.backends.EmailBackend")
|
auth_login(request, user, "hc.accounts.backends.EmailBackend")
|
||||||
return _redirect_after_login(request)
|
return _redirect_after_login(request)
|
||||||
|
|
||||||
options, state = server.authenticate_begin(credentials)
|
options, state = FIDO2_SERVER.authenticate_begin(credentials)
|
||||||
request.session["state"] = state
|
request.session["state"] = state
|
||||||
|
|
||||||
ctx = {"options": base64.b64encode(cbor.encode(options)).decode()}
|
ctx = {"options": base64.b64encode(cbor.encode(options)).decode()}
|
||||||
|
@ -164,6 +164,9 @@ STATICFILES_FINDERS = (
|
|||||||
COMPRESS_OFFLINE = True
|
COMPRESS_OFFLINE = True
|
||||||
COMPRESS_CSS_HASHING_METHOD = "content"
|
COMPRESS_CSS_HASHING_METHOD = "content"
|
||||||
|
|
||||||
|
# Webauthn
|
||||||
|
RP_ID = "localhost"
|
||||||
|
|
||||||
# Discord integration
|
# Discord integration
|
||||||
DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID")
|
DISCORD_CLIENT_ID = os.getenv("DISCORD_CLIENT_ID")
|
||||||
DISCORD_CLIENT_SECRET = os.getenv("DISCORD_CLIENT_SECRET")
|
DISCORD_CLIENT_SECRET = os.getenv("DISCORD_CLIENT_SECRET")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user