import enum
import json
import logging
from typing import Optional
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakError
from werkzeug.utils import redirect
from werkzeug.wrappers import Request, Response
from .auth import AuthenticationService
from .types import FetchUserCallable, TokenPayload, User
logger = logging.getLogger(__name__)
[docs]
class HookMethod(enum.Enum):
SUCCESS = "keycloak_success"
FAILURE = "keycloak_failure"
[docs]
class KeycloakSsoServiceMixin:
"""
Add this to your nameko service to provide SSO authentication with Keycloak.
Expected service dependencies or class attributes:
- ``keycloak`` which must be an instance of :class:`~nameko_keycloak.dependencies.KeycloakProvider`
- ``sso_cookie_prefix`` - a string that will be used to namespace cookies (useful when there are multiple SSO-enabled apps hosted on same domain)
- ``sso_cookie_path`` - path part of the URL of your application, set as Path cookie attribute
- ``sso_login_url`` - absolute URL to handler which delegates to :meth:`keycloak_login_sso`
- ``sso_token_url`` - absolute URL to handler which delegates to :meth:`keycloak_token_sso`
- ``sso_refresh_token_url`` - absolute URL to handler which delegates to :meth:`keycloak_refresh_token_sso`
- ``frontend_url`` - absolute URL to a user-facing web app that communicates with this backend service
"""
keycloak: KeycloakOpenID
sso_cookie_prefix: str = "nameko-keycloak"
sso_cookie_path: str = "/"
sso_login_url: str = "/login-sso"
sso_token_url: str = "/token-sso"
sso_refresh_token_url: str = "/refresh-token-sso"
frontend_url: str = "/"
[docs]
def keycloak_login_sso(self, request: Request) -> Response:
"""
Redirects to SSO login form configured to return back to HTTP service.
"""
return redirect(self.keycloak.auth_url(redirect_uri=self.sso_token_url))
[docs]
def keycloak_token_sso(self, request: Request) -> Response:
"""
Handles redirect from successful login in SSO.
The SSO passes a `code` query string parameter which we then use to
generate a OAuth access token. We make sure that a local User exists
before they are allowed to reach frontend URL. If all goes well, the
access token and several other metadata are stored in cookies.
"""
# annotate bound method here, otherwise mypy can't resolve self
self.fetch_user: FetchUserCallable
auth = AuthenticationService(self.keycloak, self.fetch_user)
if code := request.args.get("code", ""):
token = self.keycloak.token(
code=code,
grant_type="authorization_code",
redirect_uri=self.sso_token_url,
)
user = auth.get_user_from_access_token(access_token=token["access_token"])
if not user:
return Response("Unauthorized", status=401)
self.run_hook(HookMethod.SUCCESS, user)
response = redirect(self.frontend_url)
return self._setup_response_cookie(response, token)
return Response("Empty request")
[docs]
def keycloak_refresh_token_sso(self, request: Request) -> Response:
"""
Generates a new access token, given a cookie with a valid refresh token.
"""
refresh_token = request.cookies.get(f"{self.sso_cookie_prefix}_refresh-token")
if not refresh_token:
logger.warning("No refresh token found in cookies")
self.run_hook(HookMethod.FAILURE)
return Response("Invalid", status=401)
try:
token_payload = self.keycloak.refresh_token(refresh_token=refresh_token)
except KeycloakError as e:
# Decode Keycloak error details and decide if it's serious enough
# to call failure hook
error_code: str = ""
try:
assert e.response_body is not None
payload = json.loads(e.response_body.decode("utf-8"))
error_code = payload["error"]
except Exception:
logger.exception("Failed to decode Keycloak error details")
if error_code == "invalid_grant":
# This is a normal situation, refresh token exists but expired.
# In this case frontend should redirect to login page.
logger.debug("Refresh token expired")
else:
self.run_hook(HookMethod.FAILURE)
return Response("Invalid", status=401)
response = Response(
json.dumps({"access_token": token_payload["access_token"]}),
status=200,
content_type="application/json",
)
return self._setup_response_cookie(response, token_payload)
[docs]
def keycloak_validate_token_sso(self, request: Request) -> Response:
"""
Checks that access token is valid and a corresponding local User exists.
"""
token = request.cookies.get(f"{self.sso_cookie_prefix}_access-token")
if not token:
logger.warning("No access token found in cookies")
return Response("Invalid", status=401)
auth = AuthenticationService(self.keycloak, self.fetch_user)
user = auth.get_user_from_access_token(token)
if not user:
return Response("Invalid", status=401)
return Response("Valid", status=200)
[docs]
def keycloak_logout(self, request: Request) -> Response:
"""
Invalidates session in Keycloak, deletes cookies and redirects to login.
.. note::
Keycloak logout API invalidates only refresh token, not access
token. This is by design, as access tokens should be short lived
anyway.
"""
refresh_token = request.cookies.get(
f"{self.sso_cookie_prefix}_refresh-token", ""
)
if not refresh_token:
logger.warning("No refresh token found in cookies")
try:
self.keycloak.logout(refresh_token)
logger.info("Logged out and invalidated Keycloak refresh token")
except KeycloakError:
self.run_hook(HookMethod.FAILURE)
response = redirect(self.sso_login_url)
response.delete_cookie(
key=f"{self.sso_cookie_prefix}_access-token",
path=self.sso_cookie_path,
)
response.delete_cookie(
key=f"{self.sso_cookie_prefix}_refresh-token",
path=self.sso_cookie_path,
)
return response
def _setup_response_cookie(
self, response: Response, token_payload: TokenPayload
) -> Response:
response.set_cookie(
key=f"{self.sso_cookie_prefix}_access-token",
value=token_payload["access_token"],
secure=True,
httponly=True,
path=self.sso_cookie_path,
)
response.set_cookie(
key=f"{self.sso_cookie_prefix}_refresh-token",
value=token_payload["refresh_token"],
secure=True,
httponly=True,
path=self.sso_cookie_path,
)
return response
def run_hook(self, hook_method: HookMethod, user: Optional[User] = None) -> None:
instance_method = getattr(self, hook_method.value, None)
if instance_method is None:
logger.warning(
f"Failed to call hook, {self.__class__} doesn't implement {hook_method.value} method"
)
return
instance_method(user) if user else instance_method()