diff options
Diffstat (limited to 'freezed_deps/ecdsa')
23 files changed, 8850 insertions, 0 deletions
diff --git a/freezed_deps/ecdsa/__init__.py b/freezed_deps/ecdsa/__init__.py new file mode 100644 index 0000000..eef5fe3 --- /dev/null +++ b/freezed_deps/ecdsa/__init__.py @@ -0,0 +1,25 @@ +from .keys import SigningKey, VerifyingKey, BadSignatureError, BadDigestError,\ + MalformedPointError +from .curves import NIST192p, NIST224p, NIST256p, NIST384p, NIST521p,\ + SECP256k1, BRAINPOOLP160r1, BRAINPOOLP192r1, BRAINPOOLP224r1,\ + BRAINPOOLP256r1, BRAINPOOLP320r1, BRAINPOOLP384r1, BRAINPOOLP512r1 +from .ecdh import ECDH, NoKeyError, NoCurveError, InvalidCurveError, \ + InvalidSharedSecretError +from .der import UnexpectedDER + +# This code comes from http://github.com/warner/python-ecdsa +from ._version import get_versions +__version__ = get_versions()['version'] +del get_versions + +__all__ = ["curves", "der", "ecdsa", "ellipticcurve", "keys", "numbertheory", + "test_pyecdsa", "util", "six"] + +_hush_pyflakes = [SigningKey, VerifyingKey, BadSignatureError, BadDigestError, + MalformedPointError, UnexpectedDER, InvalidCurveError, + NoKeyError, InvalidSharedSecretError, ECDH, NoCurveError, + NIST192p, NIST224p, NIST256p, NIST384p, NIST521p, SECP256k1, + BRAINPOOLP160r1, BRAINPOOLP192r1, BRAINPOOLP224r1, + BRAINPOOLP256r1, BRAINPOOLP320r1, BRAINPOOLP384r1, + BRAINPOOLP512r1] +del _hush_pyflakes diff --git a/freezed_deps/ecdsa/_compat.py b/freezed_deps/ecdsa/_compat.py new file mode 100644 index 0000000..965d8c4 --- /dev/null +++ b/freezed_deps/ecdsa/_compat.py @@ -0,0 +1,39 @@ +""" +Common functions for providing cross-python version compatibility. +""" +import sys +from six import integer_types + + +def str_idx_as_int(string, index): + """Take index'th byte from string, return as integer""" + val = string[index] + if isinstance(val, integer_types): + return val + return ord(val) + + +if sys.version_info < (3, 0): + def normalise_bytes(buffer_object): + """Cast the input into array of bytes.""" + # flake8 runs on py3 where `buffer` indeed doesn't exist... + return buffer(buffer_object) # noqa: F821 + + def hmac_compat(ret): + return ret + +else: + if sys.version_info < (3, 4): + # on python 3.3 hmac.hmac.update() accepts only bytes, on newer + # versions it does accept memoryview() also + def hmac_compat(data): + if not isinstance(data, bytes): + return bytes(data) + return data + else: + def hmac_compat(data): + return data + + def normalise_bytes(buffer_object): + """Cast the input into array of bytes.""" + return memoryview(buffer_object).cast('B') diff --git a/freezed_deps/ecdsa/_rwlock.py b/freezed_deps/ecdsa/_rwlock.py new file mode 100644 index 0000000..e4ef78d --- /dev/null +++ b/freezed_deps/ecdsa/_rwlock.py @@ -0,0 +1,85 @@ +# Copyright Mateusz Kobos, (c) 2011 +# https://code.activestate.com/recipes/577803-reader-writer-lock-with-priority-for-writers/ +# released under the MIT licence + +import threading + + +__author__ = "Mateusz Kobos" + + +class RWLock: + """ + Read-Write locking primitive + + Synchronization object used in a solution of so-called second + readers-writers problem. In this problem, many readers can simultaneously + access a share, and a writer has an exclusive access to this share. + Additionally, the following constraints should be met: + 1) no reader should be kept waiting if the share is currently opened for + reading unless a writer is also waiting for the share, + 2) no writer should be kept waiting for the share longer than absolutely + necessary. + + The implementation is based on [1, secs. 4.2.2, 4.2.6, 4.2.7] + with a modification -- adding an additional lock (C{self.__readers_queue}) + -- in accordance with [2]. + + Sources: + [1] A.B. Downey: "The little book of semaphores", Version 2.1.5, 2008 + [2] P.J. Courtois, F. Heymans, D.L. Parnas: + "Concurrent Control with 'Readers' and 'Writers'", + Communications of the ACM, 1971 (via [3]) + [3] http://en.wikipedia.org/wiki/Readers-writers_problem + """ + + def __init__(self): + """ + A lock giving an even higher priority to the writer in certain + cases (see [2] for a discussion). + """ + self.__read_switch = _LightSwitch() + self.__write_switch = _LightSwitch() + self.__no_readers = threading.Lock() + self.__no_writers = threading.Lock() + self.__readers_queue = threading.Lock() + + def reader_acquire(self): + self.__readers_queue.acquire() + self.__no_readers.acquire() + self.__read_switch.acquire(self.__no_writers) + self.__no_readers.release() + self.__readers_queue.release() + + def reader_release(self): + self.__read_switch.release(self.__no_writers) + + def writer_acquire(self): + self.__write_switch.acquire(self.__no_readers) + self.__no_writers.acquire() + + def writer_release(self): + self.__no_writers.release() + self.__write_switch.release(self.__no_readers) + + +class _LightSwitch: + """An auxiliary "light switch"-like object. The first thread turns on the + "switch", the last one turns it off (see [1, sec. 4.2.2] for details).""" + def __init__(self): + self.__counter = 0 + self.__mutex = threading.Lock() + + def acquire(self, lock): + self.__mutex.acquire() + self.__counter += 1 + if self.__counter == 1: + lock.acquire() + self.__mutex.release() + + def release(self, lock): + self.__mutex.acquire() + self.__counter -= 1 + if self.__counter == 0: + lock.release() + self.__mutex.release() diff --git a/freezed_deps/ecdsa/_version.py b/freezed_deps/ecdsa/_version.py new file mode 100644 index 0000000..038d62a --- /dev/null +++ b/freezed_deps/ecdsa/_version.py @@ -0,0 +1,21 @@ + +# This file was generated by 'versioneer.py' (0.17) from +# revision-control system data, or from the parent directory name of an +# unpacked source archive. Distribution tarballs contain a pre-generated copy +# of this file. + +import json + +version_json = ''' +{ + "date": "2020-01-02T17:05:04+0100", + "dirty": false, + "error": null, + "full-revisionid": "93b04ba3ddb7c2716e07761393a179c061718c34", + "version": "0.15" +} +''' # END VERSION_JSON + + +def get_versions(): + return json.loads(version_json) diff --git a/freezed_deps/ecdsa/curves.py b/freezed_deps/ecdsa/curves.py new file mode 100644 index 0000000..173a2cd --- /dev/null +++ b/freezed_deps/ecdsa/curves.py @@ -0,0 +1,128 @@ +from __future__ import division + +from . import der, ecdsa +from .util import orderlen + + +# orderlen was defined in this module previously, so keep it in __all__, +# will need to mark it as deprecated later +__all__ = ["UnknownCurveError", "orderlen", "Curve", "NIST192p", + "NIST224p", "NIST256p", "NIST384p", "NIST521p", "curves", + "find_curve", "SECP256k1", "BRAINPOOLP160r1", "BRAINPOOLP192r1", + "BRAINPOOLP224r1", "BRAINPOOLP256r1", "BRAINPOOLP320r1", + "BRAINPOOLP384r1", "BRAINPOOLP512r1"] + + +class UnknownCurveError(Exception): + pass + + +class Curve: + def __init__(self, name, curve, generator, oid, openssl_name=None): + self.name = name + self.openssl_name = openssl_name # maybe None + self.curve = curve + self.generator = generator + self.order = generator.order() + self.baselen = orderlen(self.order) + self.verifying_key_length = 2*self.baselen + self.signature_length = 2*self.baselen + self.oid = oid + self.encoded_oid = der.encode_oid(*oid) + + def __repr__(self): + return self.name + + +# the NIST curves +NIST192p = Curve("NIST192p", ecdsa.curve_192, + ecdsa.generator_192, + (1, 2, 840, 10045, 3, 1, 1), "prime192v1") + + +NIST224p = Curve("NIST224p", ecdsa.curve_224, + ecdsa.generator_224, + (1, 3, 132, 0, 33), "secp224r1") + + +NIST256p = Curve("NIST256p", ecdsa.curve_256, + ecdsa.generator_256, + (1, 2, 840, 10045, 3, 1, 7), "prime256v1") + + +NIST384p = Curve("NIST384p", ecdsa.curve_384, + ecdsa.generator_384, + (1, 3, 132, 0, 34), "secp384r1") + + +NIST521p = Curve("NIST521p", ecdsa.curve_521, + ecdsa.generator_521, + (1, 3, 132, 0, 35), "secp521r1") + + +SECP256k1 = Curve("SECP256k1", ecdsa.curve_secp256k1, + ecdsa.generator_secp256k1, + (1, 3, 132, 0, 10), "secp256k1") + + +BRAINPOOLP160r1 = Curve("BRAINPOOLP160r1", + ecdsa.curve_brainpoolp160r1, + ecdsa.generator_brainpoolp160r1, + (1, 3, 36, 3, 3, 2, 8, 1, 1, 1), + "brainpoolP160r1") + + +BRAINPOOLP192r1 = Curve("BRAINPOOLP192r1", + ecdsa.curve_brainpoolp192r1, + ecdsa.generator_brainpoolp192r1, + (1, 3, 36, 3, 3, 2, 8, 1, 1, 3), + "brainpoolP192r1") + + +BRAINPOOLP224r1 = Curve("BRAINPOOLP224r1", + ecdsa.curve_brainpoolp224r1, + ecdsa.generator_brainpoolp224r1, + (1, 3, 36, 3, 3, 2, 8, 1, 1, 5), + "brainpoolP224r1") + + +BRAINPOOLP256r1 = Curve("BRAINPOOLP256r1", + ecdsa.curve_brainpoolp256r1, + ecdsa.generator_brainpoolp256r1, + (1, 3, 36, 3, 3, 2, 8, 1, 1, 7), + "brainpoolP256r1") + + +BRAINPOOLP320r1 = Curve("BRAINPOOLP320r1", + ecdsa.curve_brainpoolp320r1, + ecdsa.generator_brainpoolp320r1, + (1, 3, 36, 3, 3, 2, 8, 1, 1, 9), + "brainpoolP320r1") + + +BRAINPOOLP384r1 = Curve("BRAINPOOLP384r1", + ecdsa.curve_brainpoolp384r1, + ecdsa.generator_brainpoolp384r1, + (1, 3, 36, 3, 3, 2, 8, 1, 1, 11), + "brainpoolP384r1") + + +BRAINPOOLP512r1 = Curve("BRAINPOOLP512r1", + ecdsa.curve_brainpoolp512r1, + ecdsa.generator_brainpoolp512r1, + (1, 3, 36, 3, 3, 2, 8, 1, 1, 13), + "brainpoolP512r1") + + +curves = [NIST192p, NIST224p, NIST256p, NIST384p, NIST521p, SECP256k1, + BRAINPOOLP160r1, BRAINPOOLP192r1, BRAINPOOLP224r1, BRAINPOOLP256r1, + BRAINPOOLP320r1, BRAINPOOLP384r1, BRAINPOOLP512r1] + + +def find_curve(oid_curve): + for c in curves: + if c.oid == oid_curve: + return c + raise UnknownCurveError("I don't know about the curve with oid %s." + "I only know about these: %s" % + (oid_curve, [c.name for c in curves])) diff --git a/freezed_deps/ecdsa/der.py b/freezed_deps/ecdsa/der.py new file mode 100644 index 0000000..ad75b37 --- /dev/null +++ b/freezed_deps/ecdsa/der.py @@ -0,0 +1,384 @@ +from __future__ import division + +import binascii +import base64 +import warnings +from itertools import chain +from six import int2byte, b, text_type +from ._compat import str_idx_as_int + + +class UnexpectedDER(Exception): + pass + + +def encode_constructed(tag, value): + return int2byte(0xa0+tag) + encode_length(len(value)) + value + + +def encode_integer(r): + assert r >= 0 # can't support negative numbers yet + h = ("%x" % r).encode() + if len(h) % 2: + h = b("0") + h + s = binascii.unhexlify(h) + num = str_idx_as_int(s, 0) + if num <= 0x7f: + return b("\x02") + encode_length(len(s)) + s + else: + # DER integers are two's complement, so if the first byte is + # 0x80-0xff then we need an extra 0x00 byte to prevent it from + # looking negative. + return b("\x02") + encode_length(len(s)+1) + b("\x00") + s + + +# sentry object to check if an argument was specified (used to detect +# deprecated calling convention) +_sentry = object() + + +def encode_bitstring(s, unused=_sentry): + """ + Encode a binary string as a BIT STRING using :term:`DER` encoding. + + Note, because there is no native Python object that can encode an actual + bit string, this function only accepts byte strings as the `s` argument. + The byte string is the actual bit string that will be encoded, padded + on the right (least significant bits, looking from big endian perspective) + to the first full byte. If the bit string has a bit length that is multiple + of 8, then the padding should not be included. For correct DER encoding + the padding bits MUST be set to 0. + + Number of bits of padding need to be provided as the `unused` parameter. + In case they are specified as None, it means the number of unused bits + is already encoded in the string as the first byte. + + The deprecated call convention specifies just the `s` parameters and + encodes the number of unused bits as first parameter (same convention + as with None). + + Empty string must be encoded with `unused` specified as 0. + + Future version of python-ecdsa will make specifying the `unused` argument + mandatory. + + :param s: bytes to encode + :type s: bytes like object + :param unused: number of bits at the end of `s` that are unused, must be + between 0 and 7 (inclusive) + :type unused: int or None + + :raises ValueError: when `unused` is too large or too small + + :return: `s` encoded using DER + :rtype: bytes + """ + encoded_unused = b'' + len_extra = 0 + if unused is _sentry: + warnings.warn("Legacy call convention used, unused= needs to be " + "specified", + DeprecationWarning) + elif unused is not None: + if not 0 <= unused <= 7: + raise ValueError("unused must be integer between 0 and 7") + if unused: + if not s: + raise ValueError("unused is non-zero but s is empty") + last = str_idx_as_int(s, -1) + if last & (2 ** unused - 1): + raise ValueError("unused bits must be zeros in DER") + encoded_unused = int2byte(unused) + len_extra = 1 + return b("\x03") + encode_length(len(s) + len_extra) + encoded_unused + s + + +def encode_octet_string(s): + return b("\x04") + encode_length(len(s)) + s + + +def encode_oid(first, second, *pieces): + assert 0 <= first < 2 and 0 <= second <= 39 or first == 2 and 0 <= second + body = b''.join(chain([encode_number(40*first+second)], + (encode_number(p) for p in pieces))) + return b'\x06' + encode_length(len(body)) + body + + +def encode_sequence(*encoded_pieces): + total_len = sum([len(p) for p in encoded_pieces]) + return b('\x30') + encode_length(total_len) + b('').join(encoded_pieces) + + +def encode_number(n): + b128_digits = [] + while n: + b128_digits.insert(0, (n & 0x7f) | 0x80) + n = n >> 7 + if not b128_digits: + b128_digits.append(0) + b128_digits[-1] &= 0x7f + return b('').join([int2byte(d) for d in b128_digits]) + + +def remove_constructed(string): + s0 = str_idx_as_int(string, 0) + if (s0 & 0xe0) != 0xa0: + raise UnexpectedDER("wanted type 'constructed tag' (0xa0-0xbf), " + "got 0x%02x" % s0) + tag = s0 & 0x1f + length, llen = read_length(string[1:]) + body = string[1+llen:1+llen+length] + rest = string[1+llen+length:] + return tag, body, rest + + +def remove_sequence(string): + if not string: + raise UnexpectedDER("Empty string does not encode a sequence") + if string[:1] != b"\x30": + n = str_idx_as_int(string, 0) + raise UnexpectedDER("wanted type 'sequence' (0x30), got 0x%02x" % n) + length, lengthlength = read_length(string[1:]) + if length > len(string) - 1 - lengthlength: + raise UnexpectedDER("Length longer than the provided buffer") + endseq = 1+lengthlength+length + return string[1+lengthlength:endseq], string[endseq:] + + +def remove_octet_string(string): + if string[:1] != b"\x04": + n = str_idx_as_int(string, 0) + raise UnexpectedDER("wanted type 'octetstring' (0x04), got 0x%02x" % n) + length, llen = read_length(string[1:]) + body = string[1+llen:1+llen+length] + rest = string[1+llen+length:] + return body, rest + + +def remove_object(string): + if not string: + raise UnexpectedDER( + "Empty string does not encode an object identifier") + if string[:1] != b"\x06": + n = str_idx_as_int(string, 0) + raise UnexpectedDER("wanted type 'object' (0x06), got 0x%02x" % n) + length, lengthlength = read_length(string[1:]) + body = string[1+lengthlength:1+lengthlength+length] + rest = string[1+lengthlength+length:] + if not body: + raise UnexpectedDER("Empty object identifier") + if len(body) != length: + raise UnexpectedDER( + "Length of object identifier longer than the provided buffer") + numbers = [] + while body: + n, ll = read_number(body) + numbers.append(n) + body = body[ll:] + n0 = numbers.pop(0) + if n0 < 80: + first = n0 // 40 + else: + first = 2 + second = n0 - (40 * first) + numbers.insert(0, first) + numbers.insert(1, second) + return tuple(numbers), rest + + +def remove_integer(string): + if not string: + raise UnexpectedDER("Empty string is an invalid encoding of an " + "integer") + if string[:1] != b"\x02": + n = str_idx_as_int(string, 0) + raise UnexpectedDER("wanted type 'integer' (0x02), got 0x%02x" % n) + length, llen = read_length(string[1:]) + if length > len(string) - 1 - llen: + raise UnexpectedDER("Length longer than provided buffer") + if length == 0: + raise UnexpectedDER("0-byte long encoding of integer") + numberbytes = string[1+llen:1+llen+length] + rest = string[1+llen+length:] + msb = str_idx_as_int(numberbytes, 0) + if not msb < 0x80: + raise UnexpectedDER("Negative integers are not supported") + # check if the encoding is the minimal one (DER requirement) + if length > 1 and not msb: + # leading zero byte is allowed if the integer would have been + # considered a negative number otherwise + smsb = str_idx_as_int(numberbytes, 1) + if smsb < 0x80: + raise UnexpectedDER("Invalid encoding of integer, unnecessary " + "zero padding bytes") + return int(binascii.hexlify(numberbytes), 16), rest + + +def read_number(string): + number = 0 + llen = 0 + if str_idx_as_int(string, 0) == 0x80: + raise UnexpectedDER("Non minimal encoding of OID subidentifier") + # base-128 big endian, with most significant bit set in all but the last + # byte + while True: + if llen >= len(string): + raise UnexpectedDER("ran out of length bytes") + number = number << 7 + d = str_idx_as_int(string, llen) + number += (d & 0x7f) + llen += 1 + if not d & 0x80: + break + return number, llen + + +def encode_length(l): + assert l >= 0 + if l < 0x80: + return int2byte(l) + s = ("%x" % l).encode() + if len(s) % 2: + s = b("0") + s + s = binascii.unhexlify(s) + llen = len(s) + return int2byte(0x80 | llen) + s + + +def read_length(string): + if not string: + raise UnexpectedDER("Empty string can't encode valid length value") + num = str_idx_as_int(string, 0) + if not (num & 0x80): + # short form + return (num & 0x7f), 1 + # else long-form: b0&0x7f is number of additional base256 length bytes, + # big-endian + llen = num & 0x7f + if not llen: + raise UnexpectedDER("Invalid length encoding, length of length is 0") + if llen > len(string)-1: + raise UnexpectedDER("Length of length longer than provided buffer") + # verify that the encoding is minimal possible (DER requirement) + msb = str_idx_as_int(string, 1) + if not msb or llen == 1 and msb < 0x80: + raise UnexpectedDER("Not minimal encoding of length") + return int(binascii.hexlify(string[1:1+llen]), 16), 1+llen + + +def remove_bitstring(string, expect_unused=_sentry): + """ + Remove a BIT STRING object from `string` following :term:`DER`. + + The `expect_unused` can be used to specify if the bit string should + have the amount of unused bits decoded or not. If it's an integer, any + read BIT STRING that has number of unused bits different from specified + value will cause UnexpectedDER exception to be raised (this is especially + useful when decoding BIT STRINGS that have DER encoded object in them; + DER encoding is byte oriented, so the unused bits will always equal 0). + + If the `expect_unused` is specified as None, the first element returned + will be a tuple, with the first value being the extracted bit string + while the second value will be the decoded number of unused bits. + + If the `expect_unused` is unspecified, the decoding of byte with + number of unused bits will not be attempted and the bit string will be + returned as-is, the callee will be required to decode it and verify its + correctness. + + Future version of python will require the `expected_unused` parameter + to be specified. + + :param string: string of bytes to extract the BIT STRING from + :type string: bytes like object + :param expect_unused: number of bits that should be unused in the BIT + STRING, or None, to return it to caller + :type expect_unused: int or None + + :raises UnexpectedDER: when the encoding does not follow DER. + + :return: a tuple with first element being the extracted bit string and + the second being the remaining bytes in the string (if any); if the + `expect_unused` is specified as None, the first element of the returned + tuple will be a tuple itself, with first element being the bit string + as bytes and the second element being the number of unused bits at the + end of the byte array as an integer + :rtype: tuple + """ + if not string: + raise UnexpectedDER("Empty string does not encode a bitstring") + if expect_unused is _sentry: + warnings.warn("Legacy call convention used, expect_unused= needs to be" + " specified", + DeprecationWarning) + num = str_idx_as_int(string, 0) + if string[:1] != b"\x03": + raise UnexpectedDER("wanted bitstring (0x03), got 0x%02x" % num) + length, llen = read_length(string[1:]) + if not length: + raise UnexpectedDER("Invalid length of bit string, can't be 0") + body = string[1+llen:1+llen+length] + rest = string[1+llen+length:] + if expect_unused is not _sentry: + unused = str_idx_as_int(body, 0) + if not 0 <= unused <= 7: + raise UnexpectedDER("Invalid encoding of unused bits") + if expect_unused is not None and expect_unused != unused: + raise UnexpectedDER("Unexpected number of unused bits") + body = body[1:] + if unused: + if not body: + raise UnexpectedDER("Invalid encoding of empty bit string") + last = str_idx_as_int(body, -1) + # verify that all the unused bits are set to zero (DER requirement) + if last & (2 ** unused - 1): + raise UnexpectedDER("Non zero padding bits in bit string") + if expect_unused is None: + body = (body, unused) + return body, rest + +# SEQUENCE([1, STRING(secexp), cont[0], OBJECT(curvename), cont[1], BINTSTRING) + + +# signatures: (from RFC3279) +# ansi-X9-62 OBJECT IDENTIFIER ::= { +# iso(1) member-body(2) us(840) 10045 } +# +# id-ecSigType OBJECT IDENTIFIER ::= { +# ansi-X9-62 signatures(4) } +# ecdsa-with-SHA1 OBJECT IDENTIFIER ::= { +# id-ecSigType 1 } +## so 1,2,840,10045,4,1 +## so 0x42, .. .. + +# Ecdsa-Sig-Value ::= SEQUENCE { +# r INTEGER, +# s INTEGER } + +# id-public-key-type OBJECT IDENTIFIER ::= { ansi-X9.62 2 } +# +# id-ecPublicKey OBJECT IDENTIFIER ::= { id-publicKeyType 1 } + +# I think the secp224r1 identifier is (t=06,l=05,v=2b81040021) +# secp224r1 OBJECT IDENTIFIER ::= { +# iso(1) identified-organization(3) certicom(132) curve(0) 33 } +# and the secp384r1 is (t=06,l=05,v=2b81040022) +# secp384r1 OBJECT IDENTIFIER ::= { +# iso(1) identified-organization(3) certicom(132) curve(0) 34 } + +def unpem(pem): + if isinstance(pem, text_type): + pem = pem.encode() + + d = b("").join([l.strip() for l in pem.split(b("\n")) + if l and not l.startswith(b("-----"))]) + return base64.b64decode(d) + + +def topem(der, name): + b64 = base64.b64encode(der) + lines = [("-----BEGIN %s-----\n" % name).encode()] + lines.extend([b64[start:start+64]+b("\n") + for start in range(0, len(b64), 64)]) + lines.append(("-----END %s-----\n" % name).encode()) + return b("").join(lines) diff --git a/freezed_deps/ecdsa/ecdh.py b/freezed_deps/ecdsa/ecdh.py new file mode 100644 index 0000000..88848f5 --- /dev/null +++ b/freezed_deps/ecdsa/ecdh.py @@ -0,0 +1,306 @@ +""" +Class for performing Elliptic-curve Diffie-Hellman (ECDH) operations. +""" + +from .util import number_to_string +from .ellipticcurve import INFINITY +from .keys import SigningKey, VerifyingKey + + +__all__ = ["ECDH", "NoKeyError", "NoCurveError", "InvalidCurveError", + "InvalidSharedSecretError"] + + +class NoKeyError(Exception): + """ECDH. Key not found but it is needed for operation.""" + + pass + + +class NoCurveError(Exception): + """ECDH. Curve not set but it is needed for operation.""" + + pass + + +class InvalidCurveError(Exception): + """ECDH. Raised in case the public and private keys use different curves.""" + + pass + + +class InvalidSharedSecretError(Exception): + """ECDH. Raised in case the shared secret we obtained is an INFINITY.""" + + pass + + +class ECDH(object): + """ + Elliptic-curve Diffie-Hellman (ECDH). A key agreement protocol. + + Allows two parties, each having an elliptic-curve public-private key + pair, to establish a shared secret over an insecure channel + """"" + + def __init__(self, curve=None, private_key=None, public_key=None): + """ + ECDH init. + + Call can be initialised without parameters, then the first operation + (loading either key) will set the used curve. + All parameters must be ultimately set before shared secret + calculation will be allowed. + + :param curve: curve for operations + :type curve: Curve + :param private_key: `my` private key for ECDH + :type private_key: SigningKey + :param public_key: `their` public key for ECDH + :type public_key: VerifyingKey + """ + self.curve = curve + self.private_key = None + self.public_key = None + if private_key: + self.load_private_key(private_key) + if public_key: + self.load_received_public_key(public_key) + + def _get_shared_secret(self, remote_public_key): + if not self.private_key: + raise NoKeyError( + "Private key needs to be set to create shared secret") + if not self.public_key: + raise NoKeyError( + "Public key needs to be set to create shared secret") + if not (self.private_key.curve == self.curve == remote_public_key.curve): + raise InvalidCurveError( + "Curves for public key and private key is not equal.") + + # shared secret = PUBKEYtheirs * PRIVATEKEYours + result = remote_public_key.pubkey.point * self.private_key.privkey.secret_multiplier + if result == INFINITY: + raise InvalidSharedSecretError( + "Invalid shared secret (INFINITY).") + + return result.x() + + def set_curve(self, key_curve): + """ + Set the working curve for ecdh operations. + + :param key_curve: curve from `curves` module + :type key_curve: Curve + """ + self.curve = key_curve + + def generate_private_key(self): + """ + Generate local private key for ecdh operation with curve that was set. + + :raises NoCurveError: Curve must be set before key generation. + + :return: public (verifying) key from this private key. + :rtype: VerifyingKey object + """ + if not self.curve: + raise NoCurveError("Curve must be set prior to key generation.") + return self.load_private_key(SigningKey.generate(curve=self.curve)) + + def load_private_key(self, private_key): + """ + Load private key from SigningKey (keys.py) object. + + Needs to have the same curve as was set with set_curve method. + If curve is not set - it sets from this SigningKey + + :param private_key: Initialised SigningKey class + :type private_key: SigningKey + + :raises InvalidCurveError: private_key curve not the same as self.curve + + :return: public (verifying) key from this private key. + :rtype: VerifyingKey object + """ + if not self.curve: + self.curve = private_key.curve + if self.curve != private_key.curve: + raise InvalidCurveError("Curve mismatch.") + self.private_key = privat |