Skip to content
This repository was archived by the owner on Oct 21, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions fbpcp/gateway/kms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from typing import Any, Dict, List, Optional

import boto3
from botocore.client import BaseClient
from fbpcp.decorator.error_handler import error_handler
from fbpcp.gateway.aws import AWSGateway


class KMSGateway(AWSGateway):
def __init__(
self,
region: str,
access_key_id: Optional[str] = None,
access_key_data: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(region, access_key_id, access_key_data, config)
self.client: BaseClient = boto3.client(
"kms", region_name=self.region, **self.config
)

@error_handler
def decrypt(
self,
key_id: str,
ciphertext_blob: bytes,
encryption_context: Dict[str, str],
grant_tokens: List[str],
encryption_algorithm: str,
) -> Dict[str, Any]:
return self.client.encrypt(
KeyId=key_id,
CiphertextBlob=ciphertext_blob,
EncryptionContext=encryption_context,
GrantTokens=grant_tokens,
EncryptionAlgorithm=encryption_algorithm,
)

@error_handler
def encrypt(
self,
key_id: str,
plaintext: bytes,
encryption_context: Dict[str, str],
grant_tokens: List[str],
encryption_algorithm: str,
) -> Dict[str, Any]:
return self.client.encrypt(
KeyId=key_id,
Plaintext=plaintext,
EncryptionContext=encryption_context,
GrantTokens=grant_tokens,
EncryptionAlgorithm=encryption_algorithm,
)

@error_handler
def sign(
self,
key_id: str,
message: bytes,
message_type: str,
grant_tokens: List[str],
signing_algorithm: str,
) -> Dict[str, Any]:
return self.client.sign(
KeyId=key_id,
Message=message,
MessageType=message_type,
GrantTokens=grant_tokens,
SigningAlgorithm=signing_algorithm,
)

@error_handler
def verify(
self,
key_id: str,
message: bytes,
message_type: str,
signature: bytes,
signing_algorithm: str,
grant_tokens: List[str],
) -> Dict[str, Any]:
return self.client.verify(
KeyId=key_id,
Message=message,
MessageType=message_type,
Signature=signature,
SigningAlgorithm=signing_algorithm,
GrantTokens=grant_tokens,
)
27 changes: 27 additions & 0 deletions fbpcp/service/key_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import abc


class KeyManagementService(abc.ABC):
@abc.abstractmethod
def sign(self, message: str) -> str:
pass

@abc.abstractmethod
def decrypt(self, ciphertext_blob: str) -> str:
pass

@abc.abstractmethod
def encrypt(self, plaintext: str) -> str:
pass

@abc.abstractmethod
def verify(self, message: str, signature: str) -> str:
pass
100 changes: 100 additions & 0 deletions fbpcp/service/key_managment_aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from base64 import b64decode, b64encode
from typing import Any, Dict, List, Optional

from fbpcp.gateway.kms import KMSGateway
from fbpcp.service.key_management import KeyManagementService


class AWSKeyManagementService(KeyManagementService):
key_id: str
encryption_algorithm: str
signing_algorithm: str
grant_tokens: Optional[List[str]]

def __init__(
self,
region: str,
key_id: str,
encryption_algorithm: Optional[str] = None,
signing_algorithm: Optional[str] = None,
grant_tokens: Optional[List[str]] = None,
access_key_id: Optional[str] = None,
access_key_data: Optional[str] = None,
encryption_context: Optional[Dict[str, str]] = None,
config: Optional[Dict[str, Any]] = None,
) -> None:
self.kms_gateway = KMSGateway(
region=region,
access_key_id=access_key_id,
access_key_data=access_key_data,
config=config,
)
self.key_id = key_id

self.encryption_algorithm = encryption_algorithm if encryption_algorithm else ""
self.signing_algorithm = signing_algorithm if signing_algorithm else ""
self.grant_tokens = grant_tokens if grant_tokens else []

def sign(self, message: str, message_type: str = "RAW") -> str:
if not self.signing_algorithm:
raise ValueError("No Signing Algorithm Set")
response = self.kms_gateway.sign(
key_id=self.key_id,
message=message,
message_type=message_type,
grant_tokens=self.grant_tokens,
signing_algorithm=self.signing_algorithm,
)
signature = b64encode(response["Signature"]).decode()
return signature

def decrypt(
self, ciphertext_blob: str, encryption_context: Optional[Dict[str, str]] = None
) -> str:
if not self.encryption_algorithm:
raise ValueError("No Encryption Algorithm Set")
b64_ciphertext_blob = b64decode(ciphertext_blob.encode())
response = self.kms_gateway.decrypt(
key_id=self.key_id,
ciphertext_blob=b64_ciphertext_blob,
encryption_context=encryption_context,
grant_tokens=self.grant_tokens,
encryption_algorithm=self.encryption_algorithm,
)
plaintext = b64encode(response["Plaintext"]).decode()
return plaintext

def encrypt(
self, plaintext: str, encryption_context: Optional[Dict[str, str]] = None
) -> str:
if not self.encryption_algorithm:
raise ValueError("No Encryption Algorithm Set")
response = self.kms_gateway.encrypt(
key_id=self.key_id,
plaintext=plaintext,
encryption_context=encryption_context if encryption_context else {},
grant_tokens=self.grant_tokens,
encryption_algorithm=self.encryption_algorithm,
)
ciphertext_blob = b64encode(response["CiphertextBlob"]).decode()
return ciphertext_blob

def verify(self, message: str, signature: str, message_type: str = "RAW") -> str:
b64_signature = b64decode(signature.encode())
response = self.kms_gateway.verify(
key_id=self.key_id,
message=message,
message_type=message_type,
signature=b64_signature,
signing_algorithm=self.signing_algorithm,
grant_tokens=self.grant_tokens,
)
return response["SignatureValid"]
6 changes: 6 additions & 0 deletions onedocker/common/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
# This is the repository path that OneDocker downloads binaries from
ONEDOCKER_REPOSITORY_PATH = "ONEDOCKER_REPOSITORY_PATH"

# This is the repository path that OneDocker downloads binary checksums from
ONEDOCKER_CHECKSUM_REPOSITORY_PATH = "ONEDOCKER_CHECKSUM_REPOSITORY_PATH"

# This is the local path that the binaries reside
ONEDOCKER_EXE_PATH = "ONEDOCKER_EXE_PATH"

# This is the path user specified to upload the core dump file to
CORE_DUMP_REPOSITORY_PATH = "CORE_DUMP_REPOSITORY_PATH"

# This is the type of checksum we want to compare when running program
ONEDOCKER_CHECKSUM_TYPE = "ONEDOCKER_CHECKSUM_TYPE"
33 changes: 26 additions & 7 deletions onedocker/entity/checksum_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@

from __future__ import annotations

import base64
import json

from dataclasses import dataclass
from typing import Any, Dict
from typing import Any, Dict, Optional, Set

from onedocker.entity.checksum_type import ChecksumType
from OpenSSL.crypto import FILETYPE_PEM, load_publickey, verify, X509


@dataclass
Expand All @@ -20,14 +24,16 @@ class ChecksumInfo:
This dataclass tracks a package's checksum info for attestation, to allow for easy comparision and tracking

Fields:
package_name: String containting the package's name
version: String containting the package's version
Checksums: Dict that holds a pairing between a ChecksumType (Key) and the corresponding hash (Value)
package_name: String containing the package name
version: String containing the package version
checksums: Dict that holds a pairing between a ChecksumType (Key) and the corresponding hash (Value)
Signature: Base64 encoded string containing
"""

package_name: str
version: str
checksums: Dict[str, str]
signature: str = ""

def __post_init__(self) -> None:
"""
Expand All @@ -41,7 +47,7 @@ def __post_init__(self) -> None:

def __eq__(self, other: ChecksumInfo) -> bool:
"""
Compares two ChecksumInfo isntannces in previously decided order
Compares two ChecksumInfo instances in previously decided order
1. Compares package_name
2. Compares version
3. Checks if overlaping checkum algorithms are present
Expand All @@ -63,5 +69,18 @@ def __eq__(self, other: ChecksumInfo) -> bool:
return False
return True

def asdict(self) -> Dict[str, Any]:
return self.__dict__
def asdict(self, exclude: Optional[Set[str]] = None) -> Dict[str, Any]:
"""
Returns a dict representation of all fields in ChecksumInfo

Args:
exclude: Set of Strings that contains all fields to exclude from return
"""
return (
self.__dict__
if not exclude
else {
key: self.__dict__[key] for key in self.__dict__ if key not in exclude
}
)

48 changes: 48 additions & 0 deletions onedocker/repository/onedocker_checksum.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

from fbpcp.service.storage import StorageService


class OneDockerChecksumRepository:
def __init__(
self, storage_svc: StorageService, checksum_repository_path: str
) -> None:
self.storage_svc = storage_svc
self.checksum_repository_path = checksum_repository_path

def _build_checksum_path(self, package_name: str, version: str) -> str:
if not self.checksum_repository_path:
raise ValueError(
"Checksum Repository Path not set. Unable to attest Package"
)
return f"{self.checksum_repository_path}{package_name}/{version}/{package_name.split('/')[-1]}.json"

def _file_exists(self, package_name: str, version: str) -> bool:
package_path = self._build_checksum_path(
package_name=package_name, version=version
)
return self.storage_svc.file_exists(package_path)

def write(self, package_name: str, version: str, checksum_data: str) -> None:
package_path = self._build_checksum_path(
package_name=package_name, version=version
)
self.storage_svc.write(filename=package_path, data=checksum_data)

def read(self, package_name: str, version: str) -> str:
package_path = self._build_checksum_path(
package_name=package_name, version=version
)

if not self._file_exists(package_name, version):
raise FileNotFoundError(
f"Cant find checksum file for package {package_name}, version {version}"
)

return self.storage_svc.read(filename=package_path)
Loading