Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiprocessing poll for deposit data generation and local storag… #32

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 112 additions & 66 deletions stakewise_cli/eth2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import secrets
import string
from enum import Enum
from itertools import chain
from multiprocessing import Pool
from typing import Dict, List, Set, Tuple

import backoff
Expand Down Expand Up @@ -43,6 +45,7 @@
MerkleDepositData,
SigningKey,
)
from stakewise_cli.utils import chunkify

# TODO: find a way to import "from eth2deposit.utils.constants import WORD_LISTS_PATH"
WORD_LISTS_PATH = os.path.join(os.path.dirname(__file__), "word_lists")
Expand Down Expand Up @@ -117,53 +120,65 @@ def create_new_mnemonic(mnemonic_language: str) -> str:
return mnemonic


def generate_keypairs(
indexes: List[int], mnemonic: str, gql_client: Client
) -> List[KeyPair]:
pub_key_to_priv_key: Dict[HexStr, BLSPrivkey] = {}

# derive signing key
public_keys: List[HexStr] = []
for index in indexes:
signing_key = get_mnemonic_signing_key(mnemonic, index, IS_LEGACY)
# derive public key
public_key = w3.toHex(G2ProofOfPossession.SkToPk(signing_key.key))
# store keypairs
pub_key_to_priv_key[public_key] = signing_key.key
public_keys.append(public_key)

# remove keys that were already registered in beacon chain
result: Dict = gql_client.execute(
document=REGISTRATIONS_QUERY,
variable_values=dict(public_keys=public_keys),
)
registrations = result["validatorRegistrations"]
for registration in registrations:
pub_key_to_priv_key.pop(registration["publicKey"], None)

return [
KeyPair(private_key=priv_key, public_key=pub_key)
for pub_key, priv_key in pub_key_to_priv_key.items()
]


def generate_unused_validator_keys(
gql_client: Client, mnemonic: str, keys_count: int
gql_client: Client,
mnemonic: str,
keys_count: int,
) -> List[KeyPair]:
"""Generates specified number of unused validator key-pairs from the mnemonic."""
pub_key_to_priv_key: Dict[HexStr, BLSPrivkey] = {}
with click.progressbar(
length=keys_count,
label="Creating validator keys:\t\t",
show_percent=False,
show_pos=True,
) as bar:
from_index = 0
while len(pub_key_to_priv_key) < keys_count:
curr_progress = len(pub_key_to_priv_key)
chunk_size = min(100, keys_count - curr_progress)

# generate keys in chunks
public_keys_chunk: List[HexStr] = []
while len(public_keys_chunk) != chunk_size:
# derive signing key
signing_key = get_mnemonic_signing_key(mnemonic, from_index, IS_LEGACY)

# derive public key
public_key = w3.toHex(G2ProofOfPossession.SkToPk(signing_key.key))

# store keypairs
pub_key_to_priv_key[public_key] = signing_key.key
public_keys_chunk.append(public_key)

# increase index for generating other keys
from_index += 1
with Pool() as pool:

# remove keys that were already registered in beacon chain
result: Dict = gql_client.execute(
document=REGISTRATIONS_QUERY,
variable_values=dict(public_keys=public_keys_chunk),
)
registrations = result["validatorRegistrations"]
for registration in registrations:
pub_key_to_priv_key.pop(registration["publicKey"], None)
def bar_updated(result, *args, **kwargs):
bar.update(len(result))

bar.update(len(pub_key_to_priv_key) - curr_progress)

return [
KeyPair(private_key=priv_key, public_key=pub_key)
for pub_key, priv_key in pub_key_to_priv_key.items()
]
results = []
indexes = [i for i in range(keys_count)]
for chunk in chunkify(indexes, 50):
results.append(
pool.apply_async(
generate_keypairs,
[chunk, mnemonic, gql_client],
callback=bar_updated,
)
)
[result.wait() for result in results]
return list(chain.from_iterable([result.get() for result in results]))


