Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DONT REVIEW, NOT RELEVANT #78

Draft
wants to merge 12 commits into
base: feature/isaak/local_fs_dataset
Choose a base branch
from
Prev Previous commit
Next Next commit
updates wip
pickles-bread-and-butter committed Aug 21, 2024
commit bfe00cb4ded5a89de37a306a86f751b6b7e049d9
Empty file.
Empty file.
114 changes: 106 additions & 8 deletions tests/core/multi_cloud/gcloud/mocked_classes.py
Original file line number Diff line number Diff line change
@@ -7,21 +7,119 @@
making we cannot mock the underlying behavior 1-1 reducing the use of these
tests dramatically.
"""
import logging
import os
from pathlib import Path


class MockedGCSBlob:
"""
Mocked blob object for abstracting operations on single file similar to
gcloud storage Blob. Keep updated or make updates for api consistency.
"""
def __init__(
self,
bucket: "MockedGCSBucket",
name: str
) -> None:
self.__bucket = bucket
self.__name = name


def exists(
self,
client: "MockedGCSClient"
) -> bool:
return client.check_file_exists(
bucket = self.__bucket.bucket_name,
file_path = self.__name
)



class MockedGCSBucket:
"""
Mocked bucket object for communicating with gcs, only mocks connection calls right
now and gives correct returns.
Mocked bucket object that abstracts comms to client similar to gcloud storage bucket.
Update to keep and/or make the apis consistent.
"""

def __init__():
pass
def __init__(
self,
bucket_name: str
):
self.__bucket_name = bucket_name

def write_file(
self,
client: "MockedGCSClient",
data: bytes,
file_path: str
):
client.write_data(
bucket=self.__bucket_name,
data=data,
full_path=file_path
)

@property
def bucket_name(self):
return self.__bucket_name


class MockedGCSClient:
"""
Mocked client object for communicating with gcs, only mocks conenction calls right
now and gives correct returns.
Mocked client object, manages the mocked file tree.
"""

def __init__():
pass
def __init__(
self,
mock_file_root: str
):
self.__mock_file_root = mock_file_root
self.__bucket_path_registry = {}

def check_file_exists(
self,
bucket: str,
file_path: str
) -> bool:
if bucket not in self.__bucket_path_registry:
logging.warning("Bucket does not exist.")
return False

full_file_path = os.path.join(self.__bucket_path_registry[bucket], file_path)
return os.path.exists(full_file_path)

def create_bucket(
self,
mock_bucket_name: str
) -> None:
if mock_bucket_name in self.__bucket_path_registry:
logging.warning("Mocked bucket already exists, returning")
return

mocked_fs_path = os.path.join(self.__mock_file_root, mock_bucket_name)
os.makedirs(mocked_fs_path, exist_ok=True)
self.__bucket_path_registry[mock_bucket_name] = mocked_fs_path
logging.info(f"Created mocked bucket at {mocked_fs_path}")

def write_data(
self,
bucket: str,
data: bytes,
full_path: str
) -> None:
if not bucket in self.__bucket_path_registry:
logging.warning("Bucket not created yet, run create bucket. Cannot create file.")
return

full_path_on_mock = os.path.join(self.__bucket_path_registry[bucket], full_path)
if os.path.exists(full_path_on_mock):
logging.warning("File path already on system, overwriting")
os.remove(full_path_on_mock)

mocked_dir = Path(full_path_on_mock).parent
os.makedirs(mocked_dir, exist_ok=True)
with open(full_path_on_mock, "wb") as open_file:
open_file.write(data)

37 changes: 36 additions & 1 deletion tests/core/multi_cloud/gcloud/test_gcs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,39 @@
import tempfile
from unittest.mock import patch

import wicker.core.multi_cloud.gcloud.gcs as gcs

from tests.core.multi_cloud.gcloud import mocked_classes


def test_gcloud_file_exists():
pass
with tempfile.TemporaryDirectory() as temp_dir:
with patch("wicker.core.multi_cloud.gcloud.gcs.storage.Blob", mocked_classes.MockedGCSBlob):
mocked_bucket_name = "mocked_wicker_bucket"
mocked_client = mocked_classes.MockedGCSClient(
mock_file_root = temp_dir
)

mocked_client.create_bucket(
mock_bucket_name = mocked_bucket_name
)
mocked_bucket = mocked_classes.MockedGCSBucket(
bucket_name = mocked_bucket_name
)

# mock a file path that looks like it would be on gcloud
file_path = "wicker_head_path/__COLUMN_CONCATENATED_FILES__/test-col-file"
# mock some data that will constitute the file
mock_data = b"test-data-string"

mocked_bucket.write_file(
client = mocked_client,
file_path = file_path,
data = mock_data
)

gcs.gcloud_file_exists(
gcloud_bucket=mocked_bucket,
gcloud_client=mocked_client,
gcloud_file_path=file_path
)
78 changes: 10 additions & 68 deletions wicker/core/datasets.py
Original file line number Diff line number Diff line change
@@ -2,8 +2,6 @@
import logging
import os
from functools import cached_property
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Dict, List, Optional, Tuple, Union

import boto3
@@ -18,6 +16,7 @@
)
from wicker.core.config import get_config # type: ignore
from wicker.core.definitions import DatasetDefinition, DatasetID, DatasetPartition
from wicker.core.parsing import multiproc_file_parse, thread_file_parse
from wicker.core.storage import (
AbstractDataStorage,
FileSystemDataStorage,
@@ -33,71 +32,9 @@

logger = logging.getLogger(__name__)


def get_file_size_s3_multiproc(buckets_keys: List[Tuple[str, str]]) -> int:
"""Get file size of s3 files, most often column files.

