diff --git a/doc/source/data/loading-data.rst b/doc/source/data/loading-data.rst index 4007c9e9b4c3..3d8c00540b09 100644 --- a/doc/source/data/loading-data.rst +++ b/doc/source/data/loading-data.rst @@ -395,11 +395,17 @@ Ray Data interoperates with libraries like pandas, NumPy, and Arrow. .. testoutput:: - MaterializedDataset( - num_blocks=3, - num_rows=3, - schema={food: string, price: double} - ) + shape: (3, 2) + ╭────────┬────────╮ + │ food ┆ price │ + │ --- ┆ --- │ + │ string ┆ double │ + ╞════════╪════════╡ + │ spam ┆ 9.34 │ + │ ham ┆ 5.37 │ + │ eggs ┆ 0.94 │ + ╰────────┴────────╯ + (Showing 3 of 3 rows) You can also create a :class:`~ray.data.dataset.Dataset` from a list of regular Python objects. In the schema, the column name defaults to "item". @@ -414,7 +420,19 @@ Ray Data interoperates with libraries like pandas, NumPy, and Arrow. .. testoutput:: - MaterializedDataset(num_blocks=5, num_rows=5, schema={item: int64}) + shape: (5, 1) + ╭───────╮ + │ item │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + │ 2 │ + │ 3 │ + │ 4 │ + │ 5 │ + ╰───────╯ + (Showing 5 of 5 rows) .. tab-item:: NumPy @@ -434,11 +452,20 @@ Ray Data interoperates with libraries like pandas, NumPy, and Arrow. .. testoutput:: - MaterializedDataset( - num_blocks=1, - num_rows=3, - schema={data: ArrowTensorTypeV2(shape=(2, 2), dtype=double)} - ) + shape: (3, 1) + ╭──────────────────────────────────────────╮ + │ data │ + │ --- │ + │ ArrowTensorTypeV2(shape=(2, 2), dtype=d… │ + ╞══════════════════════════════════════════╡ + │ [[1. 1.] + [1. 1.]] │ + │ [[1. 1.] + [1. 1.]] │ + │ [[1. 1.] + [1. 1.]] │ + ╰──────────────────────────────────────────╯ + (Showing 3 of 3 rows) .. tab-item:: pandas @@ -460,11 +487,17 @@ Ray Data interoperates with libraries like pandas, NumPy, and Arrow. .. testoutput:: - MaterializedDataset( - num_blocks=1, - num_rows=3, - schema={food: object, price: float64} - ) + shape: (3, 2) + ╭────────┬────────╮ + │ food ┆ price │ + │ --- ┆ --- │ + │ object ┆ double │ + ╞════════╪════════╡ + │ spam ┆ 9.34 │ + │ ham ┆ 5.37 │ + │ eggs ┆ 0.94 │ + ╰────────┴────────╯ + (Showing 3 of 3 rows) .. tab-item:: PyArrow @@ -485,11 +518,17 @@ Ray Data interoperates with libraries like pandas, NumPy, and Arrow. .. testoutput:: - MaterializedDataset( - num_blocks=1, - num_rows=3, - schema={food: string, price: double} - ) + shape: (3, 2) + ╭────────┬────────╮ + │ food ┆ price │ + │ --- ┆ --- │ + │ string ┆ double │ + ╞════════╪════════╡ + │ spam ┆ 9.34 │ + │ ham ┆ 5.37 │ + │ eggs ┆ 0.94 │ + ╰────────┴────────╯ + (Showing 3 of 3 rows) .. _loading_datasets_from_distributed_df: diff --git a/doc/source/data/quickstart.rst b/doc/source/data/quickstart.rst index 86a7fff8a40d..7ca66aebc0a2 100644 --- a/doc/source/data/quickstart.rst +++ b/doc/source/data/quickstart.rst @@ -61,7 +61,7 @@ across your cluster for better performance. def transform_batch(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: vec_a = batch["petal length (cm)"] vec_b = batch["petal width (cm)"] - batch["petal area (cm^2)"] = vec_a * vec_b + batch["petal area (cm^2)"] = np.round(vec_a * vec_b, 2) return batch # Apply the transformation to our dataset @@ -74,18 +74,25 @@ across your cluster for better performance. .. testoutput:: - MaterializedDataset( - num_blocks=..., - num_rows=150, - schema={ - sepal length (cm): double, - sepal width (cm): double, - petal length (cm): double, - petal width (cm): double, - target: int64, - petal area (cm^2): double - } - ) + shape: (150, 6) + ╭───────────────────┬──────────────────┬───────────────────┬──────────────────┬────────┬───────────────────╮ + │ sepal length (cm) ┆ sepal width (cm) ┆ petal length (cm) ┆ petal width (cm) ┆ target ┆ petal area (cm^2) │ + │ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │ + │ double ┆ double ┆ double ┆ double ┆ int64 ┆ double │ + ╞═══════════════════╪══════════════════╪═══════════════════╪══════════════════╪════════╪═══════════════════╡ + │ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ 0 ┆ 0.28 │ + │ 4.9 ┆ 3.0 ┆ 1.4 ┆ 0.2 ┆ 0 ┆ 0.28 │ + │ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ 0 ┆ 0.26 │ + │ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ 0 ┆ 0.3 │ + │ 5.0 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ 0 ┆ 0.28 │ + │ … ┆ … ┆ … ┆ … ┆ … ┆ … │ + │ 6.7 ┆ 3.0 ┆ 5.2 ┆ 2.3 ┆ 2 ┆ 11.96 │ + │ 6.3 ┆ 2.5 ┆ 5.0 ┆ 1.9 ┆ 2 ┆ 9.5 │ + │ 6.5 ┆ 3.0 ┆ 5.2 ┆ 2.0 ┆ 2 ┆ 10.4 │ + │ 6.2 ┆ 3.4 ┆ 5.4 ┆ 2.3 ┆ 2 ┆ 12.42 │ + │ 5.9 ┆ 3.0 ┆ 5.1 ┆ 1.8 ┆ 2 ┆ 9.18 │ + ╰───────────────────┴──────────────────┴───────────────────┴──────────────────┴────────┴───────────────────╯ + (Showing 10 of 150 rows) To explore more transformation capabilities, read :ref:`Transforming data `. diff --git a/python/ray/data/_internal/dataset_repr.py b/python/ray/data/_internal/dataset_repr.py new file mode 100644 index 000000000000..1da36a6da4b9 --- /dev/null +++ b/python/ray/data/_internal/dataset_repr.py @@ -0,0 +1,310 @@ +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple + +import numpy as np + +import ray +from ray.data.block import Block, BlockAccessor, BlockMetadata +from ray.types import ObjectRef + +if TYPE_CHECKING: + from ray.data.dataset import Dataset, Schema + +_DATASET_REPR_ELLIPSIS = "…" # Ellipsis marker for truncated cells/rows. +_DATASET_REPR_MAX_ROWS = 10 # Total preview row budget when materialized. +_DATASET_REPR_HEAD_ROWS = 5 # Number of head rows to show before the gap. +_DATASET_REPR_MAX_COLUMN_WIDTH = 40 # Max width per column cell in the table. +_DATASET_REPR_GET_TIMEOUT_S = 30.0 # Timeout for fetching preview blocks. + +__all__ = [ + "_build_dataset_ascii_repr", +] + + +def _build_dataset_ascii_repr( + dataset: "Dataset", + schema: "Schema", + is_materialized: bool, +) -> str: + """Render the dataset as a multi-line tabular string.""" + columns = list(schema.names) + if not columns: + return dataset._plan.get_plan_as_string(dataset.__class__) + + num_rows = dataset._meta_count() + head_rows: List[List[str]] = [] + tail_rows: List[List[str]] = [] + if is_materialized: + head_data, tail_data, _ = _collect_materialized_rows_for_repr(dataset, num_rows) + head_rows = _format_rows_for_repr(head_data, columns) + tail_rows = _format_rows_for_repr(tail_data, columns) + + return _build_dataset_ascii_repr_from_rows( + schema=schema, + num_rows=num_rows, + dataset_name=dataset.name, + is_materialized=is_materialized, + head_rows=head_rows, + tail_rows=tail_rows, + ) + + +def _build_dataset_ascii_repr_from_rows( + *, + schema: "Schema", + num_rows: Optional[int], + dataset_name: Optional[str], + is_materialized: bool, + head_rows: List[List[str]], + tail_rows: List[List[str]], +) -> str: + """Render the dataset repr given schema metadata and preview rows.""" + columns = list(schema.names) + num_cols = len(columns) + shape_line = f"shape: ({num_rows if num_rows is not None else '?'}, {num_cols})" + + # Build header rows from schema. + dtype_strings = [_repr_format_dtype(t) for t in schema.types] + column_headers = [ + _truncate_to_cell_width(str(col), _DATASET_REPR_MAX_COLUMN_WIDTH) + for col in columns + ] + dtype_headers = [ + _truncate_to_cell_width(dtype, _DATASET_REPR_MAX_COLUMN_WIDTH) + for dtype in dtype_strings + ] + separator_row = ["---"] * len(columns) + + # Assemble rows, including an ellipsis gap if needed. + show_gap = bool(head_rows) and bool(tail_rows) + display_rows: List[List[str]] = [] + display_rows.extend(head_rows) + if show_gap: + display_rows.append([_DATASET_REPR_ELLIPSIS] * len(columns)) + display_rows.extend(tail_rows) + + # Render the table with computed column widths. + column_widths = _compute_column_widths( + column_headers, dtype_headers, separator_row, display_rows + ) + + table_lines = _render_table_lines( + column_headers, + dtype_headers, + separator_row, + display_rows, + column_widths, + ) + + # Append a summary line describing row coverage. + num_rows_shown = len(head_rows) + len(tail_rows) + summary_line = ( + f"(Showing {num_rows_shown} of {num_rows} rows)" + if is_materialized + else "(Dataset isn't materialized)" + ) + if is_materialized and num_rows is None: + summary_line = f"(Showing {num_rows_shown} of ? rows)" + + components = [] + if dataset_name is not None: + components.append(f"name: {dataset_name}") + components.extend([shape_line, "\n".join(table_lines), summary_line]) + return "\n".join(components) + + +def _repr_format_dtype(dtype: object) -> str: + """Format a dtype into a compact string for the schema row. + + Dtypes may come from PyArrow, pandas/NumPy, or be plain Python types. + """ + if isinstance(dtype, type): + return dtype.__name__ + name = getattr(dtype, "name", None) + if isinstance(name, str): + return name + return str(dtype) + + +def _collect_materialized_rows_for_repr( + dataset: "Dataset", + num_rows: Optional[int], +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], bool]: + """Collect head/tail rows for preview and whether to show a gap row.""" + block_entries: List[Tuple[ObjectRef, BlockMetadata]] = [] + for ref_bundle in dataset.iter_internal_ref_bundles(): + block_entries.extend(zip(ref_bundle.block_refs, ref_bundle.metadata)) + + if not block_entries: + return [], [], False + + # Compute how many head/tail rows to show within the preview budget. + head_row_limit, tail_row_limit = _determine_preview_row_targets(num_rows) + block_cache: Dict[ObjectRef, Block] = {} + + def _resolve_block(block_ref: ObjectRef) -> Block: + if block_ref not in block_cache: + block_cache[block_ref] = ray.get( + block_ref, timeout=_DATASET_REPR_GET_TIMEOUT_S + ) + return block_cache[block_ref] + + head_rows: List[Dict[str, Any]] = [] + head_remaining = head_row_limit + for block_ref, _ in block_entries: + if head_remaining <= 0: + break + block = _resolve_block(block_ref) + accessor = BlockAccessor.for_block(block) + for row in accessor.iter_rows(public_row_format=True): + head_rows.append(row) + head_remaining -= 1 + if head_remaining <= 0: + break + + tail_rows: List[Dict[str, Any]] = [] + tail_remaining = tail_row_limit + tail_parts: List[List[Dict[str, Any]]] = [] + if tail_remaining > 0: + for block_ref, metadata in reversed(block_entries): + if tail_remaining <= 0: + break + block = _resolve_block(block_ref) + accessor = BlockAccessor.for_block(block) + total_rows = metadata.num_rows + if total_rows is None: + total_rows = accessor.num_rows() + if total_rows == 0: + continue + start = max(0, total_rows - tail_remaining) + sliced_block = accessor.slice(start, total_rows, copy=False) + slice_accessor = BlockAccessor.for_block(sliced_block) + block_rows = list(slice_accessor.iter_rows(public_row_format=True)) + tail_parts.append(block_rows) + tail_remaining -= len(block_rows) + if tail_remaining <= 0: + break + + for part in reversed(tail_parts): + tail_rows.extend(part) + + show_gap = bool(head_rows) and bool(tail_rows) + return head_rows, tail_rows, show_gap + + +def _determine_preview_row_targets(num_rows: Optional[int]) -> Tuple[int, int]: + """Compute how many head and tail rows to preview.""" + max_rows = _DATASET_REPR_MAX_ROWS + if max_rows <= 0: + return 0, 0 + + if num_rows is None or num_rows <= max_rows: + head = num_rows if num_rows is not None else max_rows + return head, 0 + + head = min(_DATASET_REPR_HEAD_ROWS, max_rows) + if head < 0: + head = 0 + tail = max_rows - head + if tail < 0: + tail = 0 + return head, tail + + +def _format_rows_for_repr( + rows: List[Dict[str, Any]], + column_names: List[str], +) -> List[List[str]]: + """Format row dicts into string cell rows for table rendering.""" + formatted_rows: List[List[str]] = [] + for row in rows: + formatted_row = [] + for column in column_names: + value = row.get(column) + formatted_value = _format_value(value) + formatted_row.append( + _truncate_to_cell_width(formatted_value, _DATASET_REPR_MAX_COLUMN_WIDTH) + ) + formatted_rows.append(formatted_row) + return formatted_rows + + +def _format_value(value: Any) -> str: + if isinstance(value, np.generic): + value = value.item() + return str(value) + + +def _truncate_to_cell_width(value: str, max_width: int) -> str: + """Truncate a single cell to the configured max width.""" + if max_width is None: + return value + if max_width <= 0: + return _DATASET_REPR_ELLIPSIS if value else "" + if len(value) <= max_width: + return value + if max_width == 1: + return _DATASET_REPR_ELLIPSIS + return value[: max_width - 1] + _DATASET_REPR_ELLIPSIS + + +def _compute_column_widths( + headers: List[str], + dtype_headers: List[str], + separator_row: List[str], + data_rows: List[List[str]], +) -> List[int]: + """Compute per-column widths for table rendering.""" + column_widths: List[int] = [] + for idx in range(len(headers)): + widths = [ + len(headers[idx]), + len(dtype_headers[idx]), + len(separator_row[idx]), + ] + for row in data_rows: + widths.append(len(row[idx])) + column_widths.append(max(widths)) + return column_widths + + +def _render_table_lines( + headers: List[str], + dtype_headers: List[str], + separator_row: List[str], + data_rows: List[List[str]], + column_widths: List[int], +) -> List[str]: + """Render the full table (borders, headers, data) as lines.""" + lines: List[str] = [] + top = _render_border("╭", "┬", "╮", "─", column_widths) + header_row = _render_row(headers, column_widths) + separator_line = _render_row(separator_row, column_widths) + dtype_row = _render_row(dtype_headers, column_widths) + lines.extend([top, header_row, separator_line, dtype_row]) + + if data_rows: + middle = _render_border("╞", "╪", "╡", "═", column_widths) + lines.append(middle) + for row in data_rows: + lines.append(_render_row(row, column_widths)) + + bottom = _render_border("╰", "┴", "╯", "─", column_widths) + lines.append(bottom) + return lines + + +def _render_border( + left: str, middle: str, right: str, fill: str, column_widths: List[int] +) -> str: + """Render a table border line given column widths.""" + segments = [fill * (width + 2) for width in column_widths] + return f"{left}{middle.join(segments)}{right}" + + +def _render_row(values: List[str], column_widths: List[int]) -> str: + """Render a single table row with padding.""" + cells = [] + for idx, value in enumerate(values): + padded = value.ljust(column_widths[idx]) + cells.append(f" {padded} ") + return f"│{'┆'.join(cells)}│" diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 8cc4f6698352..533152b8264c 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -29,6 +29,7 @@ from ray._common.usage import usage_lib from ray._private.thirdparty.tabulate.tabulate import tabulate from ray.data._internal.compute import ComputeStrategy, TaskPoolStrategy +from ray.data._internal.dataset_repr import _build_dataset_ascii_repr from ray.data._internal.datasource.bigquery_datasink import BigQueryDatasink from ray.data._internal.datasource.clickhouse_datasink import ( ClickHouseDatasink, @@ -243,12 +244,22 @@ class Dataset: 999 >>> # Shuffle this dataset randomly. >>> ds.random_shuffle() # doctest: +ELLIPSIS - RandomShuffle - +- Dataset(num_rows=1000, schema={id: int64}) + shape: (1000, 1) + ╭───────╮ + │ id │ + │ --- │ + │ int64 │ + ╰───────╯ + (Dataset isn't materialized) >>> # Sort it back in order. >>> ds.sort("id") # doctest: +ELLIPSIS - Sort - +- Dataset(num_rows=1000, schema={id: int64}) + shape: (1000, 1) + ╭───────╮ + │ id │ + │ --- │ + │ int64 │ + ╰───────╯ + (Dataset isn't materialized) Both unexecuted and materialized Datasets can be passed between Ray tasks and actors without incurring a copy. Dataset supports conversion to/from several @@ -6273,8 +6284,25 @@ def materialize(self) -> "MaterializedDataset": >>> import ray >>> ds = ray.data.range(10) >>> materialized_ds = ds.materialize() - >>> materialized_ds - MaterializedDataset(num_blocks=..., num_rows=10, schema={id: int64}) + >>> materialized_ds # doctest: +ELLIPSIS + shape: (10, 1) + ╭───────╮ + │ id │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 0 │ + │ 1 │ + │ 2 │ + │ 3 │ + │ 4 │ + │ 5 │ + │ 6 │ + │ 7 │ + │ 8 │ + │ 9 │ + ╰───────╯ + (Showing 10 of 10 rows) Returns: A MaterializedDataset holding the materialized data blocks. @@ -6711,7 +6739,15 @@ def _tab_repr_(self): return Tab(children, titles=["Metadata", "Schema"]) def __repr__(self) -> str: - return self._plan.get_plan_as_string(self.__class__) + return self._tabular_repr() + + def _tabular_repr(self) -> str: + schema = self.schema(fetch_if_missing=False) + if schema is None or not isinstance(schema, Schema): + return self._plan.get_plan_as_string(self.__class__) + + is_materialized = isinstance(self, MaterializedDataset) + return _build_dataset_ascii_repr(self, schema, is_materialized) def __str__(self) -> str: return repr(self) diff --git a/python/ray/data/iterator.py b/python/ray/data/iterator.py index 96cc5e0e454e..cf9325b006a0 100644 --- a/python/ray/data/iterator.py +++ b/python/ray/data/iterator.py @@ -84,10 +84,22 @@ class DataIterator(abc.ABC): Examples: >>> import ray >>> ds = ray.data.range(5) - >>> ds - Dataset(num_rows=5, schema={id: int64}) - >>> ds.iterator() - DataIterator(Dataset(num_rows=5, schema={id: int64})) + >>> ds # doctest: +ELLIPSIS + shape: (5, 1) + ╭───────╮ + │ id │ + │ --- │ + │ int64 │ + ╰───────╯ + (Dataset isn't materialized) + >>> ds.iterator() # doctest: +ELLIPSIS + DataIterator(shape: (5, 1) + ╭───────╮ + │ id │ + │ --- │ + │ int64 │ + ╰───────╯ + (Dataset isn't materialized)) """ @abc.abstractmethod @@ -826,7 +838,7 @@ def to_tf( ... "s3://anonymous@air-example-data/iris.csv" ... ) >>> it = ds.iterator(); it - DataIterator(Dataset(num_rows=?, schema=...)) + DataIterator(Dataset(num_rows=?, schema=Unknown schema)) If your model accepts a single tensor as input, specify a single feature column. @@ -846,9 +858,6 @@ def to_tf( >>> columns_to_concat = ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"] >>> preprocessor = Concatenator(columns=columns_to_concat, output_column_name="features") >>> it = preprocessor.transform(ds).iterator() - >>> it - DataIterator(Concatenator - +- Dataset(num_rows=?, schema=...)) >>> it.to_tf("features", "target") <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))> diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 796d8508892b..0d05c01c3edc 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -174,8 +174,20 @@ def from_items( >>> import ray >>> ds = ray.data.from_items([1, 2, 3, 4, 5]) - >>> ds - MaterializedDataset(num_blocks=..., num_rows=5, schema={item: int64}) + >>> ds # doctest: +ELLIPSIS + shape: (5, 1) + ╭───────╮ + │ item │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + │ 2 │ + │ 3 │ + │ 4 │ + │ 5 │ + ╰───────╯ + (Showing 5 of 5 rows) >>> ds.schema() Column Type ------ ---- @@ -260,8 +272,14 @@ def range( >>> import ray >>> ds = ray.data.range(10000) - >>> ds - Dataset(num_rows=10000, schema={id: int64}) + >>> ds # doctest: +ELLIPSIS + shape: (10000, 1) + ╭───────╮ + │ id │ + │ --- │ + │ int64 │ + ╰───────╯ + (Dataset isn't materialized) >>> ds.map(lambda row: {"id": row["id"] * 2}).take(4) [{'id': 0}, {'id': 2}, {'id': 4}, {'id': 6}] @@ -314,11 +332,14 @@ def range_tensor( >>> import ray >>> ds = ray.data.range_tensor(1000, shape=(2, 2)) - >>> ds - Dataset( - num_rows=1000, - schema={data: ArrowTensorTypeV2(shape=(2, 2), dtype=int64)} - ) + >>> ds # doctest: +ELLIPSIS + shape: (1000, 1) + ╭──────────────────────────────────────────╮ + │ data │ + │ --- │ + │ ArrowTensorTypeV2(shape=(2, 2), dtype=i… │ + ╰──────────────────────────────────────────╯ + (Dataset isn't materialized) >>> ds.map_batches(lambda row: {"data": row["data"] * 2}).take(2) [{'data': array([[0, 0], [0, 0]])}, {'data': array([[2, 2], @@ -1497,7 +1518,7 @@ def read_csv( >>> ray.data.read_csv("s3://anonymous@ray-example-data/different-extensions/", ... file_extensions=["csv"]) - Dataset(num_rows=?, schema=...) + Dataset(num_rows=?, schema=Unknown schema) Args: paths: A single file or directory, or a list of file or directory paths. @@ -1979,7 +2000,7 @@ def read_tfrecords( Examples: >>> import ray >>> ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords") - Dataset(num_rows=?, schema=...) + Dataset(num_rows=?, schema=Unknown schema) We can also read compressed TFRecord files, which use one of the `compression types supported by Arrow >> import pandas as pd >>> import ray >>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}) - >>> ray.data.from_pandas(df) - MaterializedDataset(num_blocks=1, num_rows=3, schema={a: int64, b: int64}) + >>> ray.data.from_pandas(df) # doctest: +ELLIPSIS + shape: (3, 2) + ╭───────┬───────╮ + │ a ┆ b │ + │ --- ┆ --- │ + │ int64 ┆ int64 │ + ╞═══════╪═══════╡ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + ╰───────┴───────╯ + (Showing 3 of 3 rows) Create a Ray Dataset from a list of Pandas DataFrames. - >>> ray.data.from_pandas([df, df]) - MaterializedDataset(num_blocks=2, num_rows=6, schema={a: int64, b: int64}) + >>> ray.data.from_pandas([df, df]) # doctest: +ELLIPSIS + shape: (6, 2) + ╭───────┬───────╮ + │ a ┆ b │ + │ --- ┆ --- │ + │ int64 ┆ int64 │ + ╞═══════╪═══════╡ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + ╰───────┴───────╯ + (Showing 6 of 6 rows) Args: dfs: A pandas dataframe or a list of pandas dataframes. @@ -3128,13 +3172,36 @@ def from_pandas_refs( >>> import pandas as pd >>> import ray >>> df_ref = ray.put(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})) - >>> ray.data.from_pandas_refs(df_ref) - MaterializedDataset(num_blocks=1, num_rows=3, schema={a: int64, b: int64}) + >>> ray.data.from_pandas_refs(df_ref) # doctest: +ELLIPSIS + shape: (3, 2) + ╭───────┬───────╮ + │ a ┆ b │ + │ --- ┆ --- │ + │ int64 ┆ int64 │ + ╞═══════╪═══════╡ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + ╰───────┴───────╯ + (Showing 3 of 3 rows) Create a Ray Dataset from a list of Pandas Dataframes references. - >>> ray.data.from_pandas_refs([df_ref, df_ref]) - MaterializedDataset(num_blocks=2, num_rows=6, schema={a: int64, b: int64}) + >>> ray.data.from_pandas_refs([df_ref, df_ref]) # doctest: +ELLIPSIS + shape: (6, 2) + ╭───────┬───────╮ + │ a ┆ b │ + │ --- ┆ --- │ + │ int64 ┆ int64 │ + ╞═══════╪═══════╡ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + │ 1 ┆ 4 │ + │ 2 ┆ 5 │ + │ 3 ┆ 6 │ + ╰───────┴───────╯ + (Showing 6 of 6 rows) Args: dfs: A Ray object reference to a pandas dataframe, or a list of @@ -3200,13 +3267,30 @@ def from_numpy(ndarrays: Union[np.ndarray, List[np.ndarray]]) -> MaterializedDat >>> import numpy as np >>> import ray >>> arr = np.array([1]) - >>> ray.data.from_numpy(arr) - MaterializedDataset(num_blocks=1, num_rows=1, schema={data: int64}) + >>> ray.data.from_numpy(arr) # doctest: +ELLIPSIS + shape: (1, 1) + ╭───────╮ + │ data │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + ╰───────╯ + (Showing 1 of 1 rows) Create a Ray Dataset from a list of NumPy arrays. - >>> ray.data.from_numpy([arr, arr]) - MaterializedDataset(num_blocks=2, num_rows=2, schema={data: int64}) + >>> ray.data.from_numpy([arr, arr]) # doctest: +ELLIPSIS + shape: (2, 1) + ╭───────╮ + │ data │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + │ 1 │ + ╰───────╯ + (Showing 2 of 2 rows) Args: ndarrays: A NumPy ndarray or a list of NumPy ndarrays. @@ -3233,13 +3317,30 @@ def from_numpy_refs( >>> import numpy as np >>> import ray >>> arr_ref = ray.put(np.array([1])) - >>> ray.data.from_numpy_refs(arr_ref) - MaterializedDataset(num_blocks=1, num_rows=1, schema={data: int64}) + >>> ray.data.from_numpy_refs(arr_ref) # doctest: +ELLIPSIS + shape: (1, 1) + ╭───────╮ + │ data │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + ╰───────╯ + (Showing 1 of 1 rows) Create a Ray Dataset from a list of NumPy array references. - >>> ray.data.from_numpy_refs([arr_ref, arr_ref]) - MaterializedDataset(num_blocks=2, num_rows=2, schema={data: int64}) + >>> ray.data.from_numpy_refs([arr_ref, arr_ref]) # doctest: +ELLIPSIS + shape: (2, 1) + ╭───────╮ + │ data │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + │ 1 │ + ╰───────╯ + (Showing 2 of 2 rows) Args: ndarrays: A Ray object reference to a NumPy ndarray or a list of Ray object @@ -3296,13 +3397,30 @@ def from_arrow( >>> import pyarrow as pa >>> import ray >>> table = pa.table({"x": [1]}) - >>> ray.data.from_arrow(table) - MaterializedDataset(num_blocks=1, num_rows=1, schema={x: int64}) + >>> ray.data.from_arrow(table) # doctest: +ELLIPSIS + shape: (1, 1) + ╭───────╮ + │ x │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + ╰───────╯ + (Showing 1 of 1 rows) Create a Ray Dataset from a list of PyArrow tables. - >>> ray.data.from_arrow([table, table]) - MaterializedDataset(num_blocks=2, num_rows=2, schema={x: int64}) + >>> ray.data.from_arrow([table, table]) # doctest: +ELLIPSIS + shape: (2, 1) + ╭───────╮ + │ x │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + │ 1 │ + ╰───────╯ + (Showing 2 of 2 rows) Args: @@ -3369,13 +3487,30 @@ def from_arrow_refs( >>> import pyarrow as pa >>> import ray >>> table_ref = ray.put(pa.table({"x": [1]})) - >>> ray.data.from_arrow_refs(table_ref) - MaterializedDataset(num_blocks=1, num_rows=1, schema={x: int64}) + >>> ray.data.from_arrow_refs(table_ref) # doctest: +ELLIPSIS + shape: (1, 1) + ╭───────╮ + │ x │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + ╰───────╯ + (Showing 1 of 1 rows) Create a Ray Dataset from a list of PyArrow table references - >>> ray.data.from_arrow_refs([table_ref, table_ref]) - MaterializedDataset(num_blocks=2, num_rows=2, schema={x: int64}) + >>> ray.data.from_arrow_refs([table_ref, table_ref]) # doctest: +ELLIPSIS + shape: (2, 1) + ╭───────╮ + │ x │ + │ --- │ + │ int64 │ + ╞═══════╡ + │ 1 │ + │ 1 │ + ╰───────╯ + (Showing 2 of 2 rows) Args: diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 721daa92b7fe..2d8154c8e6e9 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -58,21 +58,25 @@ def test_schema(ray_start_regular): last_snapshot, ) - assert str(ds2) == "Dataset(num_rows=10, schema={id: int64})" + ds2_schema = ds2.schema(fetch_if_missing=False) + assert ds2_schema is not None + assert ds2_schema.names == ["id"] + assert not isinstance(ds2, MaterializedDataset) last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics(task_count={}), last_snapshot ) - assert ( - str(ds3) == "MaterializedDataset(num_blocks=5, num_rows=10, schema={id: int64})" - ) + ds3_schema = ds3.schema(fetch_if_missing=False) + assert ds3_schema is not None + assert ds3_schema.names == ["id"] + assert isinstance(ds3, MaterializedDataset) last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics(task_count={}), last_snapshot ) - assert ( - str(ds4) == "MaterializedDataset(num_blocks=1, num_rows=5, " - "schema={a: string, b: double})" - ) + ds4_schema = ds4.schema(fetch_if_missing=False) + assert ds4_schema is not None + assert ds4_schema.names == ["a", "b"] + assert isinstance(ds4, MaterializedDataset) last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics(task_count={}), last_snapshot ) @@ -380,65 +384,60 @@ def test_lazy_loading_exponential_rampup(ray_start_regular_shared): _check_none_computed(ds) -def test_dataset_repr(ray_start_regular_shared): - ds = ray.data.range(10, override_num_blocks=10) - assert repr(ds) == "Dataset(num_rows=10, schema={id: int64})" - ds = ds.map_batches(lambda x: x) - assert repr(ds) == ( - "MapBatches()\n+- Dataset(num_rows=10, schema={id: int64})" - ) - ds = ds.filter(lambda x: x["id"] > 0) - assert repr(ds) == ( - "Filter()\n" - "+- MapBatches()\n" - " +- Dataset(num_rows=10, schema={id: int64})" - ) - ds = ds.random_shuffle() +def test_dataset_repr_not_materialized(ray_start_regular_shared, restore_data_context): + ds = ray.data.range(5) assert repr(ds) == ( - "RandomShuffle\n" - "+- Filter()\n" - " +- MapBatches()\n" - " +- Dataset(num_rows=10, schema={id: int64})" - ) - ds = ds.materialize() - assert ( - repr(ds) == "MaterializedDataset(num_blocks=10, num_rows=9, schema={id: int64})" + "shape: (5, 1)\n" + "╭───────╮\n" + "│ id │\n" + "│ --- │\n" + "│ int64 │\n" + "╰───────╯\n" + "(Dataset isn't materialized)" ) - ds = ds.map_batches(lambda x: x) - assert repr(ds) == ( - "MapBatches()\n+- Dataset(num_rows=9, schema={id: int64})" - ) - ds1, ds2 = ds.split(2) - assert ( - repr(ds1) == f"MaterializedDataset(num_blocks=5, num_rows={ds1.count()}, " - "schema={id: int64})" - ) - assert ( - repr(ds2) == f"MaterializedDataset(num_blocks=5, num_rows={ds2.count()}, " - "schema={id: int64})" + +def test_dataset_repr_materialized(ray_start_regular_shared, restore_data_context): + materialized = ray.data.range(5).materialize() + assert repr(materialized) == ( + "shape: (5, 1)\n" + "╭───────╮\n" + "│ id │\n" + "│ --- │\n" + "│ int64 │\n" + "╞═══════╡\n" + "│ 0 │\n" + "│ 1 │\n" + "│ 2 │\n" + "│ 3 │\n" + "│ 4 │\n" + "╰───────╯\n" + "(Showing 5 of 5 rows)" ) - # TODO(scottjlee): include all of the input datasets to union() - # in the repr output, instead of only the resulting unioned dataset. - # TODO(@bveeramani): Handle schemas for n-ary operators like `Union`. - # ds3 = ds1.union(ds2) - # assert repr(ds3) == ("Union\n+- Dataset(num_rows=9, schema={id: int64})") - # ds = ds.zip(ds3) - # assert repr(ds) == ( - # "Zip\n" - # "+- MapBatches()\n" - # "+- Union\n" - # " +- Dataset(num_rows=9, schema={id: int64})" - # ) - - def my_dummy_fn(x): - return x - ds = ray.data.range(10, override_num_blocks=10) - ds = ds.map_batches(my_dummy_fn) - assert repr(ds) == ( - "MapBatches(my_dummy_fn)\n+- Dataset(num_rows=10, schema={id: int64})" +def test_dataset_repr_gap(ray_start_regular_shared, restore_data_context): + ds_with_gap = ray.data.range(20).materialize() + assert repr(ds_with_gap) == ( + "shape: (20, 1)\n" + "╭───────╮\n" + "│ id │\n" + "│ --- │\n" + "│ int64 │\n" + "╞═══════╡\n" + "│ 0 │\n" + "│ 1 │\n" + "│ 2 │\n" + "│ 3 │\n" + "│ 4 │\n" + "│ … │\n" + "│ 15 │\n" + "│ 16 │\n" + "│ 17 │\n" + "│ 18 │\n" + "│ 19 │\n" + "╰───────╯\n" + "(Showing 10 of 20 rows)" ) @@ -860,40 +859,5 @@ def test_dataset_schema_after_read_stats(ray_start_cluster): assert schema == ds.schema() -def test_dataset_plan_as_string(ray_start_cluster): - ds = ray.data.read_parquet("example://iris.parquet", override_num_blocks=8) - assert ds._plan.get_plan_as_string(type(ds)) == ( - "Dataset(\n" - " num_rows=?,\n" - " schema={\n" - " sepal.length: double,\n" - " sepal.width: double,\n" - " petal.length: double,\n" - " petal.width: double,\n" - " variety: string\n" - " }\n" - ")" - ) - for _ in range(5): - ds = ds.map_batches(lambda x: x) - assert ds._plan.get_plan_as_string(type(ds)) == ( - "MapBatches()\n" - "+- MapBatches()\n" - " +- MapBatches()\n" - " +- MapBatches()\n" - " +- MapBatches()\n" - " +- Dataset(\n" - " num_rows=?,\n" - " schema={\n" - " sepal.length: double,\n" - " sepal.width: double,\n" - " petal.length: double,\n" - " petal.width: double,\n" - " variety: string\n" - " }\n" - " )" - ) - - if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/data/tests/test_groupby_e2e.py b/python/ray/data/tests/test_groupby_e2e.py index 421495e334bd..ed8c96ea0e62 100644 --- a/python/ray/data/tests/test_groupby_e2e.py +++ b/python/ray/data/tests/test_groupby_e2e.py @@ -1273,7 +1273,7 @@ def _map_group(df): ) -def test_groupby_map_groups_with_partial(disable_fallback_to_object_extension): +def test_groupby_map_groups_with_partial(disable_fallback_to_object_extension, capsys): """ The partial function name should show up as +- Sort @@ -1297,7 +1297,9 @@ def func(x, y): {"x_add_5": 25}, {"x_add_5": 25}, ] - assert "MapBatches(func)" in ds.__repr__() + ds.explain() + captured = capsys.readouterr() + assert "MapBatches(func)" in captured.out def test_map_groups_generator_udf(ray_start_regular_shared_2_cpus): diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 83d19fd03912..c56db56279ea 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -1323,7 +1323,7 @@ def _map_raising(r): get_pyarrow_version() < MIN_PYARROW_VERSION_TYPE_PROMOTION, reason="Requires pyarrow>=14 for unify_schemas in OneHotEncoder", ) -def test_map_names(target_max_block_size_infinite_or_default): +def test_map_names(target_max_block_size_infinite_or_default, capsys): """To test different UDF format such that the operator has the correct representation. @@ -1333,35 +1333,40 @@ def test_map_names(target_max_block_size_infinite_or_default): ds = ray.data.range(5) - r = ds.map(lambda x: {"id": str(x["id"])}).__repr__() - assert r.startswith("Map()"), r + def _assert_explain_contains(dataset, expected): + dataset.explain() + captured = capsys.readouterr() + assert expected in captured.out, captured.out + + mapped = ds.map(lambda x: {"id": str(x["id"])}) + _assert_explain_contains(mapped, "Map()") class C: def __call__(self, x): return x - r = ds.map(C, concurrency=4).__repr__() - assert r.startswith("Map(C)"), r + mapped = ds.map(C, concurrency=4) + _assert_explain_contains(mapped, "Map(C)") # Simple and partial functions def func(x, y): return x - r = ds.map(func, fn_args=[0]).__repr__() - assert r.startswith("Map(func)") + mapped = ds.map(func, fn_args=[0]) + _assert_explain_contains(mapped, "Map(func)") from functools import partial - r = ds.map(partial(func, y=1)).__repr__() - assert r.startswith("Map(func)"), r + mapped = ds.map(partial(func, y=1)) + _assert_explain_contains(mapped, "Map(func)") # Preprocessor from ray.data.preprocessors import OneHotEncoder ds = ray.data.from_items(["a", "b", "c", "a", "b", "c"]) enc = OneHotEncoder(columns=["item"]) - r = enc.fit_transform(ds).__repr__() - assert "OneHotEncoder" in r, r + transformed = enc.fit_transform(ds) + _assert_explain_contains(transformed, "OneHotEncoder") def test_map_with_max_calls(): diff --git a/python/ray/data/tests/test_mongo.py b/python/ray/data/tests/test_mongo.py index e9ad7dcd43d1..5a93dbda19f5 100644 --- a/python/ray/data/tests/test_mongo.py +++ b/python/ray/data/tests/test_mongo.py @@ -79,9 +79,9 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): override_num_blocks=2, ) assert ds._block_num_rows() == [3, 2] - assert str(ds) == ( - "Dataset(num_rows=5, schema={float_field: double, int_field: int32})" - ) + ds_schema = ds.schema() + assert ds_schema.names == ["float_field", "int_field"] + assert ds_schema.types == [pa.float64(), pa.int32()] assert df.equals(ds.to_pandas()) # Read with schema inference, which will read all columns (including the auto @@ -200,13 +200,11 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): override_num_blocks=2, ).materialize() assert ds._block_num_rows() == [3, 2] - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=2,\n" - " num_rows=5,\n" - " schema={float_field: double, int_field: int32}\n" - ")" - ) + assert ds.num_blocks() == 2 + assert ds.count() == 5 + ds_schema = ds.schema() + assert ds_schema.names == ["float_field", "int_field"] + assert ds_schema.types == [pa.float64(), pa.int32()] assert df.equals(ds.to_pandas()) # Read with schema inference, which will read all columns (including the auto @@ -218,14 +216,11 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): override_num_blocks=2, ).materialize() assert ds._block_num_rows() == [3, 2] - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=2,\n" - " num_rows=5,\n" - " schema={_id: fixed_size_binary[12], float_field: double, " - "int_field: int32}\n" - ")" - ) + assert ds.num_blocks() == 2 + assert ds.count() == 5 + ds_schema = ds.schema() + assert ds_schema.names == ["_id", "float_field", "int_field"] + assert ds_schema.types[1:] == [pa.float64(), pa.int32()] assert df.equals(ds.drop_columns(["_id"]).to_pandas()) # Read with auto-tuned parallelism. @@ -234,14 +229,11 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): database=foo_db, collection=foo_collection, ).materialize() - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=2,\n" - " num_rows=5,\n" - " schema={_id: fixed_size_binary[12], float_field: double, " - "int_field: int32}\n" - ")" - ) + assert ds.num_blocks() == 2 + assert ds.count() == 5 + ds_schema = ds.schema() + assert ds_schema.names == ["_id", "float_field", "int_field"] + assert ds_schema.types[1:] == [pa.float64(), pa.int32()] assert df.equals(ds.drop_columns(["_id"]).to_pandas()) # Read with a parallelism larger than number of rows. @@ -251,7 +243,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): collection=foo_collection, override_num_blocks=1000, ) - assert str(ds) == ("Dataset(num_rows=5, schema=Unknown schema)") + assert ds.schema(fetch_if_missing=False) is None assert df.equals(ds.drop_columns(["_id"]).to_pandas()) # Read a subset of the collection. @@ -263,13 +255,9 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): override_num_blocks=2, ) assert ds._block_num_rows() == [2, 1] - assert str(ds) == ( - "Dataset(\n" - " num_rows=3,\n" - " schema={_id: fixed_size_binary[12], float_field: double, " - "int_field: int32}\n" - ")" - ) + ds_schema = ds.schema() + assert ds_schema.names == ["_id", "float_field", "int_field"] + assert ds_schema.types[1:] == [pa.float64(), pa.int32()] df[df["int_field"] < 3].equals(ds.drop_columns(["_id"]).to_pandas()) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 370a6832089f..a6cd83c6837e 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -1907,10 +1907,7 @@ def test_dataset_name_and_id(): ds = ray.data.range(100, override_num_blocks=20).map_batches(lambda x: x) ds.set_name("test_ds") assert ds.name == "test_ds" - assert str(ds) == ( - "MapBatches()\n" - "+- Dataset(name=test_ds, num_rows=100, schema={id: int64})" - ) + assert "test_ds" in repr(ds) def _run_dataset(ds, expected_name, expected_run_index): with patch_update_stats_actor() as update_fn: @@ -1944,10 +1941,7 @@ def _run_dataset(ds, expected_name, expected_run_index): ds = ray.data.range(100, override_num_blocks=20) ds.set_name("very_loooooooong_name") - assert ( - str(ds) - == "Dataset(name=very_loooooooong_name, num_rows=100, schema={id: int64})" - ) + assert "very_loooooooong_name" in repr(ds) def test_dataset_id_train_ingest(): diff --git a/python/ray/data/tests/test_tensor.py b/python/ray/data/tests/test_tensor.py index 37a2cda8e6a7..f80e56dbabe9 100644 --- a/python/ray/data/tests/test_tensor.py +++ b/python/ray/data/tests/test_tensor.py @@ -329,65 +329,50 @@ def test_tensors_inferred_from_map( lambda _: {"data": np.ones((4, 4))} ) ds = ds.materialize() - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=10,\n" - " num_rows=10,\n" - f" schema={{data: {class_name}(shape=(4, 4), dtype=double)}}\n" - ")" - ) + assert ds.count() == 10 + schema = ds.schema() + assert schema.names == ["data"] + assert str(schema.types[0]) == f"{class_name}(shape=(4, 4), dtype=double)" # Test map_batches. ds = ray.data.range(16, override_num_blocks=4).map_batches( lambda _: {"data": np.ones((3, 4, 4))}, batch_size=2 ) ds = ds.materialize() - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=4,\n" - " num_rows=24,\n" - f" schema={{data: {class_name}(shape=(4, 4), dtype=double)}}\n" - ")" - ) + assert ds.count() == 24 + schema = ds.schema() + assert schema.names == ["data"] + assert str(schema.types[0]) == f"{class_name}(shape=(4, 4), dtype=double)" # Test flat_map. ds = ray.data.range(10, override_num_blocks=10).flat_map( lambda _: [{"data": np.ones((4, 4))}, {"data": np.ones((4, 4))}] ) ds = ds.materialize() - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=10,\n" - " num_rows=20,\n" - f" schema={{data: {class_name}(shape=(4, 4), dtype=double)}}\n" - ")" - ) + assert ds.count() == 20 + schema = ds.schema() + assert schema.names == ["data"] + assert str(schema.types[0]) == f"{class_name}(shape=(4, 4), dtype=double)" # Test map_batches ndarray column. ds = ray.data.range(16, override_num_blocks=4).map_batches( lambda _: pd.DataFrame({"a": [np.ones((4, 4))] * 3}), batch_size=2 ) ds = ds.materialize() - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=4,\n" - " num_rows=24,\n" - " schema={a: TensorDtype(shape=(4, 4), dtype=float64)}\n" - ")" - ) + assert ds.count() == 24 + schema = ds.schema() + assert schema.names == ["a"] + assert str(schema.types[0]) == f"{class_name}(shape=(4, 4), dtype=double)" ds = ray.data.range(16, override_num_blocks=4).map_batches( lambda _: pd.DataFrame({"a": [np.ones((2, 2)), np.ones((3, 3))]}), batch_size=2, ) ds = ds.materialize() - assert str(ds) == ( - "MaterializedDataset(\n" - " num_blocks=4,\n" - " num_rows=16,\n" - " schema={a: TensorDtype(shape=(None, None), dtype=float64)}\n" - ")" - ) + assert ds.count() == 16 + schema = ds.schema() + assert schema.names == ["a"] + assert str(schema.types[0]) == f"{class_name}(shape=(None, None), dtype=double)" @pytest.mark.parametrize("tensor_format", ["v1", "v2"]) diff --git a/python/ray/data/tests/unit/test_dataset_repr.py b/python/ray/data/tests/unit/test_dataset_repr.py new file mode 100644 index 000000000000..bc412d2810fe --- /dev/null +++ b/python/ray/data/tests/unit/test_dataset_repr.py @@ -0,0 +1,61 @@ +import pyarrow as pa + +from ray.data._internal.dataset_repr import _build_dataset_ascii_repr_from_rows +from ray.data.dataset import Schema + + +def test_dataset_repr_from_rows_not_materialized(): + schema = Schema(pa.schema([("a", pa.int64()), ("b", pa.string())])) + text = _build_dataset_ascii_repr_from_rows( + schema=schema, + num_rows=5, + dataset_name="test_ds", + is_materialized=False, + head_rows=[], + tail_rows=[], + ) + assert text == ( + "name: test_ds\n" + "shape: (5, 2)\n" + "╭───────┬────────╮\n" + "│ a ┆ b │\n" + "│ --- ┆ --- │\n" + "│ int64 ┆ string │\n" + "╰───────┴────────╯\n" + "(Dataset isn't materialized)" + ) + + +def test_dataset_repr_from_rows_gap(): + schema = Schema(pa.schema([("id", pa.int64())])) + text = _build_dataset_ascii_repr_from_rows( + schema=schema, + num_rows=12, + dataset_name=None, + is_materialized=True, + head_rows=[["0"], ["1"]], + tail_rows=[["10"], ["11"]], + ) + assert text == ( + "shape: (12, 1)\n" + "╭───────╮\n" + "│ id │\n" + "│ --- │\n" + "│ int64 │\n" + "╞═══════╡\n" + "│ 0 │\n" + "│ 1 │\n" + "│ … │\n" + "│ 10 │\n" + "│ 11 │\n" + "╰───────╯\n" + "(Showing 4 of 12 rows)" + ) + + +if __name__ == "__main__": + import sys + + import pytest + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/train/base_trainer.py b/python/ray/train/base_trainer.py index 07e7f0e4073a..6ec55b1a50ad 100644 --- a/python/ray/train/base_trainer.py +++ b/python/ray/train/base_trainer.py @@ -19,6 +19,7 @@ from ray.air._internal.config import ensure_only_allowed_dataclass_keys_updated from ray.air._internal.usage import AirEntrypoint from ray.air.config import RunConfig, ScalingConfig +from ray.air.constants import MAX_REPR_LENGTH from ray.air.result import Result from ray.train import Checkpoint from ray.train._internal.session import get_session @@ -68,6 +69,43 @@ ) +def _truncate_repr_if_needed(representation: str) -> str: + """Ensure repr strings remain shorter than MAX_REPR_LENGTH.""" + if len(representation) < MAX_REPR_LENGTH: + return representation + + ellipsis = "..." + closing = "" + if representation: + closing = representation[-1] + representation = representation[:-1] + + max_body_len = MAX_REPR_LENGTH - len(ellipsis) - len(closing) - 1 + if max_body_len < 0: + max_body_len = 0 + + truncated_body = representation[:max_body_len] + return f"{truncated_body}{ellipsis}{closing}" + + +def _format_datasets_for_repr(value: Any) -> Any: + """Format datasets for BaseTrainer repr using plan strings.""" + if not isinstance(value, dict): + return value + try: + from ray.data import Dataset + except Exception: + return value + + formatted = {} + for key, dataset in value.items(): + if isinstance(dataset, Dataset): + formatted[key] = dataset._plan.get_plan_as_string(type(dataset)) + else: + formatted[key] = dataset + return formatted + + @PublicAPI(stability="beta") class TrainingFailedError(RuntimeError): """An error indicating that training has failed.""" @@ -459,12 +497,16 @@ def __repr__(self): for parameter, default_value in default_values.items(): value = getattr(self, parameter) if value != default_value: + if parameter == "datasets": + value = _format_datasets_for_repr(value) non_default_arguments.append(f"{parameter}={value!r}") if non_default_arguments: - return f"<{self.__class__.__name__} {' '.join(non_default_arguments)}>" + return _truncate_repr_if_needed( + f"<{self.__class__.__name__} {' '.join(non_default_arguments)}>" + ) - return f"<{self.__class__.__name__}>" + return _truncate_repr_if_needed(f"<{self.__class__.__name__}>") def __new__(cls, *args, **kwargs): # Store the init args as attributes so this can be merged with Tune hparams.