Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add --since and --until to "pg export" #17

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import javax.annotation.Nullable;

import org.geotools.data.DataStore;
import org.geotools.data.simple.SimpleFeatureCollection;
import org.geotools.data.simple.SimpleFeatureSource;
import org.geotools.data.simple.SimpleFeatureStore;
import org.geotools.feature.DefaultFeatureCollection;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.NameImpl;
import org.geotools.feature.simple.SimpleFeatureTypeImpl;
import org.locationtech.geogig.api.GeoGIG;
Expand All @@ -32,13 +36,16 @@
import org.locationtech.geogig.api.plumbing.ResolveTreeish;
import org.locationtech.geogig.api.plumbing.RevObjectParse;
import org.locationtech.geogig.api.plumbing.RevParse;
import org.locationtech.geogig.api.plumbing.diff.DiffEntry;
import org.locationtech.geogig.api.porcelain.DiffOp;
import org.locationtech.geogig.cli.CLICommand;
import org.locationtech.geogig.cli.CommandFailedException;
import org.locationtech.geogig.cli.GeogigCLI;
import org.locationtech.geogig.cli.InvalidParameterException;
import org.locationtech.geogig.cli.annotation.ReadOnly;
import org.locationtech.geogig.geotools.plumbing.ExportOp;
import org.locationtech.geogig.geotools.plumbing.GeoToolsOpException;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.filter.Filter;

