-
Notifications
You must be signed in to change notification settings - Fork 76
Hadoop DSL Language FAQ
-
Hadoop DSL Language FAQ
- How do I clear previously declared job dependencies or workflow targets?
- How do I set Hadoop job configuration parameters (like the amount of memory my jobs use)?
- How can I affect the JVM properties of my Azkaban job?
- Can I split my DSL code into more than one file? For example, I would like to have one DSL file per workflow.
- If I have my code split into more than one file, can I define something in one file and then refer to it in another?
- How can I escape strings in the Hadoop DSL?
- I can't seem to define jobs in a Groovy each closure. What's wrong?
- Is there a way to determine if a name is bound in scope?
- How can I programmatically add properties to all the jobs in a workflow?
- How can I make a job or workflow conditionally depend on several other jobs that I need to check in a loop?
- How can I add things to Hadoop Distributed Cache? Can I use this to specify the jars my Hadoop Java Job needs?
- Can I use Hadoop Closures inside of Hadoop Closures?
- Can a Hadoop DSL definition set inherit from another definition set?
See Clearing Job Dependencies and Workflow Targets While Cloning.
(Since version 0.3.6) If you are using a subclass of the HadoopJavaProcessJob
class (such as hadoopJavaJob
, hiveJob
, javaJob
, pigJob
, kafkaPushJob
or voldemortBuildPushJob
), you can Hadoop job configuration parameters using the following syntax:
hadoopJavaJob('jobName') { // Or another subtype of HadoopJavaProcessJob
set confProperties: [
'mapreduce.map.memory.mb' : 2048, // Sets the amount of physical RAM allocated to the map tasks of your job. Should be in increments of 2048.
'mapreduce.map.java.opts' : '-Xmx 1536m', // Sets the amount of Xmx for your map tasks. Should be less than mapreduce.map.memory.mb to accommodate stack and code size.
'mapreduce.reduce.memory.mb' : 4096, // Sets the amount of physical RAM allocated to the reduce tasks of your job. Should be in increments of 2048.
'mapreduce.reduce.java.opts': '-Xmx 3584m' // Sets the amount of Xmx for your reduce tasks. Should be less than mapreduce.reduce.memory.mb to accommodate stack and code size.
]
}
Using set confProperties
causes properties to be written to your Azkaban job file that are prefixed with hadoop-inject
. Azkaban will automatically inject any properties it sees that are prefixed with hadoop-inject
into the Hadoop Job configuration object.
(Since version 0.3.6) For any subclass of the JavaProcessJob
class, you can set the classpath, JVM properties, -Xms
and -Xmx
of the client JVM started by Azkaban to run the job:
javaProcessJob('jobName') {
uses 'com.linkedin.foo.HelloJavaProcessJob' // Required. Sets java.class=com.linkedin.foo.HelloJavaProcessJob in the job file
jvmClasspath './*:./lib/*' // Sets the classpath of the JVM started by Azkaban to run the job
set jvmProperties: [ // Sets jvm.args=-DpropertyName1=propertyValue1 -DpropertyName2=propertyValue2 in the job file.
'jvmPropertyName1' : 'jvmPropertyValue1', // These arguments are passed to the JVM started by Azkaban to run the job.
'jvmPropertyName2' : 'jvmPropertyValue2'
]
Xms 96 // Sets -Xms for the JVM started by Azkaban to run the job
Xmx 384 // Sets -Xmx for the JVM started by Azkaban to run the job
}
These properties affect the JVM of the client process started by Azkaban to run the job only. In particular, these properties do NOT affect the JVM properties that run map and reduce tasks for Hadoop jobs. If you are running a map-reduce job, you usually do not need to increase the amount of memory for the Azkaban client process.
To affect the JVM properties that run map and reduce tasks, you need to set the appropriate Hadoop job configuration parameter using set confProperties
.
Can I split my DSL code into more than one file? For example, I would like to have one DSL file per workflow.
Yes, you can. For projects with large workflows, splitting them into one DSL file per workflow is a reasonable idea.
// In your build.gradle, you can apply all the DSL files you want
apply from: 'src/main/gradle/workflow1.gradle'
apply from: 'src/main/gradle/workflow2.gradle'
apply from: 'src/main/gradle/workflow3.gradle'
If I have my code split into more than one file, can I define something in one file and then refer to it in another?
Yes and no. Unfortunately, Groovy def
variables (e.g. def foo = "bar"
) are limited to the scope of the file in which they are declared. However, you can use the Hadoop DSL definitionSet
feature to work around this problem.
For the Hadoop DSL, we have our own explicit name resolution, so you interchangeably refer to named Hadoop DSL elements (such as jobs and workflows) across files.
Any named elements you declare in a particular scope, you can then use in that scope, even in a different file. Since the names are resolved at build time (see the section in the language reference on Variables, Names and Scope), you can even refer to job or workflow that you haven't defined yet (but define later, perhaps even in another file). Here's a quick example:
// In the file workflow1.gradle
hadoopJavaJob('cacheLibraries') {
// 'cacheLibraries' now bound in Hadoop DSL global scope
uses 'com.linkedin.drama.mapred.CacheLibraryJob'
}
hadoop {
buildPath "azkaban"
workflow('countByCountryFlow') {
// Workflow 'countByCountryFlow' now bound in hadoop scope
hadoopJavaJob('countByCountry') {
uses 'com.linkedin.hello.mapreduce.CountByCountryJob'
// Job 'countByCountry' now bound in the workflow scope for countByCountryFlow
}
targets 'countByCountry'
}
}
// In the file workflow2.gradle
hadoop {
workflow('anotherExampleFlow') {
addJob('cacheLibraries') { // Lookup 'cacheLibraries' in Hadoop DSL global scope works even across files
}
addJob('.hadoop.countByCountryFlow.countByCountry') { // Do a fully-qualified lookup and clone of the countByCountry job from workflow1.gradle
}
targets 'countByCountry'
}
}
You can use Groovy language features and API functions to help you escape strings in the Hadoop DSL. Groovy makes differences between single-quoted strings, double-quoted strings, triply double-quoted strings, slashy strings, dollar-sign strings and more. This should allow you to easily define things like Avro schemas without making your code impossible to read.
WANTED: If someone has a recommendation for the best way to define an Avro schema in the Hadoop DSL (or better ways to escape other kinds of commonly-used strings), please let us know.
// http://docs.groovy-lang.org/latest/html/gapi/groovy/json/StringEscapeUtils.html
def escape = { s -> groovy.json.StringEscapeUtils.escapeJava(s) }
// Read http://mrhaki.blogspot.com/2009/08/groovy-goodness-string-strings-strings.html for more information on all the types of Groovy strings!
def schema = escape("""{
"type" : "record",
"name" : "member_summary",
"namespace" : "com.linkedin.data.derived",
"fields" : [ {
"name" : "memberId",
"type" : [ "null", "long" ]
} ]
}""");
noOpJob('test') {
set properties: [
'test1' : escape("line1\nline2"),
'test2' : '"mySchema": "foo": [ "bar": "bazz"]',
'test3' : "\"${escape('"mySchema": "foo": [ "bar": "bazz"]')}\"",
'test4' : "\"${schema}\""
]
}
// This results in the following output in the compiled Azkaban job file:
// test1=line1\nline2
// test2="mySchema": "foo": [ "bar": "bazz"]
// test3="\"mySchema\": \"foo\": [ \"bar\": \"bazz\"]"
// test4="{\n \"type\" : \"record\",\n \"name\" : \"member_summary\",\n \"namespace\" : \"com.linkedin.data.derived\",\n \"fields\" : [ {\n \"name\" : \"memberId\",\n \"type\" : [ \"null\", \"long\" ]\n } ]\n }"
Tyler E. at LinkedIn noticed that he could not define jobs in a Groovy each
closure. The following example failed:
hadoop {
buildPath "azkaban"
workflow('foo') {
['a': 1].each {
hadoopJavaJob('b') {
uses 'reminder.metrics.AvroMetricToAMFJob'
}
targets 'b'
}
}
}
// Fails with WorkflowJobChecker ERROR: Workflow foo declares that it targets b, but this target does not exist in the workflow.
The problem is that with Gradle, there's a funny interaction with closures (the use of each
results in the application of a closure). The delegate of the closure resolves to the top-level Hadoop DSL scope instead of the workflow scope. To solve this problem (and get the jobs to be in the scope of the workflow in which they are declared), use a regular loop instead of an each
closure.
For more gotcha issues like this, see Gradle and Groovy Problems in the language reference.
hadoop {
buildPath "azkaban"
workflow('foo') {
def m = ['a': 1]
for (entry in m) {
hadoopJavaJob('b') {
uses 'reminder.metrics.AvroMetricToAMFJob'
}
}
targets 'b'
}
}
See the section on using lookupRef
at Using lookupRef.
First, see if you can solve your problem by declaring a propertySet
and declaring baseProperties
for your jobs. See the section on Property Sets for more information. If this doesn't solve your problem, you can loop through the jobs programmatically and update them.
workflow('w1') {
// Declares a bunch of jobs
job('job1') { }
job('job2') { }
}
for (job in lookup('w1').jobs) {
job.setJobProperty("foo", "bar");
}
How can I make a job or workflow conditionally depend on several other jobs that I need to check in a loop?
Sometimes you want declare a job, and then depending on your Hadoop DSL logic, declare that your job depends on a series of other jobs.
def someCondition = true
def someOtherCondition = true
workflow('workflow1') {
// Declare jobs 1 through N
job('job1') { }
job('job2') { }
// Declare the finalJob, which might depend on some subset of jobs 1 through N depending on some condition
job('finalJob') {
// Remember, use regular for loops, not Groovy each closure loops
for (int i = 1; i <= 2; i++) {
if (someCondition) {
depends "job${i}" // That was easy!
}
}
}
// Let's do the same for the workflow targets
for (int i = 1; i <= 2; i++) {
if (someOtherCondition) {
targets "job${i}" // Also easy!
}
}
}
How can I add things to Hadoop Distributed Cache? Can I use this to specify the jars my Hadoop Java Job needs?
You can use the caches
and cachesArchives
declaration on any subclass of HadoopJavaProcessJob
to add things to Hadoop Distributed Cache. See the section on the HadoopJavaProcessJob for more information.
While this can be helpful for resource files or archives, this unfortunately help much with jars, because it doesn't add them to the classpath of your tasks. You have to programmatically add jars to Distributed Cache in the actual code of your Hadoop Java jobs to get the jars both added to Distributed Cache and added to the classpath of your tasks. For LinkedIn users, the Hadoop Starter Kit's hello-mapreduce-azkaban classes all do this.
Although you can declare a hadoopClosure
inside of another hadoopClosure
, the Hadoop DSL will effectively consider all Hadoop Closures as if they were defined in a single plugin-wide scope, so it is hard to imagine any situations in which this be advantageous.
On the other hand, you can definitely evaluate one hadoopClosure
inside of another. The issue becomes what Hadoop definitionSet
to use to perform the inner evaluation. The most natural definitionSet
to use for the inner evaluation is the same definitionSet
being used to evaluate the outer hadoopClosure
.
To use the same definitionSet
for both, you can use the Hadoop DSL definitionSetName
method to get the name of the current definitionSet
when you evaluate the inner hadoopClosure
.
definitionSet name: 'myDefs', defs: [
'foo': 'val1',
'bar': 'val2'
}
hadoopClosure name: 'closure1', closure: {
println lookupDef('foo')
}
hadoopClosure name: 'closure2', closure: {
println lookupDef('bar')
evalHadoopClosure name: 'closure1', defs: definitionSetName() // Evaluate closure1 against the same definition set being used to evaluate closure2
}
evalHadoopClosure name: 'closure2', defs: 'myDefs'
// Prints 'val1' and 'val1' on separate lines
There is no inheritance model for definition sets. However, when you declare a definitionSet
it will return a Groovy Map
. This makes it easy combine previous definitions to form new definition sets using the Groovy +
operator.
// Create a new named definition set and save a reference to the Groovy Map it returns
def contextOneMap = definitionSet name: "context1", defs: [
var2: 'VAL2',
var3: 3
]
// Perhaps you have some more things declared in a regular Groovy Map
def anotherMap = [
'anotherVar1': 'anotherVal1'
]
// Another way to derive new definition sets is to save a reference to the Groovy Map returned when declaring
// a previous definitionSet and then append it to the new defs with the Groovy plus operator
definitionSet name: "anotherDefinitionSet", defs: contextOneMap + anotherMap + [
var2: 'VAL2_UPDATED',
var4: false
]