Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1886186 - gather threadExecutor callables and call Future.get() to prevent silent fails #2035

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/main/java/net/snowflake/client/jdbc/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ public enum ErrorCode {
GCP_SERVICE_ERROR(200061, SqlState.SYSTEM_ERROR),
AUTHENTICATOR_REQUEST_TIMEOUT(200062, SqlState.CONNECTION_EXCEPTION),
INVALID_STRUCT_DATA(200063, SqlState.DATA_EXCEPTION),
DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE);
DISABLEOCSP_INSECUREMODE_VALUE_MISMATCH(200064, SqlState.INVALID_PARAMETER_VALUE),

FILE_OPERATION_UPLOAD_ERROR(200065, SqlState.INTERNAL_ERROR),
FILE_OPERATION_DOWNLOAD_ERROR(200066, SqlState.INTERNAL_ERROR);

public static final String errorMessageResource = "net.snowflake.client.jdbc.jdbc_error_messages";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -1663,11 +1664,12 @@ private void uploadStream() throws SnowflakeSQLException {
throw new SnowflakeSQLLoggedException(
queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
} catch (ExecutionException ex) {
logger.error("Exception encountered within threadExecutor flow in uploadStream");
sfc-gh-mkubik marked this conversation as resolved.
Show resolved Hide resolved
throw new SnowflakeSQLException(
queryID,
ex.getCause(),
SqlState.INTERNAL_ERROR,
ErrorCode.INTERNAL_ERROR.getMessageCode());
ErrorCode.FILE_OPERATION_UPLOAD_ERROR.getMessageCode());
}
logger.debug("Done with uploading from a stream");
} finally {
Expand Down Expand Up @@ -1740,6 +1742,7 @@ private void downloadFiles() throws SnowflakeSQLException {
try {
threadExecutor = SnowflakeUtil.createDefaultExecutorService("sf-file-download-worker-", 1);

List<Future<Void>> downloadFileFutures = new LinkedList<>();
for (String srcFile : sourceFiles) {
FileMetadata fileMetadata = fileMetadataMap.get(srcFile);

Expand All @@ -1756,21 +1759,22 @@ private void downloadFiles() throws SnowflakeSQLException {

RemoteStoreFileEncryptionMaterial encMat = srcFileToEncMat.get(srcFile);
String presignedUrl = srcFileToPresignedUrl.get(srcFile);
threadExecutor.submit(
getDownloadFileCallable(
stageInfo,
srcFile,
localLocation,
fileMetadataMap,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(stageInfo, parallel, encMat, session),
session,
command,
parallel,
encMat,
presignedUrl,
queryID));
downloadFileFutures.add(
threadExecutor.submit(
getDownloadFileCallable(
stageInfo,
srcFile,
localLocation,
fileMetadataMap,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(stageInfo, parallel, encMat, session),
session,
command,
parallel,
encMat,
presignedUrl,
queryID)));

logger.debug("Submitted download job for: {}", srcFile);
}
Expand All @@ -1780,9 +1784,21 @@ private void downloadFiles() throws SnowflakeSQLException {
try {
// wait for all threads to complete without timeout
threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
for (Future<Void> downloadFileFuture : downloadFileFutures) {
sfc-gh-mkubik marked this conversation as resolved.
Show resolved Hide resolved
if (downloadFileFuture.isDone()) {
downloadFileFuture.get();
}
}
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
} catch (ExecutionException ex) {
sfc-gh-mkubik marked this conversation as resolved.
Show resolved Hide resolved
sfc-gh-mkubik marked this conversation as resolved.
Show resolved Hide resolved
logger.error("Exception encountered within threadExecutor flow in downloadFiles");
sfc-gh-mkubik marked this conversation as resolved.
Show resolved Hide resolved
throw new SnowflakeSQLException(
queryID,
ex.getCause(),
SqlState.INTERNAL_ERROR,
ErrorCode.FILE_OPERATION_DOWNLOAD_ERROR.getMessageCode());
}
logger.debug("Done with downloading");
} finally {
Expand Down Expand Up @@ -1823,6 +1839,7 @@ private void uploadFiles(Set<String> fileList, int parallel) throws SnowflakeSQL
threadExecutor =
SnowflakeUtil.createDefaultExecutorService("sf-file-upload-worker-", parallel);

List<Future<Void>> uploadFileFutures = new LinkedList<>();
for (String srcFile : fileList) {
FileMetadata fileMetadata = fileMetadataMap.get(srcFile);

Expand Down Expand Up @@ -1850,23 +1867,24 @@ private void uploadFiles(Set<String> fileList, int parallel) throws SnowflakeSQL
int delay = session.getInjectWaitInPut();
setUploadDelay(delay);

threadExecutor.submit(
getUploadFileCallable(
stageInfo,
srcFile,
fileMetadata,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(
stageInfo, parallel, encryptionMaterial.get(0), session),
session,
command,
null,
false,
(parallel > 1 ? 1 : this.parallel),
srcFileObj,
encryptionMaterial.get(0),
queryID));
uploadFileFutures.add(
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
threadExecutor.submit(
getUploadFileCallable(
stageInfo,
srcFile,
fileMetadata,
(stageInfo.getStageType() == StageInfo.StageType.LOCAL_FS)
? null
: storageFactory.createClient(
stageInfo, parallel, encryptionMaterial.get(0), session),
session,
command,
null,
false,
(parallel > 1 ? 1 : this.parallel),
srcFileObj,
encryptionMaterial.get(0),
queryID)));

logger.debug("Submitted copy job for: {}", srcFile);
}
Expand All @@ -1877,9 +1895,21 @@ private void uploadFiles(Set<String> fileList, int parallel) throws SnowflakeSQL
try {
// wait for all threads to complete without timeout
threadExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
for (Future<Void> uploadFileFuture : uploadFileFutures) {
if (uploadFileFuture.isDone()) {
uploadFileFuture.get();
}
}
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryID, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
} catch (ExecutionException ex) {
logger.error("Exception encountered within threadExecutor flow in uploadFiles");
sfc-gh-mkubik marked this conversation as resolved.
Show resolved Hide resolved
throw new SnowflakeSQLException(
queryID,
ex.getCause(),
SqlState.INTERNAL_ERROR,
ErrorCode.FILE_OPERATION_UPLOAD_ERROR.getMessageCode());
}
logger.debug("Done with uploading");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ Error message={3}, Extended error info={4}
200061=GCS operation failed: Operation={0}, Error code={1}, Message={2}, Reason={3}
200062=Authentication timed out.
200063=Invalid data - Cannot be parsed and converted to structured type.

200064=The values for 'disableOCSPChecks' and 'insecureMode' must be identical.
200065=JDBC driver file operation error while performing stage upload.
200066=JDBC driver file operation error while performing stage download.
Loading
Loading