This works on any list of buckets and keys but is generally only
used for column files as those are the majority of what is stored on
s3 for Wicker. Wicker also stores parquet files on s3 but those are limited
to one file per dataset and one schema file.

This splits your buckets_keys_list across multiple processes on your local host
where each process is then multi threaded further. This reduces the i/o wait by
parellelizing across all available procs and threads on a single machine.

Args:
buckets_keys: (List[Tuple[str, str]]): A list of buckets and keys for which
to fetch size in bytes on s3. Tuple index 0 is bucket and index 1 is key of the file.

Returns:
int size of file list in bytes.
"""
buckets_keys_chunks = chunk_data_for_split(chunkable_data=buckets_keys, chunk_number=200)

logging.info("Grabbing file information from s3 heads")
pool = Pool(cpu_count() - 1)
return sum(list(pool.map(get_file_size_s3_threaded, buckets_keys_chunks)))


def get_file_size_s3_threaded(buckets_keys_chunks_local: List[Tuple[str, str]]) -> int:
"""Get file size of a list of s3 paths.

Args:
buckets_keys_chunks_local - The list of tuples denoting bucket and key of files on s3 to
parse. Generally column files but will work with any data.

Returns:
int: size of the set of files in bytes
"""
local_chunks = chunk_data_for_split(chunkable_data=buckets_keys_chunks_local, chunk_number=200)
thread_pool = ThreadPool()

return sum(list(thread_pool.map(iterate_bucket_key_chunk_for_size, local_chunks))) # type: ignore


def chunk_data_for_split(chunkable_data: List[Any], chunk_number: int = 500) -> List[List[Any]]:
"""Chunk data into a user specified number of chunks.

Args:
chunkable_data (List[Any]): Data to be chunked into smaller pieces.
chunk_number (int): Number of chunks to form.

Returns:
List[List[Any]]: List of subsets of input data.
"""
local_chunks = []
local_chunk_size = len(chunkable_data) // chunk_number
for i in range(0, chunk_number - 1):
chunk = chunkable_data[i * local_chunk_size : (i + 1) * local_chunk_size]
local_chunks.append(chunk)

last_chunk_size = len(chunkable_data) - (chunk_number * local_chunk_size)
if last_chunk_size > 0:
last_chunk = chunkable_data[-last_chunk_size:]
local_chunks.append(last_chunk)

return local_chunks

def thread_func_head(buckets_keys_chunks_local: List[Tuple[str, str]]):
thread_file_parse(buckets_keys_chunks_local, iterate_bucket_key_chunk_for_size, sum)


def iterate_bucket_key_chunk_for_size(bucket_key_locs: List[Tuple[str, str]]) -> int: # type: ignore
@@ -486,7 +423,12 @@ def _get_dataset_partition_size(self) -> int:

# pass the data to the multi proc management function
buckets_keys_list = list(buckets_keys)
column_files_byte_size = get_file_size_s3_multiproc(buckets_keys_list)

column_files_byte_size = multiproc_file_parse(
buckets_keys=buckets_keys_list,
function_for_process=thread_func_head,
result_collapse_func=sum
)
return column_files_byte_size + par_dir_bytes

@cached_property
20 changes: 19 additions & 1 deletion wicker/core/multi_cloud/gcloud/gcs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Set

from google.cloud import storage


def gcloud_file_exists(
gcloud_bucket: storage.Bucket,
gcloud_client: storage.Client,
@@ -12,11 +15,26 @@ def gcloud_file_exists(

Args:
gcloud_bucket (storage.Bucket): GCloud bucket to use for data storage.
gcloud_client (storage.Client): GCloud client to usexists(gcloud_client)e for existance checking.
gcloud_client (storage.Client): GCloud client to use for existance checking.
gcloud_file_path (str): Path on GCS to file

Returns:
bool: Existance or non existance
"""
gcloud_blob = storage.Blob(bucket=gcloud_bucket, name=gcloud_file_path)
return gcloud_blob.exists(gcloud_client)