def get_mnemonic_signing_key(
Expand Down Expand Up @@ -276,6 +291,42 @@ def verify_deposit_data(
return deposit_data.hash_tree_root == hash_tree_root


def generate_deposit_data(
withdrawal_credentials: HexStr,
deposit_amount: Wei,
withdrawal_credentials_bytes: Bytes32,
deposit_amount_gwei: Gwei,
genesis_fork_version: bytes,
keypair: KeyPair,
) -> (MerkleDepositData, bytes):
private_key = keypair["private_key"]
public_key = keypair["public_key"]
signature, deposit_data_root = get_deposit_data_signature(
private_key=private_key,
public_key=BLSPubkey(w3.toBytes(hexstr=public_key)),
withdrawal_credentials=withdrawal_credentials_bytes,
amount=deposit_amount_gwei,
fork_version=Bytes4(genesis_fork_version),
)
encoded_data: bytes = w3.codec.encode_abi(
["bytes", "bytes32", "bytes", "bytes32"],
[
public_key,
withdrawal_credentials_bytes,
signature,
deposit_data_root,
],
)
return MerkleDepositData(
public_key=public_key,
signature=w3.toHex(signature),
amount=str(deposit_amount),
withdrawal_credentials=withdrawal_credentials,
deposit_data_root=w3.toHex(deposit_data_root),
proof=[],
), w3.keccak(primitive=encoded_data)


def generate_merkle_deposit_datum(
genesis_fork_version: bytes,
withdrawal_credentials: HexStr,
Expand All @@ -293,36 +344,31 @@ def generate_merkle_deposit_datum(
merkle_elements: List[bytes] = []
with click.progressbar(
validator_keypairs, label=loading_label, show_percent=False, show_pos=True
) as keypairs:
for keypair in keypairs:
private_key = keypair["private_key"]
public_key = keypair["public_key"]
signature, deposit_data_root = get_deposit_data_signature(
private_key=private_key,
public_key=BLSPubkey(w3.toBytes(hexstr=public_key)),
withdrawal_credentials=withdrawal_credentials_bytes,
amount=deposit_amount_gwei,
fork_version=Bytes4(genesis_fork_version),
)
encoded_data: bytes = w3.codec.encode_abi(
["bytes", "bytes32", "bytes", "bytes32"],
[
public_key,
withdrawal_credentials_bytes,
signature,
deposit_data_root,
],
)
merkle_elements.append(w3.keccak(primitive=encoded_data))
deposit_data = MerkleDepositData(
public_key=public_key,
signature=w3.toHex(signature),
amount=str(deposit_amount),
withdrawal_credentials=withdrawal_credentials,
deposit_data_root=w3.toHex(deposit_data_root),
proof=[],
)
merkle_deposit_datum.append(deposit_data)
) as bar:
with Pool() as pool:

def bar_updated(*args, **kwargs):
bar.update(1)

results = [
pool.apply_async(
generate_deposit_data,
[
withdrawal_credentials,
deposit_amount,
withdrawal_credentials_bytes,
deposit_amount_gwei,
genesis_fork_version,
keypair,
],
callback=bar_updated,
)
for keypair in validator_keypairs
]
[result.wait() for result in results]
for deposit_data, merkle_element in [result.get() for result in results]:
merkle_elements.append(merkle_element)
merkle_deposit_datum.append(deposit_data)

merkle_tree = MerkleTree(merkle_elements)

Expand Down
72 changes: 41 additions & 31 deletions stakewise_cli/storages/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import errno
import time
from functools import cached_property, lru_cache
from multiprocessing import Pool
from os import listdir, makedirs
from os.path import exists
from typing import Dict, Set
Expand Down Expand Up @@ -56,6 +57,33 @@ def operator_deposit_data_public_keys(self) -> Set[HexStr]:

return result

def generate_keystores(self, index):
signing_key = get_mnemonic_signing_key(self.mnemonic, index, IS_LEGACY)
public_key = Web3.toHex(G2ProofOfPossession.SkToPk(signing_key.key))

is_registered = is_validator_registered(
gql_client=self.eth_gql_client, public_key=public_key
)
if is_registered:
click.secho(
f"Public key {public_key} is in deposit data and already in use, skipping...",
bold=True,
fg="red",
)
return
secret = signing_key.key.to_bytes(32, "big")

password = self.get_or_create_keystore_password()
keystore = ScryptKeystore.encrypt(
secret=secret, password=password, path=signing_key.path
).as_json()

keystore_name = "keystore-%s-%i.json" % (
signing_key.path.replace("/", "_"),
time.time(),
)
return keystore_name, keystore

@cached_property
def deposit_data_keystores(self) -> Dict[str, str]:
"""
Expand All @@ -67,45 +95,27 @@ def deposit_data_keystores(self) -> Dict[str, str]:
if not keys_count:
return keystores

from_index = 0
with click.progressbar(
length=keys_count,
label="Syncing deposit data keystores\t\t",
show_percent=False,
show_pos=True,
) as bar:
while True:
signing_key = get_mnemonic_signing_key(
self.mnemonic, from_index, IS_LEGACY
)
public_key = Web3.toHex(G2ProofOfPossession.SkToPk(signing_key.key))
if public_key not in self.operator_deposit_data_public_keys:
break
with Pool() as pool:

is_registered = is_validator_registered(
gql_client=self.eth_gql_client, public_key=public_key
)
if is_registered:
click.secho(
f"Public key {public_key} is in deposit data and already in use, skipping...",
bold=True,
fg="red",
)
def bar_updated(*args, **kwargs):
bar.update(1)
continue

secret = signing_key.key.to_bytes(32, "big")
password = self.get_or_create_keystore_password()
keystore = ScryptKeystore.encrypt(
secret=secret, password=password, path=signing_key.path
).as_json()
keystore_name = "keystore-%s-%i.json" % (
signing_key.path.replace("/", "_"),
time.time(),
)
keystores[keystore_name] = keystore
from_index += 1
bar.update(1)

results = [
pool.apply_async(
self.generate_keystores,
[index],
callback=bar_updated,
)
for index in [i for i in range(keys_count)]
]
[result.wait() for result in results]
keystores = {x[0]: x[1] for x in [result.get() for result in results]}

return keystores

Expand Down
3 changes: 3 additions & 0 deletions stakewise_cli/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def chunkify(items, size):
for i in range(0, len(items), size):
yield items[i : i + size]