Skip to content

Commit 2af763d

Browse files
committed
Add Cassandra3InputFormatIT, remove JanusGraph copyright from CqlBridgeRecordReader and clean up Cassandra3BinaryInputFormat.
Signed-off-by: sjudeng <[email protected]>
1 parent bc54b9e commit 2af763d

File tree

8 files changed

+79
-89
lines changed

8 files changed

+79
-89
lines changed

NOTICE.txt

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ It also includes software from other open source projects including, but not lim
2828
* Apache Solr [http://lucene.apache.org/solr/]
2929
* Apache TinkerPop [http://tinkerpop.apache.org/]
3030
* Astyanax [https://github.com/Netflix/astyanax]
31+
* DataStax Driver for Apache Cassandra [https://github.com/datastax/java-driver]
3132
* EasyMock [http://easymock.org/]
3233
* Elasticsearch [https://www.elastic.co/]
3334
* Google Guava [https://github.com/google/guava]

TESTING.md

+9
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,12 @@ To run Elasicsearch tests using an embedded Elasticsearch Docker container, use
8686
```bash
8787
mvn clean install -pl janusgraph-es -Pes-docker
8888
```
89+
90+
### Running Hadoop tests with Cassandra-3 using CQL record reader (requires Docker)
91+
92+
To run Hadoop tests with Cassandra-3 using the CQL record reader, start a Cassandra-3 Docker container and run tests with `-DskipCassandra3=false`. Note core HBase and Cassandra-2 Hadoop tests must be skipped when an external Cassandra instance is running.
93+
94+
```bash
95+
docker run --name jg-cassandra -p 9160:9160 -p 9042:9042 -e CASSANDRA_START_RPC=true -d cassandra:3.10
96+
mvn clean install -pl :janusgraph-hadoop-2 -DskipHBase -DskipCassandra -DskipCassandra3=false
97+
```

janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/cassandra/Cassandra3BinaryInputFormat.java

+1-70
Original file line numberDiff line numberDiff line change
@@ -14,53 +14,18 @@
1414

1515
package org.janusgraph.hadoop.formats.cassandra;
1616

17-
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
18-
import org.apache.cassandra.hadoop.ColumnFamilyRecordReader;
19-
import org.apache.cassandra.hadoop.ConfigHelper;
20-
import org.apache.cassandra.thrift.SlicePredicate;
21-
import org.apache.cassandra.thrift.SliceRange;
22-
import org.apache.hadoop.conf.Configuration;
2317
import org.apache.hadoop.mapreduce.InputSplit;
24-
import org.apache.hadoop.mapreduce.JobContext;
2518
import org.apache.hadoop.mapreduce.RecordReader;
2619
import org.apache.hadoop.mapreduce.TaskAttemptContext;
2720
import org.janusgraph.diskstorage.Entry;
2821
import org.janusgraph.diskstorage.StaticBuffer;
29-
import org.janusgraph.diskstorage.cassandra.AbstractCassandraStoreManager;
30-
import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery;
31-
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
32-
import org.janusgraph.hadoop.config.JanusGraphHadoopConfiguration;
33-
import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat;
34-
import org.janusgraph.hadoop.formats.util.input.JanusGraphHadoopSetupCommon;
35-
import org.slf4j.Logger;
36-
import org.slf4j.LoggerFactory;
3722

3823
import java.io.IOException;
39-
import java.util.List;
4024

4125
/**
4226
* Wraps a ColumnFamilyInputFormat and converts CFIF's binary types to JanusGraph's binary types.
4327
*/
44-
public class Cassandra3BinaryInputFormat extends AbstractBinaryInputFormat {
45-
46-
private static final Logger log = LoggerFactory.getLogger(Cassandra3BinaryInputFormat.class);
47-
48-
// Copied these private constants from Cassandra's ConfigHelper circa 2.0.9
49-
private static final String INPUT_WIDEROWS_CONFIG = "cassandra.input.widerows";
50-
private static final String RANGE_BATCH_SIZE_CONFIG = "cassandra.range.batch.size";
51-
52-
private final ColumnFamilyInputFormat columnFamilyInputFormat = new ColumnFamilyInputFormat();
53-
private ColumnFamilyRecordReader columnFamilyRecordReader;
54-
private RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;
55-
56-
public RecordReader<StaticBuffer, Iterable<Entry>> getRecordReader() {
57-
return janusgraphRecordReader;
58-
}
59-
60-
@Override
61-
public List<InputSplit> getSplits(final JobContext jobContext) throws IOException, InterruptedException {
62-
return this.columnFamilyInputFormat.getSplits(jobContext);
63-
}
28+
public class Cassandra3BinaryInputFormat extends CassandraBinaryInputFormat {
6429

6530
@Override
6631
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext)
@@ -69,39 +34,5 @@ public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final Inpu
6934
return janusgraphRecordReader;
7035
}
7136

72-
@Override
73-
public void setConf(final Configuration config) {
74-
super.setConf(config);
75-
76-
// Copy some JanusGraph configuration keys to the Hadoop Configuration keys used by Cassandra's ColumnFamilyInputFormat
77-
ConfigHelper.setInputInitialAddress(config, janusgraphConf.get(GraphDatabaseConfiguration.STORAGE_HOSTS)[0]);
78-
if (janusgraphConf.has(GraphDatabaseConfiguration.STORAGE_PORT))
79-
ConfigHelper.setInputRpcPort(config, String.valueOf(janusgraphConf.get(GraphDatabaseConfiguration.STORAGE_PORT)));
80-
if (janusgraphConf.has(GraphDatabaseConfiguration.AUTH_USERNAME))
81-
ConfigHelper.setInputKeyspaceUserName(config, janusgraphConf.get(GraphDatabaseConfiguration.AUTH_USERNAME));
82-
if (janusgraphConf.has(GraphDatabaseConfiguration.AUTH_PASSWORD))
83-
ConfigHelper.setInputKeyspacePassword(config, janusgraphConf.get(GraphDatabaseConfiguration.AUTH_PASSWORD));
84-
85-
// Copy keyspace, force the CF setting to edgestore, honor widerows when set
86-
final boolean wideRows = config.getBoolean(INPUT_WIDEROWS_CONFIG, false);
87-
// Use the setInputColumnFamily overload that includes a widerows argument; using the overload without this argument forces it false
88-
ConfigHelper.setInputColumnFamily(config, janusgraphConf.get(AbstractCassandraStoreManager.CASSANDRA_KEYSPACE),
89-
mrConf.get(JanusGraphHadoopConfiguration.COLUMN_FAMILY_NAME), wideRows);
90-
log.debug("Set keyspace: {}", janusgraphConf.get(AbstractCassandraStoreManager.CASSANDRA_KEYSPACE));
91-
92-
// Set the column slice bounds via Faunus's vertex query filter
93-
final SlicePredicate predicate = new SlicePredicate();
94-
final int rangeBatchSize = config.getInt(RANGE_BATCH_SIZE_CONFIG, Integer.MAX_VALUE);
95-
predicate.setSlice_range(getSliceRange(JanusGraphHadoopSetupCommon.DEFAULT_SLICE_QUERY, rangeBatchSize)); // TODO stop slicing the whole row
96-
ConfigHelper.setInputSlicePredicate(config, predicate);
97-
}
98-
99-
private SliceRange getSliceRange(final SliceQuery slice, final int limit) {
100-
final SliceRange sliceRange = new SliceRange();
101-
sliceRange.setStart(slice.getSliceStart().asByteBuffer());
102-
sliceRange.setFinish(slice.getSliceEnd().asByteBuffer());
103-
sliceRange.setCount(Math.min(limit, slice.getLimit()));
104-
return sliceRange;
105-
}
10637
}
10738

janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/cassandra/CassandraBinaryInputFormat.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public class CassandraBinaryInputFormat extends AbstractBinaryInputFormat {
5151

5252
private final ColumnFamilyInputFormat columnFamilyInputFormat = new ColumnFamilyInputFormat();
5353
private ColumnFamilyRecordReader columnFamilyRecordReader;
54-
private RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;
54+
RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;
5555

5656
public RecordReader<StaticBuffer, Iterable<Entry>> getRecordReader() {
5757
return janusgraphRecordReader;
@@ -65,7 +65,6 @@ public List<InputSplit> getSplits(final JobContext jobContext) throws IOExceptio
6565
@Override
6666
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext)
6767
throws IOException, InterruptedException {
68-
// the default case, Cassandra 2.1.9
6968
columnFamilyRecordReader =
7069
(ColumnFamilyRecordReader) columnFamilyInputFormat.createRecordReader(inputSplit, taskAttemptContext);
7170
janusgraphRecordReader =

janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/cassandra/CqlBridgeRecordReader.java

-15
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,3 @@
1-
// Copyright 2017 JanusGraph Authors
2-
//
3-
// Licensed under the Apache License, Version 2.0 (the "License");
4-
// you may not use this file except in compliance with the License.
5-
// You may obtain a copy of the License at
6-
//
7-
// http://www.apache.org/licenses/LICENSE-2.0
8-
//
9-
// Unless required by applicable law or agreed to in writing, software
10-
// distributed under the License is distributed on an "AS IS" BASIS,
11-
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12-
// See the License for the specific language governing permissions and
13-
// limitations under the License.
14-
15-
// Brought in the following copyright header from Cassandra-3 GitHub Repo.
161
/*
172
* Licensed to the Apache Software Foundation (ASF) under one
183
* or more contributor license agreements. See the NOTICE file
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright 2017 JanusGraph Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package org.janusgraph.hadoop;
16+
17+
import org.apache.commons.configuration.ConfigurationException;
18+
import org.apache.commons.configuration.PropertiesConfiguration;
19+
import org.janusgraph.CassandraStorageSetup;
20+
import org.janusgraph.diskstorage.configuration.ModifiableConfiguration;
21+
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
22+
23+
import java.io.IOException;
24+
25+
public class Cassandra3InputFormatIT extends CassandraInputFormatIT {
26+
27+
protected PropertiesConfiguration getGraphConfiguration() throws ConfigurationException, IOException {
28+
final PropertiesConfiguration config = super.getGraphConfiguration();
29+
config.setProperty("gremlin.hadoop.graphInputFormat", "org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat");
30+
return config;
31+
}
32+
33+
@Override
34+
public WriteConfiguration getConfiguration() {
35+
String className = CassandraInputFormatIT.class.getSimpleName();
36+
ModifiableConfiguration mc = CassandraStorageSetup.getCassandraThriftConfiguration(className);
37+
return mc.getConfiguration();
38+
}
39+
}

janusgraph-hadoop-parent/janusgraph-hadoop-core/src/test/java/org/janusgraph/hadoop/CassandraInputFormatIT.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,17 @@
2929

3030
public class CassandraInputFormatIT extends AbstractInputFormatIT {
3131

32-
protected Graph getGraph() throws ConfigurationException, IOException {
32+
protected PropertiesConfiguration getGraphConfiguration() throws ConfigurationException, IOException {
3333
final PropertiesConfiguration config = new PropertiesConfiguration("target/test-classes/cassandra-read.properties");
3434
Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation"));
3535
baseOutDir.toFile().mkdirs();
3636
String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString();
3737
config.setProperty("gremlin.hadoop.outputLocation", outDir);
38-
return GraphFactory.open(config);
38+
return config;
39+
}
40+
41+
protected Graph getGraph() throws ConfigurationException, IOException {
42+
return GraphFactory.open(getGraphConfiguration());
3943
}
4044

4145
@Override

janusgraph-hadoop-parent/pom.xml

+22
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
<properties>
1515
<top.level.basedir>${project.parent.basedir}</top.level.basedir>
1616
<skipCassandra>${skipTests}</skipCassandra>
17+
<!-- Cassandra-3 testing requires an external Cassandra instance.
18+
See TESTING.md for more information. -->
19+
<skipCassandra3>true</skipCassandra3>
1720
<skipHBase>${skipTests}</skipHBase>
1821
<skipPipeline>${skipTests}</skipPipeline>
1922
</properties>
@@ -322,12 +325,31 @@
322325
<includes>
323326
<include>**/*Cassandra*IT.java</include>
324327
</includes>
328+
<excludes>
329+
<exclude>**/*Cassandra3*IT.java</exclude>
330+
</excludes>
325331
<skipTests>${skipCassandra}</skipTests>
326332
<reuseForks>false</reuseForks>
327333
<summaryFile>target/failsafe-reports/failsafe-janusgraph-cassandra.xml</summaryFile>
328334
<argLine>-Dtest.cassandra.confdir=${project.build.directory}/cassandra/conf/localhost-murmur -Dtest.cassandra.datadir=${project.build.directory}/cassandra/data/localhost-murmur</argLine>
329335
</configuration>
330336
</execution>
337+
<!-- Cassandra-3 testing requires an external Cassandra instance.
338+
See TESTING.md for more information. -->
339+
<execution>
340+
<id>janusgraph-cassandra3-test</id>
341+
<goals>
342+
<goal>integration-test</goal>
343+
</goals>
344+
<configuration>
345+
<includes>
346+
<include>**/*Cassandra3*IT.java</include>
347+
</includes>
348+
<skipTests>${skipCassandra3}</skipTests>
349+
<reuseForks>false</reuseForks>
350+
<summaryFile>target/failsafe-reports/failsafe-janusgraph-cassandra3.xml</summaryFile>
351+
</configuration>
352+
</execution>
331353
<execution>
332354
<id>janusgraph-hbase-test</id>
333355
<goals>

0 commit comments

Comments
 (0)