Skip to content

Commit a4ecc5d

Browse files
committed
[CDAP-20910] Reuse the Spark ClassLoader in app-fabric
- For non local spark program execution purposes, we don’t need to isolate the Spark classloader, since there is no creation of the SparkContext object.
1 parent 1be1fa0 commit a4ecc5d

File tree

7 files changed

+459
-355
lines changed

7 files changed

+459
-355
lines changed

cdap-app-fabric/src/main/java/io/cdap/cdap/app/runtime/ProgramClassLoaderProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.cdap.cdap.internal.app.runtime.ProgramClassLoader;
2020

2121
/**
22-
* A provider for for program classloading creation.
22+
* A provider for program classloading creation.
2323
*/
2424
public interface ProgramClassLoaderProvider {
2525

cdap-app-fabric/src/main/java/io/cdap/cdap/app/runtime/ProgramRuntimeProvider.java

+13-17
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,22 @@
2424
import java.lang.annotation.RetentionPolicy;
2525
import java.lang.annotation.Target;
2626

27-
/**
28-
* Provider for runtime system of programs.
29-
*/
27+
/** Provider for runtime system of programs. */
3028
public interface ProgramRuntimeProvider {
3129

32-
/**
33-
* Annotation for implementation to specify what are the supported {@link ProgramType}.
34-
*/
30+
/** Annotation for implementation to specify what are the supported {@link ProgramType}. */
3531
@Retention(RetentionPolicy.RUNTIME)
3632
@Target(ElementType.TYPE)
3733
@interface SupportedProgramType {
3834

39-
/**
40-
* Returns the list of supported {@link ProgramType}.
41-
*/
35+
/** Returns the list of supported {@link ProgramType}. */
4236
ProgramType[] value();
4337
}
4438

45-
/**
46-
* The execution mode of the program runtime system.
47-
*/
39+
/** The execution mode of the program runtime system. */
4840
enum Mode {
49-
LOCAL, DISTRIBUTED
41+
LOCAL,
42+
DISTRIBUTED
5043
}
5144

5245
/**
@@ -70,12 +63,15 @@ enum Mode {
7063
boolean isSupported(ProgramType programType, CConfiguration cConf);
7164

7265
/**
73-
* Creates a ClassLoader for the given program type. This is useful if you need the program class
74-
* loader but do not need to run a program.
66+
* Returns a ClassLoader for the given program type. This is useful if you only need the runtime
67+
* classloader for the given program type, but not for program execution.
7568
*
76-
* @param cConf The configuration to use
7769
* @param programType The type of program
70+
* @param cConf The configuration to use
7871
* @return a {@link ClassLoader} for the given program runner
72+
* @throws UnsupportedOperationException if the given program type is not supported by this
73+
* provider. Caller can use the {@link #isSupported(ProgramType, CConfiguration)} method to
74+
* check.
7975
*/
80-
ClassLoader createProgramClassLoader(CConfiguration cConf, ProgramType programType);
76+
ClassLoader getRuntimeClassLoader(ProgramType programType, CConfiguration cConf);
8177
}

cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/runtime/artifact/ArtifactClassLoaderFactory.java

+46-45
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.cdap.cdap.internal.app.runtime.ProgramRuntimeProviderLoader;
3232
import io.cdap.cdap.proto.ProgramType;
3333
import io.cdap.cdap.security.impersonation.EntityImpersonator;
34-
import java.io.Closeable;
3534
import java.io.File;
3635
import java.util.Iterator;
3736
import javax.annotation.Nullable;
@@ -48,21 +47,21 @@ final class ArtifactClassLoaderFactory {
4847
private static final Logger LOG = LoggerFactory.getLogger(ArtifactClassLoaderFactory.class);
4948

5049
private final CConfiguration cConf;
51-
@Nullable
52-
private final ProgramRuntimeProviderLoader programRuntimeProviderLoader;
50+
@Nullable private final ProgramRuntimeProviderLoader programRuntimeProviderLoader;
5351
private final File tmpDir;
5452

5553
@VisibleForTesting
5654
ArtifactClassLoaderFactory(CConfiguration cConf) {
5755
this(cConf, null);
5856
}
5957

60-
ArtifactClassLoaderFactory(CConfiguration cConf,
61-
@Nullable ProgramRuntimeProviderLoader programRuntimeProviderLoader) {
58+
ArtifactClassLoaderFactory(
59+
CConfiguration cConf, @Nullable ProgramRuntimeProviderLoader programRuntimeProviderLoader) {
6260
this.cConf = cConf;
6361
this.programRuntimeProviderLoader = programRuntimeProviderLoader;
64-
this.tmpDir = new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR),
65-
cConf.get(Constants.AppFabric.TEMP_DIR)).getAbsoluteFile();
62+
this.tmpDir =
63+
new File(cConf.get(Constants.CFG_LOCAL_DATA_DIR), cConf.get(Constants.AppFabric.TEMP_DIR))
64+
.getAbsoluteFile();
6665
}
6766

6867
/**
@@ -77,35 +76,30 @@ final class ArtifactClassLoaderFactory {
7776
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
7877
*/
7978
CloseableClassLoader createClassLoader(File unpackDir) {
80-
ClassLoader sparkClassLoader = null;
79+
ClassLoader parentClassLoader = null;
8180
if (programRuntimeProviderLoader != null) {
8281
try {
8382
// Try to create a ProgramClassLoader from the Spark runtime system if it is available.
8483
// It is needed because we don't know what program types that an artifact might have.
8584
// TODO: CDAP-5613. We shouldn't always expose the Spark classes.
86-
sparkClassLoader = programRuntimeProviderLoader.get(ProgramType.SPARK)
87-
.createProgramClassLoader(cConf, ProgramType.SPARK);
85+
parentClassLoader =
86+
programRuntimeProviderLoader
87+
.get(ProgramType.SPARK)
88+
.getRuntimeClassLoader(ProgramType.SPARK, cConf);
8889
} catch (Exception e) {
8990
// If Spark is not supported, exception is expected. We'll use the default filter.
9091
LOG.warn("Spark is not supported. Not using ProgramClassLoader from Spark");
9192
LOG.trace("Failed to create spark program runner with error:", e);
9293
}
9394
}
94-
95-
ProgramClassLoader programClassLoader = null;
96-
if (sparkClassLoader != null) {
97-
programClassLoader = new ProgramClassLoader(cConf, unpackDir, sparkClassLoader);
98-
} else {
99-
programClassLoader = new ProgramClassLoader(cConf, unpackDir,
100-
FilterClassLoader.create(getClass().getClassLoader()));
95+
if (parentClassLoader == null) {
96+
parentClassLoader = FilterClassLoader.create(getClass().getClassLoader());
10197
}
10298

103-
final ClassLoader finalProgramClassLoader = programClassLoader;
104-
return new CloseableClassLoader(programClassLoader, () -> {
105-
if (finalProgramClassLoader instanceof Closeable) {
106-
Closeables.closeQuietly((Closeable) finalProgramClassLoader);
107-
}
108-
});
99+
ProgramClassLoader programClassLoader =
100+
new ProgramClassLoader(cConf, unpackDir, parentClassLoader);
101+
return new CloseableClassLoader(
102+
programClassLoader, () -> Closeables.closeQuietly(programClassLoader));
109103
}
110104

111105
/**
@@ -117,18 +111,22 @@ CloseableClassLoader createClassLoader(File unpackDir) {
117111
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
118112
* @see #createClassLoader(File)
119113
*/
120-
CloseableClassLoader createClassLoader(Location artifactLocation,
121-
EntityImpersonator entityImpersonator) {
114+
CloseableClassLoader createClassLoader(
115+
Location artifactLocation, EntityImpersonator entityImpersonator) {
122116
try {
123-
ClassLoaderFolder classLoaderFolder = entityImpersonator.impersonate(
124-
() -> BundleJarUtil.prepareClassLoaderFolder(artifactLocation,
125-
() -> DirUtils.createTempDir(tmpDir)));
117+
ClassLoaderFolder classLoaderFolder =
118+
entityImpersonator.impersonate(
119+
() ->
120+
BundleJarUtil.prepareClassLoaderFolder(
121+
artifactLocation, () -> DirUtils.createTempDir(tmpDir)));
126122

127123
CloseableClassLoader classLoader = createClassLoader(classLoaderFolder.getDir());
128-
return new CloseableClassLoader(classLoader, () -> {
129-
Closeables.closeQuietly(classLoader);
130-
Closeables.closeQuietly(classLoaderFolder);
131-
});
124+
return new CloseableClassLoader(
125+
classLoader,
126+
() -> {
127+
Closeables.closeQuietly(classLoader);
128+
Closeables.closeQuietly(classLoaderFolder);
129+
});
132130
} catch (Exception e) {
133131
throw Throwables.propagate(e);
134132
}
@@ -143,8 +141,8 @@ CloseableClassLoader createClassLoader(Location artifactLocation,
143141
* {@link ClassLoader}, all temporary resources created for the classloader will be removed
144142
* @see #createClassLoader(File)
145143
*/
146-
CloseableClassLoader createClassLoader(Iterator<Location> artifactLocations,
147-
EntityImpersonator entityImpersonator) {
144+
CloseableClassLoader createClassLoader(
145+
Iterator<Location> artifactLocations, EntityImpersonator entityImpersonator) {
148146
if (!artifactLocations.hasNext()) {
149147
throw new IllegalArgumentException("Cannot create a classloader without an artifact.");
150148
}
@@ -155,17 +153,20 @@ CloseableClassLoader createClassLoader(Iterator<Location> artifactLocations,
155153
}
156154

157155
try {
158-
ClassLoaderFolder classLoaderFolder = entityImpersonator.impersonate(
159-
() -> BundleJarUtil.prepareClassLoaderFolder(artifactLocation,
160-
() -> DirUtils.createTempDir(tmpDir)));
161-
162-
CloseableClassLoader parentClassLoader = createClassLoader(artifactLocations,
163-
entityImpersonator);
164-
return new CloseableClassLoader(new DirectoryClassLoader(classLoaderFolder.getDir(),
165-
parentClassLoader, "lib"), () -> {
166-
Closeables.closeQuietly(parentClassLoader);
167-
Closeables.closeQuietly(classLoaderFolder);
168-
});
156+
ClassLoaderFolder classLoaderFolder =
157+
entityImpersonator.impersonate(
158+
() ->
159+
BundleJarUtil.prepareClassLoaderFolder(
160+
artifactLocation, () -> DirUtils.createTempDir(tmpDir)));
161+
162+
CloseableClassLoader parentClassLoader =
163+
createClassLoader(artifactLocations, entityImpersonator);
164+
return new CloseableClassLoader(
165+
new DirectoryClassLoader(classLoaderFolder.getDir(), parentClassLoader, "lib"),
166+
() -> {
167+
Closeables.closeQuietly(parentClassLoader);
168+
Closeables.closeQuietly(classLoaderFolder);
169+
});
169170
} catch (Exception e) {
170171
throw Throwables.propagate(e);
171172
}

cdap-app-fabric/src/main/java/io/cdap/cdap/master/environment/DefaultMasterEnvironmentRunnableContext.java

+18-19
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@
3434
import org.apache.twill.filesystem.LocationFactory;
3535
import org.apache.twill.internal.utils.Instances;
3636

37-
/**
38-
* Default implementation of {@link MasterEnvironmentRunnableContext}.
39-
*/
37+
/** Default implementation of {@link MasterEnvironmentRunnableContext}. */
4038
public class DefaultMasterEnvironmentRunnableContext implements MasterEnvironmentRunnableContext {
4139

4240
private final LocationFactory locationFactory;
@@ -46,13 +44,14 @@ public class DefaultMasterEnvironmentRunnableContext implements MasterEnvironmen
4644

4745
private ClassLoader extensionCombinedClassLoader;
4846

49-
public DefaultMasterEnvironmentRunnableContext(LocationFactory locationFactory,
47+
public DefaultMasterEnvironmentRunnableContext(
48+
LocationFactory locationFactory,
5049
RemoteClientFactory remoteClientFactory,
5150
CConfiguration cConf) {
5251
this.locationFactory = locationFactory;
53-
this.remoteClient = remoteClientFactory.createRemoteClient(
54-
Constants.Service.APP_FABRIC_HTTP,
55-
new DefaultHttpRequestConfig(false), "");
52+
this.remoteClient =
53+
remoteClientFactory.createRemoteClient(
54+
Constants.Service.APP_FABRIC_HTTP, new DefaultHttpRequestConfig(false), "");
5655
this.cConf = cConf;
5756
this.programRuntimeProviderLoader = new ProgramRuntimeProviderLoader(cConf);
5857
this.extensionCombinedClassLoader = null;
@@ -63,9 +62,7 @@ public LocationFactory getLocationFactory() {
6362
return locationFactory;
6463
}
6564

66-
/**
67-
* Opens a {@link HttpURLConnection} for the given resource path.
68-
*/
65+
/** Opens a {@link HttpURLConnection} for the given resource path. */
6966
@Override
7067
public HttpURLConnection openHttpURLConnection(String resource) throws IOException {
7168
return remoteClient.openConnection(resource);
@@ -79,20 +76,22 @@ public TwillRunnable instantiateTwillRunnable(String className) {
7976
} catch (ClassNotFoundException e) {
8077
// Try loading the class from the runtime extensions.
8178
if (extensionCombinedClassLoader == null) {
82-
Map<ProgramType, ProgramRuntimeProvider> classLoaderProviderMap = programRuntimeProviderLoader.getAll();
83-
extensionCombinedClassLoader = new CombineClassLoader(getClass().getClassLoader(),
84-
classLoaderProviderMap.entrySet().stream()
85-
.map(entry -> entry.getValue()
86-
.createProgramClassLoader(cConf, entry.getKey()))
87-
.collect(Collectors.toList())
88-
.toArray(new ClassLoader[0]));
79+
Map<ProgramType, ProgramRuntimeProvider> classLoaderProviderMap =
80+
programRuntimeProviderLoader.getAll();
81+
extensionCombinedClassLoader =
82+
new CombineClassLoader(
83+
getClass().getClassLoader(),
84+
classLoaderProviderMap.entrySet().stream()
85+
.map(entry -> entry.getValue().getRuntimeClassLoader(entry.getKey(), cConf))
86+
.collect(Collectors.toList()));
8987
}
9088
try {
9189
runnableClass = extensionCombinedClassLoader.loadClass(className);
9290
} catch (ClassNotFoundException cnfe) {
9391
throw new RuntimeException(
94-
String.format("Failed to load twill runnable class from runtime extension '%s'",
95-
className), cnfe);
92+
String.format(
93+
"Failed to load twill runnable class from runtime extension '%s'", className),
94+
cnfe);
9695
}
9796
}
9897
if (!TwillRunnable.class.isAssignableFrom(runnableClass)) {

0 commit comments

Comments
 (0)