diff --git a/src/main/java/net/snowflake/client/jdbc/ErrorCode.java b/src/main/java/net/snowflake/client/jdbc/ErrorCode.java index 2f7ac2796..09fefbd95 100644 --- a/src/main/java/net/snowflake/client/jdbc/ErrorCode.java +++ b/src/main/java/net/snowflake/client/jdbc/ErrorCode.java @@ -85,7 +85,9 @@ public enum ErrorCode { AUTHENTICATOR_REQUEST_TIMEOUT(200062, SqlState.CONNECTION_EXCEPTION), INVALID_STRUCT_DATA(200063, SqlState.DATA_EXCEPTION), DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE), - TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM(200065, SqlState.DATA_EXCEPTION); + TOO_MANY_FILES_TO_DOWNLOAD_AS_STREAM(200065, SqlState.DATA_EXCEPTION), + FILE_OPERATION_UPLOAD_ERROR(200066, SqlState.INTERNAL_ERROR), + FILE_OPERATION_DOWNLOAD_ERROR(200067, SqlState.INTERNAL_ERROR); public static final String errorMessageResource = "net.snowflake.client.jdbc.jdbc_error_messages"; diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java index f515ef691..1c5b009b6 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeFileTransferAgent.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -1667,7 +1668,7 @@ private void uploadStream() throws SnowflakeSQLException { queryID, ex.getCause(), SqlState.INTERNAL_ERROR, - ErrorCode.INTERNAL_ERROR.getMessageCode()); + ErrorCode.FILE_OPERATION_UPLOAD_ERROR.getMessageCode()); } logger.debug("Done with uploading from a stream"); } finally { @@ -1752,6 +1753,7 @@ private void downloadFiles() throws SnowflakeSQLException { try { threadExecutor = SnowflakeUtil.createDefaultExecutorService("sf-file-download-worker-", 1); + List> downloadFileFutures = new LinkedList<>(); for (String srcFile : sourceFiles) { FileMetadata fileMetadata = fileMetadataMap.get(srcFile); @@ -1768,21 +1770,22 @@ private void downloadFiles() throws SnowflakeSQLException { RemoteStoreFileEncryptionMaterial encMat = srcFileToEncMat.get(srcFile); String presignedUrl = srcFileToPresignedUrl.get(srcFile); - threadExecutor.submit( - getDownloadFileCallable( - stageInfo, - srcFile, - localLocation, - fileMetadataMap, - (stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS) - ? null - : storageFactory.createClient(stageInfo, parallel, encMat, session), - session, - command, - parallel, - encMat, - presignedUrl, - queryID)); + downloadFileFutures.add( + threadExecutor.submit( + getDownloadFileCallable( + stageInfo, + srcFile, + localLocation, + fileMetadataMap, + (stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS) + ? null + : storageFactory.createClient(stageInfo, parallel, encMat, session), + session, + command, + parallel, + encMat, + presignedUrl, + queryID))); logger.debug("Submitted download job for: {}", srcFile); } @@ -1792,9 +1795,20 @@ private void downloadFiles() throws SnowflakeSQLException { try { // wait for all threads to complete without timeout threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + for (Future downloadFileFuture : downloadFileFutures) { + if (downloadFileFuture.isDone()) { + downloadFileFuture.get(); + } + } } catch (InterruptedException ex) { throw new SnowflakeSQLLoggedException( queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); + } catch (ExecutionException ex) { + throw new SnowflakeSQLException( + queryID, + ex.getCause(), + SqlState.INTERNAL_ERROR, + ErrorCode.FILE_OPERATION_DOWNLOAD_ERROR.getMessageCode()); } logger.debug("Done with downloading"); } finally { @@ -1835,6 +1849,7 @@ private void uploadFiles(Set fileList, int parallel) throws SnowflakeSQL threadExecutor = SnowflakeUtil.createDefaultExecutorService("sf-file-upload-worker-", parallel); + List> uploadFileFutures = new LinkedList<>(); for (String srcFile : fileList) { FileMetadata fileMetadata = fileMetadataMap.get(srcFile); @@ -1862,23 +1877,24 @@ private void uploadFiles(Set fileList, int parallel) throws SnowflakeSQL int delay = session.getInjectWaitInPut(); setUploadDelay(delay); - threadExecutor.submit( - getUploadFileCallable( - stageInfo, - srcFile, - fileMetadata, - (stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS) - ? null - : storageFactory.createClient( - stageInfo, parallel, encryptionMaterial.get(0), session), - session, - command, - null, - false, - (parallel > 1 ? 1 : this.parallel), - srcFileObj, - encryptionMaterial.get(0), - queryID)); + uploadFileFutures.add( + threadExecutor.submit( + getUploadFileCallable( + stageInfo, + srcFile, + fileMetadata, + (stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS) + ? null + : storageFactory.createClient( + stageInfo, parallel, encryptionMaterial.get(0), session), + session, + command, + null, + false, + (parallel > 1 ? 1 : this.parallel), + srcFileObj, + encryptionMaterial.get(0), + queryID))); logger.debug("Submitted copy job for: {}", srcFile); } @@ -1889,9 +1905,20 @@ private void uploadFiles(Set fileList, int parallel) throws SnowflakeSQL try { // wait for all threads to complete without timeout threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + for (Future uploadFileFuture : uploadFileFutures) { + if (uploadFileFuture.isDone()) { + uploadFileFuture.get(); + } + } } catch (InterruptedException ex) { throw new SnowflakeSQLLoggedException( queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED); + } catch (ExecutionException ex) { + throw new SnowflakeSQLException( + queryID, + ex.getCause(), + SqlState.INTERNAL_ERROR, + ErrorCode.FILE_OPERATION_UPLOAD_ERROR.getMessageCode()); } logger.debug("Done with uploading"); diff --git a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java index 6e57d3965..bb10bede6 100644 --- a/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java +++ b/src/main/java/net/snowflake/client/jdbc/cloud/storage/SnowflakeAzureClient.java @@ -352,13 +352,21 @@ public void download( // Get the user-defined BLOB metadata Map userDefinedMetadata = SnowflakeUtil.createCaseInsensitiveMap(blob.getMetadata()); - AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); - - String key = encryptionData.getKey(); - String iv = encryptionData.getValue(); if (this.isEncrypting() && this.getEncryptionKeySize() <= 256) { + if (!userDefinedMetadata.containsKey(AZ_ENCRYPTIONDATAPROP)) { + throw new SnowflakeSQLLoggedException( + queryId, + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Encryption data not found in the metadata of a file being downloaded"); + } + AbstractMap.SimpleEntry encryptionData = + parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); + + String key = encryptionData.getKey(); + String iv = encryptionData.getValue(); stopwatch.restart(); if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( @@ -452,12 +460,20 @@ public InputStream downloadToStream( long downloadMillis = stopwatch.elapsedMillis(); Map userDefinedMetadata = SnowflakeUtil.createCaseInsensitiveMap(blob.getMetadata()); - AbstractMap.SimpleEntry encryptionData = - parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); - String key = encryptionData.getKey(); - String iv = encryptionData.getValue(); if (this.isEncrypting() && this.getEncryptionKeySize() <= 256) { + if (!userDefinedMetadata.containsKey(AZ_ENCRYPTIONDATAPROP)) { + throw new SnowflakeSQLLoggedException( + queryId, + session, + ErrorCode.INTERNAL_ERROR.getMessageCode(), + SqlState.INTERNAL_ERROR, + "Encryption data not found in the metadata of a file being downloaded"); + } + AbstractMap.SimpleEntry encryptionData = + parseEncryptionData(userDefinedMetadata.get(AZ_ENCRYPTIONDATAPROP), queryId); + String key = encryptionData.getKey(); + String iv = encryptionData.getValue(); stopwatch.restart(); if (key == null || iv == null) { throw new SnowflakeSQLLoggedException( diff --git a/src/main/resources/net/snowflake/client/jdbc/jdbc_error_messages.properties b/src/main/resources/net/snowflake/client/jdbc/jdbc_error_messages.properties index 3b68fbf69..5e59df8dc 100644 --- a/src/main/resources/net/snowflake/client/jdbc/jdbc_error_messages.properties +++ b/src/main/resources/net/snowflake/client/jdbc/jdbc_error_messages.properties @@ -82,4 +82,7 @@ Error message={3}, Extended error info={4} 200061=GCS operation failed: Operation={0}, Error code={1}, Message={2}, Reason={3} 200062=Authentication timed out. 200063=Invalid data - Cannot be parsed and converted to structured type. - +200064=The values for 'disableOCSPChecks' and 'insecureMode' must be identical. +200065=Too many files to download as stream +200066=JDBC driver file operation error while performing stage upload. +200067=JDBC driver file operation error while performing stage download. diff --git a/src/test/java/net/snowflake/client/jdbc/FileUploaderLatestIT.java b/src/test/java/net/snowflake/client/jdbc/FileUploaderLatestIT.java index a116a794b..bd2d9a5bf 100644 --- a/src/test/java/net/snowflake/client/jdbc/FileUploaderLatestIT.java +++ b/src/test/java/net/snowflake/client/jdbc/FileUploaderLatestIT.java @@ -4,9 +4,10 @@ package net.snowflake.client.jdbc; import static net.snowflake.client.jdbc.SnowflakeUtil.systemGetProperty; -import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -174,13 +175,11 @@ public void testGetObjectMetadataFileNotFoundWithGCS() throws Exception { int idx = location.indexOf('/'); String remoteStageLocation = location.substring(0, idx); String path = location.substring(idx + 1) + "wrong_file.csv.gz"; - client.getObjectMetadata(remoteStageLocation, path); - fail("should raise exception"); - } catch (Exception ex) { - assertTrue( - ex instanceof StorageProviderException, - "Wrong type of exception. Message: " + ex.getMessage()); - assertTrue(ex.getMessage().matches(".*Blob.*not found in bucket.*")); + StorageProviderException thrown = + assertThrows( + StorageProviderException.class, + () -> client.getObjectMetadata(remoteStageLocation, path)); + assertTrue(thrown.getMessage().matches(".*Blob.*not found in bucket.*")); } finally { statement.execute("DROP STAGE if exists " + OBJ_META_STAGE); } @@ -212,13 +211,11 @@ public void testGetObjectMetadataStorageExceptionWithGCS() throws Exception { String location = info.getLocation(); int idx = location.indexOf('/'); String remoteStageLocation = location.substring(0, idx); - client.getObjectMetadata(remoteStageLocation, ""); - fail("should raise exception"); - } catch (Exception ex) { - assertTrue( - ex instanceof StorageProviderException, - "Wrong type of exception. Message: " + ex.getMessage()); - assertTrue(ex.getMessage().matches(".*Permission.*denied.*")); + StorageProviderException thrown = + assertThrows( + StorageProviderException.class, + () -> client.getObjectMetadata(remoteStageLocation, "")); + assertTrue(thrown.getMessage().matches(".*Permission.*denied.*")); } finally { statement.execute("DROP STAGE if exists " + OBJ_META_STAGE); } @@ -248,12 +245,13 @@ public void testNullCommand() throws SQLException { try { statement.execute("create or replace stage testStage"); SFSession sfSession = con.unwrap(SnowflakeConnectionV1.class).getSfSession(); - SnowflakeFileTransferAgent sfAgent = - new SnowflakeFileTransferAgent(null, sfSession, new SFStatement(sfSession)); - } catch (SnowflakeSQLException err) { - assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), err.getErrorCode()); + SnowflakeSQLException thrown = + assertThrows( + SnowflakeSQLException.class, + () -> new SnowflakeFileTransferAgent(null, sfSession, new SFStatement(sfSession))); assertTrue( - err.getMessage() + thrown + .getMessage() .contains("JDBC driver internal error: Missing sql for statement execution")); } finally { statement.execute("drop stage if exists testStage"); @@ -281,20 +279,24 @@ public void testCompressStreamWithGzipException() throws Exception { String srcPath = getFullPathFileInResource(TEST_DATA_FILE); InputStream inputStream = new FileInputStream(srcPath); - SnowflakeFileTransferAgent.uploadWithoutConnection( - SnowflakeFileTransferConfig.Builder.newInstance() - .setSnowflakeFileTransferMetadata(metadata) - .setUploadStream(inputStream) - .setRequireCompress(true) - .setNetworkTimeoutInMilli(0) - .setOcspMode(OCSPMode.FAIL_OPEN) - .setSFSession(sfSession) - .setCommand(PUT_COMMAND) - .build()); - } catch (SnowflakeSQLException err) { - assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), err.getErrorCode()); + SnowflakeSQLException thrown = + assertThrows( + SnowflakeSQLException.class, + () -> + SnowflakeFileTransferAgent.uploadWithoutConnection( + SnowflakeFileTransferConfig.Builder.newInstance() + .setSnowflakeFileTransferMetadata(metadata) + .setUploadStream(inputStream) + .setRequireCompress(true) + .setNetworkTimeoutInMilli(0) + .setOcspMode(OCSPMode.FAIL_OPEN) + .setSFSession(sfSession) + .setCommand(PUT_COMMAND) + .build())); + assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), thrown.getErrorCode()); assertTrue( - err.getMessage() + thrown + .getMessage() .contains("JDBC driver internal error: error encountered for compression")); } finally { statement.execute("DROP STAGE if exists testStage"); @@ -325,20 +327,24 @@ public void testCompressStreamWithGzipNoDigestException() throws Exception { String srcPath = getFullPathFileInResource(TEST_DATA_FILE); InputStream inputStream = new FileInputStream(srcPath); - SnowflakeFileTransferAgent.uploadWithoutConnection( - SnowflakeFileTransferConfig.Builder.newInstance() - .setSnowflakeFileTransferMetadata(metadata) - .setUploadStream(inputStream) - .setRequireCompress(true) - .setNetworkTimeoutInMilli(0) - .setOcspMode(OCSPMode.FAIL_OPEN) - .setSFSession(sfSession) - .setCommand(PUT_COMMAND) - .build()); - } catch (SnowflakeSQLException err) { - assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), err.getErrorCode()); + SnowflakeSQLException thrown = + assertThrows( + SnowflakeSQLException.class, + () -> + SnowflakeFileTransferAgent.uploadWithoutConnection( + SnowflakeFileTransferConfig.Builder.newInstance() + .setSnowflakeFileTransferMetadata(metadata) + .setUploadStream(inputStream) + .setRequireCompress(true) + .setNetworkTimeoutInMilli(0) + .setOcspMode(OCSPMode.FAIL_OPEN) + .setSFSession(sfSession) + .setCommand(PUT_COMMAND) + .build())); + assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), thrown.getErrorCode()); assertTrue( - err.getMessage() + thrown + .getMessage() .contains("JDBC driver internal error: error encountered for compression")); } finally { statement.execute("DROP STAGE if exists testStage"); @@ -369,19 +375,23 @@ public void testUploadWithoutConnectionException() throws Exception { String srcPath = getFullPathFileInResource(TEST_DATA_FILE); InputStream inputStream = new FileInputStream(srcPath); - SnowflakeFileTransferAgent.uploadWithoutConnection( - SnowflakeFileTransferConfig.Builder.newInstance() - .setSnowflakeFileTransferMetadata(metadata) - .setUploadStream(inputStream) - .setRequireCompress(true) - .setNetworkTimeoutInMilli(0) - .setOcspMode(OCSPMode.FAIL_OPEN) - .setSFSession(sfSession) - .setCommand(PUT_COMMAND) - .build()); - } catch (Exception err) { + Exception thrown = + assertThrows( + Exception.class, + () -> + SnowflakeFileTransferAgent.uploadWithoutConnection( + SnowflakeFileTransferConfig.Builder.newInstance() + .setSnowflakeFileTransferMetadata(metadata) + .setUploadStream(inputStream) + .setRequireCompress(true) + .setNetworkTimeoutInMilli(0) + .setOcspMode(OCSPMode.FAIL_OPEN) + .setSFSession(sfSession) + .setCommand(PUT_COMMAND) + .build())); assertTrue( - err.getMessage() + thrown + .getMessage() .contains( "Exception encountered during file upload: failed to push to remote store")); } finally { @@ -401,9 +411,8 @@ public void testInitFileMetadataFileNotFound() throws Exception { SnowflakeFileTransferAgent sfAgent = new SnowflakeFileTransferAgent(PUT_COMMAND, sfSession, new SFStatement(sfSession)); - sfAgent.execute(); - } catch (SnowflakeSQLException err) { - assertEquals(200008, err.getErrorCode()); + SnowflakeSQLException thrown = assertThrows(SnowflakeSQLException.class, sfAgent::execute); + assertEquals(ErrorCode.FILE_NOT_FOUND.getMessageCode(), thrown.getErrorCode()); } finally { statement.execute("DROP STAGE if exists testStage"); } @@ -422,9 +431,8 @@ public void testInitFileMetadataFileIsDirectory() throws Exception { String command = "put file://" + srcPath + " @testStage"; SnowflakeFileTransferAgent sfAgent = new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession)); - sfAgent.execute(); - } catch (SnowflakeSQLException err) { - assertEquals(200009, err.getErrorCode()); + SnowflakeSQLException thrown = assertThrows(SnowflakeSQLException.class, sfAgent::execute); + assertEquals(ErrorCode.FILE_IS_DIRECTORY.getMessageCode(), thrown.getErrorCode()); } finally { statement.execute("DROP STAGE if exists testStage"); } @@ -445,10 +453,10 @@ public void testCompareAndSkipFilesException() throws Exception { SnowflakeFileTransferAgent sfAgent = new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession)); - sfAgent.execute(); - } catch (SnowflakeSQLException err) { - assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), err.getErrorCode()); - assertTrue(err.getMessage().contains("Error reading:")); + SnowflakeSQLException thrown = assertThrows(SnowflakeSQLException.class, sfAgent::execute); + assertEquals( + (long) ErrorCode.FILE_OPERATION_UPLOAD_ERROR.getMessageCode(), thrown.getErrorCode()); + assertInstanceOf(NoSuchAlgorithmException.class, thrown.getCause().getCause()); } finally { statement.execute("DROP STAGE if exists testStage"); } @@ -466,12 +474,14 @@ public void testParseCommandException() throws SQLException { try { statement.execute("create or replace stage testStage"); SFSession sfSession = con.unwrap(SnowflakeConnectionV1.class).getSfSession(); - SnowflakeFileTransferAgent sfAgent = - new SnowflakeFileTransferAgent(PUT_COMMAND, sfSession, new SFStatement(sfSession)); - - } catch (SnowflakeSQLException err) { - assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), err.getErrorCode()); - assertTrue(err.getMessage().contains("Failed to parse the locations")); + SnowflakeSQLException thrown = + assertThrows( + SnowflakeSQLException.class, + () -> + new SnowflakeFileTransferAgent( + PUT_COMMAND, sfSession, new SFStatement(sfSession))); + assertEquals((long) ErrorCode.INTERNAL_ERROR.getMessageCode(), thrown.getErrorCode()); + assertTrue(thrown.getMessage().contains("Failed to parse the locations")); } finally { statement.execute("DROP STAGE if exists testStage"); } @@ -530,10 +540,9 @@ public void testListObjectsStorageException() throws Exception { SnowflakeFileTransferAgent sfAgent = new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession)); - sfAgent.execute(); - } catch (SnowflakeSQLException err) { - assertEquals(200016, err.getErrorCode()); - assertTrue(err.getMessage().contains("Encountered exception during listObjects")); + SnowflakeSQLException thrown = assertThrows(SnowflakeSQLException.class, sfAgent::execute); + assertEquals(ErrorCode.IO_ERROR.getMessageCode(), thrown.getErrorCode()); + assertTrue(thrown.getMessage().contains("Encountered exception during listObjects")); } finally { statement.execute("DROP STAGE if exists testStage"); } @@ -555,13 +564,19 @@ public void testUploadStreamInterruptedException() throws IOException, SQLExcept outputStream.flush(); // upload the data to user stage under testUploadStream with name hello.txt - connection - .unwrap(SnowflakeConnection.class) - .uploadStream( - "~", DEST_PREFIX, outputStream.asByteSource().openStream(), "hello.txt", false); - - } catch (SnowflakeSQLLoggedException err) { - assertEquals(200003, err.getErrorCode()); + SnowflakeSQLException thrown = + assertThrows( + SnowflakeSQLException.class, + () -> + connection + .unwrap(SnowflakeConnection.class) + .uploadStream( + "~", + DEST_PREFIX, + outputStream.asByteSource().openStream(), + "hello.txt", + false)); + assertEquals(ErrorCode.INTERRUPTED.getMessageCode(), thrown.getErrorCode()); } finally { statement.execute("rm @~/" + DEST_PREFIX); } @@ -632,9 +647,8 @@ public void testUploadFileCallableFileNotFound() throws Exception { String command = "PUT file://" + getFullPathFileInResource(TEST_DATA_FILE) + " @testStage"; SnowflakeFileTransferAgent sfAgent = new SnowflakeFileTransferAgent(command, sfSession, new SFStatement(sfSession)); - sfAgent.execute(); - } catch (Exception err) { - assertEquals(err.getCause(), instanceOf(FileNotFoundException.class)); + Exception thrown = assertThrows(Exception.class, sfAgent::execute); + assertInstanceOf(FileNotFoundException.class, thrown.getCause()); } finally { statement.execute("DROP STAGE if exists testStage"); }