diff --git a/.gitignore b/.gitignore index a6cfe9a4d..d415250f7 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.ipr *.iws *.tmproj +*.swp *~ .DS_Store .classpath diff --git a/cascading-protobuf/pom.xml b/cascading-protobuf/pom.xml new file mode 100644 index 000000000..d88fd1c86 --- /dev/null +++ b/cascading-protobuf/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + com.twitter.elephantbird + elephant-bird + 4.13-SNAPSHOT + .. + + elephant-bird-cascading-protobuf + Elephant Bird Cascading Protobuf + Cascading Protobuf utilities. + + + conjars.org + http://conjars.org/repo + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + cascading + cascading-hadoop + + ${cascading3.version} + provided + + + org.apache.hadoop + hadoop-client + + + diff --git a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufComparator.java b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufComparator.java similarity index 97% rename from cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufComparator.java rename to cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufComparator.java index e21fd3766..9fbe086dd 100644 --- a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufComparator.java +++ b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufComparator.java @@ -1,4 +1,4 @@ -package com.twitter.elephantbird.cascading2.io.protobuf; +package com.twitter.elephantbird.cascading.protobuf; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufDeserializer.java b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufDeserializer.java similarity index 93% rename from cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufDeserializer.java rename to cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufDeserializer.java index 713d46e28..d9a4599e6 100644 --- a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufDeserializer.java +++ b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufDeserializer.java @@ -1,4 +1,4 @@ -package com.twitter.elephantbird.cascading2.io.protobuf; +package com.twitter.elephantbird.cascading.protobuf; import java.io.IOException; import java.io.InputStream; diff --git a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufReflectionUtil.java b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufReflectionUtil.java similarity index 96% rename from cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufReflectionUtil.java rename to cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufReflectionUtil.java index dd4f6ea10..ee8847f82 100644 --- a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufReflectionUtil.java +++ b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufReflectionUtil.java @@ -1,4 +1,4 @@ -package com.twitter.elephantbird.cascading2.io.protobuf; +package com.twitter.elephantbird.cascading.protobuf; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; diff --git a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufSerialization.java b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufSerialization.java similarity index 94% rename from cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufSerialization.java rename to cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufSerialization.java index 7da6d8f87..b5ad0c461 100644 --- a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufSerialization.java +++ b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufSerialization.java @@ -1,4 +1,4 @@ -package com.twitter.elephantbird.cascading2.io.protobuf; +package com.twitter.elephantbird.cascading.protobuf; import java.util.Comparator; diff --git a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufSerializer.java b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufSerializer.java similarity index 91% rename from cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufSerializer.java rename to cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufSerializer.java index 835261085..14b898950 100644 --- a/cascading2/src/main/java/com/twitter/elephantbird/cascading2/io/protobuf/ProtobufSerializer.java +++ b/cascading-protobuf/src/main/java/com/twitter/elephantbird/cascading/protobuf/ProtobufSerializer.java @@ -1,4 +1,4 @@ -package com.twitter.elephantbird.cascading2.io.protobuf; +package com.twitter.elephantbird.cascading.protobuf; import java.io.IOException; import java.io.OutputStream; diff --git a/cascading2/pom.xml b/cascading2/pom.xml index 481ffbf8e..2eafe5620 100644 --- a/cascading2/pom.xml +++ b/cascading2/pom.xml @@ -32,6 +32,8 @@ cascading cascading-hadoop + ${cascading2.version} + provided junit diff --git a/cascading3/pom.xml b/cascading3/pom.xml new file mode 100644 index 000000000..b28cb1640 --- /dev/null +++ b/cascading3/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + + com.twitter.elephantbird + elephant-bird + 4.13-SNAPSHOT + .. + + elephant-bird-cascading3 + Elephant Bird Cascading3 + Cascading utilities. + + + conjars.org + http://conjars.org/repo + + + + + com.twitter.elephantbird + elephant-bird-core + + + org.apache.hadoop + hadoop-client + + + org.slf4j + slf4j-simple + + + cascading + cascading-hadoop + ${cascading3.version} + provided + + + junit + junit + + + diff --git a/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/CombinedSequenceFile.java b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/CombinedSequenceFile.java new file mode 100644 index 000000000..f056131fc --- /dev/null +++ b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/CombinedSequenceFile.java @@ -0,0 +1,85 @@ +package com.twitter.elephantbird.cascading3.scheme; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.SequenceFileInputFormat; + +import com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper; +import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat; + +import cascading.flow.FlowProcess; +import cascading.scheme.hadoop.SequenceFile; +import cascading.tap.Tap; +import cascading.tuple.Fields; + +/** + * This scheme allows SequenceFile splits to be combined via the DelegateCombineFileInputFormat + * before it is read. It can be used to combine inputs for intermediate MR jobs in Cascading. + * + * To enable, set cascading.flowconnector.intermediateschemeclass to this class in the Hadoop + * configuration. + * + * @author Akihiro Matsukawa + */ +public class CombinedSequenceFile extends SequenceFile { + + private static final String MR_COMPRESS_ENABLE = "mapreduce.output.fileoutputformat.compress"; + public static final String COMPRESS_ENABLE = "elephantbird.cascading.combinedsequencefile.compress.enable"; + + private static final String MR_COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type"; + public static final String COMPRESS_TYPE = "elephantbird.cascading.combinedsequencefile.compress.type"; + + private static final String MR_COMPRESS_CODEC = "mapreduce.output.fileoutputformat.compress.codec"; + public static final String COMPRESS_CODEC = "elephantbird.cascading.combinedsequencefile.compress.codec"; + + + protected CombinedSequenceFile() { super(); } + + public CombinedSequenceFile(Fields fields) { super(fields); } + + // We can allow overriding the compression settings for just this scheme here + private void updateJobConfForLocalSettings(Configuration conf) { + String localSetCompressionEnabled = conf.get(COMPRESS_ENABLE); + if(localSetCompressionEnabled != null) { + conf.set(MR_COMPRESS_ENABLE, localSetCompressionEnabled); + } + + String localSetCompressionType = conf.get(COMPRESS_TYPE); + if(localSetCompressionType != null) { + conf.set(MR_COMPRESS_TYPE, localSetCompressionType); + } + + String localSetCompressionCodec = conf.get(COMPRESS_CODEC); + if(localSetCompressionCodec != null) { + conf.set(MR_COMPRESS_CODEC, localSetCompressionCodec); + } + } + + @Override + public void sourceConfInit( + FlowProcess flowProcess, + Tap tap, + Configuration conf ) { + super.sourceConfInit(flowProcess, tap, conf); + + updateJobConfForLocalSettings(conf); + + // both EB combiner and Cascading3 work over the mapreduce API + // however, SequenceFileInputFormat is in the mapred API. + // in order to use the EB combiner we must wrap the mapred SequenceFileInputFormat + // with the MapReduceInputFormatWrapper and then wrap it in the DelegateCombineFileInputFormat + MapReduceInputFormatWrapper.setWrappedInputFormat(SequenceFileInputFormat.class, conf); + DelegateCombineFileInputFormat.setDelegateInputFormatHadoop2(conf, MapReduceInputFormatWrapper.class); + } + + @Override + public void sinkConfInit( FlowProcess flowProcess, Tap tap, Configuration conf ) + { + super.sinkConfInit(flowProcess, tap, conf); + + updateJobConfForLocalSettings(conf); + } + +} diff --git a/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoBinaryScheme.java b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoBinaryScheme.java new file mode 100644 index 000000000..6cf28c91b --- /dev/null +++ b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoBinaryScheme.java @@ -0,0 +1,85 @@ +package com.twitter.elephantbird.cascading3.scheme; + +import java.io.IOException; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.elephantbird.mapreduce.io.BinaryWritable; + +import cascading.flow.FlowProcess; +import cascading.scheme.Scheme; +import cascading.scheme.SinkCall; +import cascading.scheme.SourceCall; +import cascading.tap.Tap; +import cascading.tuple.Tuple; +import cascading.tuple.TupleEntry; + +/** + * Scheme for lzo binary encoded files. Handles both block and base 64. Can be used for Protobuf and Thrift. + * + * @author Argyris Zymnis + */ +abstract public class LzoBinaryScheme> extends + Scheme { + + private static final Logger LOG = LoggerFactory.getLogger(LzoBinaryScheme.class); + private static final long serialVersionUID = -5011096855302946106L; + + @Override + public void sink(FlowProcess flowProcess, SinkCall sinkCall) + throws IOException { + OutputCollector collector = sinkCall.getOutput(); + TupleEntry entry = sinkCall.getOutgoingEntry(); + T writable = sinkCall.getContext(); + writable.set((M) entry.getTuple().getObject(0)); + collector.collect(null, writable); + } + + @Override + public void sinkPrepare( FlowProcess fp, SinkCall sinkCall ) { + sinkCall.setContext(prepareBinaryWritable()); + } + + protected abstract T prepareBinaryWritable(); + + @Override + public boolean source(FlowProcess flowProcess, + SourceCall sourceCall) throws IOException { + + Object[] context = sourceCall.getContext(); + while(sourceCall.getInput().next(context[0], context[1])) { + Object out = ((T) context[1]).get(); + if(out != null) { + sourceCall.getIncomingEntry().setTuple(new Tuple(out)); + return true; + } + LOG.warn("failed to decode record"); + } + return false; + } + + @Override + public void sourceCleanup(FlowProcess flowProcess, + SourceCall sourceCall) { + sourceCall.setContext(null); + } + + /** + * This sets up the state between succesive calls to source + */ + @Override + public void sourcePrepare(FlowProcess flowProcess, + SourceCall sourceCall) { + //Hadoop sets a key value pair: + sourceCall.setContext(new Object[2]); + sourceCall.getContext()[0] = sourceCall.getInput().createKey(); + sourceCall.getContext()[1] = sourceCall.getInput().createValue(); + } +} diff --git a/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoByteArrayScheme.java b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoByteArrayScheme.java new file mode 100644 index 000000000..ef67a6c63 --- /dev/null +++ b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoByteArrayScheme.java @@ -0,0 +1,56 @@ +/* +Copyright 2012 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.twitter.elephantbird.cascading3.scheme; + +import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.OutputFormat; + +import com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper; +import com.twitter.elephantbird.mapreduce.input.MultiInputFormat; +import com.twitter.elephantbird.mapreduce.io.RawBytesWritable; +import com.twitter.elephantbird.mapreduce.output.LzoBinaryBlockOutputFormat; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; + +/** + * Scheme for lzo compressed files with binary records. + * + * @author Sam Ritchie + */ +public class LzoByteArrayScheme extends LzoBinaryScheme { + @Override protected RawBytesWritable prepareBinaryWritable() { + return new RawBytesWritable(); + } + + @Override public void sourceConfInit(FlowProcess fp, + Tap tap, + Configuration conf) { + MultiInputFormat.setClassConf(byte[].class, conf); + DelegateCombineFileInputFormat.setDelegateInputFormatHadoop2(conf, MultiInputFormat.class); + } + + @Override public void sinkConfInit(FlowProcess fp, + Tap tap, + Configuration conf) { + conf.setClass("mapreduce.outputformat.class", LzoBinaryBlockOutputFormat.class, OutputFormat.class); + } +} diff --git a/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoProtobufScheme.java b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoProtobufScheme.java new file mode 100644 index 000000000..351689bd1 --- /dev/null +++ b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoProtobufScheme.java @@ -0,0 +1,53 @@ +package com.twitter.elephantbird.cascading3.scheme; + +import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.OutputFormat; + +import com.google.protobuf.Message; + +import com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper; +import com.twitter.elephantbird.mapreduce.input.MultiInputFormat; +import com.twitter.elephantbird.mapreduce.io.ProtobufWritable; +import com.twitter.elephantbird.mapreduce.output.LzoProtobufBlockOutputFormat; +import com.twitter.elephantbird.util.Protobufs; +import com.twitter.elephantbird.util.TypeRef; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; + +/** + * Scheme for Protobuf lzo compressed files. + * + * @author Avi Bryant, Ning Liang + */ +public class LzoProtobufScheme extends + LzoBinaryScheme> { + + private static final long serialVersionUID = -5011096855302946105L; + private Class protoClass; + + public LzoProtobufScheme(Class protoClass) { + this.protoClass = protoClass; + } + + protected ProtobufWritable prepareBinaryWritable() { + TypeRef typeRef = (TypeRef) Protobufs.getTypeRef(protoClass.getName()); + return new ProtobufWritable(typeRef); + } + + @Override + public void sinkConfInit(FlowProcess hfp, Tap tap, Configuration conf) { + LzoProtobufBlockOutputFormat.setClassConf(protoClass, conf); + conf.setClass("mapreduce.outputformat.class", LzoProtobufBlockOutputFormat.class, OutputFormat.class); + } + + @Override + public void sourceConfInit(FlowProcess hfp, Tap tap, Configuration conf) { + MultiInputFormat.setClassConf(protoClass, conf); + DelegateCombineFileInputFormat.setDelegateInputFormatHadoop2(conf, MultiInputFormat.class); + } +} diff --git a/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoTextDelimited.java b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoTextDelimited.java new file mode 100644 index 000000000..43c7d1958 --- /dev/null +++ b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoTextDelimited.java @@ -0,0 +1,84 @@ +package com.twitter.elephantbird.cascading3.scheme; + +import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat; +import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordReader; + +import com.twitter.elephantbird.mapred.output.DeprecatedLzoTextOutputFormat; + +import cascading.flow.FlowProcess; +import cascading.scheme.hadoop.TextDelimited; +import cascading.tap.Tap; +import cascading.tuple.Fields; + +/** + * Scheme for LZO encoded TSV files. + * + * @author Ning Liang + */ +public class LzoTextDelimited extends TextDelimited { + + public LzoTextDelimited(Fields fields, String delimiter) { + super(fields, delimiter); + } + + public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter) { + super(fields, skipHeader, delimiter); + } + + public LzoTextDelimited(Fields fields, String delimiter, Class[] types) { + super(fields, delimiter, types); + } + + public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter, Class[] types) { + super(fields, skipHeader, delimiter, types); + } + + public LzoTextDelimited(Fields fields, String delimiter, String quote, Class[] types) { + super(fields, delimiter, quote, types); + } + + public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter, + String quote, Class[] types) { + super(fields, skipHeader, delimiter, quote, types); + } + + public LzoTextDelimited(Fields fields, String delimiter, + String quote, Class[] types, boolean safe) { + super(fields, delimiter, quote, types, safe); + } + + public LzoTextDelimited(Fields fields, boolean skipHeader, boolean writeHeader, String delimiter, + boolean strict, String quote, Class[] types, boolean safe) { + // We set Compress to null as this class's point is to explicitly handle this + super(fields, null, skipHeader, writeHeader, delimiter, strict, quote, types, safe); + } + + public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter, + String quote, Class[] types, boolean safe) { + super(fields, skipHeader, delimiter, quote, types, safe); + } + + public LzoTextDelimited(Fields fields, String delimiter, String quote) { + super(fields, delimiter, quote); + } + + public LzoTextDelimited(Fields fields, boolean skipHeader, String delimiter, String quote) { + super(fields, skipHeader, delimiter, quote); + } + + @Override + public void sourceConfInit(FlowProcess flowProcess, Tap tap, Configuration conf ) { + conf.setClass("mapred.input.format.class", LzoTextInputFormat.class, InputFormat.class); + } + + @Override + public void sinkConfInit(FlowProcess flowProcess, Tap tap, Configuration conf ) { + conf.setClass("mapred.output.format.class", DeprecatedLzoTextOutputFormat.class, OutputFormat.class); + } +} diff --git a/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoTextLine.java b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoTextLine.java new file mode 100644 index 000000000..e0fea41cd --- /dev/null +++ b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoTextLine.java @@ -0,0 +1,59 @@ +package com.twitter.elephantbird.cascading3.scheme; + +import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat; +import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordReader; + +import com.twitter.elephantbird.mapred.output.DeprecatedLzoTextOutputFormat; + +import cascading.flow.FlowProcess; +import cascading.scheme.hadoop.TextLine; +import cascading.tap.Tap; +import cascading.tuple.Fields; + +/** + * Scheme for LZO encoded text files. + * + * @author Ning Liang + */ +public class LzoTextLine extends TextLine { + + public LzoTextLine() { + super(); + } + + public LzoTextLine(int numSinkParts) { + super(numSinkParts); + } + + public LzoTextLine(Fields sourceFields, Fields sinkFields) { + super(sourceFields, sinkFields); + } + + public LzoTextLine(Fields sourceFields, Fields sinkFields, int numSinkParts) { + super(sourceFields, sinkFields, numSinkParts); + } + + public LzoTextLine(Fields sourceFields) { + super(sourceFields); + } + + public LzoTextLine(Fields sourceFields, int numSinkParts) { + super(sourceFields, numSinkParts); + } + + @Override + public void sourceConfInit(FlowProcess flowProcess, Tap tap, Configuration conf ) { + conf.setClass("mapred.input.format.class", LzoTextInputFormat.class, InputFormat.class); + } + + @Override + public void sinkConfInit(FlowProcess flowProcess, Tap tap, Configuration conf ) { + conf.setClass("mapred.output.format.class", DeprecatedLzoTextOutputFormat.class, OutputFormat.class); + } +} diff --git a/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoThriftScheme.java b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoThriftScheme.java new file mode 100644 index 000000000..f3243ba5b --- /dev/null +++ b/cascading3/src/main/java/com/twitter/elephantbird/cascading3/scheme/LzoThriftScheme.java @@ -0,0 +1,53 @@ +package com.twitter.elephantbird.cascading3.scheme; + +import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.OutputFormat; + +import com.twitter.elephantbird.mapred.output.DeprecatedOutputFormatWrapper; +import com.twitter.elephantbird.mapreduce.input.MultiInputFormat; +import com.twitter.elephantbird.mapreduce.io.ThriftWritable; +import com.twitter.elephantbird.mapreduce.output.LzoThriftBlockOutputFormat; +import com.twitter.elephantbird.util.ThriftUtils; +import com.twitter.elephantbird.util.TypeRef; + +import cascading.flow.FlowProcess; +import cascading.tap.Tap; + +import org.apache.thrift.TBase; + +/** + * Scheme for Thrift lzo compressed files. + * + * @author Argyris Zymnis + */ +public class LzoThriftScheme> extends + LzoBinaryScheme> { + + private static final long serialVersionUID = -5011096855302946109L; + private Class thriftClass; + + public LzoThriftScheme(Class thriftClass) { + this.thriftClass = thriftClass; + } + + @Override + public void sinkConfInit(FlowProcess hfp, Tap tap, Configuration conf) { + LzoThriftBlockOutputFormat.setClassConf(thriftClass, conf); + conf.setClass("mapreduce.outputformat.class", LzoThriftBlockOutputFormat.class, OutputFormat.class); + } + + protected ThriftWritable prepareBinaryWritable() { + TypeRef typeRef = (TypeRef) ThriftUtils.getTypeRef(thriftClass); + return new ThriftWritable(typeRef); + } + + @Override + public void sourceConfInit(FlowProcess hfp, Tap tap, Configuration conf) { + MultiInputFormat.setClassConf(thriftClass, conf); + DelegateCombineFileInputFormat.setDelegateInputFormatHadoop2(conf, MultiInputFormat.class); + } +} diff --git a/cascading3/src/test/java/com/twitter/elephantbird/cascading3/scheme/TestCombinedSequenceFile.java b/cascading3/src/test/java/com/twitter/elephantbird/cascading3/scheme/TestCombinedSequenceFile.java new file mode 100644 index 000000000..729778a6f --- /dev/null +++ b/cascading3/src/test/java/com/twitter/elephantbird/cascading3/scheme/TestCombinedSequenceFile.java @@ -0,0 +1,49 @@ +package com.twitter.elephantbird.cascading3.scheme; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.RecordReader; +import org.junit.Test; + +import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper; +import com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper; +import com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat; + +import cascading.flow.FlowProcess; +import cascading.flow.hadoop.HadoopFlowProcess; +import cascading.tap.Tap; +import cascading.tap.hadoop.util.TempHfs; +import cascading.tuple.Fields; +import static org.junit.Assert.assertEquals; + +public class TestCombinedSequenceFile { + + @Test + public void testHadoopConf() { + CombinedSequenceFile csfScheme = new CombinedSequenceFile(Fields.ALL); + JobConf conf = new JobConf(); + FlowProcess fp = new HadoopFlowProcess(); + Tap tap = + new TempHfs(conf, "test", CombinedSequenceFile.class, false); + + csfScheme.sourceConfInit(fp, tap, conf); + + assertEquals( + "MapReduceInputFormatWrapper shold wrap mapred.SequenceFileinputFormat", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + conf.get(MapReduceInputFormatWrapper.CLASS_CONF_KEY) + ); + assertEquals( + "Delegate combiner should wrap MapReduceInputFormatWrapper", + "com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper", + conf.get(DelegateCombineFileInputFormat.COMBINED_INPUT_FORMAT_DELEGATE) + ); + assertEquals( + "Delegate combiner should be set without any deprecated wrapper", + "com.twitter.elephantbird.mapreduce.input.combine.DelegateCombineFileInputFormat", + conf.get("mapreduce.inputformat.class") + ); + } + +} diff --git a/core/src/main/java/com/twitter/elephantbird/mapreduce/input/combine/DelegateCombineFileInputFormat.java b/core/src/main/java/com/twitter/elephantbird/mapreduce/input/combine/DelegateCombineFileInputFormat.java index 06f40e5f3..ba2f3a01f 100644 --- a/core/src/main/java/com/twitter/elephantbird/mapreduce/input/combine/DelegateCombineFileInputFormat.java +++ b/core/src/main/java/com/twitter/elephantbird/mapreduce/input/combine/DelegateCombineFileInputFormat.java @@ -46,8 +46,13 @@ public static void setCombinedInputFormatDelegate(Configuration conf, Class inputFormat) { - DeprecatedInputFormatWrapper.setInputFormat(DelegateCombineFileInputFormat.class, conf); + public static void setDelegateInputFormat(JobConf jobConf, Class inputFormat) { + DeprecatedInputFormatWrapper.setInputFormat(DelegateCombineFileInputFormat.class, jobConf); + setCombinedInputFormatDelegate(jobConf, inputFormat); + } + + public static void setDelegateInputFormatHadoop2(Configuration conf, Class inputFormat) { + conf.setClass("mapreduce.inputformat.class", DelegateCombineFileInputFormat.class, InputFormat.class); setCombinedInputFormatDelegate(conf, inputFormat); } diff --git a/pom.xml b/pom.xml index 40b23b3dd..91bfb7617 100644 --- a/pom.xml +++ b/pom.xml @@ -86,6 +86,8 @@ 0.4.19 0.7.0 thrift + 2.5.2 + 3.0.2 @@ -107,6 +109,16 @@ elephant-bird-cascading2 ${project.version} + + ${project.groupId} + elephant-bird-cascading3 + ${project.version} + + + ${project.groupId} + elephant-bird-cascading-protobuf + ${project.version} + ${project.groupId} elephant-bird-core @@ -665,6 +677,8 @@ cascading2 + cascading3 + cascading-protobuf crunch core hadoop-compat