Source code for nameko_keycloak.fakes

import logging
from typing import Any

from jwcrypto.common import JWException
from keycloak.exceptions import KeycloakError

from .types import Token, TokenPayload

logger = logging.getLogger(__name__)


[docs] class FakeKeycloak: """ Fake to be used wherever tests need to interact with Keycloak. This class emulates a few APIs of ``KeycloakOpenID`` that we use for SSO workflow. We're working under a very important assumption here: You need to pass user's email as ``code`` when generating their token. This is obviously not true in real life where Keycloak manages generating secure tokens from one-time codes, but here it simplifies a lot. The Keycloak user database is simulated by a key-value storage where you insert an item when calling :meth:`token`, and fetch from storage when calling :meth:`decode_token` or :meth:`refresh_token`. """ def __init__(self): self.token_payloads: dict[Token, TokenPayload] = {} def auth_url(self, **kwargs) -> str: return "http://keycloak.url" def token(self, code: str, **kwargs) -> TokenPayload: email = code token = f"token_{email}" token_payload = { "email": email, "access_token": token, "expires_in": "EXP", # this is not semantically correct, but satisifes other uses of # refresh_token, such as logout() "refresh_token": email, "refresh_expires_in": "REXP", "refresh_token_url": "http://keycloak.url/refresh", } # allow arbitrary key-value data in payload token_payload.update(kwargs) self.token_payloads[token] = token_payload return token_payload def decode_token(self, token: Token) -> TokenPayload: logger.info(f"Looking up {token} in {self.token_payloads}") try: return self.token_payloads[token] except KeyError: raise JWException("Missing user") def refresh_token(self, refresh_token: Token, **kwargs) -> TokenPayload: token = f"token_{refresh_token}" logger.info(f"Looking up {token} in {self.token_payloads}") try: return self.token_payloads[token] except KeyError: raise KeycloakError("Missing user") def logout(self, refresh_token: Token) -> None: token = f"token_{refresh_token}" del self.token_payloads[token] def certs(self) -> list[Any]: return []