Skip to content

Commit cda7023

Browse files
authored
[fix](hive)fix hive insert only translaction table.(#45753)(#46385) (#46454)
### What problem does this PR solve? bp #45753 : fix read hive insert only Transaction table. bp #46385 , #45999 : fix #45753 case unstable.
1 parent d5bfe01 commit cda7023

File tree

11 files changed

+371
-29
lines changed

11 files changed

+371
-29
lines changed

docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_scripts/run25.hql

+53
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,56 @@ insert into orc_full_acid_par PARTITION(part_col=20230102) values
4141
(6, 'F');
4242

4343
update orc_full_acid_par set value = 'BB' where id = 2;
44+
45+
46+
47+
48+
create table orc_to_acid_tb (id INT, value STRING)
49+
PARTITIONED BY (part_col INT)
50+
CLUSTERED BY (id) INTO 3 BUCKETS
51+
STORED AS ORC;
52+
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
53+
INSERT INTO TABLE orc_to_acid_tb PARTITION (part_col=102) VALUES (2, 'B');
54+
ALTER TABLE orc_to_acid_tb SET TBLPROPERTIES ('transactional'='true');
55+
56+
57+
create table orc_to_acid_compacted_tb (id INT, value STRING)
58+
PARTITIONED BY (part_col INT)
59+
CLUSTERED BY (id) INTO 3 BUCKETS
60+
STORED AS ORC;
61+
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=101) VALUES (1, 'A'), (3, 'C');
62+
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (2, 'B');
63+
ALTER TABLE orc_to_acid_compacted_tb SET TBLPROPERTIES ('transactional'='true');
64+
ALTER TABLE orc_to_acid_compacted_tb partition(part_col='101') COMPACT 'major' and wait;
65+
ALTER TABLE orc_to_acid_compacted_tb partition(part_col='102') COMPACT 'major' and wait;
66+
INSERT INTO TABLE orc_to_acid_compacted_tb PARTITION (part_col=102) VALUES (4, 'D');
67+
update orc_to_acid_compacted_tb set value = "CC" where id = 3;
68+
update orc_to_acid_compacted_tb set value = "BB" where id = 2;
69+
70+
71+
create table orc_acid_minor (id INT, value STRING)
72+
CLUSTERED BY (id) INTO 3 BUCKETS
73+
STORED AS ORC
74+
TBLPROPERTIES ('transactional' = 'true');
75+
insert into orc_acid_minor values (1, 'A');
76+
insert into orc_acid_minor values (2, 'B');
77+
insert into orc_acid_minor values (3, 'C');
78+
update orc_acid_minor set value = "BB" where id = 2;
79+
ALTER TABLE orc_acid_minor COMPACT 'minor' and wait;
80+
insert into orc_acid_minor values (4, 'D');
81+
update orc_acid_minor set value = "DD" where id = 4;
82+
DELETE FROM orc_acid_minor WHERE id = 3;
83+
84+
85+
create table orc_acid_major (id INT, value STRING)
86+
CLUSTERED BY (id) INTO 3 BUCKETS
87+
STORED AS ORC
88+
TBLPROPERTIES ('transactional' = 'true');
89+
insert into orc_acid_major values (1, 'A');
90+
insert into orc_acid_major values (2, 'B');
91+
insert into orc_acid_major values (3, 'C');
92+
update orc_acid_major set value = "BB" where id = 2;
93+
ALTER TABLE orc_acid_major COMPACT 'minor' and wait;
94+
insert into orc_acid_major values (4, 'D');
95+
update orc_acid_major set value = "DD" where id = 4;
96+
DELETE FROM orc_acid_major WHERE id = 3;

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.doris.common.AnalysisException;
3131
import org.apache.doris.common.Config;
3232
import org.apache.doris.common.DdlException;
33+
import org.apache.doris.common.UserException;
3334
import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
3435
import org.apache.doris.datasource.ExternalTable;
3536
import org.apache.doris.datasource.SchemaCacheValue;
@@ -359,19 +360,24 @@ public Map<String, PartitionItem> getNameToPartitionItems() {
359360
}
360361

361362
public boolean isHiveTransactionalTable() {
362-
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable)
363-
&& isSupportedTransactionalFileFormat();
363+
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable);
364364
}
365365

