hyb
2026-01-09 4cb426cb3ae31e772a09d4ade5b2f0242aaeefa0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0, as
# published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an
# additional permission to link the program and your derivative works
# with the separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# Without limiting anything contained in the foregoing, this file,
# which is part of MySQL Connector/Python, is also subject to the
# Universal FOSS Exception, version 1.0, a copy of which can be found at
# http://oss.oracle.com/licenses/universal-foss-exception.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 
"""Caching SHA2 Password Authentication Plugin."""
 
import struct
 
from hashlib import sha256
from typing import TYPE_CHECKING, Any, Optional
 
from ..errors import InterfaceError
from ..logger import logger
from . import MySQLAuthPlugin
 
if TYPE_CHECKING:
    from ..network import MySQLSocket
 
AUTHENTICATION_PLUGIN_CLASS = "MySQLCachingSHA2PasswordAuthPlugin"
 
 
class MySQLCachingSHA2PasswordAuthPlugin(MySQLAuthPlugin):
    """Class implementing the MySQL caching_sha2_password authentication plugin
 
    Note that encrypting using RSA is not supported since the Python
    Standard Library does not provide this OpenSSL functionality.
    """
 
    perform_full_authentication: int = 4
 
    def _scramble(self, auth_data: bytes) -> bytes:
        """Return a scramble of the password using a Nonce sent by the
        server.
 
        The scramble is of the form:
        XOR(SHA2(password), SHA2(SHA2(SHA2(password)), Nonce))
        """
        if not auth_data:
            raise InterfaceError("Missing authentication data (seed)")
 
        if not self._password:
            return b""
 
        hash1 = sha256(self._password.encode()).digest()
        hash2 = sha256()
        hash2.update(sha256(hash1).digest())
        hash2.update(auth_data)
        hash2_digest = hash2.digest()
        xored = [h1 ^ h2 for (h1, h2) in zip(hash1, hash2_digest)]
        hash3 = struct.pack("32B", *xored)
        return hash3
 
    @property
    def name(self) -> str:
        """Plugin official name."""
        return "caching_sha2_password"
 
    @property
    def requires_ssl(self) -> bool:
        """Signals whether or not SSL is required."""
        return False
 
    def auth_response(self, auth_data: bytes, **kwargs: Any) -> Optional[bytes]:
        """Make the client's authorization response.
 
        Args:
            auth_data: Authorization data.
            kwargs: Custom configuration to be passed to the auth plugin
                    when invoked. The parameters defined here will override the ones
                    defined in the auth plugin itself.
 
        Returns:
            packet: Client's authorization response.
        """
        if not auth_data:
            return None
        if len(auth_data) > 1:
            return self._scramble(auth_data)
        if auth_data[0] == self.perform_full_authentication:
            # return password as clear text.
            return self._password.encode() + b"\x00"
 
        return None
 
    def auth_more_response(
        self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
    ) -> bytes:
        """Handles server's `auth more data` response.
 
        Args:
            sock: Pointer to the socket connection.
            auth_data: Authentication method data (from a packet representing
                       an `auth more data` response).
            kwargs: Custom configuration to be passed to the auth plugin
                    when invoked. The parameters defined here will override the ones
                    defined in the auth plugin itself.
 
        Returns:
            packet: Last server's response after back-and-forth
                    communication.
        """
        response = self.auth_response(auth_data, **kwargs)
        if response:
            sock.send(response)
 
        return bytes(sock.recv())
 
    def auth_switch_response(
        self, sock: "MySQLSocket", auth_data: bytes, **kwargs: Any
    ) -> bytes:
        """Handles server's `auth switch request` response.
 
        Args:
            sock: Pointer to the socket connection.
            auth_data: Plugin provided data (extracted from a packet
                       representing an `auth switch request` response).
            kwargs: Custom configuration to be passed to the auth plugin
                    when invoked. The parameters defined here will override the ones
                    defined in the auth plugin itself.
 
        Returns:
            packet: Last server's response after back-and-forth
                    communication.
        """
        response = self.auth_response(auth_data, **kwargs)
        if response is None:
            raise InterfaceError("Got a NULL auth response")
 
        logger.debug("# request: %s size: %s", response, len(response))
        sock.send(response)
 
        pkt = bytes(sock.recv())
        logger.debug("# server response packet: %s", pkt)
 
        return pkt