def get_non_existant_file_set(
file_set: Set[str]
) -> Set[str]:
"""Get subset of files that do not exist on gcs.

Uses the definitions in the gcloud config to get the files
subset that do not exist on gcloud. Used for generating manifest
file for gcloud transfer service in general.

Args:
file_set (Set[str]): set of files to parse
"""

92 changes: 92 additions & 0 deletions wicker/core/parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ThreadPool

from typing import Any, List, Tuple


def chunk_data_for_split(chunkable_data: List[Any], chunk_number: int = 500) -> List[List[Any]]:
"""Chunk data into a user specified number of chunks.
Args:
chunkable_data (List[Any]): Data to be chunked into smaller pieces.
chunk_number (int): Number of chunks to form.
Returns:
List[List[Any]]: List of subsets of input data.
"""
local_chunks = []
local_chunk_size = len(chunkable_data) // chunk_number
for i in range(0, chunk_number - 1):
chunk = chunkable_data[i * local_chunk_size : (i + 1) * local_chunk_size]
local_chunks.append(chunk)

last_chunk_size = len(chunkable_data) - (chunk_number * local_chunk_size)
if last_chunk_size > 0:
last_chunk = chunkable_data[-last_chunk_size:]
local_chunks.append(last_chunk)

return local_chunks


def multiproc_file_parse(
buckets_keys: List[Tuple[str, str]],
function_for_process: Any,
result_collapse_func: Any = None
) -> Any:
"""Get file size of s3 files, most often column files.
This works on any list of buckets and keys but is generally only
used for column files as those are the majority of what is stored on
s3 for Wicker. Wicker also stores parquet files on s3 but those are limited
to one file per dataset and one schema file.
This splits your buckets_keys_list across multiple processes on your local host
where each process is then multi threaded further. This reduces the i/o wait by
parellelizing across all available procs and threads on a single machine.
Args:
buckets_keys: (List[Tuple[str, str]]): A list of buckets and keys for which
to multi process. Tuple index 0 is bucket and index 1 is key of the file.
function_for_process (Any): The process to run for each of the bucket key chunks
result_collapse_func (Any): The process to run on the results of proc to collapse.
Defaults to None
Returns:
int size of file list in bytes.
"""
buckets_keys_chunks = chunk_data_for_split(chunkable_data=buckets_keys, chunk_number=200)

logging.info("Grabbing file information from s3 heads")
pool = Pool(cpu_count() - 1)
results = list(pool.map(function_for_process, buckets_keys_chunks))
if result_collapse_func is not None:
return result_collapse_func(results)
return results


def thread_file_parse(
buckets_keys_chunks_local: List[Tuple[str, str]],
function_for_thread: Any,
result_collapse_func: Any = None
) -> Any:
"""Get file size of a list of s3 paths.
Args:
buckets_keys_chunks_local (List[Tuple[str, str]]):
The list of tuples denoting bucket and key of files on s3 to
parse. Generally column files but will work with any data.
function_for_thread (Any): The function to run on each thread
result_collapse_func (Any): The process to run on the results of proc to collapse.
Defaults to None
Returns:
int: size of the set of files in bytes
"""
local_chunks = chunk_data_for_split(chunkable_data=buckets_keys_chunks_local, chunk_number=200)
thread_pool = ThreadPool()

results = list(thread_pool.map(function_for_thread, local_chunks)) # type: ignore
if result_collapse_func is not None:
return result_collapse_func(results)
return results