From c688dd58a4e1d2f56f97bf66715bead056dd410f Mon Sep 17 00:00:00 2001 From: goutamadwant Date: Sun, 7 Jun 2026 23:04:57 -0700 Subject: [PATCH 1/3] fix: replace pyu2f reauth fallback with fido2 --- .../google-auth/google/oauth2/challenges.py | 147 ++++++--- .../google-auth/google/oauth2/credentials.py | 4 +- packages/google-auth/setup.py | 2 +- .../tests/oauth2/test_challenges.py | 312 +++++++++++------- 4 files changed, 296 insertions(+), 169 deletions(-) diff --git a/packages/google-auth/google/oauth2/challenges.py b/packages/google-auth/google/oauth2/challenges.py index 59a2f9be4f43..c47ff46cfa03 100644 --- a/packages/google-auth/google/oauth2/challenges.py +++ b/packages/google-auth/google/oauth2/challenges.py @@ -18,6 +18,8 @@ import abc import base64 import getpass +import hashlib +import json import sys from google.auth import _helpers @@ -29,12 +31,12 @@ PublicKeyCredentialDescriptor, ) - REAUTH_ORIGIN = "https://accounts.google.com" SAML_CHALLENGE_MESSAGE = ( "Please run `gcloud auth login` to complete reauthentication with SAML." ) WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout +U2F_AUTHENTICATION_TYPE = "navigator.id.getAssertion" def get_user_password(text): @@ -117,7 +119,7 @@ def is_locally_eligible(self): @_helpers.copy_docstring(ReauthChallenge) def obtain_challenge_input(self, metadata): - # Check if there is an available Webauthn Handler, if not use pyu2f + # Check if there is an available Webauthn Handler, if not use fido2. try: factory = webauthn_handler_factory.WebauthnHandlerFactory() webauthn_handler = factory.get_handler() @@ -125,18 +127,35 @@ def obtain_challenge_input(self, metadata): sys.stderr.write("Please insert and touch your security key\n") return self._obtain_challenge_input_webauthn(metadata, webauthn_handler) except Exception: - # Attempt pyu2f if exception in webauthn flow + # Attempt fido2 if exception in webauthn flow. pass + return self._obtain_challenge_input_fido2(metadata) + + def _get_fido2_classes(self): try: - import pyu2f.convenience.authenticator # type: ignore - import pyu2f.errors # type: ignore - import pyu2f.model # type: ignore - except ImportError: - raise exceptions.ReauthFailError( - "pyu2f dependency is required to use Security key reauth feature. " - "It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`." + from fido2.ctap import CtapError # type: ignore + from fido2.ctap1 import ( + APDU, # type: ignore + ApduError, # type: ignore + Ctap1, # type: ignore ) + from fido2.hid import CtapHidDevice # type: ignore + except ImportError as caught_exc: + raise exceptions.ReauthFailError( + "fido2 dependency is required to use Security key reauth feature. " + "It can be installed via `pip install fido2` or `pip install google-auth[reauth]`." + ) from caught_exc + return CtapHidDevice, Ctap1, APDU, ApduError, CtapError + + def _obtain_challenge_input_fido2(self, metadata): + CtapHidDevice, Ctap1, APDU, ApduError, CtapError = self._get_fido2_classes() + + devices = list(CtapHidDevice.list_devices()) + if not devices: + sys.stderr.write("No security key found.\n") + return None + sk = metadata["securityKey"] challenges = sk["challenges"] # Read both 'applicationId' and 'relyingPartyId', if they are the same, use @@ -150,46 +169,57 @@ def obtain_challenge_input(self, metadata): else: application_parameters = [application_id] - challenge_data = [] - for c in challenges: - kh = c["keyHandle"].encode("ascii") - key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh))) - challenge = c["challenge"].encode("ascii") - challenge = base64.urlsafe_b64decode(challenge) - challenge_data.append({"key": key, "challenge": challenge}) - - # Track number of tries to suppress error message until all application_parameters - # are tried. - tries = 0 + sys.stderr.write("Please touch your security key.\n") for app_id in application_parameters: - try: - tries += 1 - api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( - REAUTH_ORIGIN - ) - response = api.Authenticate( - app_id, challenge_data, print_callback=sys.stderr.write - ) - return {"securityKey": response} - except pyu2f.errors.U2FError as e: - if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: - # Only show error if all app_ids have been tried - if tries == len(application_parameters): - sys.stderr.write("Ineligible security key.\n") - return None - continue - if e.code == pyu2f.errors.U2FError.TIMEOUT: - sys.stderr.write( - "Timed out while waiting for security key touch.\n" - ) - else: - raise e - except pyu2f.errors.PluginError as e: - sys.stderr.write("Plugin error: {}.\n".format(e)) - continue - except pyu2f.errors.NoDeviceFoundError: - sys.stderr.write("No security key found.\n") - return None + app_param = hashlib.sha256(app_id.encode("utf-8")).digest() + for challenge in challenges: + key_handle = self._urlsafe_b64decode(challenge["keyHandle"]) + challenge_bytes = self._urlsafe_b64decode(challenge["challenge"]) + client_data = self._create_u2f_client_data(challenge_bytes) + client_param = hashlib.sha256(client_data).digest() + for device in devices: + try: + signature = Ctap1(device).authenticate( + client_param, app_param, key_handle + ) + except ApduError as caught_exc: + if caught_exc.code == APDU.WRONG_DATA: + continue + if caught_exc.code == APDU.USE_NOT_SATISFIED: + sys.stderr.write( + "Timed out while waiting for security key touch.\n" + ) + return None + raise + except CtapError as caught_exc: + if caught_exc.code in ( + CtapError.ERR.TIMEOUT, + CtapError.ERR.ACTION_TIMEOUT, + CtapError.ERR.KEEPALIVE_CANCEL, + CtapError.ERR.OPERATION_DENIED, + ): + sys.stderr.write( + "Timed out while waiting for security key touch.\n" + ) + return None + raise + else: + return { + "securityKey": { + "clientData": self._unpadded_urlsafe_b64encode( + client_data + ), + "signatureData": self._unpadded_urlsafe_b64encode( + bytes(signature) + ), + "applicationId": app_id, + "keyHandle": self._unpadded_urlsafe_b64encode( + key_handle + ), + } + } + sys.stderr.write("Ineligible security key.\n") + return None def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): sk = metadata.get("securityKey") @@ -248,8 +278,23 @@ def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): def _unpadded_urlsafe_b64recode(self, s): """Converts standard b64 encoded string to url safe b64 encoded string with no padding.""" - b = base64.urlsafe_b64decode(s) - return base64.urlsafe_b64encode(b).decode().rstrip("=") + return self._unpadded_urlsafe_b64encode(self._urlsafe_b64decode(s)) + + def _create_u2f_client_data(self, challenge): + return json.dumps( + { + "challenge": self._unpadded_urlsafe_b64encode(challenge), + "origin": REAUTH_ORIGIN, + "typ": U2F_AUTHENTICATION_TYPE, + }, + sort_keys=True, + ).encode() + + def _unpadded_urlsafe_b64encode(self, data): + return base64.urlsafe_b64encode(data).decode().rstrip("=") + + def _urlsafe_b64decode(self, data): + return base64.urlsafe_b64decode(data + "=" * (-len(data) % 4)) class SamlChallenge(ReauthChallenge): diff --git a/packages/google-auth/google/oauth2/credentials.py b/packages/google-auth/google/oauth2/credentials.py index 724cf98bcad2..4b30d0ac2f20 100644 --- a/packages/google-auth/google/oauth2/credentials.py +++ b/packages/google-auth/google/oauth2/credentials.py @@ -71,8 +71,8 @@ class Credentials( Reauth is disabled by default. To enable reauth, set the `enable_reauth_refresh` parameter to True in the constructor. Note that reauth feature is intended for gcloud to use only. - If reauth is enabled, `pyu2f` dependency has to be installed in order to use security - key reauth feature. Dependency can be installed via `pip install pyu2f` or `pip install + If reauth is enabled, `fido2` dependency has to be installed in order to use security + key reauth feature. Dependency can be installed via `pip install fido2` or `pip install google-auth[reauth]`. """ diff --git a/packages/google-auth/setup.py b/packages/google-auth/setup.py index cf3148130d6e..02caf384fab7 100644 --- a/packages/google-auth/setup.py +++ b/packages/google-auth/setup.py @@ -33,7 +33,7 @@ pyjwt_extra_require = ["pyjwt>=2.0"] -reauth_extra_require = ["pyu2f>=0.1.5"] +reauth_extra_require = ["fido2>=2.0.0,<3.0.0"] # TODO(https://github.com/googleapis/google-auth-library-python/issues/1738): Add bounds for pyopenssl dependency. enterprise_cert_extra_require = ["pyopenssl"] diff --git a/packages/google-auth/tests/oauth2/test_challenges.py b/packages/google-auth/tests/oauth2/test_challenges.py index fb5e164d50bd..cffa163fcb77 100644 --- a/packages/google-auth/tests/oauth2/test_challenges.py +++ b/packages/google-auth/tests/oauth2/test_challenges.py @@ -15,12 +15,11 @@ """Tests for the reauth module.""" import base64 -import os -import sys +import hashlib +import json from unittest import mock import pytest # type: ignore -import pyu2f # type: ignore from google.auth import exceptions from google.oauth2 import challenges @@ -33,6 +32,81 @@ ) +class FakeAPDU: + WRONG_DATA = "wrong data" + USE_NOT_SATISFIED = "use not satisfied" + + +class FakeApduError(Exception): + def __init__(self, code): + self.code = code + + +class FakeCtapError(Exception): + class ERR: + TIMEOUT = "timeout" + ACTION_TIMEOUT = "action timeout" + KEEPALIVE_CANCEL = "keepalive cancel" + OPERATION_DENIED = "operation denied" + + def __init__(self, code): + self.code = code + + +class FakeCtapHidDevice: + devices = [] + + @classmethod + def list_devices(cls): + return iter(cls.devices) + + +class FakeCtap1: + calls = [] + side_effects = [] + + def __init__(self, device): + self.device = device + + def authenticate(self, client_param, app_param, key_handle): + self.calls.append((self.device, client_param, app_param, key_handle)) + effect = self.side_effects.pop(0) + if isinstance(effect, Exception): + raise effect + return effect + + +def use_fake_fido2(challenge, devices=None, side_effects=None): + FakeCtapHidDevice.devices = devices if devices is not None else ["security-key"] + FakeCtap1.calls = [] + FakeCtap1.side_effects = ( + side_effects if side_effects is not None else [b"signature data"] + ) + return mock.patch.object( + challenge, + "_get_fido2_classes", + return_value=( + FakeCtapHidDevice, + FakeCtap1, + FakeAPDU, + FakeApduError, + FakeCtapError, + ), + ) + + +def expected_client_data(challenge): + challenge_b64 = base64.urlsafe_b64encode(challenge).decode().rstrip("=") + return json.dumps( + { + "challenge": challenge_b64, + "origin": challenges.REAUTH_ORIGIN, + "typ": challenges.U2F_AUTHENTICATION_TYPE, + }, + sort_keys=True, + ).encode() + + def test_get_user_password(): with mock.patch("getpass.getpass", return_value="foo"): assert challenges.get_user_password("") == "foo" @@ -56,143 +130,151 @@ def test_security_key(): "relyingPartyId": "security_key_application_id", }, } - mock_key = mock.Mock() - challenge = challenges.SecurityKeyChallenge() + assert challenge._get_fido2_classes() # Test the case that security key challenge is passed with applicationId and # relyingPartyId the same. - os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None) - - with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.return_value = "security key response" - assert challenge.name == "SECURITY_KEY" - assert challenge.is_locally_eligible - assert challenge.obtain_challenge_input(metadata) == { - "securityKey": "security key response" + with use_fake_fido2(challenge): + assert challenge.name == "SECURITY_KEY" + assert challenge.is_locally_eligible + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": { + "clientData": base64.urlsafe_b64encode( + expected_client_data(b"some_challenge") + ) + .decode() + .rstrip("="), + "signatureData": "c2lnbmF0dXJlIGRhdGE", + "applicationId": "security_key_application_id", + "keyHandle": "some_key", } - mock_authenticate.assert_called_with( - "security_key_application_id", - [{"key": mock_key, "challenge": b"some_challenge"}], - print_callback=sys.stderr.write, + } + assert FakeCtap1.calls == [ + ( + "security-key", + hashlib.sha256(expected_client_data(b"some_challenge")).digest(), + hashlib.sha256(b"security_key_application_id").digest(), + base64.urlsafe_b64decode("some_key"), ) + ] # Test the case that webauthn plugin is available - os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin" - - with mock.patch( - "google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn", - return_value={"securityKey": "security key response"}, + with ( + mock.patch( + "google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn", + return_value={"securityKey": "security key response"}, + ), + mock.patch( + "google.oauth2.webauthn_handler.PluginHandler.is_available", + return_value=True, + ), ): assert challenge.obtain_challenge_input(metadata) == { "securityKey": "security key response" } - os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None) + + with ( + mock.patch( + "google.oauth2.challenges.webauthn_handler_factory.WebauthnHandlerFactory", + side_effect=Exception(), + ), + use_fake_fido2(challenge), + ): + assert ( + challenge.obtain_challenge_input(metadata)["securityKey"]["applicationId"] + == "security_key_application_id" + ) # Test the case that security key challenge is passed with applicationId and # relyingPartyId different, first call works. metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id" - sys.stderr.write("metadata=" + str(metadata) + "\n") - with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.return_value = "security key response" - assert challenge.name == "SECURITY_KEY" - assert challenge.is_locally_eligible - assert challenge.obtain_challenge_input(metadata) == { - "securityKey": "security key response" + with use_fake_fido2(challenge): + assert challenge.name == "SECURITY_KEY" + assert challenge.is_locally_eligible + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": { + "clientData": base64.urlsafe_b64encode( + expected_client_data(b"some_challenge") + ) + .decode() + .rstrip("="), + "signatureData": "c2lnbmF0dXJlIGRhdGE", + "applicationId": "security_key_relying_party_id", + "keyHandle": "some_key", } - mock_authenticate.assert_called_with( - "security_key_relying_party_id", - [{"key": mock_key, "challenge": b"some_challenge"}], - print_callback=sys.stderr.write, + } + assert FakeCtap1.calls == [ + ( + "security-key", + hashlib.sha256(expected_client_data(b"some_challenge")).digest(), + hashlib.sha256(b"security_key_relying_party_id").digest(), + base64.urlsafe_b64decode("some_key"), ) + ] # Test the case that security key challenge is passed with applicationId and # relyingPartyId different, first call fails, requires retry. metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id" - with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - assert challenge.name == "SECURITY_KEY" - assert challenge.is_locally_eligible - mock_authenticate.side_effect = [ - pyu2f.errors.U2FError(pyu2f.errors.U2FError.DEVICE_INELIGIBLE), - "security key response", - ] - assert challenge.obtain_challenge_input(metadata) == { - "securityKey": "security key response" - } - calls = [ - mock.call( - "security_key_relying_party_id", - [{"key": mock_key, "challenge": b"some_challenge"}], - print_callback=sys.stderr.write, - ), - mock.call( - "security_key_application_id", - [{"key": mock_key, "challenge": b"some_challenge"}], - print_callback=sys.stderr.write, - ), - ] - mock_authenticate.assert_has_calls(calls) + with use_fake_fido2( + challenge, + side_effects=[ + FakeApduError(FakeAPDU.WRONG_DATA), + b"security key response", + ], + ): + assert challenge.name == "SECURITY_KEY" + assert challenge.is_locally_eligible + assert ( + challenge.obtain_challenge_input(metadata)["securityKey"]["applicationId"] + == "security_key_application_id" + ) + assert [call[2] for call in FakeCtap1.calls] == [ + hashlib.sha256(b"security_key_relying_party_id").digest(), + hashlib.sha256(b"security_key_application_id").digest(), + ] # Test various types of exceptions. - with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key): - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.side_effect = pyu2f.errors.U2FError( - pyu2f.errors.U2FError.DEVICE_INELIGIBLE - ) - assert challenge.obtain_challenge_input(metadata) is None + metadata["securityKey"]["relyingPartyId"] = "security_key_application_id" + with use_fake_fido2(challenge, side_effects=[FakeApduError(FakeAPDU.WRONG_DATA)]): + assert challenge.obtain_challenge_input(metadata) is None - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.side_effect = pyu2f.errors.U2FError( - pyu2f.errors.U2FError.TIMEOUT - ) - assert challenge.obtain_challenge_input(metadata) is None - - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.side_effect = pyu2f.errors.PluginError() - assert challenge.obtain_challenge_input(metadata) is None - - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.side_effect = pyu2f.errors.U2FError( - pyu2f.errors.U2FError.BAD_REQUEST - ) - with pytest.raises(pyu2f.errors.U2FError): - challenge.obtain_challenge_input(metadata) - - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.side_effect = pyu2f.errors.NoDeviceFoundError() - assert challenge.obtain_challenge_input(metadata) is None - - with mock.patch( - "pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate" - ) as mock_authenticate: - mock_authenticate.side_effect = pyu2f.errors.UnsupportedVersionException() - with pytest.raises(pyu2f.errors.UnsupportedVersionException): - challenge.obtain_challenge_input(metadata) - - with mock.patch.dict("sys.modules"): - sys.modules["pyu2f"] = None - with pytest.raises(exceptions.ReauthFailError) as excinfo: - challenge.obtain_challenge_input(metadata) - assert excinfo.match(r"pyu2f dependency is required") + with use_fake_fido2( + challenge, + side_effects=[FakeApduError(FakeAPDU.USE_NOT_SATISFIED)], + ): + assert challenge.obtain_challenge_input(metadata) is None + + with use_fake_fido2( + challenge, + side_effects=[FakeCtapError(FakeCtapError.ERR.TIMEOUT)], + ): + assert challenge.obtain_challenge_input(metadata) is None + + with use_fake_fido2(challenge, side_effects=[FakeApduError("bad request")]): + with pytest.raises(FakeApduError): + challenge.obtain_challenge_input(metadata) + + with use_fake_fido2(challenge, side_effects=[FakeCtapError("bad request")]): + with pytest.raises(FakeCtapError): + challenge.obtain_challenge_input(metadata) + + with use_fake_fido2(challenge, devices=[]): + assert challenge.obtain_challenge_input(metadata) is None + + real_import = __import__ + + def block_fido2(name, *args, **kwargs): + if name == "fido2" or name.startswith("fido2."): + raise ImportError(name) + return real_import(name, *args, **kwargs) + + assert block_fido2("json") is json + + with mock.patch("builtins.__import__", side_effect=block_fido2): + with pytest.raises(exceptions.ReauthFailError) as excinfo: + challenge._get_fido2_classes() + assert excinfo.match(r"fido2 dependency is required") def test_security_key_webauthn(): From 49a4163c4307568dceb7a1bed13206b23ba40a87 Mon Sep 17 00:00:00 2001 From: goutamadwant Date: Sun, 7 Jun 2026 23:53:37 -0700 Subject: [PATCH 2/3] fix: handle fido2 device access errors --- .../google-auth/google/oauth2/challenges.py | 14 ++++++++++- .../tests/oauth2/test_challenges.py | 24 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/packages/google-auth/google/oauth2/challenges.py b/packages/google-auth/google/oauth2/challenges.py index c47ff46cfa03..a617c9e6f737 100644 --- a/packages/google-auth/google/oauth2/challenges.py +++ b/packages/google-auth/google/oauth2/challenges.py @@ -151,7 +151,12 @@ def _get_fido2_classes(self): def _obtain_challenge_input_fido2(self, metadata): CtapHidDevice, Ctap1, APDU, ApduError, CtapError = self._get_fido2_classes() - devices = list(CtapHidDevice.list_devices()) + try: + devices = list(CtapHidDevice.list_devices()) + except OSError as caught_exc: + sys.stderr.write("Failed to list security keys: {}.\n".format(caught_exc)) + return None + if not devices: sys.stderr.write("No security key found.\n") return None @@ -203,6 +208,13 @@ def _obtain_challenge_input_fido2(self, metadata): ) return None raise + except OSError as caught_exc: + sys.stderr.write( + "Failed to communicate with security key: {}.\n".format( + caught_exc + ) + ) + continue else: return { "securityKey": { diff --git a/packages/google-auth/tests/oauth2/test_challenges.py b/packages/google-auth/tests/oauth2/test_challenges.py index cffa163fcb77..ac2b09c846b4 100644 --- a/packages/google-auth/tests/oauth2/test_challenges.py +++ b/packages/google-auth/tests/oauth2/test_challenges.py @@ -262,6 +262,30 @@ def test_security_key(): with use_fake_fido2(challenge, devices=[]): assert challenge.obtain_challenge_input(metadata) is None + with ( + use_fake_fido2(challenge), + mock.patch.object( + FakeCtapHidDevice, + "list_devices", + side_effect=OSError("permission denied"), + ), + ): + assert challenge.obtain_challenge_input(metadata) is None + + with use_fake_fido2( + challenge, + devices=["first-key", "second-key"], + side_effects=[ + OSError("permission denied"), + b"security key response", + ], + ): + assert ( + challenge.obtain_challenge_input(metadata)["securityKey"]["applicationId"] + == "security_key_application_id" + ) + assert [call[0] for call in FakeCtap1.calls] == ["first-key", "second-key"] + real_import = __import__ def block_fido2(name, *args, **kwargs): From 9ce8d9565f7cd23c916ae418b956f49066379010 Mon Sep 17 00:00:00 2001 From: goutamadwant Date: Mon, 8 Jun 2026 19:14:33 -0700 Subject: [PATCH 3/3] fix: keep pyu2f as legacy reauth fallback --- .../google-auth/google/oauth2/challenges.py | 109 ++++++++-- .../tests/oauth2/test_challenges.py | 189 +++++++++++++++++- 2 files changed, 284 insertions(+), 14 deletions(-) diff --git a/packages/google-auth/google/oauth2/challenges.py b/packages/google-auth/google/oauth2/challenges.py index a617c9e6f737..2fdce81e3b3a 100644 --- a/packages/google-auth/google/oauth2/challenges.py +++ b/packages/google-auth/google/oauth2/challenges.py @@ -21,6 +21,8 @@ import hashlib import json import sys +from typing import Any, Mapping, Optional +import warnings from google.auth import _helpers from google.auth import exceptions @@ -119,7 +121,7 @@ def is_locally_eligible(self): @_helpers.copy_docstring(ReauthChallenge) def obtain_challenge_input(self, metadata): - # Check if there is an available Webauthn Handler, if not use fido2. + # Check if there is an available Webauthn Handler, if not use fallbacks. try: factory = webauthn_handler_factory.WebauthnHandlerFactory() webauthn_handler = factory.get_handler() @@ -127,28 +129,43 @@ def obtain_challenge_input(self, metadata): sys.stderr.write("Please insert and touch your security key\n") return self._obtain_challenge_input_webauthn(metadata, webauthn_handler) except Exception: - # Attempt fido2 if exception in webauthn flow. + # Attempt local security key fallbacks if exception in webauthn flow. pass - return self._obtain_challenge_input_fido2(metadata) + try: + return self._obtain_challenge_input_fido2(metadata) + except ImportError: + return self._obtain_challenge_input_pyu2f(metadata) + + def _get_fido2_classes(self) -> tuple[Any, Any, Any, Any, Any]: + """Return fido2 classes used by security key reauth.""" + from fido2.ctap import CtapError # type: ignore + from fido2.ctap1 import ( + APDU, # type: ignore + ApduError, # type: ignore + Ctap1, # type: ignore + ) + from fido2.hid import CtapHidDevice # type: ignore + + return CtapHidDevice, Ctap1, APDU, ApduError, CtapError - def _get_fido2_classes(self): + def _get_pyu2f_module(self) -> Any: + """Return pyu2f module used by the legacy security key fallback.""" try: - from fido2.ctap import CtapError # type: ignore - from fido2.ctap1 import ( - APDU, # type: ignore - ApduError, # type: ignore - Ctap1, # type: ignore - ) - from fido2.hid import CtapHidDevice # type: ignore + import pyu2f.convenience.authenticator # type: ignore + import pyu2f.errors # type: ignore + import pyu2f.model # type: ignore except ImportError as caught_exc: raise exceptions.ReauthFailError( "fido2 dependency is required to use Security key reauth feature. " "It can be installed via `pip install fido2` or `pip install google-auth[reauth]`." ) from caught_exc - return CtapHidDevice, Ctap1, APDU, ApduError, CtapError + return pyu2f - def _obtain_challenge_input_fido2(self, metadata): + def _obtain_challenge_input_fido2( + self, metadata: Mapping[str, Any] + ) -> Optional[dict]: + """Obtain security key challenge input using fido2.""" CtapHidDevice, Ctap1, APDU, ApduError, CtapError = self._get_fido2_classes() try: @@ -233,6 +250,72 @@ def _obtain_challenge_input_fido2(self, metadata): sys.stderr.write("Ineligible security key.\n") return None + def _obtain_challenge_input_pyu2f( + self, metadata: Mapping[str, Any] + ) -> Optional[dict]: + """Obtain security key challenge input using the legacy pyu2f fallback.""" + pyu2f = self._get_pyu2f_module() + warnings.warn( + "Support for pyu2f is deprecated and will be removed in a future release. " + "Please switch to fido2 by installing `fido2` or `google-auth[reauth]`.", + DeprecationWarning, + stacklevel=2, + ) + + sk = metadata["securityKey"] + challenges = sk["challenges"] + # Read both 'applicationId' and 'relyingPartyId', if they are the same, use + # applicationId, if they are different, use relyingPartyId first and retry + # with applicationId + application_id = sk["applicationId"] + relying_party_id = sk["relyingPartyId"] + + if application_id != relying_party_id: + application_parameters = [relying_party_id, application_id] + else: + application_parameters = [application_id] + + challenge_data = [] + for c in challenges: + kh = c["keyHandle"].encode("ascii") + key = pyu2f.model.RegisteredKey(bytearray(base64.urlsafe_b64decode(kh))) + challenge = c["challenge"].encode("ascii") + challenge = base64.urlsafe_b64decode(challenge) + challenge_data.append({"key": key, "challenge": challenge}) + + # Track number of tries to suppress error message until all application_parameters + # are tried. + tries = 0 + for app_id in application_parameters: + try: + tries += 1 + api = pyu2f.convenience.authenticator.CreateCompositeAuthenticator( + REAUTH_ORIGIN + ) + response = api.Authenticate( + app_id, challenge_data, print_callback=sys.stderr.write + ) + return {"securityKey": response} + except pyu2f.errors.U2FError as e: + if e.code == pyu2f.errors.U2FError.DEVICE_INELIGIBLE: + # Only show error if all app_ids have been tried + if tries == len(application_parameters): + sys.stderr.write("Ineligible security key.\n") + return None + continue + if e.code == pyu2f.errors.U2FError.TIMEOUT: + sys.stderr.write( + "Timed out while waiting for security key touch.\n" + ) + else: + raise e + except pyu2f.errors.PluginError as e: + sys.stderr.write("Plugin error: {}.\n".format(e)) + continue + except pyu2f.errors.NoDeviceFoundError: + sys.stderr.write("No security key found.\n") + return None + def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler): sk = metadata.get("securityKey") if sk is None: diff --git a/packages/google-auth/tests/oauth2/test_challenges.py b/packages/google-auth/tests/oauth2/test_challenges.py index ac2b09c846b4..c83f93ec2efd 100644 --- a/packages/google-auth/tests/oauth2/test_challenges.py +++ b/packages/google-auth/tests/oauth2/test_challenges.py @@ -17,6 +17,7 @@ import base64 import hashlib import json +import types from unittest import mock import pytest # type: ignore @@ -76,6 +77,41 @@ def authenticate(self, client_param, app_param, key_handle): return effect +class FakeRegisteredKey: + def __init__(self, key): + self.key = key + + +class FakeU2FError(Exception): + DEVICE_INELIGIBLE = "device ineligible" + TIMEOUT = "timeout" + BAD_REQUEST = "bad request" + + def __init__(self, code): + self.code = code + + +class FakePluginError(Exception): + pass + + +class FakeNoDeviceFoundError(Exception): + pass + + +class FakeCompositeAuthenticator: + origins = [] + calls = [] + side_effects = [] + + def Authenticate(self, app_id, challenge_data, print_callback=None): + self.calls.append((app_id, challenge_data, print_callback)) + effect = self.side_effects.pop(0) + if isinstance(effect, Exception): + raise effect + return effect + + def use_fake_fido2(challenge, devices=None, side_effects=None): FakeCtapHidDevice.devices = devices if devices is not None else ["security-key"] FakeCtap1.calls = [] @@ -95,6 +131,46 @@ def use_fake_fido2(challenge, devices=None, side_effects=None): ) +def create_composite_authenticator(origin): + FakeCompositeAuthenticator.origins.append(origin) + return FakeCompositeAuthenticator() + + +def use_fake_pyu2f(side_effects=None): + FakeCompositeAuthenticator.origins = [] + FakeCompositeAuthenticator.calls = [] + FakeCompositeAuthenticator.side_effects = ( + side_effects if side_effects is not None else ["security key response"] + ) + + pyu2f = types.ModuleType("pyu2f") + convenience = types.ModuleType("pyu2f.convenience") + authenticator = types.ModuleType("pyu2f.convenience.authenticator") + errors = types.ModuleType("pyu2f.errors") + model = types.ModuleType("pyu2f.model") + + authenticator.CreateCompositeAuthenticator = create_composite_authenticator + convenience.authenticator = authenticator + errors.U2FError = FakeU2FError + errors.PluginError = FakePluginError + errors.NoDeviceFoundError = FakeNoDeviceFoundError + model.RegisteredKey = FakeRegisteredKey + pyu2f.convenience = convenience + pyu2f.errors = errors + pyu2f.model = model + + return mock.patch.dict( + "sys.modules", + { + "pyu2f": pyu2f, + "pyu2f.convenience": convenience, + "pyu2f.convenience.authenticator": authenticator, + "pyu2f.errors": errors, + "pyu2f.model": model, + }, + ) + + def expected_client_data(challenge): challenge_b64 = base64.urlsafe_b64encode(challenge).decode().rstrip("=") return json.dumps( @@ -286,6 +362,101 @@ def test_security_key(): ) assert [call[0] for call in FakeCtap1.calls] == ["first-key", "second-key"] + with ( + mock.patch.object( + challenge, + "_get_fido2_classes", + side_effect=ImportError("fido2"), + ), + use_fake_pyu2f(), + pytest.warns(DeprecationWarning, match="pyu2f is deprecated"), + ): + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": "security key response" + } + assert FakeCompositeAuthenticator.origins == [challenges.REAUTH_ORIGIN] + assert FakeCompositeAuthenticator.calls[0][0] == "security_key_application_id" + + metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id" + with ( + mock.patch.object( + challenge, + "_get_fido2_classes", + side_effect=ImportError("fido2"), + ), + use_fake_pyu2f( + side_effects=[ + FakeU2FError(FakeU2FError.DEVICE_INELIGIBLE), + "security key response", + ], + ), + pytest.warns(DeprecationWarning, match="pyu2f is deprecated"), + ): + assert challenge.obtain_challenge_input(metadata) == { + "securityKey": "security key response" + } + assert [call[0] for call in FakeCompositeAuthenticator.calls] == [ + "security_key_relying_party_id", + "security_key_application_id", + ] + + metadata["securityKey"]["relyingPartyId"] = "security_key_application_id" + with ( + mock.patch.object( + challenge, + "_get_fido2_classes", + side_effect=ImportError("fido2"), + ), + use_fake_pyu2f(side_effects=[FakeU2FError(FakeU2FError.DEVICE_INELIGIBLE)]), + pytest.warns(DeprecationWarning, match="pyu2f is deprecated"), + ): + assert challenge.obtain_challenge_input(metadata) is None + + with ( + mock.patch.object( + challenge, + "_get_fido2_classes", + side_effect=ImportError("fido2"), + ), + use_fake_pyu2f(side_effects=[FakeU2FError(FakeU2FError.TIMEOUT)]), + pytest.warns(DeprecationWarning, match="pyu2f is deprecated"), + ): + assert challenge.obtain_challenge_input(metadata) is None + + with ( + mock.patch.object( + challenge, + "_get_fido2_classes", + side_effect=ImportError("fido2"), + ), + use_fake_pyu2f(side_effects=[FakePluginError("plugin error")]), + pytest.warns(DeprecationWarning, match="pyu2f is deprecated"), + ): + assert challenge.obtain_challenge_input(metadata) is None + + with ( + mock.patch.object( + challenge, + "_get_fido2_classes", + side_effect=ImportError("fido2"), + ), + use_fake_pyu2f(side_effects=[FakeNoDeviceFoundError()]), + pytest.warns(DeprecationWarning, match="pyu2f is deprecated"), + ): + assert challenge.obtain_challenge_input(metadata) is None + + with ( + mock.patch.object( + challenge, + "_get_fido2_classes", + side_effect=ImportError("fido2"), + ), + use_fake_pyu2f(side_effects=[FakeU2FError(FakeU2FError.BAD_REQUEST)]), + pytest.warns(DeprecationWarning, match="pyu2f is deprecated"), + ): + with pytest.raises(FakeU2FError): + challenge.obtain_challenge_input(metadata) + real_import = __import__ def block_fido2(name, *args, **kwargs): @@ -296,8 +467,24 @@ def block_fido2(name, *args, **kwargs): assert block_fido2("json") is json with mock.patch("builtins.__import__", side_effect=block_fido2): - with pytest.raises(exceptions.ReauthFailError) as excinfo: + with pytest.raises(ImportError): challenge._get_fido2_classes() + + def block_security_key_imports(name, *args, **kwargs): + if ( + name == "fido2" + or name.startswith("fido2.") + or name == "pyu2f" + or name.startswith("pyu2f.") + ): + raise ImportError(name) + return real_import(name, *args, **kwargs) + + assert block_security_key_imports("json") is json + + with mock.patch("builtins.__import__", side_effect=block_security_key_imports): + with pytest.raises(exceptions.ReauthFailError) as excinfo: + challenge.obtain_challenge_input(metadata) assert excinfo.match(r"fido2 dependency is required")