diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 1e496fe0d6f7..a91a4aefbabe 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -34,6 +34,112 @@ logger = logging.getLogger(__name__) +def _format_dataset_as_table( + schema, num_rows, num_blocks=None, sample_rows=None, max_rows=10 +) -> str: + """Format dataset information as a polars-style table. + + Args: + schema: Dataset schema with .names and .types attributes + num_rows: Total number of rows (or "?" if unknown) + num_blocks: Number of blocks (only for MaterializedDataset) + sample_rows: Optional list of sample row dicts to display + max_rows: Maximum number of rows to show + + Returns: + Formatted table string with box-drawing characters + """ + if not schema or not schema.names: + return f"Dataset(num_rows={num_rows})" + + col_names = schema.names + col_types = [str(t) if not hasattr(t, "__name__") else t.__name__ + for t in schema.types] + + # Calculate column widths + col_widths = [] + for i, name in enumerate(col_names): + width = max( + len(name), + len(col_types[i]) + 4, # "--- " prefix + 10 # minimum width + ) + if sample_rows: + for row in sample_rows[:max_rows]: + if name in row: + val_str = str(row[name]) + # Truncate long values + if len(val_str) > 50: + val_str = val_str[:47] + "..." + width = max(width, len(val_str)) + col_widths.append(min(width, 50)) # Cap at 50 chars + + # Build table + lines = [] + + # Shape line (like polars) + shape_str = f"shape: ({num_rows}, {len(col_names)})" + if num_blocks is not None: + shape_str += f", num_blocks: {num_blocks}" + lines.append(shape_str) + + # Top border + parts = ["┌"] + for i, width in enumerate(col_widths): + parts.append("─" * (width + 2)) + parts.append("┬" if i < len(col_widths) - 1 else "┐") + lines.append("".join(parts)) + + # Column names + parts = ["│"] + for name, width in zip(col_names, col_widths): + parts.append(f" {name:{width}} │") + lines.append("".join(parts)) + + # Type separator with "---" + parts = ["│"] + for col_type, width in zip(col_types, col_widths): + type_str = f"--- {col_type}" + parts.append(f" {type_str:{width}} │") + lines.append("".join(parts)) + + # Header separator + parts = ["╞"] + for i, width in enumerate(col_widths): + parts.append("═" * (width + 2)) + parts.append("╪" if i < len(col_widths) - 1 else "╡") + lines.append("".join(parts)) + + # Data rows (if materialized) + if sample_rows: + show_rows = min(len(sample_rows), max_rows) + for row_idx, row in enumerate(sample_rows[:show_rows]): + parts = ["│"] + for name, width in zip(col_names, col_widths): + val = row.get(name, "") + val_str = str(val) + if len(val_str) > width: + val_str = val_str[:width-3] + "..." + parts.append(f" {val_str:{width}} │") + lines.append("".join(parts)) + + # Show ellipsis if there are more rows + if num_rows != "?" and isinstance(num_rows, int) and num_rows > show_rows: + parts = ["│"] + for width in col_widths: + parts.append(f" {'…':{width}} │") + lines.append("".join(parts)) + + # Bottom border + parts = ["└"] + for i, width in enumerate(col_widths): + parts.append("─" * (width + 2)) + parts.append("┴" if i < len(col_widths) - 1 else "┘") + lines.append("".join(parts)) + + return "\n".join(lines) + + class ExecutionPlan: """A lazy execution plan for a Dataset. @@ -253,82 +359,23 @@ def get_plan_as_string(self, dataset_cls: Type["Dataset"]) -> str: num_blocks = self.initial_num_blocks() assert num_blocks is not None - name_str = ( - "name={}, ".format(self._dataset_name) - if self._dataset_name is not None - else "" - ) - num_blocks_str = f"num_blocks={num_blocks}, " if num_blocks else "" - - dataset_str = "{}({}{}num_rows={}, schema={})".format( - dataset_cls.__name__, - name_str, - num_blocks_str, - count, - schema_str, + # Use table formatter for better UX (polars-style) + dataset_str = _format_dataset_as_table( + schema=schema, + num_rows=count, + num_blocks=num_blocks, + sample_rows=None # TODO: Add sample data fetching in future PR ) - # If the resulting string representation fits in one line, use it directly. - SCHEMA_LINE_CHAR_LIMIT = 80 - MIN_FIELD_LENGTH = 10 INDENT_STR = " " * 3 - trailing_space = INDENT_STR * plan_max_depth - - if len(dataset_str) > SCHEMA_LINE_CHAR_LIMIT: - # If the resulting string representation exceeds the line char limit, - # first try breaking up each `Dataset` parameter into its own line - # and check if each line fits within the line limit. We check the - # `schema` param's length, since this is likely the longest string. - schema_str_on_new_line = f"{trailing_space}{INDENT_STR}schema={schema_str}" - if len(schema_str_on_new_line) > SCHEMA_LINE_CHAR_LIMIT: - # If the schema cannot fit on a single line, break up each field - # into its own line. - schema_str = [] - for n, t in zip(schema.names, schema.types): - if hasattr(t, "__name__"): - t = t.__name__ - col_str = f"{trailing_space}{INDENT_STR * 2}{n}: {t}" - # If the field line exceeds the char limit, abbreviate - # the field name to fit while maintaining the full type - if len(col_str) > SCHEMA_LINE_CHAR_LIMIT: - shortened_suffix = f"...: {str(t)}" - # Show at least 10 characters of the field name, even if - # we have already hit the line limit with the type. - chars_left_for_col_name = max( - SCHEMA_LINE_CHAR_LIMIT - len(shortened_suffix), - MIN_FIELD_LENGTH, - ) - col_str = ( - f"{col_str[:chars_left_for_col_name]}{shortened_suffix}" - ) - schema_str.append(col_str) - schema_str = ",\n".join(schema_str) - schema_str = ( - "{\n" + schema_str + f"\n{trailing_space}{INDENT_STR}" + "}" - ) - name_str = ( - f"\n{trailing_space}{INDENT_STR}name={self._dataset_name}," - if self._dataset_name is not None - else "" - ) - num_blocks_str = ( - f"\n{trailing_space}{INDENT_STR}num_blocks={num_blocks}," - if num_blocks - else "" - ) - dataset_str = ( - f"{dataset_cls.__name__}(" - f"{name_str}" - f"{num_blocks_str}" - f"\n{trailing_space}{INDENT_STR}num_rows={count}," - f"\n{trailing_space}{INDENT_STR}schema={schema_str}" - f"\n{trailing_space})" - ) - if plan_max_depth == 0: plan_str += dataset_str else: - plan_str += f"{INDENT_STR * (plan_max_depth - 1)}+- {dataset_str}" + # Indent each line of the table for nested plans + indented_lines = [] + for line in dataset_str.split("\n"): + indented_lines.append(f"{INDENT_STR * (plan_max_depth - 1)}+- {line}") + plan_str += "\n".join(indented_lines) return plan_str def link_logical_plan(self, logical_plan: "LogicalPlan"):