diff --git a/pom.xml b/pom.xml
index 46641deef..a76d0ddd5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,7 +62,7 @@
         1.18.36
         1.18.20.0
         3.4.1
-        0.14.0
+        0.15.0
         2.29.40
         3.3.1
         3.8.0
diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
index bef000135..705746679 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/BaseFileUpdatesExtractor.java
@@ -36,8 +36,6 @@
 import lombok.NonNull;
 import lombok.Value;
 
-import org.apache.hadoop.fs.Path;
-
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -48,8 +46,8 @@
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.ExternalFilePathUtil;
-import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.xtable.collectors.CustomCollectors;
 import org.apache.xtable.model.schema.InternalType;
@@ -64,7 +62,7 @@ public class BaseFileUpdatesExtractor {
       Pattern.compile(
           "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-[0-9]_[0-9a-fA-F-]+_[0-9]+\\.");
   private final HoodieEngineContext engineContext;
-  private final Path tableBasePath;
+  private final StoragePath tableBasePath;
 
   /**
    * Extracts the changes between the snapshot files and the base files in the Hudi table currently.
@@ -91,7 +89,10 @@ ReplaceMetadata extractSnapshotChanges(
     Set partitionPathsToDrop =
         new HashSet<>(
             FSUtils.getAllPartitionPaths(
-                engineContext, metadataConfig, metaClient.getBasePathV2().toString()));
+                engineContext,
+                metaClient.getStorage(),
+                metadataConfig,
+                metaClient.getBasePathV2().toString()));
     ReplaceMetadata replaceMetadata =
         partitionedDataFiles.stream()
             .map(
@@ -173,7 +174,7 @@ ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull Strin
     // For all removed files, group by partition and extract the file id
     Map> partitionToReplacedFileIds =
         dataFilesDiff.getFilesRemoved().stream()
-            .map(file -> new CachingPath(file.getPhysicalPath()))
+            .map(file -> new StoragePath(file.getPhysicalPath()))
             .collect(
                 Collectors.groupingBy(
                     path -> HudiPathUtils.getPartitionPath(tableBasePath, path),
@@ -186,7 +187,7 @@ ReplaceMetadata convertDiff(@NonNull DataFilesDiff dataFilesDiff, @NonNull Strin
     return ReplaceMetadata.of(partitionToReplacedFileIds, writeStatuses);
   }
 
-  private String getFileId(Path filePath) {
+  private String getFileId(StoragePath filePath) {
     String fileName = filePath.getName();
     // if file was created by Hudi use original fileId, otherwise use the file name as IDs
     if (isFileCreatedByHudiWriter(fileName)) {
@@ -207,12 +208,12 @@ private boolean isFileCreatedByHudiWriter(String fileName) {
   }
 
   private WriteStatus toWriteStatus(
-      Path tableBasePath,
+      StoragePath tableBasePath,
       String commitTime,
       InternalDataFile file,
       Optional partitionPathOptional) {
     WriteStatus writeStatus = new WriteStatus();
-    Path path = new CachingPath(file.getPhysicalPath());
+    StoragePath path = new StoragePath(file.getPhysicalPath());
     String partitionPath =
         partitionPathOptional.orElseGet(() -> HudiPathUtils.getPartitionPath(tableBasePath, path));
     String fileId = getFileId(path);
@@ -273,8 +274,8 @@ private ReplaceMetadata combine(ReplaceMetadata other) {
     }
   }
 
-  private String getPartitionPath(Path tableBasePath, List files) {
+  private String getPartitionPath(StoragePath tableBasePath, List files) {
     return HudiPathUtils.getPartitionPath(
-        tableBasePath, new CachingPath(files.get(0).getPhysicalPath()));
+        tableBasePath, new StoragePath(files.get(0).getPhysicalPath()));
   }
 }
diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
index 0ddbbcb76..54b1abf9c 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
@@ -23,6 +23,7 @@
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.xtable.conversion.ConversionSourceProvider;
 import org.apache.xtable.conversion.SourceTable;
@@ -35,7 +36,7 @@ public class HudiConversionSourceProvider extends ConversionSourceProvider configuration,
       int maxNumDeltaCommitsBeforeCompaction) {
     this(
         targetTable.getBasePath(),
         (int) targetTable.getMetadataRetention().toHours(),
         maxNumDeltaCommitsBeforeCompaction,
         BaseFileUpdatesExtractor.of(
-            new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())),
+            new HoodieJavaEngineContext(configuration), new StoragePath(targetTable.getBasePath())),
         AvroSchemaConverter.getInstance(),
         HudiTableManager.of(configuration),
         CommitState::new);
@@ -168,9 +170,10 @@ public void init(TargetTable targetTable, Configuration configuration) {
         (int) targetTable.getMetadataRetention().toHours(),
         HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(),
         BaseFileUpdatesExtractor.of(
-            new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())),
+            new HoodieJavaEngineContext(new HadoopStorageConfiguration(configuration)),
+            new StoragePath(targetTable.getBasePath())),
         AvroSchemaConverter.getInstance(),
-        HudiTableManager.of(configuration),
+        HudiTableManager.of(new HadoopStorageConfiguration(configuration)),
         CommitState::new);
   }
 
@@ -369,7 +372,7 @@ public void commit() {
               getNumInstantsToRetain(),
               maxNumDeltaCommitsBeforeCompaction,
               timelineRetentionInHours);
-      HoodieEngineContext engineContext = new HoodieJavaEngineContext(metaClient.getHadoopConf());
+      HoodieEngineContext engineContext = new HoodieJavaEngineContext(metaClient.getStorageConf());
       try (HoodieJavaWriteClient> writeClient =
           new HoodieJavaWriteClient<>(engineContext, writeConfig)) {
         writeClient.startCommitWithTime(instantTime, HoodieTimeline.REPLACE_COMMIT_ACTION);
@@ -494,7 +497,8 @@ private void markInstantsAsCleaned(
                 Collections.emptyMap(),
                 CleanPlanner.LATEST_CLEAN_PLAN_VERSION,
                 cleanInfoPerPartition,
-                Collections.emptyList());
+                Collections.emptyList(),
+                Collections.emptyMap());
         // create a clean instant and mark it as requested with the clean plan
         HoodieInstant requestedCleanInstant =
             new HoodieInstant(
@@ -524,7 +528,8 @@ private void markInstantsAsCleaned(
                     })
                 .collect(Collectors.toList());
         HoodieCleanMetadata cleanMetadata =
-            CleanerUtils.convertCleanMetadata(cleanTime, Option.empty(), cleanStats);
+            CleanerUtils.convertCleanMetadata(
+                cleanTime, Option.empty(), cleanStats, Collections.emptyMap());
         // update the metadata table with the clean metadata so the files' metadata are marked for
         // deletion
         hoodieTableMetadataWriter.performTableServices(Option.empty());
diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
index 5739328c1..35807f5f7 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java
@@ -57,6 +57,7 @@
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.xtable.collectors.CustomCollectors;
 import org.apache.xtable.exception.NotSupportedException;
@@ -79,26 +80,26 @@ public class HudiDataFileExtractor implements AutoCloseable {
   private final HudiFileStatsExtractor fileStatsExtractor;
   private final HoodieMetadataConfig metadataConfig;
   private final FileSystemViewManager fileSystemViewManager;
-  private final Path basePath;
+  private final StoragePath basePath;
 
   public HudiDataFileExtractor(
       HoodieTableMetaClient metaClient,
       HudiPartitionValuesExtractor hudiPartitionValuesExtractor,
       HudiFileStatsExtractor hudiFileStatsExtractor) {
-    this.engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
+    this.engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf());
     metadataConfig =
         HoodieMetadataConfig.newBuilder()
             .enable(metaClient.getTableConfig().isMetadataTableAvailable())
             .build();
     this.basePath = metaClient.getBasePathV2();
     this.tableMetadata =
-        metadataConfig.enabled()
-            ? HoodieTableMetadata.create(engineContext, metadataConfig, basePath.toString(), true)
+        metadataConfig.isEnabled()
+            ? HoodieTableMetadata.create(
+                engineContext, metaClient.getStorage(), metadataConfig, basePath.toString(), true)
             : null;
     this.fileSystemViewManager =
         FileSystemViewManager.createViewManager(
             engineContext,
-            metadataConfig,
             FileSystemViewStorageConfig.newBuilder()
                 .withStorageType(FileSystemViewStorageType.MEMORY)
                 .build(),
@@ -114,7 +115,8 @@ public List getFilesCurrentState(InternalTable table) {
       List allPartitionPaths =
           tableMetadata != null
               ? tableMetadata.getAllPartitionPaths()
-              : FSUtils.getAllPartitionPaths(engineContext, metadataConfig, basePath.toString());
+              : FSUtils.getAllPartitionPaths(
+                  engineContext, metaClient.getStorage(), metadataConfig, basePath.toString());
       return getInternalDataFilesForPartitions(allPartitionPaths, table);
     } catch (IOException ex) {
       throw new ReadException(
@@ -402,9 +404,9 @@ private InternalDataFile buildFileWithoutStats(
         .recordCount(rowCount)
         .columnStats(Collections.emptyList())
         .lastModified(
-            hoodieBaseFile.getFileStatus() == null
+            hoodieBaseFile.getPathInfo() == null
                 ? 0L
-                : hoodieBaseFile.getFileStatus().getModificationTime())
+                : hoodieBaseFile.getPathInfo().getModificationTime())
         .build();
   }
 
diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
index e47ef72e0..3e0072446 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java
@@ -35,7 +35,6 @@
 import lombok.AllArgsConstructor;
 import lombok.NonNull;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.parquet.io.api.Binary;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
@@ -44,9 +43,9 @@
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.xtable.collectors.CustomCollectors;
 import org.apache.xtable.model.schema.InternalField;
@@ -107,7 +106,7 @@ private Stream computeColumnStatsFromParquetFooters(
     return files.map(
         file -> {
           HudiFileStats fileStats =
-              computeColumnStatsForFile(new Path(file.getPhysicalPath()), nameFieldMap);
+              computeColumnStatsForFile(new StoragePath(file.getPhysicalPath()), nameFieldMap);
           return file.toBuilder()
               .columnStats(fileStats.getColumnStats())
               .recordCount(fileStats.getRowCount())
@@ -116,7 +115,7 @@ private Stream computeColumnStatsFromParquetFooters(
   }
 
   private Pair getPartitionAndFileName(String path) {
-    Path filePath = new CachingPath(path);
+    StoragePath filePath = new StoragePath(path);
     String partitionPath = HudiPathUtils.getPartitionPath(metaClient.getBasePathV2(), filePath);
     return Pair.of(partitionPath, filePath.getName());
   }
@@ -176,10 +175,10 @@ private Optional getMaxFromColumnStats(List columnStats) {
   }
 
   private HudiFileStats computeColumnStatsForFile(
-      Path filePath, Map nameFieldMap) {
+      StoragePath filePath, Map nameFieldMap) {
     List> columnRanges =
-        UTILS.readRangeFromParquetMetadata(
-            metaClient.getHadoopConf(), filePath, new ArrayList<>(nameFieldMap.keySet()));
+        UTILS.readColumnStatsFromMetadata(
+            metaClient.getStorage(), filePath, new ArrayList<>(nameFieldMap.keySet()));
     List columnStats =
         columnRanges.stream()
             .map(
@@ -188,7 +187,7 @@ private HudiFileStats computeColumnStatsForFile(
             .collect(CustomCollectors.toList(columnRanges.size()));
     Long rowCount = getMaxFromColumnStats(columnStats).orElse(null);
     if (rowCount == null) {
-      rowCount = UTILS.getRowCount(metaClient.getHadoopConf(), filePath);
+      rowCount = UTILS.getRowCount(metaClient.getStorage(), filePath);
     }
     return new HudiFileStats(columnStats, rowCount);
   }
diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPathUtils.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPathUtils.java
index 545f32150..bc1ae8035 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPathUtils.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiPathUtils.java
@@ -18,10 +18,10 @@
  
 package org.apache.xtable.hudi;
 
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.storage.StoragePath;
 
 public class HudiPathUtils {
-  public static String getPartitionPath(Path tableBasePath, Path filePath) {
+  public static String getPartitionPath(StoragePath tableBasePath, StoragePath filePath) {
     String fileName = filePath.getName();
     String pathStr = filePath.toUri().getPath();
     int startIndex = tableBasePath.toUri().getPath().length() + 1;
diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
index 1ac1b5aab..c1389ca7a 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
@@ -26,14 +26,13 @@
 import lombok.RequiredArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 
-import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieTimelineTimeZone;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.storage.StorageConfiguration;
 
 import org.apache.xtable.exception.UpdateException;
 import org.apache.xtable.model.InternalTable;
@@ -53,7 +52,7 @@ class HudiTableManager {
   private static final String COMPLEX_KEY_GENERATOR = "org.apache.hudi.keygen.ComplexKeyGenerator";
   private static final String SIMPLE_KEY_GENERATOR = "org.apache.hudi.keygen.SimpleKeyGenerator";
 
-  private final Configuration configuration;
+  private final StorageConfiguration> configuration;
 
   /**
    * Loads the meta client for the table at the base path if it exists
diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 3d539766a..a24ac3b70 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -527,7 +527,7 @@ private static Stream provideArgsForPartitionTesting() {
                 HUDI,
                 Arrays.asList(ICEBERG, DELTA),
                 "timestamp_micros_nullable_field:TIMESTAMP,level:SIMPLE",
-                "timestamp_micros_nullable_field:DAY:yyyy/MM/dd,level:VALUE",
+                "timestamp_micros_nullable_field:DAY:yyyy-MM-dd,level:VALUE",
                 timestampAndLevelFilter)));
   }
 
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
index 3e9a133a2..7f61ef2d0 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
@@ -57,6 +57,7 @@
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -101,6 +102,7 @@
 import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import com.google.common.base.Preconditions;
 
@@ -146,6 +148,7 @@ public abstract class TestAbstractHudiTable
       // Add key generator
       this.typedProperties = new TypedProperties();
       typedProperties.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), RECORD_KEY_FIELD_NAME);
+      typedProperties.put("hoodie.datasource.write.hive_style_partitioning", "true");
       if (partitionConfig == null) {
         this.keyGenerator = new NonpartitionedKeyGenerator(typedProperties);
         this.partitionFieldNames = Collections.emptyList();
@@ -153,7 +156,7 @@ public abstract class TestAbstractHudiTable
         if (partitionConfig.contains("timestamp")) {
           typedProperties.put("hoodie.keygen.timebased.timestamp.type", "SCALAR");
           typedProperties.put("hoodie.keygen.timebased.timestamp.scalar.time.unit", "MICROSECONDS");
-          typedProperties.put("hoodie.keygen.timebased.output.dateformat", "yyyy/MM/dd");
+          typedProperties.put("hoodie.keygen.timebased.output.dateformat", "yyyy-MM-dd");
           typedProperties.put("hoodie.keygen.timebased.input.timezone", "UTC");
           typedProperties.put("hoodie.keygen.timebased.output.timezone", "UTC");
         }
@@ -586,14 +589,14 @@ protected static Schema addTopLevelField(Schema schema) {
   @SneakyThrows
   protected HoodieTableMetaClient getMetaClient(
       TypedProperties keyGenProperties, HoodieTableType hoodieTableType, Configuration conf) {
-    LocalFileSystem fs = (LocalFileSystem) FSUtils.getFs(basePath, conf);
+    LocalFileSystem fs = FileSystem.getLocal(conf);
     // Enforce checksum such that fs.open() is consistent to DFS
     fs.setVerifyChecksum(true);
     fs.mkdirs(new org.apache.hadoop.fs.Path(basePath));
 
     if (fs.exists(new org.apache.hadoop.fs.Path(basePath + "/.hoodie"))) {
       return HoodieTableMetaClient.builder()
-          .setConf(conf)
+          .setConf(new HadoopStorageConfiguration(conf))
           .setBasePath(basePath)
           .setLoadActiveTimelineOnLoad(true)
           .build();
@@ -610,7 +613,8 @@ protected HoodieTableMetaClient getMetaClient(
             .setCommitTimezone(HoodieTimelineTimeZone.UTC)
             .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
             .build();
-    return HoodieTableMetaClient.initTableAndGetMetaClient(conf, this.basePath, properties);
+    return HoodieTableMetaClient.initTableAndGetMetaClient(
+        new HadoopStorageConfiguration(conf), this.basePath, properties);
   }
 
   private static Schema.Field copyField(Schema.Field input) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
index ce3b25bda..097315fd7 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
@@ -57,6 +57,7 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.CustomKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import com.google.common.base.Preconditions;
 
@@ -320,7 +321,7 @@ private HoodieJavaWriteClient initJavaWriteClient(
               .withArchivalConfig(archivalConfig)
               .build();
     }
-    HoodieEngineContext context = new HoodieJavaEngineContext(conf);
+    HoodieEngineContext context = new HoodieJavaEngineContext(new HadoopStorageConfiguration(conf));
     return new HoodieJavaWriteClient<>(context, writeConfig);
   }
 }
diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java b/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java
index c701a1d54..3374fbec4 100644
--- a/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java
+++ b/xtable-core/src/test/java/org/apache/xtable/hudi/HudiTestUtil.java
@@ -31,7 +31,6 @@
 import lombok.Value;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.serializer.KryoSerializer;
 
@@ -47,6 +46,7 @@
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class HudiTestUtil {
@@ -60,7 +60,7 @@ static HoodieTableMetaClient initTableAndGetMetaClient(
         .setTableName("test_table")
         .setPayloadClass(HoodieAvroPayload.class)
         .setPartitionFields(partitionFields)
-        .initTable(new Configuration(), tableBasePath);
+        .initTable(new HadoopStorageConfiguration(false), tableBasePath);
   }
 
   public static HoodieWriteConfig getHoodieWriteConfig(HoodieTableMetaClient metaClient) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
index 376ccedae..962f763dd 100644
--- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
+++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java
@@ -60,6 +60,7 @@
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.xtable.GenericTable;
 import org.apache.xtable.TestJavaHudiTable;
@@ -688,7 +689,7 @@ private HudiConversionSource getHudiSourceClient(
       Configuration conf, String basePath, String xTablePartitionConfig) {
     HoodieTableMetaClient hoodieTableMetaClient =
         HoodieTableMetaClient.builder()
-            .setConf(conf)
+            .setConf(new HadoopStorageConfiguration(conf))
             .setBasePath(basePath)
             .setLoadActiveTimelineOnLoad(true)
             .build();
diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
index 03bb6e2ca..223c64be3 100644
--- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
+++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionTarget.java
@@ -69,6 +69,10 @@
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
 import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalTable;
@@ -95,8 +99,11 @@
  */
 public class ITHudiConversionTarget {
   @TempDir public static Path tempDir;
-  private static final Configuration CONFIGURATION = new Configuration();
-  private static final HoodieEngineContext CONTEXT = new HoodieJavaEngineContext(CONFIGURATION);
+  private static final Configuration CONFIGURATION = new Configuration(false);
+  private static final StorageConfiguration> STORAGE_CONFIGURATION =
+      new HadoopStorageConfiguration(CONFIGURATION);
+  private static final HoodieEngineContext CONTEXT =
+      new HoodieJavaEngineContext(STORAGE_CONFIGURATION);
 
   private static final String TABLE_NAME = "test_table";
   private static final String KEY_FIELD_NAME = "id";
@@ -131,6 +138,7 @@ public class ITHudiConversionTarget {
                   PARTITION_FIELD_SOURCE,
                   InternalField.builder().name(OTHER_FIELD_NAME).schema(STRING_SCHEMA).build()))
           .build();
+  private static HoodieStorage hoodieStorage;
   private final String tableBasePath = tempDir.resolve(UUID.randomUUID().toString()).toString();
 
   @BeforeAll
@@ -139,6 +147,7 @@ public static void setupOnce() {
     HoodieInstantTimeGenerator.setCommitTimeZone(HoodieTimelineTimeZone.UTC);
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
     System.setProperty("user.timezone", "GMT");
+    hoodieStorage = new HoodieHadoopStorage(tempDir.toString(), CONFIGURATION);
   }
 
   @Test
@@ -209,12 +218,15 @@ void syncForExistingTable() {
     targetClient.completeSync();
 
     HoodieTableMetaClient metaClient =
-        HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build();
+        HoodieTableMetaClient.builder()
+            .setConf(STORAGE_CONFIGURATION)
+            .setBasePath(tableBasePath)
+            .build();
     assertFileGroupCorrectness(
         metaClient, partitionPath, Collections.singletonList(Pair.of(fileName, filePath)));
     try (HoodieBackedTableMetadata hoodieBackedTableMetadata =
         new HoodieBackedTableMetadata(
-            CONTEXT, writeConfig.getMetadataConfig(), tableBasePath, true)) {
+            CONTEXT, hoodieStorage, writeConfig.getMetadataConfig(), tableBasePath, true)) {
       assertColStats(hoodieBackedTableMetadata, partitionPath, fileName);
     }
     // include meta fields since the table was created with meta fields enabled
@@ -249,12 +261,19 @@ void syncForNewTable() {
     targetClient.completeSync();
 
     HoodieTableMetaClient metaClient =
-        HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build();
+        HoodieTableMetaClient.builder()
+            .setConf(STORAGE_CONFIGURATION)
+            .setBasePath(tableBasePath)
+            .build();
     assertFileGroupCorrectness(
         metaClient, partitionPath, Collections.singletonList(Pair.of(fileName, filePath)));
     try (HoodieBackedTableMetadata hoodieBackedTableMetadata =
         new HoodieBackedTableMetadata(
-            CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) {
+            CONTEXT,
+            hoodieStorage,
+            getHoodieWriteConfig(metaClient).getMetadataConfig(),
+            tableBasePath,
+            true)) {
       assertColStats(hoodieBackedTableMetadata, partitionPath, fileName);
     }
     assertSchema(metaClient, false);
@@ -294,13 +313,20 @@ void archiveTimelineAndCleanMetadataTableAfterMultipleCommits(String partitionPa
     targetClient.completeSync();
 
     HoodieTableMetaClient metaClient =
-        HoodieTableMetaClient.builder().setConf(CONFIGURATION).setBasePath(tableBasePath).build();
+        HoodieTableMetaClient.builder()
+            .setConf(STORAGE_CONFIGURATION)
+            .setBasePath(tableBasePath)
+            .build();
     Pair file0Pair = Pair.of(fileName0, filePath0);
     assertFileGroupCorrectness(
         metaClient, partitionPath, Arrays.asList(file0Pair, Pair.of(fileName1, filePath1)));
     try (HoodieBackedTableMetadata hoodieBackedTableMetadata =
         new HoodieBackedTableMetadata(
-            CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) {
+            CONTEXT,
+            hoodieStorage,
+            getHoodieWriteConfig(metaClient).getMetadataConfig(),
+            tableBasePath,
+            true)) {
       assertColStats(hoodieBackedTableMetadata, partitionPath, fileName1);
     }
 
@@ -317,7 +343,11 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr
         metaClient, partitionPath, Arrays.asList(file0Pair, Pair.of(fileName2, filePath2)));
     try (HoodieBackedTableMetadata hoodieBackedTableMetadata =
         new HoodieBackedTableMetadata(
-            CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) {
+            CONTEXT,
+            hoodieStorage,
+            getHoodieWriteConfig(metaClient).getMetadataConfig(),
+            tableBasePath,
+            true)) {
       // the metadata for fileName1 should still be present until the cleaner kicks in
       assertColStats(hoodieBackedTableMetadata, partitionPath, fileName1);
       // new file stats should be present
@@ -362,7 +392,11 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr
     // col stats should be cleaned up for fileName1 but present for fileName2 and fileName3
     try (HoodieBackedTableMetadata hoodieBackedTableMetadata =
         new HoodieBackedTableMetadata(
-            CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, true)) {
+            CONTEXT,
+            hoodieStorage,
+            getHoodieWriteConfig(metaClient).getMetadataConfig(),
+            tableBasePath,
+            true)) {
       // assertEmptyColStats(hoodieBackedTableMetadata, partitionPath, fileName1);
       assertColStats(hoodieBackedTableMetadata, partitionPath, fileName3);
       assertColStats(hoodieBackedTableMetadata, partitionPath, fileName4);
@@ -589,7 +623,7 @@ private HudiConversionTarget getTargetClient() {
             .name("test_table")
             .metadataRetention(Duration.of(4, ChronoUnit.HOURS))
             .build(),
-        CONFIGURATION,
+        STORAGE_CONFIGURATION,
         3);
   }
 }
diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
index 8f3b3f7e1..ae3935125 100644
--- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
+++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestBaseFileUpdatesExtractor.java
@@ -49,7 +49,8 @@
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalPartitionField;
@@ -72,7 +73,7 @@ public class TestBaseFileUpdatesExtractor {
   private static final long RECORD_COUNT = 200L;
   private static final long LAST_MODIFIED = System.currentTimeMillis();
   private static final HoodieEngineContext CONTEXT =
-      new HoodieJavaEngineContext(new Configuration());
+      new HoodieJavaEngineContext(new HadoopStorageConfiguration(new Configuration(false)));
   private static final InternalPartitionField PARTITION_FIELD =
       InternalPartitionField.builder()
           .sourceField(
@@ -126,7 +127,7 @@ void convertDiff() {
             .build();
 
     BaseFileUpdatesExtractor extractor =
-        BaseFileUpdatesExtractor.of(CONTEXT, new CachingPath(tableBasePath));
+        BaseFileUpdatesExtractor.of(CONTEXT, new StoragePath(tableBasePath));
     BaseFileUpdatesExtractor.ReplaceMetadata replaceMetadata =
         extractor.convertDiff(diff, COMMIT_TIME);
 
@@ -155,7 +156,7 @@ void extractSnapshotChanges_emptyTargetTable() throws IOException {
             .setTableName("test_table")
             .setPayloadClass(HoodieAvroPayload.class)
             .setPartitionFields("partition_field")
-            .initTable(new Configuration(), tableBasePath);
+            .initTable(CONTEXT.getStorageConf(), tableBasePath);
 
     String partitionPath1 = "partition1";
     String fileName1 = "file1.parquet";
@@ -177,7 +178,7 @@ void extractSnapshotChanges_emptyTargetTable() throws IOException {
             String.format("%s/%s/%s", tableBasePath, partitionPath2, fileName3), getColumnStats());
 
     BaseFileUpdatesExtractor extractor =
-        BaseFileUpdatesExtractor.of(CONTEXT, new CachingPath(tableBasePath));
+        BaseFileUpdatesExtractor.of(CONTEXT, new StoragePath(tableBasePath));
 
     List partitionedDataFiles =
         Arrays.asList(
@@ -291,7 +292,7 @@ void extractSnapshotChanges_existingPartitionedTargetTable() {
                             .build()))
                 .build());
     BaseFileUpdatesExtractor extractor =
-        BaseFileUpdatesExtractor.of(CONTEXT, new CachingPath(tableBasePath));
+        BaseFileUpdatesExtractor.of(CONTEXT, new StoragePath(tableBasePath));
     BaseFileUpdatesExtractor.ReplaceMetadata replaceMetadata =
         extractor.extractSnapshotChanges(partitionedDataFiles, metaClient, COMMIT_TIME);
 
@@ -362,7 +363,7 @@ void extractSnapshotChanges_existingNonPartitionedTargetTable() {
                 .partitionValues(Collections.emptyList())
                 .build());
     BaseFileUpdatesExtractor extractor =
-        BaseFileUpdatesExtractor.of(CONTEXT, new CachingPath(tableBasePath));
+        BaseFileUpdatesExtractor.of(CONTEXT, new StoragePath(tableBasePath));
     BaseFileUpdatesExtractor.ReplaceMetadata replaceMetadata =
         extractor.extractSnapshotChanges(partitionedDataFiles, metaClient, COMMIT_TIME);
 
diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
index a18bb743d..e770b3253 100644
--- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
+++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java
@@ -63,6 +63,10 @@
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
 import org.apache.xtable.GenericTable;
 import org.apache.xtable.TestJavaHudiTable;
@@ -81,7 +85,7 @@ public class TestHudiFileStatsExtractor {
   private static final Schema NESTED_SCHEMA =
       AVRO_SCHEMA.getField("nested_record").schema().getTypes().get(1);
 
-  private final Configuration configuration = new Configuration();
+  private final StorageConfiguration> configuration = new HadoopStorageConfiguration(false);
   private final InternalField nestedIntBase = getNestedIntBase();
   private final InternalSchema nestedSchema = getNestedSchema(nestedIntBase, "nested_record");
   private final InternalField longField = getLongField();
@@ -132,9 +136,12 @@ void columnStatsWithMetadataTable(@TempDir Path tempDir) throws Exception {
       table.insertRecords(true, records);
       basePath = table.getBasePath();
     }
+    HoodieStorage hoodieStorage =
+        new HoodieHadoopStorage(basePath, new HadoopStorageConfiguration(false));
     HoodieTableMetadata tableMetadata =
         HoodieTableMetadata.create(
             new HoodieJavaEngineContext(configuration),
+            hoodieStorage,
             HoodieMetadataConfig.newBuilder().enable(true).build(),
             basePath,
             true);
@@ -170,7 +177,7 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException {
     try (ParquetWriter writer =
         AvroParquetWriter.builder(
                 HadoopOutputFile.fromPath(
-                    new org.apache.hadoop.fs.Path(file.toUri()), configuration))
+                    new org.apache.hadoop.fs.Path(file.toUri()), new Configuration(false)))
             .withSchema(AVRO_SCHEMA)
             .withDataModel(genericData)
             .build()) {
@@ -190,7 +197,9 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException {
             .build();
 
     HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
-    when(mockMetaClient.getHadoopConf()).thenReturn(configuration);
+    HoodieStorage hoodieStorage =
+        new HoodieHadoopStorage(file.toString(), new HadoopStorageConfiguration(false));
+    when(mockMetaClient.getStorage()).thenReturn(hoodieStorage);
     HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(mockMetaClient);
     List output =
         fileStatsExtractor
diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java
index c0d5e6d4e..03b5172f7 100644
--- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java
+++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiTableManager.java
@@ -37,6 +37,8 @@
 import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.schema.InternalField;
@@ -48,7 +50,8 @@
 
 public class TestHudiTableManager {
 
-  private static final Configuration CONFIGURATION = new Configuration();
+  private static final StorageConfiguration> CONFIGURATION =
+      new HadoopStorageConfiguration(new Configuration(false));
   @TempDir public static Path tempDir;
   private final String tableBasePath = tempDir.resolve(UUID.randomUUID().toString()).toString();
 
diff --git a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/extensions/AddFieldIdsClientInitCallback.java b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/extensions/AddFieldIdsClientInitCallback.java
index ac82063eb..62ba98c09 100644
--- a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/extensions/AddFieldIdsClientInitCallback.java
+++ b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/extensions/AddFieldIdsClientInitCallback.java
@@ -21,17 +21,18 @@
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 
 import org.apache.hudi.callback.HoodieClientInitCallback;
 import org.apache.hudi.client.BaseHoodieClient;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -62,15 +63,15 @@ public void call(BaseHoodieClient hoodieClient) {
     if (config.getSchema() != null || config.getWriteSchema() != null) {
       try {
         Option currentSchema = Option.empty();
+        StoragePath basePath = new StoragePath(config.getBasePath());
+        StorageConfiguration> storageConf = hoodieClient.getEngineContext().getStorageConf();
         try {
-          Configuration hadoopConfiguration = hoodieClient.getEngineContext().getHadoopConf().get();
-          String tableBasePath = config.getBasePath();
-          FileSystem fs = FSUtils.getFs(tableBasePath, hadoopConfiguration);
-          if (FSUtils.isTableExists(config.getBasePath(), fs)) {
+          HoodieStorage storage = new HoodieHadoopStorage(basePath, storageConf);
+          if (storage.exists(basePath)) {
             HoodieTableMetaClient metaClient =
                 HoodieTableMetaClient.builder()
-                    .setConf(hadoopConfiguration)
-                    .setBasePath(tableBasePath)
+                    .setConf(storageConf)
+                    .setBasePath(config.getBasePath())
                     .build();
             currentSchema =
                 new TableSchemaResolver(metaClient).getTableAvroSchemaFromLatestCommit(true);
diff --git a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/extensions/TestAddFieldIdsClientInitCallback.java b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/extensions/TestAddFieldIdsClientInitCallback.java
index e856d07a3..94c8d0b3d 100644
--- a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/extensions/TestAddFieldIdsClientInitCallback.java
+++ b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/extensions/TestAddFieldIdsClientInitCallback.java
@@ -57,6 +57,8 @@
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 
 import org.apache.xtable.hudi.idtracking.IdTracker;
 
@@ -66,6 +68,8 @@ public class TestAddFieldIdsClientInitCallback {
   private final IdTracker mockIdTracker = mock(IdTracker.class);
   private final AddFieldIdsClientInitCallback callback =
       new AddFieldIdsClientInitCallback(mockIdTracker);
+  private final StorageConfiguration> storageConfiguration =
+      new HadoopStorageConfiguration(new Configuration(false));
 
   @Test
   void nullSchemasIsNoOp() {
@@ -81,7 +85,7 @@ void noExistingTable() {
     Schema inputSchema = getSchemaStub(1);
     Schema updatedSchema = getSchemaStub(3);
 
-    HoodieEngineContext localEngineContext = new HoodieLocalEngineContext(new Configuration());
+    HoodieEngineContext localEngineContext = new HoodieLocalEngineContext(storageConfiguration);
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder()
             .withSchema(inputSchema.toString())
@@ -105,7 +109,7 @@ void existingTable() throws IOException {
     Schema inputSchema = getSchemaStub(2);
     Schema updatedSchema = getSchemaStub(3);
 
-    HoodieEngineContext localEngineContext = new HoodieJavaEngineContext(new Configuration());
+    HoodieEngineContext localEngineContext = new HoodieJavaEngineContext(storageConfiguration);
     String basePath = getTableBasePath();
     HoodieWriteConfig tableConfig =
         HoodieWriteConfig.newBuilder()
@@ -126,7 +130,7 @@ void existingTable() throws IOException {
       properties.setProperty(
           VERSION.key(), Integer.toString(HoodieTableVersion.current().versionCode()));
       HoodieTableMetaClient.initTableAndGetMetaClient(
-          localEngineContext.getHadoopConf().get(), basePath, properties);
+          localEngineContext.getStorageConf(), basePath, properties);
       String commit = hoodieJavaWriteClient.startCommit();
       GenericRecord genericRecord =
           new GenericRecordBuilder(existingSchema).set("id", "1").set("field", "value").build();
@@ -166,7 +170,7 @@ void writeSchemaOverrideProvided() {
     properties.setProperty(
         HoodieWriteConfig.WRITE_SCHEMA_OVERRIDE.key(), inputWriteSchema.toString());
 
-    HoodieEngineContext localEngineContext = new HoodieLocalEngineContext(new Configuration());
+    HoodieEngineContext localEngineContext = new HoodieLocalEngineContext(storageConfiguration);
     HoodieWriteConfig config =
         HoodieWriteConfig.newBuilder()
             .withSchema(inputSchema.toString())