aboutsummaryrefslogtreecommitdiff
path: root/frozen_deps/Cryptodome/Hash/TurboSHAKE128.py
blob: 92ac59e44b665c9322e0adc4872043b9f538a717 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from Cryptodome.Util._raw_api import (VoidPointer, SmartPointer,
                                  create_string_buffer,
                                  get_raw_buffer, c_size_t,
                                  c_uint8_ptr, c_ubyte)

from Cryptodome.Util.number import long_to_bytes
from Cryptodome.Util.py3compat import bchr

from .keccak import _raw_keccak_lib


class TurboSHAKE(object):
    """A TurboSHAKE hash object.
    Do not instantiate directly.
    Use the :func:`new` function.
    """

    def __init__(self, capacity, domain_separation, data):

        state = VoidPointer()
        result = _raw_keccak_lib.keccak_init(state.address_of(),
                                             c_size_t(capacity),
                                             c_ubyte(12))   # Reduced number of rounds
        if result:
            raise ValueError("Error %d while instantiating TurboSHAKE"
                             % result)
        self._state = SmartPointer(state.get(), _raw_keccak_lib.keccak_destroy)

        self._is_squeezing = False
        self._capacity = capacity
        self._domain = domain_separation

        if data:
            self.update(data)


    def update(self, data):
        """Continue hashing of a message by consuming the next chunk of data.

        Args:
            data (byte string/byte array/memoryview): The next chunk of the message being hashed.
        """

        if self._is_squeezing:
            raise TypeError("You cannot call 'update' after the first 'read'")

        result = _raw_keccak_lib.keccak_absorb(self._state.get(),
                                               c_uint8_ptr(data),
                                               c_size_t(len(data)))
        if result:
            raise ValueError("Error %d while updating TurboSHAKE state"
                             % result)
        return self

    def read(self, length):
        """
        Compute the next piece of XOF output.

        .. note::
            You cannot use :meth:`update` anymore after the first call to
            :meth:`read`.

        Args:
            length (integer): the amount of bytes this method must return

        :return: the next piece of XOF output (of the given length)
        :rtype: byte string
        """

        self._is_squeezing = True
        bfr = create_string_buffer(length)
        result = _raw_keccak_lib.keccak_squeeze(self._state.get(),
                                                bfr,
                                                c_size_t(length),
                                                c_ubyte(self._domain))
        if result:
            raise ValueError("Error %d while extracting from TurboSHAKE"
                             % result)

        return get_raw_buffer(bfr)

    def new(self, data=None):
        return type(self)(self._capacity, self._domain, data)

    def _reset(self):
        result = _raw_keccak_lib.keccak_reset(self._state.get())
        if result:
            raise ValueError("Error %d while resetting TurboSHAKE state"
                             % result)
        self._is_squeezing = False


def new(**kwargs):
    """Create a new TurboSHAKE128 object.

    Args:
       domain (integer):
         Optional - A domain separation byte, between 0x01 and 0x7F.
         The default value is 0x1F.
       data (bytes/bytearray/memoryview):
        Optional - The very first chunk of the message to hash.
        It is equivalent to an early call to :meth:`update`.

    :Return: A :class:`TurboSHAKE` object
    """

    domain_separation = kwargs.get('domain', 0x1F)
    if not (0x01 <= domain_separation <= 0x7F):
        raise ValueError("Incorrect domain separation value (%d)" %
                         domain_separation)
    data = kwargs.get('data')
    return TurboSHAKE(32, domain_separation, data=data)