Skip to content
This repository was archived by the owner on Oct 21, 2025. It is now read-only.
98 changes: 98 additions & 0 deletions fbpcp/gateway/kms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from typing import Any, Dict, List, Optional

import boto3
from botocore.client import BaseClient
from fbpcp.decorator.error_handler import error_handler
from fbpcp.gateway.aws import AWSGateway


class KMSGateway(AWSGateway):
def __init__(
self,
region: str,
access_key_id: Optional[str] = None,
access_key_data: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(region, access_key_id, access_key_data, config)
self.client: BaseClient = boto3.client(
"kms", region_name=self.region, **self.config
)

@error_handler
def decrypt(
self,
key_id: str,
ciphertext_blob: bytes,
encryption_context: Dict[str, str],
grant_tokens: List[str],
encryption_algorithm: str,
) -> Dict[str, Any]:
return self.client.encrypt(
KeyId=key_id,
CiphertextBlob=ciphertext_blob,
EncryptionContext=encryption_context,
GrantTokens=grant_tokens,
EncryptionAlgorithm=encryption_algorithm,
)

@error_handler
def encrypt(
self,
key_id: str,
plaintext: bytes,
encryption_context: Dict[str, str],
grant_tokens: List[str],
encryption_algorithm: str,
) -> Dict[str, Any]:
return self.client.encrypt(
KeyId=key_id,
Plaintext=plaintext,
EncryptionContext=encryption_context,
GrantTokens=grant_tokens,
EncryptionAlgorithm=encryption_algorithm,
)

@error_handler
def sign(
self,
key_id: str,
message: bytes,
message_type: str,
grant_tokens: List[str],
signing_algorithm: str,
) -> Dict[str, Any]:
return self.client.sign(
KeyId=key_id,
Message=message,
MessageType=message_type,
GrantTokens=grant_tokens,
SigningAlgorithm=signing_algorithm,
)

@error_handler
def verify(
self,
key_id: str,
message: bytes,
message_type: str,
signature: bytes,
signing_algorithm: str,
grant_tokens: List[str],
) -> Dict[str, Any]:
return self.client.verify(
KeyId=key_id,
Message=message,
MessageType=message_type,
Signature=signature,
SigningAlgorithm=signing_algorithm,
GrantTokens=grant_tokens,
)
27 changes: 27 additions & 0 deletions fbpcp/service/key_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import abc


class KeyManagementService(abc.ABC):
@abc.abstractmethod
def sign(self, message: str) -> str:
pass

@abc.abstractmethod
def decrypt(self, ciphertext_blob: str) -> str:
pass

@abc.abstractmethod
def encrypt(self, plaintext: str) -> str:
pass

@abc.abstractmethod
def verify(self, message: str, signature: str) -> str:
pass
100 changes: 100 additions & 0 deletions fbpcp/service/key_managment_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from base64 import b64decode, b64encode
from typing import Any, Dict, List, Optional

from fbpcp.gateway.kms import KMSGateway
from fbpcp.service.key_management import KeyManagementService


class AWSKeyManagementService(KeyManagementService):
key_id: str
encryption_algorithm: str
signing_algorithm: str
grant_tokens: Optional[List[str]]

def __init__(
self,
region: str,
key_id: str,
encryption_algorithm: Optional[str] = None,
signing_algorithm: Optional[str] = None,
grant_tokens: Optional[List[str]] = None,
access_key_id: Optional[str] = None,
access_key_data: Optional[str] = None,
encryption_context: Optional[Dict[str, str]] = None,
config: Optional[Dict[str, Any]] = None,
) -> None:
self.kms_gateway = KMSGateway(
region=region,
access_key_id=access_key_id,
access_key_data=access_key_data,
config=config,
)
self.key_id = key_id

self.encryption_algorithm = encryption_algorithm if encryption_algorithm else ""
self.signing_algorithm = signing_algorithm if signing_algorithm else ""
self.grant_tokens = grant_tokens if grant_tokens else []

def sign(self, message: str, message_type: str = "RAW") -> str:
if not self.signing_algorithm:
raise ValueError("No Signing Algorithm Set")
response = self.kms_gateway.sign(
key_id=self.key_id,
message=message,
message_type=message_type,
grant_tokens=self.grant_tokens,
signing_algorithm=self.signing_algorithm,
)
signature = b64encode(response["Signature"]).decode()
return signature

def decrypt(
self, ciphertext_blob: str, encryption_context: Optional[Dict[str, str]] = None
) -> str:
if not self.encryption_algorithm:
raise ValueError("No Encryption Algorithm Set")
b64_ciphertext_blob = b64decode(ciphertext_blob.encode())
response = self.kms_gateway.decrypt(
key_id=self.key_id,
ciphertext_blob=b64_ciphertext_blob,
encryption_context=encryption_context,
grant_tokens=self.grant_tokens,
encryption_algorithm=self.encryption_algorithm,
)
plaintext = b64encode(response["Plaintext"]).decode()
return plaintext

def encrypt(
self, plaintext: str, encryption_context: Optional[Dict[str, str]] = None
) -> str:
if not self.encryption_algorithm:
raise ValueError("No Encryption Algorithm Set")
response = self.kms_gateway.encrypt(
key_id=self.key_id,
plaintext=plaintext,
encryption_context=encryption_context if encryption_context else {},
grant_tokens=self.grant_tokens,
encryption_algorithm=self.encryption_algorithm,
)
ciphertext_blob = b64encode(response["CiphertextBlob"]).decode()
return ciphertext_blob

def verify(self, message: str, signature: str, message_type: str = "RAW") -> str:
b64_signature = b64decode(signature.encode())
response = self.kms_gateway.verify(
key_id=self.key_id,
message=message,
message_type=message_type,
signature=b64_signature,
signing_algorithm=self.signing_algorithm,
grant_tokens=self.grant_tokens,
)
return response["SignatureValid"]
6 changes: 6 additions & 0 deletions onedocker/common/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
# This is the repository path that OneDocker downloads binaries from
ONEDOCKER_REPOSITORY_PATH = "ONEDOCKER_REPOSITORY_PATH"

# This is the repository path that OneDocker downloads binary checksums from
ONEDOCKER_CHECKSUM_REPOSITORY_PATH = "ONEDOCKER_CHECKSUM_REPOSITORY_PATH"

# This is the local path that the binaries reside
ONEDOCKER_EXE_PATH = "ONEDOCKER_EXE_PATH"

# This is the path user specified to upload the core dump file to
CORE_DUMP_REPOSITORY_PATH = "CORE_DUMP_REPOSITORY_PATH"

# This is the type of checksum we want to compare when running program
ONEDOCKER_CHECKSUM_TYPE = "ONEDOCKER_CHECKSUM_TYPE"
6 changes: 3 additions & 3 deletions onedocker/entity/checksum_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class ChecksumInfo:
This dataclass tracks a package's checksum info for attestation, to allow for easy comparision and tracking

Fields:
package_name: String containting the package's name
version: String containting the package's version
package_name: String containing the package name
version: String containing the package version
Checksums: Dict that holds a pairing between a ChecksumType (Key) and the corresponding hash (Value)
"""

Expand All @@ -41,7 +41,7 @@ def __post_init__(self) -> None:

def __eq__(self, other: ChecksumInfo) -> bool:
"""
Compares two ChecksumInfo isntannces in previously decided order
Compares two ChecksumInfo instances in previously decided order
1. Compares package_name
2. Compares version
3. Checks if overlaping checkum algorithms are present
Expand Down
48 changes: 48 additions & 0 deletions onedocker/repository/onedocker_checksum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from fbpcp.service.storage import StorageService


class OneDockerChecksumRepository:
def __init__(
self, storage_svc: StorageService, checksum_repository_path: str
) -> None:
self.storage_svc = storage_svc
self.checksum_repository_path = checksum_repository_path

def _build_checksum_path(self, package_name: str, version: str) -> str:
if not self.checksum_repository_path:
raise ValueError(
"Checksum Repository Path not set. Unable to attest Package"
)
return f"{self.checksum_repository_path}{package_name}/{version}/{package_name.split('/')[-1]}.json"

def _file_exists(self, package_name: str, version: str) -> bool:
package_path = self._build_checksum_path(
package_name=package_name, version=version
)
return self.storage_svc.file_exists(package_path)

def write(self, package_name: str, version: str, checksum_data: str) -> None:
package_path = self._build_checksum_path(
package_name=package_name, version=version
)
self.storage_svc.write(filename=package_path, data=checksum_data)

def read(self, package_name: str, version: str) -> str:
package_path = self._build_checksum_path(
package_name=package_name, version=version
)

if not self._file_exists(package_name, version):
raise FileNotFoundError(
f"Cant find checksum file for package {package_name}, version {version}"
)

return self.storage_svc.read(filename=package_path)
36 changes: 24 additions & 12 deletions onedocker/script/cli/onedocker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,16 @@
from fbpcp.service.onedocker import OneDockerService
from fbpcp.service.storage import StorageService
from fbpcp.util import reflect, yaml
from onedocker.repository.onedocker_checksum import OneDockerChecksumRepository
from onedocker.repository.onedocker_package import OneDockerPackageRepository
from onedocker.service.attestation import AttestationService

logger = None
onedocker_svc = None
container_svc = None
onedocker_package_repo = None
attestation_service = None
onedocker_checksum_repo = None
attestation_svc = None
log_svc = None
task_definition = None
repository_path = None
Expand All @@ -62,10 +64,19 @@ def _upload(
f" Starting uploading package {package_name} at '{package_dir}', version {version}..."
)
if enable_attestation:
logger.info(
f"Generating and uploading checksums for package {package_name}: {version}"
logger.info(f"Generating checksums for package {package_name}: {version}")
formated_checksum_info = attestation_svc.track_binary(
binary_path=package_dir,
package_name=package_name,
version=version,
)
logger.info(f"Uploading checksums for package {package_name}: {version}")
onedocker_checksum_repo.write(
package_name=package_name,
version=version,
checksum_data=formated_checksum_info,
)
attestation_service.track_binary(package_dir, package_name, version)
logger.info(f"Uploading binary for package {package_name}: {version}")
onedocker_package_repo.upload(package_name, version, package_dir)
logger.info(f" Finished uploading '{package_name}, version {version}'.\n")

Expand Down Expand Up @@ -165,7 +176,7 @@ def _build_exe_s3_path(repository_path: str, package_name: str, version: str) ->


def main() -> None:
global container_svc, onedocker_svc, onedocker_package_repo, log_svc, logger, task_definition, repository_path, attestation_service
global container_svc, onedocker_svc, onedocker_package_repo, onedocker_checksum_repo, log_svc, logger, task_definition, repository_path, attestation_svc
s = schema.Schema(
{
"upload": bool,
Expand Down Expand Up @@ -197,24 +208,25 @@ def main() -> None:
version = (
arguments["--version"] if arguments["--version"] else DEFAULT_BINARY_VERSION
)
enable_attestation = arguments["--enable_attestation"]

config = yaml.load(Path(arguments["--config"])).get("onedocker-cli")
task_definition = config["setting"]["task_definition"]
repository_path = config["setting"]["repository_path"]
checksum_repository_path = config["setting"].get("checksum_repository_path", "")

attestation_svc = AttestationService()
storage_svc = _build_storage_service(config["dependency"]["StorageService"])
container_svc = _build_container_service(config["dependency"]["ContainerService"])
onedocker_svc = OneDockerService(container_svc, task_definition)
onedocker_package_repo = OneDockerPackageRepository(storage_svc, repository_path)
onedocker_checksum_repo = OneDockerChecksumRepository(
storage_svc, checksum_repository_path
)
log_svc = _build_log_service(config["dependency"]["LogService"])

enable_attestation = arguments["--enable_attestation"]
if enable_attestation:
logger.info(
f"Package tracking for package {package_name}: {version} has been enabled"
)
checksum_repository_path = config["setting"]["checksum_repository_path"]
attestation_service = AttestationService(storage_svc, checksum_repository_path)
status = "enabled" if enable_attestation else "disabled"
logger.info(f"Package tracking for package {package_name}: {version} is {status}")

if arguments["upload"]:
_upload(package_dir, package_name, version, enable_attestation)
Expand Down
Loading