Skip to content

Commit

Permalink
feat(cleanup): Support cleanup on errors and transactions storages (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara authored Jan 14, 2021
1 parent b6e1db3 commit df3a0d5
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 83 deletions.
42 changes: 25 additions & 17 deletions snuba/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
import logging
from datetime import datetime, timedelta
from typing import Optional, Sequence, Tuple
from typing import Optional, Sequence

from snuba import util
from snuba.clickhouse.native import ClickhousePool

from snuba.datasets.schemas.tables import TableSchema
from snuba.datasets.storage import WritableTableStorage

logger = logging.getLogger("snuba.cleanup")


def run_cleanup(
clickhouse: ClickhousePool, database: str, table: str, dry_run: bool = True
clickhouse: ClickhousePool,
storage: WritableTableStorage,
database: str,
dry_run: bool = True,
) -> int:
active_parts = get_active_partitions(clickhouse, database, table)

table = storage.get_table_writer().get_schema().get_local_table_name()

active_parts = get_active_partitions(clickhouse, storage, database, table)
stale_parts = filter_stale_partitions(active_parts)
drop_partitions(clickhouse, database, table, stale_parts, dry_run=dry_run)
return len(stale_parts)


def get_active_partitions(
clickhouse: ClickhousePool, database: str, table: str
clickhouse: ClickhousePool, storage: WritableTableStorage, database: str, table: str
) -> Sequence[util.Part]:
response = clickhouse.execute(
"""
Expand All @@ -32,15 +39,18 @@ def get_active_partitions(
{"database": database, "table": table},
)

events_part_format = [util.PartSegment.DATE, util.PartSegment.RETENTION_DAYS]
return [util.decode_part_str(part, events_part_format) for part, in response]
schema = storage.get_schema()
assert isinstance(schema, TableSchema)
part_format = schema.get_part_format()
assert part_format is not None
return [util.decode_part_str(part, part_format) for part, in response]


def filter_stale_partitions(
parts: Sequence[util.Part], as_of: Optional[datetime] = None
) -> Sequence[Tuple[datetime, int]]:
"""Filter partitions of (datetime, retention_days) down to ones
that are out of the retention window based on `as_of` (default: now)."""
) -> Sequence[util.Part]:
"""Filter partitions down to ones that are out of the retention
window based on `as_of` (default: now)."""

if as_of is None:
as_of = datetime.utcnow()
Expand All @@ -52,28 +62,26 @@ def filter_stale_partitions(
part_last_day = part_date + timedelta(days=6 - part_date.weekday())

if part_last_day < (as_of - timedelta(days=retention_days)):
stale_parts.append((part_date, retention_days))

stale_parts.append(part)
return stale_parts


def drop_partitions(
clickhouse: ClickhousePool,
database: str,
table: str,
parts: Sequence[Tuple[datetime, int]],
parts: Sequence[util.Part],
dry_run: bool = True,
) -> None:
query_template = """\
ALTER TABLE %(database)s.%(table)s DROP PARTITION ('%(date_str)s', %(retention_days)s)
ALTER TABLE %(database)s.%(table)s DROP PARTITION %(partition)s
"""

for part_date, retention_days in parts:
for part in parts:
args = {
"database": database,
"table": table,
"date_str": part_date.strftime("%Y-%m-%d"),
"retention_days": retention_days,
"partition": part.name,
}

query = (query_template % args).strip()
Expand Down
8 changes: 3 additions & 5 deletions snuba/cli/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from snuba.clusters.cluster import ClickhouseClientSettings
from snuba.datasets.storages import StorageKey
from snuba.datasets.storages.factory import get_writable_storage, WRITABLE_STORAGES
from snuba.datasets.storages.factory import get_writable_storage
from snuba.environment import setup_logging


Expand All @@ -25,7 +25,7 @@
"--storage",
"storage_name",
default="events",
type=click.Choice([storage_key.value for storage_key in WRITABLE_STORAGES.keys()]),
type=click.Choice(["events", "errors", "transactions"]),
help="The storage to target",
)
@click.option("--log-level", help="Logging level to use.")
Expand All @@ -50,8 +50,6 @@ def cleanup(

(clickhouse_user, clickhouse_password,) = storage.get_cluster().get_credentials()

table = storage.get_table_writer().get_schema().get_local_table_name()

database = storage.get_cluster().get_database()

if clickhouse_host and clickhouse_port:
Expand All @@ -69,5 +67,5 @@ def cleanup(
ClickhouseClientSettings.CLEANUP
)

num_dropped = run_cleanup(connection, database, table, dry_run=dry_run)
num_dropped = run_cleanup(connection, storage, database, dry_run=dry_run)
logger.info("Dropped %s partitions on %s" % (num_dropped, clickhouse_host))
9 changes: 9 additions & 0 deletions snuba/datasets/schemas/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Optional, Sequence

from snuba import util
from snuba.clickhouse.columns import ColumnSet
from snuba.clusters.cluster import get_cluster
from snuba.clusters.storage_sets import StorageSetKey
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(
storage_set_key: StorageSetKey,
mandatory_conditions: Optional[Sequence[FunctionCall]] = None,
prewhere_candidates: Optional[Sequence[str]] = None,
part_format: Optional[Sequence[util.PartSegment]] = None,
):
self.__local_table_name = local_table_name
self.__table_name = (
Expand All @@ -65,6 +67,7 @@ def __init__(
self.__table_source = TableSource(
self.get_table_name(), columns, mandatory_conditions, prewhere_candidates,
)
self.__part_format = part_format

def get_data_source(self) -> TableSource:
"""
Expand All @@ -87,6 +90,12 @@ def get_table_name(self) -> str:
"""
return self.__table_name

def get_part_format(self) -> Optional[Sequence[util.PartSegment]]:
"""
Partition format required for cleanup and optimize.
"""
return self.__part_format


class WritableTableSchema(TableSchema):
"""
Expand Down
2 changes: 2 additions & 0 deletions snuba/datasets/storages/errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from snuba import util
from snuba.clusters.storage_sets import StorageSetKey
from snuba.datasets.errors_processor import ErrorsProcessor
from snuba.datasets.errors_replacer import ErrorsReplacer, ReplacerState
Expand All @@ -22,6 +23,7 @@
storage_set_key=StorageSetKey.EVENTS,
mandatory_conditions=mandatory_conditions,
prewhere_candidates=prewhere_candidates,
part_format=[util.PartSegment.RETENTION_DAYS, util.PartSegment.DATE],
)

storage = WritableTableStorage(
Expand Down
2 changes: 2 additions & 0 deletions snuba/datasets/storages/events.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from snuba import util
from snuba.clusters.storage_sets import StorageSetKey
from snuba.datasets.errors_replacer import ErrorsReplacer, ReplacerState
from snuba.datasets.events_processor import EventsProcessor
Expand Down Expand Up @@ -25,6 +26,7 @@
storage_set_key=StorageSetKey.EVENTS,
mandatory_conditions=mandatory_conditions,
prewhere_candidates=prewhere_candidates,
part_format=[util.PartSegment.DATE, util.PartSegment.RETENTION_DAYS],
)


Expand Down
2 changes: 2 additions & 0 deletions snuba/datasets/storages/transactions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from snuba import util
from snuba.clickhouse.columns import (
UUID,
Array,
Expand Down Expand Up @@ -80,6 +81,7 @@
storage_set_key=StorageSetKey.TRANSACTIONS,
mandatory_conditions=[],
prewhere_candidates=["event_id", "transaction_name", "transaction", "title"],
part_format=[util.PartSegment.RETENTION_DAYS, util.PartSegment.DATE],
)


Expand Down
13 changes: 4 additions & 9 deletions snuba/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,10 @@
from snuba.clickhouse.native import ClickhousePool
from snuba.datasets.schemas.tables import TableSchema
from snuba.datasets.storage import ReadableTableStorage
from snuba.datasets.storages import StorageKey

logger = logging.getLogger("snuba.optimize")


STORAGE_PARTITION_KEYS = {
StorageKey.EVENTS: [util.PartSegment.DATE, util.PartSegment.RETENTION_DAYS],
StorageKey.ERRORS: [util.PartSegment.RETENTION_DAYS, util.PartSegment.DATE],
StorageKey.TRANSACTIONS: [util.PartSegment.RETENTION_DAYS, util.PartSegment.DATE],
}


def run_optimize(
clickhouse: ClickhousePool,
storage: ReadableTableStorage,
Expand Down Expand Up @@ -87,7 +79,10 @@ def get_partitions_to_optimize(
{"database": database, "table": table},
)

part_format = STORAGE_PARTITION_KEYS[storage.get_storage_key()]
schema = storage.get_schema()
assert isinstance(schema, TableSchema)
part_format = schema.get_part_format()
assert part_format is not None

parts = [util.decode_part_str(part, part_format) for part, count in active_parts]

Expand Down
Loading

0 comments on commit df3a0d5

Please sign in to comment.