Skip to content

Commit

Permalink
feat(primitives): ind & aind
Browse files Browse the repository at this point in the history
  • Loading branch information
vladyoslav committed Oct 25, 2024
1 parent 60dacbe commit cd8095f
Show file tree
Hide file tree
Showing 20 changed files with 324 additions and 5 deletions.
6 changes: 4 additions & 2 deletions dev-docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ services:
context: .
command: /celery
volumes:
- ./volumes/uploads:${UPLOADED_FILES_DIR_PATH}
- ./volumes/uploads:/volumes/uploads
env_file:
- .env
environment:
- RABBITMQ_HOST=rabbitmq
- POSTGRES_HOST=postgres
- UPLOADED_FILES_DIR_PATH=/volumes/uploads
depends_on:
- rabbitmq
restart: always
Expand All @@ -58,12 +59,13 @@ services:
context: .
command: /flower
volumes:
- ./volumes/uploads:${UPLOADED_FILES_DIR_PATH}
- ./volumes/uploads:/volumes/uploads
env_file:
- .env
environment:
- RABBITMQ_HOST=rabbitmq
- POSTGRES_HOST=postgres
- UPLOADED_FILES_DIR_PATH=/volumes/uploads
depends_on:
- rabbitmq
- celery
Expand Down
1 change: 1 addition & 0 deletions internal/domain/task/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from internal.domain.task.entities import FdTask # noqa: F401
from internal.domain.task.entities import AfdTask # noqa: F401
from internal.domain.task.entities import AcTask # noqa: F401
from internal.domain.task.entities import IndTask # noqa: F401
6 changes: 6 additions & 0 deletions internal/domain/task/entities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from internal.domain.task.entities.fd import FdTask
from internal.domain.task.entities.afd import AfdTask
from internal.domain.task.entities.ac import AcTask
from internal.domain.task.entities.ind import IndTask
from internal.domain.task.entities.aind import AindTask
from internal.domain.task.value_objects import PrimitiveName


Expand All @@ -26,4 +28,8 @@ def match_task_by_primitive_name(primitive_name: PrimitiveName):
return AfdTask()
case PrimitiveName.ac:
return AcTask()
case PrimitiveName.ind:
return IndTask()
case PrimitiveName.aind:
return AindTask()
assert_never(primitive_name)
1 change: 1 addition & 0 deletions internal/domain/task/entities/aind/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from internal.domain.task.entities.aind.aind_task import AindTask # noqa: F401
55 changes: 55 additions & 0 deletions internal/domain/task/entities/aind/aind_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from desbordante.ind import IndAlgorithm
from desbordante.aind.algorithms import Mind, Spider
from internal.domain.task.entities.task import Task
from internal.domain.task.value_objects import PrimitiveName, IncorrectAlgorithmName
from internal.domain.task.value_objects.aind import (
AindAlgoName,
AindTaskConfig,
AindTaskResult,
)
from internal.domain.task.value_objects.aind import AindAlgoResult, AindModel


class AindTask(Task[IndAlgorithm, AindTaskConfig, AindTaskResult]):
"""
Task class for Inclusion Dependency (AIND) profiling.
This class executes various AIND algorithms and processes the results
into the appropriate format. It implements abstract methods from the Task base class.
Methods:
- _match_algo_by_name(algo_name: AindAlgoName) -> AindAlgorithm:
Match AIND algorithm by its name.
- _collect_result(algo: AindAlgorithm) -> AindTaskResult:
Process the output of the AIND algorithm and return the result.
"""

def _collect_result(self, algo: IndAlgorithm) -> AindTaskResult:
"""
Collect and process the AIND result.
Args:
algo (AindAlgorithm): AIND algorithm to process.
Returns:
AindTaskResult: Processed result containing AINDs.
"""
ainds = algo.get_inds()
algo_result = AindAlgoResult(inds=[AindModel.from_ind(aind) for aind in ainds])
return AindTaskResult(primitive_name=PrimitiveName.aind, result=algo_result)