366-
private boolean isSupportedTransactionalFileFormat() {
366+
private boolean isSupportedFullAcidTransactionalFileFormat() {
367367
// Sometimes we meet "transactional" = "true" but format is parquet, which is not supported.
368368
// So we need to check the input format for transactional table.
369369
String inputFormatName = remoteTable.getSd().getInputFormat();
370370
return inputFormatName != null && SUPPORTED_HIVE_TRANSACTIONAL_FILE_FORMATS.contains(inputFormatName);
371371
}
372372

373-
public boolean isFullAcidTable() {
374-
return dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable);
373+
public boolean isFullAcidTable() throws UserException {
374+
if (dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable)) {
375+
if (!isSupportedFullAcidTransactionalFileFormat()) {
376+
throw new UserException("This table is full Acid Table, but no Orc Format.");
377+
}
378+
return true;
379+
}
380+
return false;
375381
}
376382

377383
@Override

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java

+26-23
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,16 @@
3232
import org.apache.doris.common.FeConstants;
3333
import org.apache.doris.common.UserException;
3434
import org.apache.doris.common.security.authentication.AuthenticationConfig;
35+
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
3536
import org.apache.doris.common.util.CacheBulkLoader;
3637
import org.apache.doris.common.util.LocationPath;
3738
import org.apache.doris.common.util.Util;
3839
import org.apache.doris.datasource.CacheException;
3940
import org.apache.doris.datasource.ExternalMetaCacheMgr;
4041
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
42+
import org.apache.doris.datasource.hive.HiveUtil.ACIDFileFilter;
43+
import org.apache.doris.datasource.hive.HiveUtil.FullAcidFileFilter;
44+
import org.apache.doris.datasource.hive.HiveUtil.InsertOnlyACIDFileFilter;
4145
import org.apache.doris.datasource.property.PropertyConverter;
4246
import org.apache.doris.fs.FileSystemCache;
4347
import org.apache.doris.fs.remote.RemoteFile;
@@ -55,7 +59,6 @@
5559
import com.github.benmanes.caffeine.cache.LoadingCache;
5660
import com.google.common.annotations.VisibleForTesting;
5761
import com.google.common.base.Preconditions;
58-
import com.google.common.base.Strings;
5962
import com.google.common.collect.BiMap;
6063
import com.google.common.collect.HashBiMap;
6164
import com.google.common.collect.Iterables;
@@ -77,12 +80,10 @@
7780
import org.apache.hadoop.hive.ql.io.AcidUtils;
7881
import org.apache.hadoop.mapred.FileInputFormat;
7982
import org.apache.hadoop.mapred.JobConf;
80-
import org.apache.hadoop.security.UserGroupInformation;
8183
import org.apache.logging.log4j.LogManager;
8284
import org.apache.logging.log4j.Logger;
8385

