|
| 1 | +import json |
1 | 2 | import logging
|
2 | 3 | from abc import ABC, abstractmethod
|
| 4 | +from functools import lru_cache |
3 | 5 | from pathlib import Path
|
4 | 6 | from typing import Dict, Optional
|
5 | 7 |
|
6 | 8 | from coincurve.keys import PrivateKey
|
| 9 | +from rich.prompt import Console, Prompt, Text |
7 | 10 | from typing_extensions import deprecated
|
| 11 | +from web3 import Web3 |
8 | 12 |
|
9 | 13 | from aleph.sdk.conf import settings
|
10 | 14 | from aleph.sdk.utils import enum_as_str
|
@@ -143,22 +147,141 @@ async def decrypt(self, content: bytes) -> bytes:
|
143 | 147 | raise NotImplementedError
|
144 | 148 |
|
145 | 149 |
|
146 |
| -# Start of the ugly stuff |
147 | 150 | def generate_key() -> bytes:
|
| 151 | + """ |
| 152 | + Generate a new private key. |
| 153 | +
|
| 154 | + Returns: |
| 155 | + bytes: The generated private key as bytes. |
| 156 | + """ |
| 157 | + |
148 | 158 | privkey = PrivateKey()
|
149 | 159 | return privkey.secret
|
150 | 160 |
|
151 | 161 |
|
| 162 | +def create_key() -> bytes: |
| 163 | + """ |
| 164 | + Create or import a private key. |
| 165 | +
|
| 166 | + This function allows the user to either import an existing private key |
| 167 | + or generate a new one. If the user chooses to import a key, they can |
| 168 | + enter a private key in hexadecimal format or a passphrase. |
| 169 | +
|
| 170 | + Returns: |
| 171 | + bytes: The private key as bytes. 7e0c27fff7e434ec5aa47127e7bcdce81c4eba6f3ce980f425b60b1cd019a947 |
| 172 | + """ |
| 173 | + if Prompt.ask("Import an existing wallet", choices=["y", "n"], default="n") == "y": |
| 174 | + data = Prompt.ask("Enter your private key or passphrase") |
| 175 | + # private key |
| 176 | + if data.startswith("0x") and len(data) == 66: |
| 177 | + return bytes.fromhex(data[2:]) |
| 178 | + elif len(data) == 64: |
| 179 | + return bytes.fromhex(data) |
| 180 | + # passphrase |
| 181 | + elif len(data.split()) in [12, 24]: |
| 182 | + w3 = Web3() |
| 183 | + w3.eth.account.enable_unaudited_hdwallet_features() |
| 184 | + return w3.eth.account.from_mnemonic(data.strip()).key |
| 185 | + raise ValueError("Invalid private key or passphrase") |
| 186 | + return generate_key() |
| 187 | + |
| 188 | + |
| 189 | +def save_key(private_key: bytes, path: Path): |
| 190 | + """ |
| 191 | + Save a private key to a file. |
| 192 | +
|
| 193 | + Parameters: |
| 194 | + private_key (bytes): The private key as bytes. |
| 195 | + path (Path): The path to the private key file. |
| 196 | +
|
| 197 | + Returns: |
| 198 | + None |
| 199 | + """ |
| 200 | + address = None |
| 201 | + path.parent.mkdir(exist_ok=True, parents=True) |
| 202 | + if path.name.endswith(".key"): |
| 203 | + path.write_bytes(private_key) |
| 204 | + address = Web3().to_checksum_address( |
| 205 | + Web3().eth.account.from_key(private_key).address |
| 206 | + ) |
| 207 | + elif path.name.endswith(".json"): |
| 208 | + password = Prompt.ask( |
| 209 | + "Enter a password to encrypt your keystore", password=True |
| 210 | + ) |
| 211 | + keystore = Web3().eth.account.encrypt(private_key, password) |
| 212 | + path.write_text(json.dumps(keystore)) |
| 213 | + address = Web3().to_checksum_address( |
| 214 | + Web3().eth.account.from_key(private_key).address |
| 215 | + ) |
| 216 | + if not address: |
| 217 | + raise ValueError("Unsupported private key file format") |
| 218 | + confirmation = Text.assemble( |
| 219 | + "\nYour address: ", |
| 220 | + Text(address, style="cyan"), |
| 221 | + "\nSaved file: ", |
| 222 | + Text(str(path), style="orange1"), |
| 223 | + "\n", |
| 224 | + ) |
| 225 | + Console().print(confirmation) |
| 226 | + |
| 227 | + |
| 228 | +@lru_cache(maxsize=1) |
| 229 | +def load_key(private_key_path: Path) -> bytes: |
| 230 | + """ |
| 231 | + Load a private key from a file. |
| 232 | +
|
| 233 | + This function supports two types of private key files: |
| 234 | + 1. Unencrypted .key files. |
| 235 | + 2. Encrypted .json keystore files. |
| 236 | +
|
| 237 | + Parameters: |
| 238 | + private_key_path (Path): The path to the private key file. |
| 239 | +
|
| 240 | + Returns: |
| 241 | + bytes: The private key as bytes. |
| 242 | +
|
| 243 | + Raises: |
| 244 | + FileNotFoundError: If the private key file does not exist. |
| 245 | + ValueError: If the private key file is not a .key or .json file. |
| 246 | + """ |
| 247 | + if not private_key_path.exists(): |
| 248 | + raise FileNotFoundError("Private key file not found") |
| 249 | + elif private_key_path.name.endswith(".key"): |
| 250 | + return private_key_path.read_bytes() |
| 251 | + elif private_key_path.name.endswith(".json"): |
| 252 | + keystore = private_key_path.read_text() |
| 253 | + password = Prompt.ask("Keystore password", password=True) |
| 254 | + try: |
| 255 | + return Web3().eth.account.decrypt(keystore, password) |
| 256 | + except ValueError: |
| 257 | + raise ValueError("Invalid password") |
| 258 | + raise ValueError("Unsupported private key file format") |
| 259 | + |
| 260 | + |
152 | 261 | def get_fallback_private_key(path: Optional[Path] = None) -> bytes:
|
| 262 | + """ |
| 263 | + Retrieve or create a fallback private key. |
| 264 | +
|
| 265 | + This function attempts to load a private key from the specified path. |
| 266 | + If the path is not provided, it defaults to the path specified in the |
| 267 | + settings. If the file does not exist or is empty, a new private key |
| 268 | + is generated and saved to the specified path. A symlink is also created |
| 269 | + to use this key by default. |
| 270 | +
|
| 271 | + Parameters: |
| 272 | + path (Optional[Path]): The path to the private key file. If not provided, |
| 273 | + the default path from settings is used. |
| 274 | +
|
| 275 | + Returns: |
| 276 | + bytes: The private key as bytes. |
| 277 | + """ |
153 | 278 | path = path or settings.PRIVATE_KEY_FILE
|
154 | 279 | private_key: bytes
|
155 | 280 | if path.exists() and path.stat().st_size > 0:
|
156 |
| - private_key = path.read_bytes() |
| 281 | + private_key = load_key(path) |
157 | 282 | else:
|
158 |
| - private_key = generate_key() |
159 |
| - path.parent.mkdir(exist_ok=True, parents=True) |
160 |
| - path.write_bytes(private_key) |
161 |
| - |
| 283 | + private_key = create_key() |
| 284 | + save_key(private_key, path) |
162 | 285 | default_key_path = path.parent / "default.key"
|
163 | 286 |
|
164 | 287 | # If the symlink exists but does not point to a file, delete it.
|
|
0 commit comments