Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ private InputFormatConfig() {

public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";

public static final String SORT_ORDER = "sort.order";
public static final String SORT_COLUMNS = "sort.columns";

public enum InMemoryDataModel {
HIVE,
GENERIC // Default data model is of Iceberg Generics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.mr.hive;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -41,6 +42,8 @@
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFieldDesc;
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFields;
import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
Expand Down Expand Up @@ -74,6 +77,8 @@
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.RowLevelOperationMode.MERGE_ON_READ;
import static org.apache.iceberg.mr.InputFormatConfig.SORT_COLUMNS;
import static org.apache.iceberg.mr.InputFormatConfig.SORT_ORDER;

public class BaseHiveIcebergMetaHook implements HiveMetaHook {
private static final Logger LOG = LoggerFactory.getLogger(BaseHiveIcebergMetaHook.class);
Expand Down Expand Up @@ -217,28 +222,83 @@ private void validateCatalogConfigsDefined() {
}
}

/**
* Persists the table's write sort order based on the HMS property 'default-sort-order'
* that is populated by the DDL layer.
* <p>
* Behaviour:
* - If the JSON represents Z-order, we remove DEFAULT_SORT_ORDER
* as Iceberg does not have Z-order support in its spec.
* So, we persist Z-order metadata in {@link org.apache.iceberg.mr.InputFormatConfig#SORT_ORDER}
* and {@link org.apache.iceberg.mr.InputFormatConfig#SORT_COLUMNS} to be used by Hive Writer.
* <p>
* - Otherwise, the JSON is a list of SortFields; we convert it to Iceberg
* SortOrder JSON and keep it in DEFAULT_SORT_ORDER for Iceberg to use it.
*/
private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
Properties properties) {
String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
SortFields sortFields = null;
if (!Strings.isNullOrEmpty(sortOderJSONString)) {
try {
sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class);
} catch (Exception e) {
LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
return;
}
String sortOrderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
if (Strings.isNullOrEmpty(sortOrderJSONString)) {
return;
}

if (isZOrderJSON(sortOrderJSONString)) {
properties.remove(TableProperties.DEFAULT_SORT_ORDER);
setZOrderSortOrder(sortOrderJSONString, properties);
return;
}

try {
SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class);
if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema);
sortFields.getSortFields().forEach(fieldDesc -> {
NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ?
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
SortDirection.ASC : SortDirection.DESC;
sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
SortDirection.ASC : SortDirection.DESC;
sortOrderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
});
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build()));
}
} catch (Exception e) {
LOG.warn("Can not read write order json: {}", sortOrderJSONString);
}
}

/**
* Configures the Z-order sort order metadata in the given properties
* based on the specified Z-order fields.
*
* @param jsonString the JSON string representing sort orders
* @param properties the Properties object to store sort order metadata
*/
private void setZOrderSortOrder(String jsonString, Properties properties) {
try {
ZOrderFields zorderFields = JSON_OBJECT_MAPPER.reader().readValue(jsonString, ZOrderFields.class);
if (zorderFields != null && !zorderFields.getZOrderFields().isEmpty()) {
List<String> columnNames = zorderFields.getZOrderFields().stream()
.map(ZOrderFieldDesc::getColumnName)
.collect(Collectors.toList());

LOG.info("Setting Z-order sort order for columns: {}", columnNames);

properties.put(SORT_ORDER, "ZORDER");
properties.put(SORT_COLUMNS, String.join(",", columnNames));

LOG.info("Z-order sort order configured for Iceberg table with columns: {}", columnNames);
}
} catch (Exception e) {
LOG.warn("Failed to parse Z-order sort order", e);
}
}

private boolean isZOrderJSON(String jsonString) {
try {
JsonNode node = JSON_OBJECT_MAPPER.readTree(jsonString);
return node.has("zorderFields");
} catch (Exception e) {
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.hadoop.hive.ql.ddl.table.create.like.CreateTableLikeDesc;
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.IOConstants;
Expand Down Expand Up @@ -119,6 +120,7 @@
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.hadoop.hive.ql.stats.Partish;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.Deserializer;
Expand Down Expand Up @@ -184,6 +186,7 @@
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.BlobMetadata;
import org.apache.iceberg.puffin.Puffin;
Expand Down Expand Up @@ -218,6 +221,8 @@
import static org.apache.iceberg.SnapshotSummary.TOTAL_FILE_SIZE_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_POS_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP;
import static org.apache.iceberg.mr.InputFormatConfig.SORT_COLUMNS;
import static org.apache.iceberg.mr.InputFormatConfig.SORT_ORDER;

public class HiveIcebergStorageHandler extends DefaultStorageHandler implements HiveStoragePredicateHandler {
private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergStorageHandler.class);
Expand Down Expand Up @@ -929,9 +934,64 @@ public DynamicPartitionCtx createDPContext(
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getSortTransformSpec(table));
}

// Even if table has no explicit sort order, honor z-order if configured
Map<String, String> props = table.properties();
if ("ZORDER".equalsIgnoreCase(props.getOrDefault(SORT_ORDER, ""))) {
createZOrderCustomSort(props, dpCtx, table, hmsTable, writeOperation);
}

return dpCtx;
}

/**
* Adds a custom sort expression to the DynamicPartitionCtx that performs local Z-ordering on write.
*
* Behavior:
* - Reads Z-order properties from 'sort.order' and 'sort.columns' (comma-separated).
* - Resolves the referenced columns to their positions in the physical row (taking into account
* ACID virtual columns offset for overwrite/update operations).
* - Configures a single ASC sort key with NULLS FIRST and injects a custom key expression for
* Z-order
*/
private void createZOrderCustomSort(Map<String, String> props, DynamicPartitionCtx dpCtx, Table table,
org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation) {
String colsProp = props.get(SORT_COLUMNS);
if (StringUtils.isNotBlank(colsProp)) {
List<String> zCols = Arrays.stream(colsProp.split(",")).map(String::trim)
.filter(s -> !s.isEmpty()).collect(Collectors.toList());

Map<String, Integer> fieldOrderMap = Maps.newHashMap();
List<Types.NestedField> fields = table.schema().columns();
for (int i = 0; i < fields.size(); ++i) {
fieldOrderMap.put(fields.get(i).name(), i);
}
int offset = (shouldOverwrite(hmsTable, writeOperation) ?
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, writeOperation)).size();

List<Integer> zIndices = zCols.stream().map(col -> {
Integer base = fieldOrderMap.get(col);
Preconditions.checkArgument(base != null, "Z-order column not found in schema: %s", col);
return base + offset;
}).collect(Collectors.toList());

dpCtx.setCustomSortOrder(Lists.newArrayList(Collections.singletonList(1)));
dpCtx.setCustomSortNullOrder(Lists.newArrayList(Collections.singletonList(NullOrdering.NULLS_FIRST.getCode())));

dpCtx.addCustomSortExpressions(Collections.singletonList(allCols -> {
List<ExprNodeDesc> args = Lists.newArrayListWithExpectedSize(zIndices.size());
for (Integer idx : zIndices) {
args.add(allCols.get(idx));
}
try {
GenericUDF udf = new GenericUDFIcebergZorder();
return ExprNodeGenericFuncDesc.newInstance(udf, "iceberg_zorder", args);
} catch (UDFArgumentException e) {
throw new RuntimeException(e);
}
}));
}
}

private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Operation writeOperation, DynamicPartitionCtx dpCtx,
List<TransformSpec> transformSpecs) {
Expand Down
Loading