Skip to content

Commit

Permalink
Merge pull request #463 from rubanm/rubanm/try_cascading3
Browse files Browse the repository at this point in the history
Compatibility with Cascading 3.0 take two
  • Loading branch information
rubanm committed Feb 8, 2016
2 parents 9f622ee + c77ee1f commit 9c4d091
Show file tree
Hide file tree
Showing 19 changed files with 633 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.ipr
*.iws
*.tmproj
*.swp
*~
.DS_Store
.classpath
Expand Down
37 changes: 37 additions & 0 deletions cascading-protobuf/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird</artifactId>
<version>4.13-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>elephant-bird-cascading-protobuf</artifactId>
<name>Elephant Bird Cascading Protobuf</name>
<description>Cascading Protobuf utilities.</description>
<repositories>
<repository>
<id>conjars.org</id>
<url>http://conjars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>cascading</groupId>
<artifactId>cascading-hadoop</artifactId>
<!-- This module is api compatible with both cascading 2 and 3. We leave it to the consumers to provide it.-->
<version>${cascading3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.twitter.elephantbird.cascading2.io.protobuf;
package com.twitter.elephantbird.cascading.protobuf;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.twitter.elephantbird.cascading2.io.protobuf;
package com.twitter.elephantbird.cascading.protobuf;

import java.io.IOException;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.twitter.elephantbird.cascading2.io.protobuf;
package com.twitter.elephantbird.cascading.protobuf;

import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.twitter.elephantbird.cascading2.io.protobuf;
package com.twitter.elephantbird.cascading.protobuf;

import java.util.Comparator;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.twitter.elephantbird.cascading2.io.protobuf;
package com.twitter.elephantbird.cascading.protobuf;

import java.io.IOException;
import java.io.OutputStream;
Expand Down
2 changes: 2 additions & 0 deletions cascading2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
<dependency>
<groupId>cascading</groupId>
<artifactId>cascading-hadoop</artifactId>
<version>${cascading2.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
43 changes: 43 additions & 0 deletions cascading3/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird</artifactId>
<version>4.13-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>elephant-bird-cascading3</artifactId>
<name>Elephant Bird Cascading3</name>
<description>Cascading utilities.</description>
<repositories>
<repository>
<id>conjars.org</id>
<url>http://conjars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.twitter.elephantbird</groupId>
<artifactId>elephant-bird-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
<dependency>
<groupId>cascading</groupId>
<artifactId>cascading-hadoop</artifactId>
<version>${cascading3.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.twitter.elephantbird.cascading3.scheme;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.SequenceFileInputFormat;

import com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper;
import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat;

import cascading.flow.FlowProcess;
import cascading.scheme.hadoop.SequenceFile;
import cascading.tap.Tap;
import cascading.tuple.Fields;

/**
* This scheme allows SequenceFile splits to be combined via the DelegateCombineFileInputFormat
* before it is read. It can be used to combine inputs for intermediate MR jobs in Cascading.
*
* To enable, set cascading.flowconnector.intermediateschemeclass to this class in the Hadoop
* configuration.
*
* @author Akihiro Matsukawa
*/
public class CombinedSequenceFile extends SequenceFile {

private static final String MR_COMPRESS_ENABLE = "mapreduce.output.fileoutputformat.compress";
public static final String COMPRESS_ENABLE = "elephantbird.cascading.combinedsequencefile.compress.enable";

private static final String MR_COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
public static final String COMPRESS_TYPE = "elephantbird.cascading.combinedsequencefile.compress.type";

private static final String MR_COMPRESS_CODEC = "mapreduce.output.fileoutputformat.compress.codec";
public static final String COMPRESS_CODEC = "elephantbird.cascading.combinedsequencefile.compress.codec";


protected CombinedSequenceFile() { super(); }

public CombinedSequenceFile(Fields fields) { super(fields); }

// We can allow overriding the compression settings for just this scheme here
private void updateJobConfForLocalSettings(Configuration conf) {
String localSetCompressionEnabled = conf.get(COMPRESS_ENABLE);
if(localSetCompressionEnabled != null) {
conf.set(MR_COMPRESS_ENABLE, localSetCompressionEnabled);
}

String localSetCompressionType = conf.get(COMPRESS_TYPE);
if(localSetCompressionType != null) {
conf.set(MR_COMPRESS_TYPE, localSetCompressionType);
}

String localSetCompressionCodec = conf.get(COMPRESS_CODEC);
if(localSetCompressionCodec != null) {
conf.set(MR_COMPRESS_CODEC, localSetCompressionCodec);
}
}

@Override
public void sourceConfInit(
FlowProcess<? extends Configuration> flowProcess,
Tap<Configuration, RecordReader, OutputCollector> tap,
Configuration conf ) {
super.sourceConfInit(flowProcess, tap, conf);

updateJobConfForLocalSettings(conf);

// both EB combiner and Cascading3 work over the mapreduce API
// however, SequenceFileInputFormat is in the mapred API.
// in order to use the EB combiner we must wrap the mapred SequenceFileInputFormat
// with the MapReduceInputFormatWrapper and then wrap it in the DelegateCombineFileInputFormat
MapReduceInputFormatWrapper.setWrappedInputFormat(SequenceFileInputFormat.class, conf);
DelegateCombineFileInputFormat.setDelegateInputFormatHadoop2(conf, MapReduceInputFormatWrapper.class);
}

@Override
public void sinkConfInit( FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, Configuration conf )
{
super.sinkConfInit(flowProcess, tap, conf);

updateJobConfForLocalSettings(conf);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.twitter.elephantbird.cascading3.scheme;

import java.io.IOException;

import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.twitter.elephantbird.mapreduce.io.BinaryWritable;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

/**
* Scheme for lzo binary encoded files. Handles both block and base 64. Can be used for Protobuf and Thrift.
*
* @author Argyris Zymnis
*/
abstract public class LzoBinaryScheme<M, T extends BinaryWritable<M>> extends
Scheme<Configuration, RecordReader, OutputCollector, Object[], T> {

private static final Logger LOG = LoggerFactory.getLogger(LzoBinaryScheme.class);
private static final long serialVersionUID = -5011096855302946106L;

@Override
public void sink(FlowProcess<? extends Configuration> flowProcess, SinkCall<T, OutputCollector> sinkCall)
throws IOException {
OutputCollector collector = sinkCall.getOutput();
TupleEntry entry = sinkCall.getOutgoingEntry();
T writable = sinkCall.getContext();
writable.set((M) entry.getTuple().getObject(0));
collector.collect(null, writable);
}

@Override
public void sinkPrepare( FlowProcess<? extends Configuration> fp, SinkCall<T, OutputCollector> sinkCall ) {
sinkCall.setContext(prepareBinaryWritable());
}

protected abstract T prepareBinaryWritable();

@Override
public boolean source(FlowProcess<? extends Configuration> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) throws IOException {

Object[] context = sourceCall.getContext();
while(sourceCall.getInput().next(context[0], context[1])) {
Object out = ((T) context[1]).get();
if(out != null) {
sourceCall.getIncomingEntry().setTuple(new Tuple(out));
return true;
}
LOG.warn("failed to decode record");
}
return false;
}

@Override
public void sourceCleanup(FlowProcess<? extends Configuration> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
sourceCall.setContext(null);
}

/**
* This sets up the state between succesive calls to source
*/
@Override
public void sourcePrepare(FlowProcess<? extends Configuration> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
//Hadoop sets a key value pair:
sourceCall.setContext(new Object[2]);
sourceCall.getContext()[0] = sourceCall.getInput().createKey();
sourceCall.getContext()[1] = sourceCall.getInput().createValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2012 Twitter, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.elephantbird.cascading3.scheme;

import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.OutputFormat;

import com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper;
import com.twitter.elephantbird.mapreduce.input.MultiInputFormat;
import com.twitter.elephantbird.mapreduce.io.RawBytesWritable;
import com.twitter.elephantbird.mapreduce.output.LzoBinaryBlockOutputFormat;

import cascading.flow.FlowProcess;
import cascading.tap.Tap;

/**
* Scheme for lzo compressed files with binary records.
*
* @author Sam Ritchie
*/
public class LzoByteArrayScheme extends LzoBinaryScheme<byte[], RawBytesWritable> {
@Override protected RawBytesWritable prepareBinaryWritable() {
return new RawBytesWritable();
}

@Override public void sourceConfInit(FlowProcess<? extends Configuration> fp,
Tap<Configuration, RecordReader, OutputCollector> tap,
Configuration conf) {
MultiInputFormat.setClassConf(byte[].class, conf);
DelegateCombineFileInputFormat.setDelegateInputFormatHadoop2(conf, MultiInputFormat.class);
}

@Override public void sinkConfInit(FlowProcess<? extends Configuration> fp,
Tap<Configuration, RecordReader, OutputCollector> tap,
Configuration conf) {
conf.setClass("mapreduce.outputformat.class", LzoBinaryBlockOutputFormat.class, OutputFormat.class);
}
}
Loading

0 comments on commit 9c4d091

Please sign in to comment.