Skip to content

Commit d7fd6dc

Browse files
committed
Fix flush_metadata_cache failure when metastore impersonation is enabled
1 parent 6054835 commit d7fd6dc

File tree

2 files changed

+88
-2
lines changed

2 files changed

+88
-2
lines changed

plugin/trino-hive/src/main/java/io/trino/plugin/hive/procedure/FlushMetadataCacheProcedure.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.trino.metastore.HiveMetastoreFactory;
2222
import io.trino.metastore.Table;
2323
import io.trino.metastore.cache.CachingHiveMetastore;
24+
import io.trino.metastore.cache.SharedHiveMetastoreCache.ImpersonationCachingHiveMetastoreFactory;
2425
import io.trino.plugin.hive.HiveErrorCode;
2526
import io.trino.plugin.hive.fs.DirectoryLister;
2627
import io.trino.plugin.hive.metastore.glue.GlueCache;
@@ -133,6 +134,18 @@ public void flushMetadataCache(
133134
}
134135

135136
private void doFlushMetadataCache(ConnectorSession session, Optional<String> schemaName, Optional<String> tableName, List<String> partitionColumns, List<String> partitionValues)
137+
{
138+
if (hiveMetadataFactory instanceof ImpersonationCachingHiveMetastoreFactory impersonationCachingHiveMetastoreFactory) {
139+
checkState(cachingHiveMetastore.isEmpty(), "CachingHiveMetastore should not be set when using ImpersonationCachingHiveMetastoreFactory");
140+
Optional<CachingHiveMetastore> impersonationCachingHiveMetastore = Optional.of((CachingHiveMetastore) impersonationCachingHiveMetastoreFactory.createMetastore(Optional.of(session.getIdentity())));
141+
doFlushMetadataCache(session, impersonationCachingHiveMetastore, schemaName, tableName, partitionColumns, partitionValues);
142+
}
143+
else {
144+
doFlushMetadataCache(session, cachingHiveMetastore, schemaName, tableName, partitionColumns, partitionValues);
145+
}
146+
}
147+
148+
private void doFlushMetadataCache(ConnectorSession session, Optional<CachingHiveMetastore> cachingHiveMetastore, Optional<String> schemaName, Optional<String> tableName, List<String> partitionColumns, List<String> partitionValues)
136149
{
137150
if (cachingHiveMetastore.isEmpty() && glueCache.isEmpty()) {
138151
// TODO this currently does not work. CachingHiveMetastore is always bound for metastores other than Glue, even when caching is disabled,
@@ -156,13 +169,13 @@ else if (schemaName.isPresent() && tableName.isPresent()) {
156169
List<String> partitions;
157170

158171
if (!partitionColumns.isEmpty()) {
159-
cachingHiveMetastore.ifPresent(cachingHiveMetastore -> cachingHiveMetastore.flushPartitionCache(schemaName.get(), tableName.get(), partitionColumns, partitionValues));
172+
cachingHiveMetastore.ifPresent(hiveMetastore -> hiveMetastore.flushPartitionCache(schemaName.get(), tableName.get(), partitionColumns, partitionValues));
160173
glueCache.ifPresent(glueCache -> glueCache.invalidatePartition(schemaName.get(), tableName.get(), new PartitionName(partitionValues)));
161174

162175
partitions = ImmutableList.of(makePartName(partitionColumns, partitionValues));
163176
}
164177
else {
165-
cachingHiveMetastore.ifPresent(cachingHiveMetastore -> cachingHiveMetastore.invalidateTable(schemaName.get(), tableName.get()));
178+
cachingHiveMetastore.ifPresent(hiveMetastore -> hiveMetastore.invalidateTable(schemaName.get(), tableName.get()));
166179
glueCache.ifPresent(glueCache -> glueCache.invalidateTable(schemaName.get(), tableName.get(), true));
167180

168181
List<String> partitionColumnNames = table.getPartitionColumns().stream()
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.hive;
15+
16+
import com.google.common.collect.ImmutableMap;
17+
import io.trino.Session;
18+
import io.trino.plugin.hive.containers.Hive3MinioDataLake;
19+
import io.trino.plugin.hive.containers.HiveMinioDataLake;
20+
import io.trino.plugin.hive.s3.S3HiveQueryRunner;
21+
import io.trino.spi.security.Identity;
22+
import io.trino.testing.AbstractTestQueryFramework;
23+
import io.trino.testing.QueryRunner;
24+
import io.trino.testing.sql.TestTable;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.TestInstance;
27+
28+
import java.util.List;
29+
30+
import static io.trino.plugin.hive.containers.HiveHadoop.HIVE3_IMAGE;
31+
import static io.trino.testing.TestingNames.randomNameSuffix;
32+
import static org.assertj.core.api.Assertions.assertThat;
33+
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
34+
35+
@TestInstance(PER_CLASS)
36+
final class TestThriftMetastoreImpersonation
37+
extends AbstractTestQueryFramework
38+
{
39+
@Override
40+
protected QueryRunner createQueryRunner()
41+
throws Exception
42+
{
43+
HiveMinioDataLake hiveMinioDataLake = closeAfterClass(new Hive3MinioDataLake("test-thrift-impersonation-" + randomNameSuffix(), HIVE3_IMAGE));
44+
hiveMinioDataLake.start();
45+
return S3HiveQueryRunner.builder(hiveMinioDataLake)
46+
.setMetastoreImpersonationEnabled(true)
47+
.setHiveProperties(ImmutableMap.<String, String>builder()
48+
.put("hive.security", "allow-all")
49+
.put("hive.metastore-cache-ttl", "1d")
50+
.put("hive.user-metastore-cache-ttl", "1d")
51+
.buildOrThrow())
52+
.build();
53+
}
54+
55+
@Test
56+
void testFlushMetadataCache()
57+
{
58+
Session alice = Session.builder(getSession()).setIdentity(Identity.ofUser("alice")).build();
59+
60+
try (TestTable table = newTrinoTable("test_partition", "(id int, part int) WITH (partitioned_by = ARRAY['part'])", List.of("1, 10"))) {
61+
assertThat(computeScalar(alice, "SELECT count(1) FROM \"" + table.getName() + "$partitions\""))
62+
.isEqualTo(1L);
63+
64+
assertUpdate("INSERT INTO " + table.getName() + " VALUES (2, 20)", 1);
65+
assertThat(computeScalar(alice, "SELECT count(1) FROM \"" + table.getName() + "$partitions\""))
66+
.isEqualTo(1L);
67+
68+
assertUpdate(alice, "CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => '" + table.getName() + "')");
69+
assertThat(computeScalar(alice, "SELECT count(1) FROM \"" + table.getName() + "$partitions\""))
70+
.isEqualTo(2L);
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)