# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (c) 2020-2026 Simeon Simeonov
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A pure Python implementation of the RFC-2289 OTP server"""
from __future__ import annotations
import hashlib
import typing
if typing.TYPE_CHECKING:
from collections.abc import Iterator
from .generator import (
OTP_ALGO_MD5,
OTPGenerator,
OTPGeneratorError,
OTPResponse,
OTPResponseError,
)
[docs]
class OTPStateError(Exception):
"""OTPStateError class"""
[docs]
class OTPStoreError(Exception):
"""OTPStoreError class"""
[docs]
class OTPInvalidResponseError(Exception):
"""OTPInvalidResponseError class"""
[docs]
class OTPState:
"""
OTPState class
The OTPState class represents a single state on the server side that can:
- generate a challenge
- validate the corresponding generated response from the generator
"""
def __init__(
self,
ot_hex: str | None,
current_step: int,
seed: str,
hash_algo: int | str = OTP_ALGO_MD5,
) -> None:
"""
Constructs an OTPState object with the given arguments.
Keyword Arguments:
:param ot_hex: The one-time hex from the last successful authentication
or None for a newly initialized sequence.
:type ot_hex: str or None
:param current_step: The current step that is sent with the challenge
:type current_step: int
:param seed: The seed that is sent with the challenge
:type seed: str
:param hash_algo: The hash algo, defaults to OTP_ALGO_MD5
:type hash_algo: int or str
:raises otp2289.OTPStateError: If the input does not validate
"""
# enforce the rfc2289 constraints
try:
self._seed = OTPGenerator.validate_seed(seed)
self._hash_algo = OTPGenerator.validate_hash_algo(hash_algo)
self._step = OTPGenerator.validate_step(current_step)
except OTPGeneratorError as err:
raise OTPStateError(err.args[0]) from err
self._current_digest = None
if ot_hex is not None:
try:
self._current_digest = OTPResponse.hex_to_bytes(ot_hex)
except OTPResponseError as err:
raise OTPStateError(err.args[0]) from err
self._new_digest_hex = None # set upon a successful validation
def __repr__(self) -> str:
"""repr implementation"""
return (
f'{self.__class__} at {id(self)} '
f'(ot_hex={self._current_digest}, current_step={self._step}, '
f'seed={self._seed}, '
f'hash_algo={self._hash_algo})'
)
@property
def challenge_string(self) -> str:
"""challenge_string-property"""
# RFC-2289: "...the entire challenge string MUST be
# terminated with either a space or a new line."
return f'otp-{self._hash_algo} {self._step} {self._seed} '
@property
def current_digest(self) -> bytes | None:
"""current_digest-property"""
return self._current_digest
@property
def hash_algo(self) -> str:
"""hash_algo-property"""
return self._hash_algo
@property
def ot_hex(self) -> str:
"""ot_hex-property"""
if self._current_digest is None:
return ''
return self._current_digest.hex()
@property
def seed(self) -> str:
"""seed-property"""
return self._seed
@property
def step(self) -> int:
"""step-property"""
return self._step
@property
def validated(self) -> bool:
"""validated-property"""
return bool(self._new_digest_hex)
[docs]
@classmethod
def from_dict(cls, dict_obj: dict) -> OTPState:
"""
Returns an OTPState object from the dict-object
:param dict_obj: The dict object
:type dict_obj: dict
:return: A new OTPState object
:rtype: otp2289.OTPState
"""
return cls(**dict_obj)
[docs]
@staticmethod
def response_string_to_otp_response(response_str: str) -> OTPResponse:
"""
A wrapper that handles/validates the response as specified by RFC-2289.
The method first checks if the response string is a token and tries to
convert it to a OTPResponse instance. If that fails, the method assumes
that response string is a hex. If neither of those attempts succeeds
OTPInvalidResponseError is raised. It is up to the caller to run
another iteration and compare the result to an existing digest in
this state.
:param response_str: The response string to this state (its challenge)
:type response_str: str
:raises otp2289.OTPInvalidResponseError: If the response is
corrupt/illegal, but not if it
simply does not validate
:return: OTPResponse instance
:rtype: otp2289.OTPResponse
"""
try:
return OTPResponse.from_tokens(response_str)
except OTPResponseError:
# now assume hex...
try:
return OTPResponse.from_hex(response_str)
except OTPResponseError:
raise OTPInvalidResponseError(
'The response is neither a valid token or hex'
) from None
[docs]
def get_next_state(self) -> OTPState | None:
"""
Returns the next state for a validated OTPState.
This is a brand new OTPState object with the same hash_algo and seed
where step -= 1 and ot_hex = self._new_digest_hex
:return: The next OTPState if validated, None otherwise
:rtype: otp2289.OTPState or None
"""
if self._new_digest_hex is None:
return None
return OTPState(
self._new_digest_hex, self._step - 1, self._seed, self._hash_algo
)
[docs]
def response_validates(
self, response: str | OTPResponse, *, store_valid_response: bool = True
) -> bool:
"""
Validates the incoming response as specified by RFC-2289.
:param response: The response to this state (its challenge)
:type response: str or OTPResponse
:param store_valid_response: Should a valid response be stored
:type store_valid_response: bool
:raises otp2289.OTPInvalidResponseError: If the response is corrupt
or invalid
:return: Returns True if response validates, False otherwise
:rtype: bool
"""
# self.response_string_to_otp_response raises OTPInvalidResponseError
# in case response is corrupt or in a wrong format
if not isinstance(response, (str, OTPResponse)):
raise OTPInvalidResponseError(
'response must be of type str or OTPResponse'
)
if isinstance(response, str):
response_bytes = self.response_string_to_otp_response(
response
).response_bytes
else:
# assume OTPResponse
response_bytes = response.response_bytes
if self._hash_algo == 'md5':
digest = hashlib.md5(response_bytes).digest()
if (
self._current_digest is None
or OTPGenerator.strxor(digest[0:8], digest[8:])
== self._current_digest
):
if store_valid_response:
self._new_digest_hex = response_bytes.hex()
return True
return False
if self._hash_algo == 'sha1':
digest = hashlib.sha1(response_bytes).digest()
if (
self._current_digest is None
or OTPGenerator.sha1_digest_folding(
hashlib.sha1(response_bytes).digest()
)
== self._current_digest
):
if store_valid_response:
self._new_digest_hex = response_bytes.hex()
return True
return False
# this should not happen since the hash_algo is validated by the caller
raise OTPInvalidResponseError(f'Ivalid hash_algo: {self._hash_algo}')
[docs]
def to_dict(self) -> dict:
"""
Returns a dict representation of the object.
This could be the base for a JSON serialization.
:return: The dict representation of the object
:rtype: dict
"""
ot_hex = (
self._current_digest.hex()
if self._current_digest is not None
else None
)
return {
'ot_hex': ot_hex,
'current_step': self._step,
'seed': self._seed,
'hash_algo': self._hash_algo,
}
[docs]
class OTPStore:
"""
OTPStore class
A helper / container class that stores OTPState objects in a 2 layered
dict structure represented by [domain][key].
The class could serve as a base class when implementing store backends.
"""
def __init__(self, data: dict | None = None) -> None:
"""
Constructs an OTPStore object from data
:param data: The data dict, defaults to None
:type data: dict or None
"""
self._data = {} # {key1: {state1-data...}, key2: {state2-data...}}
self._states = {} # OTPState: (domain, key) - dict
if data is not None:
self._add_data(data)
def __contains__(self, state: OTPState) -> bool:
"""membership test"""
return state in self._states
def __iter__(self) -> Iterator:
"""iterator for OTPStore"""
return iter(self._data)
def __len__(self) -> int:
"""len() implementation"""
return len(self._data)
@property
def data(self) -> dict:
"""
data-property
Exposes the entire raw-data structure (dict).
Use the high level methods when possible!
"""
return self._data
@property
def states(self) -> dict:
"""
states-property
Exposes the entire states structure (dict).
Use the high level methods when possible!
"""
return self._states
[docs]
def add_state(self, key: str, state: OTPState) -> None:
"""
Adds an OTPState object with a given key.
:param key: The key under which to add the state
:type key: str
:param state: The OTPState object
:type state: otp2289.OTPState
:raises otp2289.OTPStoreError: On failure
"""
if not isinstance(key, str):
raise OTPStoreError('key must be a str')
if not isinstance(state, OTPState):
raise OTPStoreError('state must be an OTPState-object')
self._data[key] = state
self._states[state] = key
[docs]
def get(
self, key: str, default: OTPState | None = None
) -> OTPState | None:
"""A wrapper for dict.get"""
return self._data.get(key, default)
[docs]
def items(self) -> typing.ItemsView:
"""A wrapper for dict.items"""
return self._data.items()
[docs]
def pop_state(self, key: str) -> OTPState:
"""
Removes specified key and returns the corresponding OTPState-object.
:param key: The key
:type key: str
:raises KeyError: If key does not exist
:raises otp2289.OTPStoreError: On failure
:return: The state corresponding to the key
:rtype: otp2289.OTPState
"""
if not isinstance(key, str):
raise OTPStoreError('key must be a str')
state = self._data.pop(key)
self._states.pop(state)
return state
[docs]
def response_validates(
self, key: str, response: str, *, store_valid_response: bool = True
) -> bool:
"""
A method that wraps around OTPState.response_validates and
OTPState.get_next_state.
The response is validated against the OTPState object that corresponds
to key (if any). If store_valid_response is True, the state is replaced
by the next state on successful validation.
:param key: The key
:type key: str
:param response: The response to this state (its challenge)
:type response: str
:param store_valid_response: Should a valid response be stored
:type store_valid_response: bool
:raises KeyError: If the key is not present
:raises otp2289.OTPInvalidResponseError: If the response does not match
this state
:return: Returns True if response validates, False otherwise
:rtype: bool
"""
state = self._data[key]
rvalue = state.response_validates(
response, store_valid_response=store_valid_response
)
if rvalue and store_valid_response:
next_state = state.get_next_state()
self._data[key] = next_state
self._states[next_state] = key
self._states.pop(state)
return rvalue
[docs]
def to_dict(self) -> dict:
"""
Returns a dict representation of the object.
This could be the base for a JSON serialization.
:return: The dict representation of the object
:rtype: dict
"""
return {key: state.to_dict() for key, state in self._data.items()}
[docs]
def _add_data(self, dict_obj: dict) -> None:
"""
Adds data from a dict object (dict_obj).
This method should probably be either overloaded or wrapped
in a child class.
dict_obj has the following format:
{
'key': {
'ot_hex': val1,
'current_step': val2,
'seed': val3,
'hash_algo': val4
},
...,
...,
}
:param dict_obj: The dict-object
:type dict_obj: dict
"""
if not dict_obj:
return
for key, state_dict in dict_obj.items():
self.add_state(key, OTPState(**state_dict))