Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ public synchronized void setConf(final StatefulMongoDBRdfConfiguration conf) {
auths = conf.getAuthorizations();
flushEachUpdate.set(conf.flushEachUpdate());
}



public void setDB(final DB db) {
this.db = db;
}
Expand All @@ -107,7 +106,7 @@ public void init() throws RyaDAOException {
index.setConf(conf);
}

db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
db = mongoClient.getDB(conf.getRyaInstanceName());
coll = db.getCollection(conf.getTriplesCollectionName());
nameSpaceManager = new SimpleMongoDBNamespaceManager(db.getCollection(conf.getNameSpacesCollectionName()));
queryEngine = new MongoDBQueryEngine();
Expand Down Expand Up @@ -307,4 +306,4 @@ private void flushIndexers() throws RyaDAOException {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public static DocumentVisibility toDocumentVisibility(final DBObject mongoObj) t
documentVisibilityArray = (Object[]) documentVisibilityObject;
} else if (documentVisibilityObject instanceof BasicDBList) {
documentVisibilityArray = DocumentVisibilityUtil.convertBasicDBListToObjectArray((BasicDBList) documentVisibilityObject);
} else {
documentVisibilityArray = new String[] {""};
}

final String documentVisibilityString = DocumentVisibilityUtil.multidimensionalArrayToBooleanString(documentVisibilityArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ protected void beforeTest() throws Exception {
// Setup the configuration that will be used within the test.
final MongoDBRdfConfiguration conf = new MongoDBRdfConfiguration( new Configuration() );
conf.setBoolean("sc.useMongo", true);
conf.setTablePrefix("test_");
conf.setMongoDBName(conf.getRyaInstanceName());
conf.setMongoHostname( super.getMongoHostname() );
conf.setMongoPort("" + super.getMongoPort());
conf.setRyaInstanceName("mongo_test");
conf.setMongoHostname(getMongoHostname());
conf.setMongoPort(getMongoPort() + "");

// Let tests update the configuration.
updateConfiguration(conf);
Expand Down Expand Up @@ -77,4 +76,4 @@ public MongoCollection<Document> getRyaCollection() {
public DBCollection getRyaDbCollection() {
return getMongoClient().getDB(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,25 @@ private static Sail getRyaSail(final Configuration config) throws InferenceEngin

final String user;
final String pswd;
// XXX Should(?) be MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX inside the if below. RYA-135
final String ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
final String ryaInstance;

if(ConfigUtils.getUseMongo(config)) {
// Get a reference to a Mongo DB configuration object.
final MongoDBRdfConfiguration mongoConfig = (config instanceof MongoDBRdfConfiguration) ?
final MongoDBRdfConfiguration mongoConfig = config instanceof MongoDBRdfConfiguration ?
(MongoDBRdfConfiguration)config : new MongoDBRdfConfiguration(config);

ryaInstance = mongoConfig.getRyaInstanceName();

requireNonNull(ryaInstance, "RyaInstance is missing from configuration." + MongoDBRdfConfiguration.RYA_INSTANCE_NAME);

// Instantiate a Mongo client and Mongo DAO.
dao = getMongoDAO(mongoConfig);
// Then use the DAO's newly-created stateful conf in place of the original
rdfConfig = dao.getConf();
} else {
ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);

rdfConfig = new AccumuloRdfConfiguration(config);
user = rdfConfig.get(ConfigUtils.CLOUDBASE_USER);
pswd = rdfConfig.get(ConfigUtils.CLOUDBASE_PASSWORD);
Expand Down Expand Up @@ -221,7 +227,7 @@ public static void updateAccumuloConfig(final AccumuloRdfConfiguration config, f
* @return - MongoDBRyaDAO with Indexers configured according to user's specification
* @throws RyaDAOException if the DAO can't be initialized
*/
public static MongoDBRyaDAO getMongoDAO(MongoDBRdfConfiguration mongoConfig) throws RyaDAOException {
public static MongoDBRyaDAO getMongoDAO(final MongoDBRdfConfiguration mongoConfig) throws RyaDAOException {
// Create the MongoClient that will be used by the Sail object's components.
final MongoClient client = createMongoClient(mongoConfig);

Expand Down
42 changes: 0 additions & 42 deletions extras/rya.export/export.accumulo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,46 +84,4 @@ under the License.
<artifactId>libthrift</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jaxb2-maven-plugin</artifactId>
<executions>
<execution>
<id>xjc</id>
<goals>
<goal>xjc</goal>
</goals>
</execution>
</executions>
<configuration>
<packageName>org.apache.rya.export</packageName>
</configuration>
</plugin>

<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
<configuration>
<header>${project.basedir}/src/license/header.txt</header>
</configuration>
<executions>
<execution>
<id>update-generated-source-headers</id>
<configuration>
<basedir>${project.build.directory}/generated-sources</basedir>
<mapping>
<sun-jaxb.episode>XML_STYLE</sun-jaxb.episode>
</mapping>
</configuration>
<phase>process-sources</phase>
<goals>
<goal>format</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.rya.export.api.store.UpdateStatementException;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;

import com.google.common.base.Function;
import com.google.common.collect.Iterators;

/**
Expand All @@ -74,6 +73,7 @@ public class AccumuloRyaStatementStore implements RyaStatementStore {
private final Set<IteratorSetting> iteratorSettings = new HashSet<>();
private final AccumuloParentMetadataRepository metadataRepo;

private final String ryaInstanceName;
/**
* Creates a new instance of {@link AccumuloRyaStatementStore}.
* @param dao the {@AccumuloRyaDAO}.
Expand All @@ -87,6 +87,7 @@ public AccumuloRyaStatementStore(final AccumuloRyaDAO dao, final String tablePre
}
accumuloRyaDao = dao;
metadataRepo = new AccumuloParentMetadataRepository(dao);
ryaInstanceName = ryaInstance;
}

@Override
Expand All @@ -105,9 +106,7 @@ public Iterator<RyaStatement> fetchStatements() throws FetchStatementException {
}
// Convert Entry iterator to RyaStatement iterator
final Iterator<Entry<Key, Value>> entryIter = scanner.iterator();
final Iterator<RyaStatement> ryaStatementIter = Iterators.transform(entryIter, new Function<Entry<Key, Value>, RyaStatement>() {
@Override
public RyaStatement apply(final Entry<Key, Value> entry) {
final Iterator<RyaStatement> ryaStatementIter = Iterators.transform(entryIter, entry -> {
final Key key = entry.getKey();
final Value value = entry.getValue();
RyaStatement ryaStatement = null;
Expand All @@ -117,14 +116,19 @@ public RyaStatement apply(final Entry<Key, Value> entry) {
log.error("Unable to convert the key/value pair into a Rya Statement", e);
}
return ryaStatement;
}
});
});
return ryaStatementIter;
} catch (final Exception e) {
throw new FetchStatementException("Failed to fetch statements.", e);
}
}

@Override
public long count() {
//accumulo cannot count.
return -1;
}

@Override
public void addStatement(final RyaStatement statement) throws AddStatementException {
try {
Expand All @@ -141,6 +145,16 @@ public void addStatement(final RyaStatement statement) throws AddStatementExcept
}
}

@Override
public void addStatements(final Iterator<RyaStatement> statements) throws AddStatementException {
try {
accumuloRyaDao.add(statements);
accumuloRyaDao.flush();
} catch (final RyaDAOException e) {
throw new AddStatementException("Unable to add the Rya Statement", e);
}
}

@Override
public void removeStatement(final RyaStatement statement) throws RemoveStatementException {
try {
Expand Down Expand Up @@ -216,4 +230,9 @@ public void addIterator(final IteratorSetting iteratorSetting) {
checkNotNull(iteratorSetting);
iteratorSettings.add(iteratorSetting);
}

@Override
public String getRyaInstanceName() {
return ryaInstanceName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.mr.MRUtils;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.export.InstanceType;
import org.apache.rya.indexing.accumulo.ConfigUtils;

import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.rya.export.accumulo.conf;

public enum InstanceType {
MOCK,
MINI,
DISTRIBUTION;
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private MergeParentMetadata getMetadataFromTable() throws ParentMetadataDoesNotE
// Fetch the metadata from the entries.
String ryaInstanceName = null;
Date timestamp = null;
Date filterTimestamp = null;
long filterTimestamp = -1L;
Long parentTimeOffset = null;

while (entries.hasNext()) {
Expand All @@ -154,7 +154,7 @@ private MergeParentMetadata getMetadataFromTable() throws ParentMetadataDoesNotE
} else if (columnQualifier.equals(MERGE_PARENT_METADATA_TIMESTAMP)) {
timestamp = DATE_LEXICODER.decode(value);
} else if (columnQualifier.equals(MERGE_PARENT_METADATA_FILTER_TIMESTAMP)) {
filterTimestamp = DATE_LEXICODER.decode(value);
filterTimestamp = LONG_LEXICODER.decode(value);
} else if (columnQualifier.equals(MERGE_PARENT_METADATA_PARENT_TIME_OFFSET)) {
parentTimeOffset = LONG_LEXICODER.decode(value);
}
Expand Down Expand Up @@ -220,8 +220,8 @@ private static List<Mutation> makeWriteMetadataMutations(final MergeParentMetada
mutations.add(timestampMutation);

// Filter Timestamp
if (metadata.getFilterTimestamp() != null) {
final Mutation filterTimestampMutation = makeFieldMutation(metadata.getFilterTimestamp(), DATE_LEXICODER, MERGE_PARENT_METADATA_FILTER_TIMESTAMP);
if (metadata.getFilterTimestamp() != -1L) {
final Mutation filterTimestampMutation = makeFieldMutation(metadata.getFilterTimestamp(), LONG_LEXICODER, MERGE_PARENT_METADATA_FILTER_TIMESTAMP);
mutations.add(filterTimestampMutation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
*/
package org.apache.rya.export.accumulo.policy;

import java.util.Date;
import java.util.Iterator;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.iterators.user.TimestampFilter;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.export.accumulo.AccumuloRyaStatementStore;
import org.apache.rya.export.api.conf.policy.TimestampPolicyStatementStore;
import org.apache.rya.export.api.policy.TimestampPolicyStatementStore;
import org.apache.rya.export.api.store.FetchStatementException;
import org.apache.rya.export.api.store.RyaStatementStore;

Expand All @@ -34,31 +33,32 @@
* filter statements based on a timestamp.
*/
public class TimestampPolicyAccumuloRyaStatementStore extends TimestampPolicyStatementStore {

//an instance is held onto to be able to add iterators to.
private final AccumuloRyaStatementStore store;
/**
* Creates a new {@link TimestampPolicyAccumuloRyaStatementStore}
* @param store
* @param timestamp
*/
public TimestampPolicyAccumuloRyaStatementStore(final AccumuloRyaStatementStore store, final Date timestamp) {
public TimestampPolicyAccumuloRyaStatementStore(final AccumuloRyaStatementStore store, final long timestamp) {
super(store, timestamp);
store.addIterator(getStartTimeSetting(timestamp));
this.store = store;
}

/**
* Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data.
* @param time the start time of the filter.
* @return the {@link IteratorSetting}.
*/
private static IteratorSetting getStartTimeSetting(final Date time) {
private IteratorSetting getStartTimeSetting() {
final IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", TimestampFilter.class);
TimestampFilter.setStart(setting, time.getTime(), true);
TimestampFilter.setStart(setting, timestamp, true);
TimestampFilter.setEnd(setting, Long.MAX_VALUE, true);
return setting;
}

@Override
public Iterator<RyaStatement> fetchStatements() throws FetchStatementException {
store.addIterator(getStartTimeSetting());
return store.fetchStatements();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.apache.rya.api.RdfCloudTripleStoreConstants;
import org.apache.rya.api.path.PathUtils;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.export.InstanceType;
import org.apache.rya.export.accumulo.conf.AccumuloExportConstants;
import org.apache.rya.export.accumulo.conf.InstanceType;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -118,7 +118,7 @@ public class AccumuloInstanceDriver {

/**
* Creates a new instance of {@link AccumuloInstanceDriver}.
*
*
* @param driverName
* the name used to identify this driver in the logs. (not {@code null})
* @param instanceType
Expand Down
Loading