Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 149 additions & 9 deletions packages/google-auth/google/oauth2/challenges.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import abc
import base64
import getpass
import hashlib
import json
import sys
from typing import Any, Mapping, Optional
import warnings

from google.auth import _helpers
from google.auth import exceptions
Expand All @@ -29,12 +33,12 @@
PublicKeyCredentialDescriptor,
)


REAUTH_ORIGIN = "https://accounts.google.com"
SAML_CHALLENGE_MESSAGE = (
"Please run `gcloud auth login` to complete reauthentication with SAML."
)
WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout
U2F_AUTHENTICATION_TYPE = "navigator.id.getAssertion"


def get_user_password(text):
Expand Down Expand Up @@ -117,26 +121,147 @@ def is_locally_eligible(self):

@_helpers.copy_docstring(ReauthChallenge)
def obtain_challenge_input(self, metadata):
# Check if there is an available Webauthn Handler, if not use pyu2f
# Check if there is an available Webauthn Handler, if not use fallbacks.
try:
factory = webauthn_handler_factory.WebauthnHandlerFactory()
webauthn_handler = factory.get_handler()
if webauthn_handler is not None:
sys.stderr.write("Please insert and touch your security key\n")
return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
except Exception:
# Attempt pyu2f if exception in webauthn flow
# Attempt local security key fallbacks if exception in webauthn flow.
pass

try:
return self._obtain_challenge_input_fido2(metadata)
except ImportError:
return self._obtain_challenge_input_pyu2f(metadata)

def _get_fido2_classes(self) -> tuple[Any, Any, Any, Any, Any]:
"""Return fido2 classes used by security key reauth."""
from fido2.ctap import CtapError # type: ignore
from fido2.ctap1 import (
APDU, # type: ignore
ApduError, # type: ignore
Ctap1, # type: ignore
)
from fido2.hid import CtapHidDevice # type: ignore

return CtapHidDevice, Ctap1, APDU, ApduError, CtapError

def _get_pyu2f_module(self) -> Any:
"""Return pyu2f module used by the legacy security key fallback."""
try:
import pyu2f.convenience.authenticator # type: ignore
import pyu2f.errors # type: ignore
import pyu2f.model # type: ignore
except ImportError:
except ImportError as caught_exc:
raise exceptions.ReauthFailError(
"pyu2f dependency is required to use Security key reauth feature. "
"It can be installed via `pip install pyu2f` or `pip install google-auth[reauth]`."
)
"fido2 dependency is required to use Security key reauth feature. "
"It can be installed via `pip install fido2` or `pip install google-auth[reauth]`."
) from caught_exc
return pyu2f

def _obtain_challenge_input_fido2(
self, metadata: Mapping[str, Any]
) -> Optional[dict]:
"""Obtain security key challenge input using fido2."""
CtapHidDevice, Ctap1, APDU, ApduError, CtapError = self._get_fido2_classes()

try:
devices = list(CtapHidDevice.list_devices())
except OSError as caught_exc:
sys.stderr.write("Failed to list security keys: {}.\n".format(caught_exc))
return None

if not devices:
sys.stderr.write("No security key found.\n")
return None

sk = metadata["securityKey"]
challenges = sk["challenges"]
# Read both 'applicationId' and 'relyingPartyId', if they are the same, use
# applicationId, if they are different, use relyingPartyId first and retry
# with applicationId
application_id = sk["applicationId"]
relying_party_id = sk["relyingPartyId"]

if application_id != relying_party_id:
application_parameters = [relying_party_id, application_id]
else:
application_parameters = [application_id]

