Source code for otp2289.server

# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (c) 2020-2026 Simeon Simeonov
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A pure Python implementation of the RFC-2289 OTP server"""

from __future__ import annotations

import hashlib
import typing

if typing.TYPE_CHECKING:
    from collections.abc import Iterator

from .generator import (
    OTP_ALGO_MD5,
    OTPGenerator,
    OTPGeneratorError,
    OTPResponse,
    OTPResponseError,
)


[docs] class OTPStateError(Exception): """OTPStateError class"""
[docs] class OTPStoreError(Exception): """OTPStoreError class"""
[docs] class OTPInvalidResponseError(Exception): """OTPInvalidResponseError class"""
[docs] class OTPState: """ OTPState class The OTPState class represents a single state on the server side that can: - generate a challenge - validate the corresponding generated response from the generator """ def __init__( self, ot_hex: str | None, current_step: int, seed: str, hash_algo: int | str = OTP_ALGO_MD5, ) -> None: """ Constructs an OTPState object with the given arguments. Keyword Arguments: :param ot_hex: The one-time hex from the last successful authentication or None for a newly initialized sequence. :type ot_hex: str or None :param current_step: The current step that is sent with the challenge :type current_step: int :param seed: The seed that is sent with the challenge :type seed: str :param hash_algo: The hash algo, defaults to OTP_ALGO_MD5 :type hash_algo: int or str :raises otp2289.OTPStateError: If the input does not validate """ # enforce the rfc2289 constraints try: self._seed = OTPGenerator.validate_seed(seed) self._hash_algo = OTPGenerator.validate_hash_algo(hash_algo) self._step = OTPGenerator.validate_step(current_step) except OTPGeneratorError as err: raise OTPStateError(err.args[0]) from err self._current_digest = None if ot_hex is not None: try: self._current_digest = OTPResponse.hex_to_bytes(ot_hex) except OTPResponseError as err: raise OTPStateError(err.args[0]) from err self._new_digest_hex = None # set upon a successful validation def __repr__(self) -> str: """repr implementation""" return ( f'{self.__class__} at {id(self)} ' f'(ot_hex={self._current_digest}, current_step={self._step}, ' f'seed={self._seed}, ' f'hash_algo={self._hash_algo})' ) @property def challenge_string(self) -> str: """challenge_string-property""" # RFC-2289: "...the entire challenge string MUST be # terminated with either a space or a new line." return f'otp-{self._hash_algo} {self._step} {self._seed} ' @property def current_digest(self) -> bytes | None: """current_digest-property""" return self._current_digest @property def hash_algo(self) -> str: """hash_algo-property""" return self._hash_algo @property def ot_hex(self) -> str: """ot_hex-property""" if self._current_digest is None: return '' return self._current_digest.hex() @property def seed(self) -> str: """seed-property""" return self._seed @property def step(self) -> int: """step-property""" return self._step @property def validated(self) -> bool: """validated-property""" return bool(self._new_digest_hex)
[docs] @classmethod def from_dict(cls, dict_obj: dict) -> OTPState: """ Returns an OTPState object from the dict-object :param dict_obj: The dict object :type dict_obj: dict :return: A new OTPState object :rtype: otp2289.OTPState """ return cls(**dict_obj)
[docs] @staticmethod def response_string_to_otp_response(response_str: str) -> OTPResponse: """ A wrapper that handles/validates the response as specified by RFC-2289. The method first checks if the response string is a token and tries to convert it to a OTPResponse instance. If that fails, the method assumes that response string is a hex. If neither of those attempts succeeds OTPInvalidResponseError is raised. It is up to the caller to run another iteration and compare the result to an existing digest in this state. :param response_str: The response string to this state (its challenge) :type response_str: str :raises otp2289.OTPInvalidResponseError: If the response is corrupt/illegal, but not if it simply does not validate :return: OTPResponse instance :rtype: otp2289.OTPResponse """ try: return OTPResponse.from_tokens(response_str) except OTPResponseError: # now assume hex... try: return OTPResponse.from_hex(response_str) except OTPResponseError: raise OTPInvalidResponseError( 'The response is neither a valid token or hex' ) from None
[docs] def get_next_state(self) -> OTPState | None: """ Returns the next state for a validated OTPState. This is a brand new OTPState object with the same hash_algo and seed where step -= 1 and ot_hex = self._new_digest_hex :return: The next OTPState if validated, None otherwise :rtype: otp2289.OTPState or None """ if self._new_digest_hex is None: return None return OTPState( self._new_digest_hex, self._step - 1, self._seed, self._hash_algo )
[docs] def response_validates( self, response: str | OTPResponse, *, store_valid_response: bool = True ) -> bool: """ Validates the incoming response as specified by RFC-2289. :param response: The response to this state (its challenge) :type response: str or OTPResponse :param store_valid_response: Should a valid response be stored :type store_valid_response: bool :raises otp2289.OTPInvalidResponseError: If the response is corrupt or invalid :return: Returns True if response validates, False otherwise :rtype: bool """ # self.response_string_to_otp_response raises OTPInvalidResponseError # in case response is corrupt or in a wrong format if not isinstance(response, (str, OTPResponse)): raise OTPInvalidResponseError( 'response must be of type str or OTPResponse' ) if isinstance(response, str): response_bytes = self.response_string_to_otp_response( response ).response_bytes else: # assume OTPResponse response_bytes = response.response_bytes if self._hash_algo == 'md5': digest = hashlib.md5(response_bytes).digest() if ( self._current_digest is None or OTPGenerator.strxor(digest[0:8], digest[8:]) == self._current_digest ): if store_valid_response: self._new_digest_hex = response_bytes.hex() return True return False if self._hash_algo == 'sha1': digest = hashlib.sha1(response_bytes).digest() if ( self._current_digest is None or OTPGenerator.sha1_digest_folding( hashlib.sha1(response_bytes).digest() ) == self._current_digest ): if store_valid_response: self._new_digest_hex = response_bytes.hex() return True return False # this should not happen since the hash_algo is validated by the caller raise OTPInvalidResponseError(f'Ivalid hash_algo: {self._hash_algo}')
[docs] def to_dict(self) -> dict: """ Returns a dict representation of the object. This could be the base for a JSON serialization. :return: The dict representation of the object :rtype: dict """ ot_hex = ( self._current_digest.hex() if self._current_digest is not None else None ) return { 'ot_hex': ot_hex, 'current_step': self._step, 'seed': self._seed, 'hash_algo': self._hash_algo, }
[docs] class OTPStore: """ OTPStore class A helper / container class that stores OTPState objects in a 2 layered dict structure represented by [domain][key]. The class could serve as a base class when implementing store backends. """ def __init__(self, data: dict | None = None) -> None: """ Constructs an OTPStore object from data :param data: The data dict, defaults to None :type data: dict or None """ self._data = {} # {key1: {state1-data...}, key2: {state2-data...}} self._states = {} # OTPState: (domain, key) - dict if data is not None: self._add_data(data) def __contains__(self, state: OTPState) -> bool: """membership test""" return state in self._states def __iter__(self) -> Iterator: """iterator for OTPStore""" return iter(self._data) def __len__(self) -> int: """len() implementation""" return len(self._data) @property def data(self) -> dict: """ data-property Exposes the entire raw-data structure (dict). Use the high level methods when possible! """ return self._data @property def states(self) -> dict: """ states-property Exposes the entire states structure (dict). Use the high level methods when possible! """ return self._states
[docs] def add_state(self, key: str, state: OTPState) -> None: """ Adds an OTPState object with a given key. :param key: The key under which to add the state :type key: str :param state: The OTPState object :type state: otp2289.OTPState :raises otp2289.OTPStoreError: On failure """ if not isinstance(key, str): raise OTPStoreError('key must be a str') if not isinstance(state, OTPState): raise OTPStoreError('state must be an OTPState-object') self._data[key] = state self._states[state] = key
[docs] def get( self, key: str, default: OTPState | None = None ) -> OTPState | None: """A wrapper for dict.get""" return self._data.get(key, default)
[docs] def items(self) -> typing.ItemsView: """A wrapper for dict.items""" return self._data.items()
[docs] def pop_state(self, key: str) -> OTPState: """ Removes specified key and returns the corresponding OTPState-object. :param key: The key :type key: str :raises KeyError: If key does not exist :raises otp2289.OTPStoreError: On failure :return: The state corresponding to the key :rtype: otp2289.OTPState """ if not isinstance(key, str): raise OTPStoreError('key must be a str') state = self._data.pop(key) self._states.pop(state) return state
[docs] def response_validates( self, key: str, response: str, *, store_valid_response: bool = True ) -> bool: """ A method that wraps around OTPState.response_validates and OTPState.get_next_state. The response is validated against the OTPState object that corresponds to key (if any). If store_valid_response is True, the state is replaced by the next state on successful validation. :param key: The key :type key: str :param response: The response to this state (its challenge) :type response: str :param store_valid_response: Should a valid response be stored :type store_valid_response: bool :raises KeyError: If the key is not present :raises otp2289.OTPInvalidResponseError: If the response does not match this state :return: Returns True if response validates, False otherwise :rtype: bool """ state = self._data[key] rvalue = state.response_validates( response, store_valid_response=store_valid_response ) if rvalue and store_valid_response: next_state = state.get_next_state() self._data[key] = next_state self._states[next_state] = key self._states.pop(state) return rvalue
[docs] def to_dict(self) -> dict: """ Returns a dict representation of the object. This could be the base for a JSON serialization. :return: The dict representation of the object :rtype: dict """ return {key: state.to_dict() for key, state in self._data.items()}
[docs] def _add_data(self, dict_obj: dict) -> None: """ Adds data from a dict object (dict_obj). This method should probably be either overloaded or wrapped in a child class. dict_obj has the following format: { 'key': { 'ot_hex': val1, 'current_step': val2, 'seed': val3, 'hash_algo': val4 }, ..., ..., } :param dict_obj: The dict-object :type dict_obj: dict """ if not dict_obj: return for key, state_dict in dict_obj.items(): self.add_state(key, OTPState(**state_dict))