def _match_algo_by_name(self, algo_name: str) -> IndAlgorithm:
"""
Match the inclusion dependency algorithm by name.
Args:
algo_name (AindAlgoName): Name of the AIND algorithm.
Returns:
AindAlgorithm: The corresponding algorithm instance.
"""
match algo_name:
case AindAlgoName.Mind:
return Mind()
case AindAlgoName.Spider:
return Spider()
case _:
raise IncorrectAlgorithmName(algo_name, "AIND")
1 change: 1 addition & 0 deletions internal/domain/task/entities/ind/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from internal.domain.task.entities.ind.ind_task import IndTask # noqa: F401
57 changes: 57 additions & 0 deletions internal/domain/task/entities/ind/ind_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from desbordante.ind import IndAlgorithm
from desbordante.ind.algorithms import Faida, Mind, Spider
from internal.domain.task.entities.task import Task
from internal.domain.task.value_objects import PrimitiveName, IncorrectAlgorithmName
from internal.domain.task.value_objects.ind import (
IndAlgoName,
IndTaskConfig,
IndTaskResult,
)
from internal.domain.task.value_objects.ind import IndAlgoResult, IndModel


class IndTask(Task[IndAlgorithm, IndTaskConfig, IndTaskResult]):
"""
Task class for Inclusion Dependency (IND) profiling.
This class executes various IND algorithms and processes the results
into the appropriate format. It implements abstract methods from the Task base class.
Methods:
- _match_algo_by_name(algo_name: IndAlgoName) -> IndAlgorithm:
Match IND algorithm by its name.
- _collect_result(algo: IndAlgorithm) -> IndTaskResult:
Process the output of the IND algorithm and return the result.
"""

def _collect_result(self, algo: IndAlgorithm) -> IndTaskResult:
"""
Collect and process the IND result.
Args:
algo (IndAlgorithm): IND algorithm to process.
Returns:
IndTaskResult: Processed result containing INDs.
"""
inds = algo.get_inds()
algo_result = IndAlgoResult(inds=[IndModel.from_ind(ind) for ind in inds])
return IndTaskResult(primitive_name=PrimitiveName.ind, result=algo_result)

def _match_algo_by_name(self, algo_name: str) -> IndAlgorithm:
"""
Match the inclusion dependency algorithm by name.
Args:
algo_name (IndAlgoName): Name of the IND algorithm.
Returns:
IndAlgorithm: The corresponding algorithm instance.
"""
match algo_name:
case IndAlgoName.Faida:
return Faida()
case IndAlgoName.Mind:
return Mind()
case IndAlgoName.Spider:
return Spider()
case _:
raise IncorrectAlgorithmName(algo_name, "IND")
6 changes: 5 additions & 1 deletion internal/domain/task/entities/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def execute(self, table: pandas.DataFrame, task_config: C) -> R:
algo_config = task_config.config
options = algo_config.model_dump(exclude_unset=True, exclude={"algo_name"})
algo = self._match_algo_by_name(algo_config.algo_name)
algo.load_data(table=table)
# TODO: IND, AIND requires multiple tables
try:
algo.load_data(table=table)
except desbordante.ConfigurationError:
algo.load_data(tables=[table])
algo.execute(**options)
return self._collect_result(algo)
6 changes: 4 additions & 2 deletions internal/domain/task/value_objects/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from internal.domain.task.value_objects.afd import AfdTaskConfig, AfdTaskResult
from internal.domain.task.value_objects.fd import FdTaskConfig, FdTaskResult
from internal.domain.task.value_objects.ac import AcTaskConfig, AcTaskResult
from internal.domain.task.value_objects.ind import IndTaskConfig, IndTaskResult
from internal.domain.task.value_objects.aind import AindTaskConfig, AindTaskResult

