Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

/**
* Allows to configure how the staging files are selected.
* TODO: allow to select in parent classloaders as well as current one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment around the order in which the filters are applied.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO should really be in PipelineResources.

*/
public interface FilterableStagingFilesPipelineOptions extends PipelineOptions {
/**
* The filter to use to select the files from the classloader to keep
* when staging the classloader.
* The regex is applied on the file name.
*/
@Description("Include regex for file staging.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be explicit in this description since this appears in the output of --help.
Ditto on the exclude filter.

String getClassLoaderIncludeFilter();
void setClassLoaderIncludeFilter(String value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just accept a list to start since its likely people will want to be able to specify more then one include and exclude instead of writing a giant regex or filter?


/**
* The filter to use to not select the files from the classloader
* when staging the classloader.
* The regex is applied on the file name.
*/
@Description("Exclude regex for file staging.")
String getClassLoaderExcludeFilter();
void setClassLoaderExcludeFilter(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,85 @@
*/
package org.apache.beam.runners.core.construction;

import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

import java.io.File;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.beam.runners.core.construction.classloader.Classloaders;
import org.apache.beam.sdk.options.PipelineOptions;

/** Utilities for working with classpath resources for pipelines. */
/**
* Utilities for working with classpath resources for pipelines.
*/
public class PipelineResources {

/**
* Attempts to detect all the resources the class loader has access to. This does not recurse
* to class loader parents stopping it from pulling in resources from the system class loader.
*
* @param classLoader The URLClassLoader to use to detect resources to stage.
* @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
* of the resources the class loader exposes is not a file resource.
* @return A list of absolute paths to the resources the class loader uses.
*/
public static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) {
if (!(classLoader instanceof URLClassLoader)) {
String message = String.format("Unable to use ClassLoader to detect classpath elements. "
+ "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader);
throw new IllegalArgumentException(message);
/**
* Attempts to detect all the resources the class loader has access to. This does not recurse
* to class loader parents stopping it from pulling in resources from the system class loader.
*
* @param options The pipelineOptions to use to filter the files found.
* @param classLoader The URLClassLoader to use to detect resources to stage.
* @return A list of absolute paths to the resources the class loader uses.
* @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one
* of the resources the class loader exposes is not a file resource.
*/
public static List<String> detectClassPathResourcesToStage(final PipelineOptions options,
final ClassLoader classLoader) {
try {
final Set<File> parentFiles = classLoader == null
? emptySet() : Classloaders.toFiles(classLoader.getParent()).collect(toSet());
final Predicate<File> filter = toFilter(options);
return Classloaders.toFiles(classLoader)
.filter(s -> !parentFiles.contains(s))
.filter(filter)
.map(f -> {
try {
return f.getCanonicalPath();
} catch (IOException e) {
return f.getAbsolutePath();
}
})
.collect(toList());
} catch (final IOException e) {
throw new IllegalStateException(e);
}
}

private static Predicate<File> toFilter(final PipelineOptions options) {
if (options == null) {
return f -> true;
}
final FilterableStagingFilesPipelineOptions opts = options
.as(FilterableStagingFilesPipelineOptions.class);
final String include = opts.getClassLoaderIncludeFilter();
final String exclude = opts.getClassLoaderExcludeFilter();
if (include == null && exclude == null) {
return f -> true;
}
final Predicate<String> includeFilter = include == null
? v -> true : new PatternFilter(include);
final Predicate<String> excludeFilter = exclude == null
? v -> false : new PatternFilter(exclude);
final Predicate<String> predicate = includeFilter.and(excludeFilter.negate());
return f -> predicate.test(f.getName());
}

List<String> files = new ArrayList<>();
for (URL url : ((URLClassLoader) classLoader).getURLs()) {
try {
files.add(new File(url.toURI()).getAbsolutePath());
} catch (IllegalArgumentException | URISyntaxException e) {
String message = String.format("Unable to convert url (%s) to file.", url);
throw new IllegalArgumentException(message, e);
}
private static class PatternFilter implements Predicate<String> {
private final Pattern pattern;

private PatternFilter(final String include) {
this.pattern = Pattern.compile(include);
}

@Override
public boolean test(final String value) {
return pattern.matcher(value).matches();
}
}
return files;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction.classloader;

import static java.util.Collections.list;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.stream.Stream;

/**
* inspired from xbean, ensures the portability of the "scanning" and the impl independent logic.
*/
public final class Classloaders {
private static final ClassLoader SYSTEM = ClassLoader.getSystemClassLoader();

public static Stream<File> toFiles(final ClassLoader classLoader) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets hide all this logic and make it private within PipelineResources.

if (classLoader == null) {
return Stream.empty();
}
return Stream.concat(
list(classLoader.getResources("")).stream(),
list(classLoader.getResources("META-INF")).stream()
.map(url -> {
final String externalForm = url.toExternalForm();
try {
return new URL(externalForm.substring(0, externalForm.lastIndexOf("META-INF")));
} catch (final MalformedURLException e) {
throw new IllegalArgumentException(e);
}
}))
.map(Classloaders::toFile);
}

private static boolean isSystemParent(final ClassLoader classLoader) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to isAncestorOfSystemClassLoader

ClassLoader current = SYSTEM.getParent();
while (current != null) {
if (current == classLoader) {
return true;
}
current = current.getParent();
}
return false;
}

private static File toFile(final URL url) {
if ("jar".equals(url.getProtocol())) {
try {
final String spec = url.getFile();
final int separator = spec.indexOf('!');
if (separator == -1) {
return null;
}
return toFile(new URL(spec.substring(0, separator + 1)));
} catch (final MalformedURLException e) {
// let it fail
}
} else if ("file".equals(url.getProtocol())) {
String path = decode(url.getFile());
if (path.endsWith("!")) {
path = path.substring(0, path.length() - 1);
}
return new File(path);
}
throw new IllegalArgumentException("Unsupported entry: " + url.toExternalForm());
}

private static String decode(String fileName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an implementation that already exists instead of re-implementing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyone you are thinking about? Dont think we want xbean here (another shady dep)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From memory the rule was not 100% the same but can need to be checked, makes years now

if (fileName.indexOf('%') == -1) {
return fileName;
}

final StringBuilder result = new StringBuilder(fileName.length());
final ByteArrayOutputStream out = new ByteArrayOutputStream();
for (int i = 0; i < fileName.length(); ) {
final char c = fileName.charAt(i);
if (c == '%') {
out.reset();
do {
if (i + 2 >= fileName.length()) {
throw new IllegalArgumentException("Incomplete % sequence at: " + i);
}
final int d1 = Character.digit(fileName.charAt(i + 1), 16);
final int d2 = Character.digit(fileName.charAt(i + 2), 16);
if (d1 == -1 || d2 == -1) {
throw new IllegalArgumentException("Invalid % sequence ("
+ fileName.substring(i, i + 3) + ") at: " + String.valueOf(i));
}
out.write((byte) ((d1 << 4) + d2));
i += 3;
} while (i < fileName.length() && fileName.charAt(i) == '%');
result.append(out.toString());
} else {
result.append(c);
i++;
}
}
return result.toString();
}

private Classloaders() {
// no-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Utilities for runners to implement metrics.
*/
package org.apache.beam.runners.core.construction.classloader;
Loading