Expand Down Expand Up @@ -71,6 +78,12 @@ public class PGExport extends AbstractPGCommand implements CLICommand {
@Nullable
public String sFeatureTypeId;

@Parameter(names = { "--since" }, description = "Only export changes that happend after the given commit")
public String since;

@Parameter(names = { "--until" }, description = "Only export changes until the given commit")
public String until;

/**
* Executes the export command using the provided options.
*/
Expand All @@ -86,6 +99,7 @@ protected void runInternal(GeogigCLI cli) throws IOException {
String tableName = args.get(1);

checkParameter(tableName != null && !tableName.isEmpty(), "No table name specified");
checkParameter(!overwrite || ((since == null && until == null)), "--overwrite can not be specified together with --since/--until");

DataStore dataStore = getDataStore();

Expand Down Expand Up @@ -122,7 +136,7 @@ protected void runInternal(GeogigCLI cli) throws IOException {
throw new CommandFailedException("Cannot create new table in database", e);
}
} else {
if (!overwrite) {
if (!overwrite && since == null) {
throw new InvalidParameterException(
"The selected table already exists. Use -o to overwrite");
}
Expand All @@ -143,7 +157,7 @@ protected void runInternal(GeogigCLI cli) throws IOException {
throw new CommandFailedException("Error trying to remove features", e);
}
}
ExportOp op = cli.getGeogig().command(ExportOp.class).setFeatureStore(featureStore)
ExportOp op = cli.getGeogig().command(ExportOp.class).setOldRef(since).setNewRef(until).setFeatureStore(featureStore)
.setPath(path).setFilterFeatureTypeId(featureTypeId).setAlter(alter);
if (defaultType) {
op.exportDefaultFeatureType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,59 +9,45 @@
*/
package org.locationtech.geogig.geotools.plumbing;

import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.Nullable;

import com.google.common.base.*;
import com.google.common.base.Optional;
import com.google.common.collect.*;
import org.geotools.data.DefaultTransaction;
import org.geotools.data.Transaction;
import org.geotools.data.simple.SimpleFeatureCollection;
import org.geotools.data.simple.SimpleFeatureStore;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.factory.Hints;
import org.geotools.feature.DefaultFeatureCollection;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.feature.collection.BaseFeatureCollection;
import org.geotools.feature.collection.DelegateFeatureIterator;
import org.locationtech.geogig.api.AbstractGeoGigOp;
import org.locationtech.geogig.api.FeatureBuilder;
import org.locationtech.geogig.api.NodeRef;
import org.locationtech.geogig.api.ObjectId;
import org.locationtech.geogig.api.ProgressListener;
import org.locationtech.geogig.api.RevFeature;
import org.locationtech.geogig.api.RevFeatureImpl;
import org.locationtech.geogig.api.RevFeatureType;
import org.locationtech.geogig.api.RevObject;
import org.locationtech.geogig.api.*;
import org.locationtech.geogig.api.RevObject.TYPE;
import org.locationtech.geogig.api.RevTree;
import org.locationtech.geogig.api.hooks.Hookable;
import org.locationtech.geogig.api.plumbing.FindTreeChild;
import org.locationtech.geogig.api.plumbing.ResolveTreeish;
import org.locationtech.geogig.api.plumbing.diff.DepthTreeIterator;
import org.locationtech.geogig.api.plumbing.diff.DepthTreeIterator.Strategy;
import org.locationtech.geogig.api.plumbing.diff.DiffEntry;
import org.locationtech.geogig.api.porcelain.DiffOp;
import org.locationtech.geogig.geotools.plumbing.GeoToolsOpException.StatusCode;
import org.locationtech.geogig.storage.ObjectDatabase;
import org.opengis.feature.Feature;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.simple.SimpleFeatureType;
import org.opengis.feature.type.PropertyDescriptor;
import org.opengis.filter.Filter;
import org.opengis.filter.FilterFactory;
import org.opengis.filter.identity.Identifier;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.UnmodifiableIterator;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.base.Preconditions.checkArgument;

/**
* Internal operation for creating a FeatureCollection from a tree content.
Expand Down Expand Up @@ -94,6 +80,10 @@ public Optional<Feature> apply(@Nullable Feature feature) {

private boolean transactional;

private String oldRef;

private String newRef;

/**
* Constructs a new export operation.
*/
Expand All @@ -114,7 +104,8 @@ protected SimpleFeatureStore _call() {
checkArgument(filterType instanceof RevFeatureType,
"Provided filter feature type is does not exist");
}

checkArgument((oldRef == null) == (newRef == null), "--since and --until must both be set or none of them");
boolean isChangeExport = (oldRef != null || newRef != null);
final SimpleFeatureStore targetStore = getTargetStore();

final String refspec = resolveRefSpec();
Expand All @@ -131,33 +122,11 @@ protected SimpleFeatureStore _call() {
progressListener.started();
progressListener.setDescription("Exporting from " + path + " to "
+ targetStore.getName().getLocalPart() + "... ");

FeatureCollection<SimpleFeatureType, SimpleFeature> asFeatureCollection = new BaseFeatureCollection<SimpleFeatureType, SimpleFeature>() {

@Override
public FeatureIterator<SimpleFeature> features() {

final Iterator<SimpleFeature> plainFeatures = getFeatures(typeTree, database,
defaultMetadataId, progressListener);

Iterator<SimpleFeature> adaptedFeatures = adaptToArguments(plainFeatures,
defaultMetadataId);

Iterator<Optional<Feature>> transformed = Iterators.transform(adaptedFeatures,
ExportOp.this.function);

Iterator<SimpleFeature> filtered = Iterators.filter(Iterators.transform(
transformed, new Function<Optional<Feature>, SimpleFeature>() {
@Override
public SimpleFeature apply(Optional<Feature> input) {
return (SimpleFeature) (input.isPresent() ? input.get() : null);
}
}), Predicates.notNull());

return new DelegateFeatureIterator<SimpleFeature>(filtered);
}
};

FeatureCollection<SimpleFeatureType, SimpleFeature> asFeatureCollection = null;
if(!isChangeExport) {
final Iterator<SimpleFeature> plainFeatures = getFeatures(typeTree, database, defaultMetadataId, progressListener);
asFeatureCollection = getFeatureCollection(defaultMetadataId, plainFeatures);
}
// add the feature collection to the feature store
final Transaction transaction;
if (transactional) {
Expand All @@ -168,7 +137,11 @@ public SimpleFeature apply(Optional<Feature> input) {
try {
targetStore.setTransaction(transaction);
try {
targetStore.addFeatures(asFeatureCollection);
if(isChangeExport) {
exportChanges(database, defaultMetadataId, targetStore, progressListener);
} else {
targetStore.addFeatures(asFeatureCollection);
}
transaction.commit();
} catch (final Exception e) {
if (transactional) {
Expand All @@ -182,33 +155,80 @@ public SimpleFeature apply(Optional<Feature> input) {
} catch (IOException e) {
throw new GeoToolsOpException(e, StatusCode.UNABLE_TO_ADD);
}

progressListener.complete();

return targetStore;

}

private static Iterator<SimpleFeature> getFeatures(final RevTree typeTree,
final ObjectDatabase database, final ObjectId defaultMetadataId,
final ProgressListener progressListener) {
private FeatureCollection<SimpleFeatureType, SimpleFeature>
getFeatureCollection(final ObjectId defaultMetadataId, final Iterator<SimpleFeature> plainFeatures) {
return new BaseFeatureCollection<SimpleFeatureType, SimpleFeature>() {

Iterator<NodeRef> nodes = new DepthTreeIterator("", defaultMetadataId, typeTree, database,
Strategy.FEATURES_ONLY);
@Override
public FeatureIterator<SimpleFeature> features() {

// progress reporting
nodes = Iterators.transform(nodes, new Function<NodeRef, NodeRef>() {
Iterator<SimpleFeature> adaptedFeatures = adaptToArguments(plainFeatures,
defaultMetadataId);

private AtomicInteger count = new AtomicInteger();
Iterator<Optional<Feature>> transformed = Iterators.transform(adaptedFeatures,
ExportOp.this.function);

Iterator<SimpleFeature> filtered = Iterators.filter(Iterators.transform(
transformed, new Function<Optional<Feature>, SimpleFeature>() {
@Override
public SimpleFeature apply(Optional<Feature> input) {
return (SimpleFeature) (input.isPresent() ? input.get() : null);
}
}), Predicates.notNull());

return new DelegateFeatureIterator<SimpleFeature>(filtered);
}
};
}

private static Filter createFeatureFilter(List<String> ids) {
FilterFactory ff = CommonFactoryFinder.getFilterFactory2();
Set<Identifier> fids = new HashSet<>();
for(String id : ids) {
fids.add(ff.featureId(id));
}
return ff.id(fids);
}

private SimpleFeatureStore exportChanges(ObjectDatabase database, final ObjectId defaultMetadataId, SimpleFeatureStore targetStore, final ProgressListener progressListener) throws IOException{
Function<NodeRef, SimpleFeature> node2feat = convertFunction(database);
Iterator<DiffEntry> it = command(DiffOp.class).setOldVersion(oldRef).setNewVersion(newRef).setFilter(path).call();
progressListener.setProgress(20);
final List<SimpleFeature> toAdd = new ArrayList<SimpleFeature>();
List<String> toRemove = new ArrayList<String>();
while(it.hasNext()) {
final DiffEntry entry = it.next();
if(entry.isDelete() || entry.isChange()) {
toRemove.add(entry.getOldObject().name());
}
if(entry.isAdd() || entry.isChange()) {
toAdd.add(node2feat.apply(entry.getNewObject()));
}
}
targetStore.removeFeatures(createFeatureFilter(toRemove));
progressListener.setProgress(60);
Function<SimpleFeature, SimpleFeature> progressFunction = new Function<SimpleFeature, SimpleFeature>() {
AtomicInteger count = new AtomicInteger();
@Nullable
@Override
public NodeRef apply(NodeRef input) {
progressListener.setProgress((count.incrementAndGet() * 100.f) / typeTree.size());
public SimpleFeature apply(@Nullable SimpleFeature input) {
progressListener.setProgress(60.0f + (count.incrementAndGet() * 40.0f) / toAdd.size());
return input;
}
});
};
targetStore.addFeatures(getFeatureCollection(defaultMetadataId,
Iterators.filter(Iterators.transform(toAdd.iterator(), progressFunction), Predicates.notNull())));
return targetStore;
}

Function<NodeRef, SimpleFeature> asFeature = new Function<NodeRef, SimpleFeature>() {
private static Function<NodeRef, SimpleFeature> convertFunction(final ObjectDatabase database) {
return new Function<NodeRef, SimpleFeature>() {

private Map<ObjectId, FeatureBuilder> ftCache = Maps.newHashMap();

Expand Down Expand Up @@ -240,6 +260,28 @@ private FeatureBuilder getBuilderFor(final ObjectId metadataId) {
return featureBuilder;
}
};
}

private static Iterator<SimpleFeature> getFeatures(final RevTree typeTree,
final ObjectDatabase database, final ObjectId defaultMetadataId,
final ProgressListener progressListener) {

Iterator<NodeRef> nodes = new DepthTreeIterator("", defaultMetadataId, typeTree, database,
Strategy.FEATURES_ONLY);

// progress reporting
nodes = Iterators.transform(nodes, new Function<NodeRef, NodeRef>() {

private AtomicInteger count = new AtomicInteger();

@Override
public NodeRef apply(NodeRef input) {
progressListener.setProgress((count.incrementAndGet() * 100.f) / typeTree.size());
return input;
}
});

Function<NodeRef, SimpleFeature> asFeature = convertFunction(database);

Iterator<SimpleFeature> asFeatures = Iterators.transform(nodes, asFeature);

Expand Down Expand Up @@ -496,4 +538,14 @@ public ExportOp setTransactional(boolean transactional) {
this.transactional = transactional;
return this;
}

public ExportOp setOldRef(String oldRef) {
this.oldRef = oldRef;
return this;
}

public ExportOp setNewRef(String newRef) {
this.newRef = newRef;
return this;
}
}
Loading