from internal.domain.task.value_objects.config import TaskConfig # noqa: F401
from internal.domain.task.value_objects.result import TaskResult # noqa: F401
Expand All @@ -22,11 +24,11 @@
)

OneOfTaskConfig = Annotated[
Union[FdTaskConfig, AfdTaskConfig, AcTaskConfig],
Union[FdTaskConfig, AfdTaskConfig, AcTaskConfig, IndTaskConfig, AindTaskConfig],
Field(discriminator="primitive_name"),
]

OneOfTaskResult = Annotated[
Union[FdTaskResult, AfdTaskResult, AcTaskResult],
Union[FdTaskResult, AfdTaskResult, AcTaskResult, IndTaskResult, AindTaskResult],
Field(discriminator="primitive_name"),
]
23 changes: 23 additions & 0 deletions internal/domain/task/value_objects/aind/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Literal

from pydantic import BaseModel

from internal.domain.task.value_objects.primitive_name import PrimitiveName
from internal.domain.task.value_objects.aind.algo_config import OneOfAindAlgoConfig
from internal.domain.task.value_objects.aind.algo_result import ( # noqa: F401
AindAlgoResult,
AindModel,
)
from internal.domain.task.value_objects.aind.algo_name import AindAlgoName # noqa: F401


class BaseAindTaskModel(BaseModel):
primitive_name: Literal[PrimitiveName.aind]


class AindTaskConfig(BaseAindTaskModel):
config: OneOfAindAlgoConfig


class AindTaskResult(BaseAindTaskModel):
result: AindAlgoResult
35 changes: 35 additions & 0 deletions internal/domain/task/value_objects/aind/algo_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Literal, Annotated
from pydantic import Field
from internal.domain.common import OptionalModel
from internal.domain.task.value_objects.aind.algo_name import AindAlgoName
from internal.domain.task.value_objects.aind.algo_descriptions import descriptions


class BaseAindConfig(OptionalModel):
__non_optional_fields__ = {
"algo_name",
}


class MindConfig(BaseAindConfig):
algo_name: Literal[AindAlgoName.Mind]

max_arity: Annotated[int, Field(gt=0, description=descriptions["max_arity"])]
error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])]


class SpiderConfig(BaseAindConfig):
algo_name: Literal[AindAlgoName.Spider]

error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])]
is_null_equal_null: Annotated[
bool, Field(description=descriptions["is_null_equal_null"])
]
threads: Annotated[int, Field(ge=0, description=descriptions["threads"])]
mem_limit: Annotated[int, Field(gt=0, description=descriptions["mem_limit"])]


OneOfAindAlgoConfig = Annotated[
MindConfig | SpiderConfig,
Field(discriminator="algo_name"),
]
11 changes: 11 additions & 0 deletions internal/domain/task/value_objects/aind/algo_descriptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
descriptions = {
"max_arity": "Maximum arity of the inclusion dependency (IND).",
"sample_size": "Size of table sample for IND profiling.",
"ignore_constant_cols": "Ignore INDs containing columns with only one value for improved performance.",
"hll_accuracy": "HyperLogLog approximation accuracy. Closer to 0 means higher accuracy and memory usage.",
"ignore_null_cols": "Ignore INDs containing columns filled only with NULLs.",
"threads": "Number of threads to use. If 0, use all available threads.",
"error": "Error threshold for approximate IND algorithms.",
"is_null_equal_null": "Specify whether two NULL values should be treated as equal.",
"mem_limit": "Memory limit in MBs for the algorithm.",
}
6 changes: 6 additions & 0 deletions internal/domain/task/value_objects/aind/algo_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from enum import StrEnum, auto


class AindAlgoName(StrEnum):
Mind = auto()
Spider = auto()
5 changes: 5 additions & 0 deletions internal/domain/task/value_objects/aind/algo_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from internal.domain.task.value_objects.ind.algo_result import IndAlgoResult, IndModel