8486
import java.net.URI;
85-
import java.security.PrivilegedExceptionAction;
8687
import java.util.ArrayList;
8788
import java.util.Collections;
8889
import java.util.HashMap;
@@ -107,8 +108,6 @@ public class HiveMetaStoreCache {
107108
// After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'.
108109
public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version";
109110

110-
private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
111-
112111
private final HMSExternalCatalog catalog;
113112
private JobConf jobConf;
114113
private final ExecutorService refreshExecutor;
@@ -742,19 +741,16 @@ public LoadingCache<PartitionCacheKey, HivePartition> getPartitionCache() {
742741
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
743742
boolean isFullAcid, boolean skipCheckingAcidVersionFile, long tableId, String bindBrokerName) {
744743
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
745-
String remoteUser = jobConf.get(AuthenticationConfig.HADOOP_USER_NAME);
746744
try {
747745
for (HivePartition partition : partitions) {
746+
747+
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(jobConf);
748+
HadoopAuthenticator hadoopAuthenticator =
749+
HadoopAuthenticator.getHadoopAuthenticator(authenticationConfig);
750+
748751
FileCacheValue fileCacheValue = new FileCacheValue();
749-
AcidUtils.Directory directory;
750-
if (!Strings.isNullOrEmpty(remoteUser)) {
751-
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(remoteUser);
752-
directory = ugi.doAs((PrivilegedExceptionAction<AcidUtils.Directory>) () -> AcidUtils.getAcidState(
753-
new Path(partition.getPath()), jobConf, validWriteIds, false, true));
754-
} else {
755-
directory = AcidUtils.getAcidState(new Path(partition.getPath()), jobConf, validWriteIds, false,
756-
true);
757-
}
752+
AcidUtils.Directory directory = hadoopAuthenticator.doAs(() -> AcidUtils.getAcidState(
753+
new Path(partition.getPath()), jobConf, validWriteIds, false, true));
758754
if (directory == null) {
759755
return Collections.emptyList();
760756
}
@@ -775,7 +771,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
775771
return Collections.emptyList();
776772
}
777773
if (!skipCheckingAcidVersionFile) {
778-
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
774+
String acidVersionPath = new Path(
775+
baseOrDeltaPath, HIVE_ORC_ACID_VERSION_FILE).toUri().toString();
779776
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
780777
new FileSystemCache.FileSystemCacheKey(
781778
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
@@ -798,6 +795,8 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
798795
}
799796
}
800797

798+
ACIDFileFilter fileFilter = isFullAcid ? new FullAcidFileFilter() : new InsertOnlyACIDFileFilter();
799+
801800
// delta directories
802801
List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
803802
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
@@ -810,14 +809,14 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
810809
Status status = fs.listFiles(location, false, remoteFiles);
811810
if (status.ok()) {
812811
if (delta.isDeleteDelta()) {
813-
List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter(
814-
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
812+
List<String> deleteDeltaFileNames = remoteFiles.stream()
813+
.map(f -> f.getName()).filter(fileFilter::accept)
815814
.collect(Collectors.toList());
816815
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
817816
continue;
818817
}
819-
remoteFiles.stream().filter(
820-
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
818+
remoteFiles.stream().filter(f -> fileFilter.accept(f.getName()))
819+
.forEach(file -> {
821820
LocationPath path = new LocationPath(file.getPath().toString(),
822821
catalog.getProperties());
823822
fileCacheValue.addFile(file, path);
@@ -837,8 +836,7 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
837836
List<RemoteFile> remoteFiles = new ArrayList<>();
838837
Status status = fs.listFiles(location, false, remoteFiles);
839838
if (status.ok()) {
840-
remoteFiles.stream().filter(
841-
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
839+
remoteFiles.stream().filter(f -> fileFilter.accept(f.getName()))
842840
.forEach(file -> {
843841
LocationPath path = new LocationPath(file.getPath().toString(),
844842
catalog.getProperties());
@@ -848,7 +846,12 @@ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions
848846
throw new RuntimeException(status.getErrMsg());
849847
}
850848
}
851-
fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
849+
850+
if (isFullAcid) {
851+
fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
852+
} else if (!deleteDeltas.isEmpty()) {
853+
throw new RuntimeException("No Hive Full Acid Table have delete_delta_* Dir.");
854+
}
852855
fileCacheValues.add(fileCacheValue);
853856
}
854857
} catch (Exception e) {

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java

+25
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.common.base.Preconditions;
4747
import com.google.common.collect.ImmutableSet;
4848
import org.apache.hadoop.hive.conf.HiveConf;
49+
import org.apache.hadoop.hive.ql.io.AcidUtils;
4950
import org.apache.logging.log4j.LogManager;
5051
import org.apache.logging.log4j.Logger;
5152

@@ -179,6 +180,25 @@ public boolean createTable(CreateTableStmt stmt) throws UserException {
179180
props.put("owner", ConnectContext.get().getUserIdentity().getUser());
180181
}
181182
}
183+
184+
if (props.containsKey("transactional") && props.get("transactional").equals("true")) {
185+
throw new UserException("Not support create hive transactional table.");
186+
/*
187+
CREATE TABLE trans6(
188+
`col1` int,
189+
`col2` int
190+
) ENGINE=hive
191+
PROPERTIES (
192+
'file_format'='orc',
193+
'compression'='zlib',
194+
'bucketing_version'='2',
195+
'transactional'='true',
196+
'transactional_properties'='default'
197+
);
198+
In hive, this table only can insert not update(not report error,but not actually updated).
199+
*/
200+
}
201+
182202
String fileFormat = props.getOrDefault(FILE_FORMAT_KEY, Config.hive_default_file_format);
183203
Map<String, String> ddlProps = new HashMap<>();
184204
for (Map.Entry<String, String> entry : props.entrySet()) {
@@ -273,6 +293,11 @@ public void dropTable(DropTableStmt stmt) throws DdlException {
273293
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE, tblName, dbName);
274294
}
275295
}
296+
297+
if (AcidUtils.isTransactionalTable(client.getTable(dbName, tblName))) {
298+
throw new DdlException("Not support drop hive transactional table.");
299+
}
300+
276301
try {
277302
client.dropTable(dbName, tblName);
278303
db.setUnInitialized(true);

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java

+22
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ public final class HiveUtil {
7070
public static final Set<String> SUPPORTED_TEXT_COMPRESSIONS =
7171
ImmutableSet.of("plain", "gzip", "zstd", "bzip2", "lz4", "snappy");
7272

73+
public static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
74+
public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
75+
7376
private HiveUtil() {
7477
}
7578

@@ -386,4 +389,23 @@ public static StorageDescriptor makeStorageDescriptorFromHivePartition(HiveParti
386389

387390
return sd;
388391
}
392+
393+
public interface ACIDFileFilter {
394+
public boolean accept(String fileName);
395+
}
396+
397+
public static final class FullAcidFileFilter implements ACIDFileFilter {
398+
@Override
399+
public boolean accept(String fileName) {
400+
return fileName.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)
401+
&& !fileName.endsWith(DELTA_SIDE_FILE_SUFFIX);
402+
}
403+
}
404+
405+
public static final class InsertOnlyACIDFileFilter implements ACIDFileFilter {
406+
@Override
407+
public boolean accept(String fileName) {
408+
return true;
409+
}
410+
}
389411
}

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,16 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
267267
List<Split> allFiles, String bindBrokerName, int numBackends) throws IOException, UserException {
268268
List<FileCacheValue> fileCaches;
269269
if (hiveTransaction != null) {
270-
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
270+
try {
271+
fileCaches = getFileSplitByTransaction(cache, partitions, bindBrokerName);
272+
} catch (Exception e) {
273+
// Release shared load (getValidWriteIds acquire Lock).
274+
// If no exception is throw, the lock will be released when `finalizeQuery()`.
275+
// TODO: merge HMSTransaction,HiveTransaction, HiveTransactionMgr,HiveTransactionManager
276+
// and redesign the logic of this code.
277+
Env.getCurrentHiveTransactionMgr().deregister(hiveTransaction.getQueryId());
278+
throw e;
279+
}
271280
} else {
272281
boolean withCache = Config.max_external_file_cache_num > 0;
273282
fileCaches = cache.getFilesByPartitions(partitions, withCache, partitions.size() > 1, bindBrokerName);

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java

+3
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,9 @@ private BuildInsertExecutorResult initPlanOnce(ConnectContext ctx,
263263
} else if (physicalSink instanceof PhysicalHiveTableSink) {
264264
boolean emptyInsert = childIsEmptyRelation(physicalSink);
265265
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
266+
if (hiveExternalTable.isHiveTransactionalTable()) {
267+
throw new AnalysisException("Not supported insert into hive transactional table.");
268+
}
266269
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
267270
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert);
268271
// set hive query options

regression-test/data/external_table_p0/hive/test_transactional_hive.out

+46
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,49 @@ F
7676

7777
-- !q05 --
7878
0
79+
80+
-- !2 --
81+
1 A 101
82+
2 BB 102
83+
3 CC 101
84+
4 D 102
85+
86+
-- !3 --
87+
1 A 101
88+
3 CC 101
89+
90+
-- !4 --
91+
2 BB 102
92+
4 D 102
93+
94+
-- !5 --
95+
1 A 101
96+
2 BB 102
97+
98+
-- !6 --
99+
4 D 102
100+
101+
-- !7 --
102+
1 A
103+
2 BB
104+
4 DD
105+
106+
-- !10 --
107+
1 A
108+
2 BB
109+
110+
-- !11 --
111+
4 DD
112+
113+
-- !12 --
114+
1 A
115+
2 BB
116+
4 DD
117+
118+
-- !15 --
119+
1 A
120+
2 BB
121+
122+
-- !16 --
123+
4 DD
124+

0 commit comments

Comments
 (0)