Source code for otp2289.server

# -*- coding: utf-8 -*-
# SPDX-License-Identifier: BSD-2-Clause-FreeBSD
#
# Copyright (c) 2020-2023 Simeon Simeonov
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""A pure Python implementation of the RFC-2289 OTP server"""
import binascii
import hashlib

from .generator import OTP_ALGO_MD5, OTPGenerator, OTPGeneratorException


[docs]class OTPStateException(Exception): """OTPStateException class"""
[docs]class OTPStoreException(Exception): """OTPStoreException class"""
[docs]class OTPInvalidResponse(Exception): """OTPInvalidResponse class"""
[docs]class OTPState: """ OTPState class The OTPState class represents a single state on the server side that can: - generate a challenge - validate the corresponding generated response from the generator """ def __init__( self, ot_hex: str, current_step: int, seed: str, hash_algo=OTP_ALGO_MD5, ): """ Constructs an OTPState object with the given arguments. Keyword Arguments: :param ot_hex: The one-time hex from the last successful authentication or None for a newly initialized sequence. :type ot_hex: str or None :param current_step: The current step that is sent with the challenge :type current_step: int :param seed: The seed that is sent with the challenge :type seed: str :param hash_algo: The hash algo, defaults to OTP_ALGO_MD5 :type hash_algo: int or str :raises otp2289.OTPStateException: If the input does not validate """ # enforce the rfc2289 constraints try: self._seed = OTPGenerator.validate_seed(seed) self._hash_algo = OTPGenerator.validate_hash_algo(hash_algo) self._step = OTPGenerator.validate_step(current_step) except OTPGeneratorException as exp: raise OTPStateException(exp.args[0]) from None self._current_digest = None if ot_hex is not None: self._current_digest = self.validate_hex(ot_hex) self._new_digest_hex = None # set upon a successful validation def __repr__(self): """repr implementation""" return ( f'{self.__class__} at {id(self)} ' f'(ot_hex={self._current_digest}, current_step={self._step}, ' f'seed={self._seed}, ' f'hash_algo={self._hash_algo})' ) @property def challenge_string(self) -> str: """challenge_string-property""" # RFC-2289: "...the entire challenge string MUST be # terminated with either a space or a new line." return f'otp-{self._hash_algo} {self._step} {self._seed} ' @property def current_digest(self) -> bytes: """current_digest-property""" return self._current_digest @property def hash_algo(self) -> str: """hash_algo-property""" return self._hash_algo @property def ot_hex(self) -> str: """ot_hex-property""" if self._current_digest is None: return '' return binascii.hexlify(self._current_digest).decode() @property def seed(self) -> str: """seed-property""" return self._seed @property def step(self) -> int: """step-property""" return self._step @property def validated(self) -> bool: """validated-property""" return bool(self._new_digest_hex)
[docs] @classmethod def from_dict(cls, dict_obj: dict): """ Returns an OTPState object from the dict-object :param dict_obj: The dict object :type dict_obj: dict :return: A new OTPState object :rtype: otp2289.OTPStore """ return cls(**dict_obj)
[docs] @staticmethod def response_to_bytes(response: str) -> bytes: """ A wrapper that handles/validates the response as specified by RFC-2289. The method first checks if response is a token and tries to convert it to bytes. If that fails, the method assumes that response is a hex. If neither of those attempts succeeds OTPInvalidResponse is raised. It is up to the caller to run another iteration and compare the result to an existing digest in this state. :param response: The response to this state (its challenge) :type response: str :raises otp2289.OTPInvalidResponse: If the response is corrupt/illegal, but not if it simply does not validate :return: The bytes representation of response (if any) :rtype: bytes """ try: return OTPGenerator.tokens_to_bytes(response) except OTPGeneratorException: # now assume hex... try: return OTPState.validate_hex(response) except OTPStateException: raise OTPInvalidResponse( 'The response is neither a valid token or hex' ) from None
[docs] @staticmethod def validate_hex(ot_hex: str) -> bytes: """ Validates the provided hexidigest. :param ot_hex: The one-time hex to validate :type ot_hex: str :raises otp2289.OTPStateException: If hex does not validate :return: The validated hex (without leading 0x) converted to bytes :rtype: bytes """ if not isinstance(ot_hex, str): raise OTPStateException('OT-hex must be a str') if ot_hex.startswith('0x'): ot_hex = ot_hex[2:] ot_hex = ot_hex.strip().lower() if len(ot_hex) != 16: raise OTPStateException( 'The length of the hex should be 16 ' '(representing 64 bits digest)' ) try: return binascii.unhexlify(ot_hex) except binascii.Error: raise OTPStateException('Invalid OT-hex') from None
[docs] def get_next_state(self): """ Returns the next state for a validated OTPState. This is a brand new OTPState object with the same hash_algo and seed where step -= 1 and ot_hex = self._new_digest_hex :return: The next OTPState if validated, None otherwise :rtype: otp2289.OTPState or None """ if self._new_digest_hex is None: return None return OTPState( self._new_digest_hex, self._step - 1, self._seed, self._hash_algo, )
[docs] def response_validates( self, response: str, store_valid_response: str = True, ) -> bool: """ Validates the incoming response as specified by RFC-2289. :param response: The response to this state (its challenge) :type response: str :param store_valid_response: Should a valid response be stored :type store_valid_response: bool :raises otp2289.OTPInvalidResponse: If the response does not match this state :return: Returns True if response validates, False otherwise :rtype: bool """ # self.response_to_bytes raises OTPInvalidResponse in case response # is corrupt or in a wrong format response_bytes = self.response_to_bytes(response) if self._hash_algo == 'md5': digest = hashlib.md5(response_bytes).digest() if ( self._current_digest is None or OTPGenerator.strxor(digest[0:8], digest[8:]) == self._current_digest ): if store_valid_response: self._new_digest_hex = binascii.hexlify( response_bytes ).decode() return True return False if self._hash_algo == 'sha1': digest = hashlib.sha1(response_bytes).digest() if ( self._current_digest is None or OTPGenerator.sha1_digest_folding( hashlib.sha1(response_bytes).digest() ) == self._current_digest ): if store_valid_response: self._new_digest_hex = binascii.hexlify( response_bytes ).decode() return True return False # this should not happen since the hash_algo is validated by the caller raise OTPInvalidResponse(f'Ivalid hash_algo: {self._hash_algo}')
[docs] def to_dict(self) -> dict: """ Returns a dict representation of the object. This could be the base for a JSON serialization. :return: The dict representation of the object :rtype: dict """ ot_hex = self._current_digest if ot_hex is not None: ot_hex = binascii.hexlify(self._current_digest).decode() return { 'ot_hex': ot_hex, 'current_step': self._step, 'seed': self._seed, 'hash_algo': self._hash_algo, }
[docs]class OTPStore: """ OTPStore class A helper / container class that stores OTPState objects in a 2 layered dict structure represented by [domain][key]. The class could serve as a base class when implementing store backends. """ def __init__(self, data=None): """ Constructs an OTPStore object from data :param data: The data object, defaults to None :type data: object or None """ self._data = {} # {key1: {state1-data...}, key2: {state2-data...}} self._states = {} # OTPState: (domain, key) - dict if data is not None: self._add_data(data) def __contains__(self, state): """membership test""" return state in self._states def __iter__(self): """iterator for OTPStore""" return iter(self._data) def __len__(self): """len() implementation""" return len(self._data) @property def data(self) -> dict: """ data-property Exposes the entire raw-data structure (dict). Use the high level methods when possible! """ return self._data @property def states(self) -> dict: """ states-property Exposes the entire states structure (dict). Use the high level methods when possible! """ return self._states
[docs] def add_state(self, key: str, state: OTPState): """ Adds an OTPState object with a given key. :param key: The key under which to add the state :type key: str :param state: The OTPState object :type state: otp2289.OTPState :raises otp2289.OTPStoreException: On failure """ if not isinstance(key, str): raise OTPStoreException('key must be a str') if not isinstance(state, OTPState): raise OTPStoreException('state must be an OTPState-object') self._data[key] = state self._states[state] = key
[docs] def get(self, key, default=None): """A wrapper for dict.get""" return self._data.get(key, default)
[docs] def items(self): """A wrapper for dict.items""" return self._data.items()
[docs] def pop_state(self, key: str) -> OTPState: """ Removes specified key and returns the corresponding OTPState-object. :param key: The key :type key: str :raises KeyError: If key does not exist :raises otp2289.OTPStoreException: On failure :return: The state corresponding to the key :rtype: otp2289.OTPState """ if not isinstance(key, str): raise OTPStoreException('key must be a str') state = self._data.pop(key) self._states.pop(state) return state
[docs] def response_validates( self, key: str, response: str, store_valid_response: bool = True, ) -> bool: """ A method that wraps around OTPState.response_validates and OTPState.get_next_state. The response is validated against the OTPState object that corresponds to key (if any). If store_valid_response is True, the state is replaced by the next state on successful validation. :param key: The key :type key: str :param response: The response to this state (its challenge) :type response: str :param store_valid_response: Should a valid response be stored :type store_valid_response: bool :raises KeyError: If the key is not present :raises otp2289.OTPInvalidResponse: If the response does not match this state :return: Returns True if response validates, False otherwise :rtype: bool """ state = self._data[key] rvalue = state.response_validates(response, store_valid_response) if rvalue and store_valid_response: next_state = state.get_next_state() self._data[key] = next_state self._states[next_state] = key self._states.pop(state) return rvalue
[docs] def to_dict(self) -> dict: """ Returns a dict representation of the object. This could be the base for a JSON serialization. :return: The dict representation of the object :rtype: dict """ return {key: state.to_dict() for key, state in self._data.items()}
[docs] def _add_data(self, dict_obj: dict) -> dict: """ Adds data from a dict object (dict_obj). This method should probably be either overloaded or wrapped in a child class. dict_obj has the following format: { 'key': { 'ot_hex': val1, 'current_step': val2, 'seed': val3, 'hash_algo': val4 }, ..., ..., } :param dict_obj: The dict-object :type dict_obj: dict """ if not dict_obj: return for key, state_dict in dict_obj.items(): self.add_state(key, OTPState(**state_dict))