AindAlgoResult = IndAlgoResult
AindModel = IndModel
23 changes: 23 additions & 0 deletions internal/domain/task/value_objects/ind/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from typing import Literal

from pydantic import BaseModel

from internal.domain.task.value_objects.primitive_name import PrimitiveName
from internal.domain.task.value_objects.ind.algo_config import OneOfIndAlgoConfig
from internal.domain.task.value_objects.ind.algo_result import ( # noqa: F401
IndAlgoResult,
IndModel,
)
from internal.domain.task.value_objects.ind.algo_name import IndAlgoName # noqa: F401


class BaseIndTaskModel(BaseModel):
primitive_name: Literal[PrimitiveName.ind]


class IndTaskConfig(BaseIndTaskModel):
config: OneOfIndAlgoConfig


class IndTaskResult(BaseIndTaskModel):
result: IndAlgoResult
52 changes: 52 additions & 0 deletions internal/domain/task/value_objects/ind/algo_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Literal, Annotated
from pydantic import Field
from internal.domain.common import OptionalModel
from internal.domain.task.value_objects.ind.algo_name import IndAlgoName
from internal.domain.task.value_objects.ind.algo_descriptions import descriptions


class BaseIndConfig(OptionalModel):
__non_optional_fields__ = {
"algo_name",
}


class FaidaConfig(BaseIndConfig):
algo_name: Literal[IndAlgoName.Faida]

max_arity: Annotated[int, Field(gt=0, description=descriptions["max_arity"])]
sample_size: Annotated[int, Field(gt=0, description=descriptions["sample_size"])]
ignore_constant_cols: Annotated[
bool, Field(description=descriptions["ignore_constant_cols"])
]
hll_accuracy: Annotated[
float, Field(gt=0, description=descriptions["hll_accuracy"])
]
ignore_null_cols: Annotated[
bool, Field(description=descriptions["ignore_null_cols"])
]
threads: Annotated[int, Field(ge=0, description=descriptions["threads"])]


class MindConfig(BaseIndConfig):
algo_name: Literal[IndAlgoName.Mind]

max_arity: Annotated[int, Field(gt=0, description=descriptions["max_arity"])]
error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])]


class SpiderConfig(BaseIndConfig):
algo_name: Literal[IndAlgoName.Spider]

error: Annotated[float, Field(ge=0, le=1.0, description=descriptions["error"])]
is_null_equal_null: Annotated[
bool, Field(description=descriptions["is_null_equal_null"])
]
threads: Annotated[int, Field(ge=0, description=descriptions["threads"])]
mem_limit: Annotated[int, Field(gt=0, description=descriptions["mem_limit"])]


OneOfIndAlgoConfig = Annotated[
FaidaConfig | MindConfig | SpiderConfig,
Field(discriminator="algo_name"),
]
11 changes: 11 additions & 0 deletions internal/domain/task/value_objects/ind/algo_descriptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
descriptions = {
"max_arity": "Maximum arity of the inclusion dependency (IND).",
"sample_size": "Size of table sample for IND profiling.",
"ignore_constant_cols": "Ignore INDs containing columns with only one value for improved performance.",
"hll_accuracy": "HyperLogLog approximation accuracy. Closer to 0 means higher accuracy and memory usage.",
"ignore_null_cols": "Ignore INDs containing columns filled only with NULLs.",
"threads": "Number of threads to use. If 0, use all available threads.",
"error": "Error threshold for approximate IND algorithms.",
"is_null_equal_null": "Specify whether two NULL values should be treated as equal.",
"mem_limit": "Memory limit in MBs for the algorithm.",
}
7 changes: 7 additions & 0 deletions internal/domain/task/value_objects/ind/algo_name.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import StrEnum, auto


class IndAlgoName(StrEnum):
Faida = auto()
Mind = auto()
Spider = auto()
Loading

0 comments on commit cd8095f

Please sign in to comment.