sys.stderr.write("Please touch your security key.\n")
for app_id in application_parameters:
app_param = hashlib.sha256(app_id.encode("utf-8")).digest()
for challenge in challenges:
key_handle = self._urlsafe_b64decode(challenge["keyHandle"])
challenge_bytes = self._urlsafe_b64decode(challenge["challenge"])
client_data = self._create_u2f_client_data(challenge_bytes)
client_param = hashlib.sha256(client_data).digest()
for device in devices:
try:
signature = Ctap1(device).authenticate(
client_param, app_param, key_handle
)
except ApduError as caught_exc:
if caught_exc.code == APDU.WRONG_DATA:
continue
if caught_exc.code == APDU.USE_NOT_SATISFIED:
sys.stderr.write(
"Timed out while waiting for security key touch.\n"
)
return None
raise
except CtapError as caught_exc:
if caught_exc.code in (
CtapError.ERR.TIMEOUT,
CtapError.ERR.ACTION_TIMEOUT,
CtapError.ERR.KEEPALIVE_CANCEL,
CtapError.ERR.OPERATION_DENIED,
):
sys.stderr.write(
"Timed out while waiting for security key touch.\n"
)
return None
raise
Comment on lines +203 to +227

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When communicating with a security key, Ctap1(device).authenticate(...) can raise an OSError (or platform-specific subclasses like PermissionError or FileNotFoundError) if the user lacks sufficient permissions (e.g., missing udev rules on Linux) or if the device is disconnected. Adding an except OSError block ensures these common communication issues are handled gracefully by writing to stderr and continuing to the next device, rather than crashing the application.

                    try:
                        signature = Ctap1(device).authenticate(
                            client_param, app_param, key_handle
                        )
                    except ApduError as caught_exc:
                        if caught_exc.code == APDU.WRONG_DATA:
                            continue
                        if caught_exc.code == APDU.USE_NOT_SATISFIED:
                            sys.stderr.write(
                                "Timed out while waiting for security key touch.\n"
                            )
                            return None
                        raise
                    except CtapError as caught_exc:
                        if caught_exc.code in (
                            CtapError.ERR.TIMEOUT,
                            CtapError.ERR.ACTION_TIMEOUT,
                            CtapError.ERR.KEEPALIVE_CANCEL,
                            CtapError.ERR.OPERATION_DENIED,
                        ):
                            sys.stderr.write(
                                "Timed out while waiting for security key touch.\n"
                            )
                            return None
                        raise
                    except OSError as caught_exc:
                        sys.stderr.write(
                            "Failed to communicate with security key: {}.\n".format(
                                caught_exc
                            )
                        )
                        continue

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled this too. If one security key raises OSError while authenticating, the fallback now writes the error and tries the next detected device.

Added coverage for the bad-device then good-device case.

except OSError as caught_exc:
sys.stderr.write(
"Failed to communicate with security key: {}.\n".format(
caught_exc
)
)
continue
else:
return {
"securityKey": {
"clientData": self._unpadded_urlsafe_b64encode(
client_data
),
"signatureData": self._unpadded_urlsafe_b64encode(
bytes(signature)
),
"applicationId": app_id,
"keyHandle": self._unpadded_urlsafe_b64encode(
key_handle
),
}
}
sys.stderr.write("Ineligible security key.\n")
return None

def _obtain_challenge_input_pyu2f(
self, metadata: Mapping[str, Any]
) -> Optional[dict]:
"""Obtain security key challenge input using the legacy pyu2f fallback."""
pyu2f = self._get_pyu2f_module()
warnings.warn(
"Support for pyu2f is deprecated and will be removed in a future release. "
"Please switch to fido2 by installing `fido2` or `google-auth[reauth]`.",
DeprecationWarning,
stacklevel=2,
)

sk = metadata["securityKey"]
challenges = sk["challenges"]
# Read both 'applicationId' and 'relyingPartyId', if they are the same, use
Expand Down Expand Up @@ -248,8 +373,23 @@ def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
def _unpadded_urlsafe_b64recode(self, s):
"""Converts standard b64 encoded string to url safe b64 encoded string
with no padding."""
b = base64.urlsafe_b64decode(s)
return base64.urlsafe_b64encode(b).decode().rstrip("=")
return self._unpadded_urlsafe_b64encode(self._urlsafe_b64decode(s))

def _create_u2f_client_data(self, challenge):
return json.dumps(
{
"challenge": self._unpadded_urlsafe_b64encode(challenge),
"origin": REAUTH_ORIGIN,
"typ": U2F_AUTHENTICATION_TYPE,
},
sort_keys=True,
).encode()

def _unpadded_urlsafe_b64encode(self, data):
return base64.urlsafe_b64encode(data).decode().rstrip("=")

def _urlsafe_b64decode(self, data):
return base64.urlsafe_b64decode(data + "=" * (-len(data) % 4))


class SamlChallenge(ReauthChallenge):
Expand Down
4 changes: 2 additions & 2 deletions packages/google-auth/google/oauth2/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ class Credentials(
Reauth is disabled by default. To enable reauth, set the
`enable_reauth_refresh` parameter to True in the constructor. Note that
reauth feature is intended for gcloud to use only.
If reauth is enabled, `pyu2f` dependency has to be installed in order to use security
key reauth feature. Dependency can be installed via `pip install pyu2f` or `pip install
If reauth is enabled, `fido2` dependency has to be installed in order to use security
key reauth feature. Dependency can be installed via `pip install fido2` or `pip install
google-auth[reauth]`.
"""

Expand Down
2 changes: 1 addition & 1 deletion packages/google-auth/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

pyjwt_extra_require = ["pyjwt>=2.0"]

reauth_extra_require = ["pyu2f>=0.1.5"]
reauth_extra_require = ["fido2>=2.0.0,<3.0.0"]
Comment thread
parthea marked this conversation as resolved.

# TODO(https://github.com/googleapis/google-auth-library-python/issues/1738): Add bounds for pyopenssl dependency.
enterprise_cert_extra_require = ["pyopenssl"]
Expand Down
Loading
Loading