-
Notifications
You must be signed in to change notification settings - Fork 76
Hadoop DSL Language Reference
-
Hadoop DSL Language Reference
- What the Hadoop DSL is Not
- Version Updates
- Building the Hadoop DSL
- Apache Oozie Status
- Hadoop DSL Source Code
- Hadoop DSL GroovyDoc and Sources Jars
- Hadoop DSL Syntax Auto-Completion in IntelliJ IDEA
- Learning the Hadoop DSL By Example
- Job Functions
- Complete Pig Workflow by Example
- Expert Hadoop DSL Functionality by Example
- Declaring Reusable Hadoop DSL Blocks by Example
- Building Multi-Grid Workflows by Example
- Hadoop DSL Automatic Builds for Simple Multi-Grid Builds
- Applying Hadoop DSL User Profiles by Example
- Programming with Classes and Objects by Example
- Hadoop DSL Language Reference FAQ
This is the language reference for the LinkedIn Gradle DSL for Apache Hadoop. For the sake of brevity, we shall refer to the DSL as simply the "Hadoop DSL".
The Hadoop DSL is a language for specifying jobs and workflows for Hadoop workflow schedulers like Azkaban and Apache Oozie. The Hadoop DSL makes it easy to express your Hadoop workflows in a consistent fashion. With the Hadoop DSL:
- You can easily describe your jobs in a single file (or several files), instead of having to create one Azkaban job file per job
- The Hadoop DSL is an embedded Groovy DSL, so it is (basically) just Groovy. You can use Groovy code anywhere you want throughout the DSL.
- While you will mostly use the DSL to specify your workflows and jobs, you can also drop down and manually work with the classes and objects that define the DSL
- The DSL is statically compiled into job files at build time. Since it's statically compiled, it can be statically checked! The static checker will catch all kinds of potential problems with your jobs at build time. This will end the edit-upload-run-repeat cycle of having to edit your job files until you finally get them all specified correctly.
While the DSL should be convenient for all Hadoop users, the true win from the DSL is for large Hadoop workflows, i.e. workflows with dozens or even hundreds of jobs. In projects like these, users actually need code to generate and control their workflows. This is exactly what the Hadoop DSL provides.
The DSL lets users think about their Hadoop workflows as being "programmable" (which includes statically checking them), rather than simply thinking about their workflows as a loose collection of property files.
The Hadoop DSL is not a DSL for writing Hadoop jobs. Plenty of tools and frameworks for this already exist, like Apache Pig, Apache Hive, Java Map-Reduce, etc.
The following versions of the Hadoop Plugin contain feature updates to the Hadoop DSL:
Version | Description |
---|---|
0.13.x | Latest development series |
0.13.2 | (Recommended) Add checkExactPath method to HdfsWaitJob job type and add more options to Hadoop DSL Automatic Builds |
0.13.1 | Hadoop DSL Automatic Builds for simple multi-grid builds |
0.12.9 | New Hadoop DSL applyUserProfile method is available |
0.12.4 | Add HdfsWaitJob job type that waits for fresh data in HDFS |
0.12.2 | Add TableauJob job type support. Fix issue with missing isGrouping property on cloned workflows. |
0.12.0 | Add initial Hadoop Plugin example project |
0.11.19 | Prevent core-site.xml from polluting the IDE classpath |
0.11.18 | Add supports for a required parameters check |
0.11.16 | Support for PinotBuildAndPushJob job type. Support for groupJobs declaration for child workflows that generates child workflows as Azkaban embedded flows. |
0.11.4 | Automatic syntax completion for the Hadoop DSL in IntelliJ IDEA available |
0.10.15 | Last recommended version in the 0.10.x series that includes Azkaban CLI tasks and improved azkabanUpload features |
0.9.17 | Last recommended version in the 0.9.x series that includes a number of minor fixes |
0.9.5 | Added SqlJob job type |
0.9.1 | Add extra typed methods for Hadoop DSL SparkJob properties |
0.8.14 | Last recommended version in the 0.8.x series that includes a number of minor fixes |
0.7.11 | New job types for Espresso, Gobblin and Teradata. Last recommended version in the 0.7.x series. |
0.7.10 | New HadoopShellJob job type |
0.6.10 | Added applyProfile method. Last recommended version in the 0.6.x series. |
0.6.5 | Support for SparkJob job type for Apache Spark |
0.5.17 | Hadoop Plugin zip extension and azkabanUpload task work correctly with hadoopClosure . Last recommended version in the 0.5.x series. |
0.5.15 | The nameNode property on KafkaPushJob is not required again. |
0.5.8 | Support for hadoopClosure
|
0.5.3 | Support for definitionSet . The nameNode property on KafkaPushJob types is now required. |
0.5.2 | Support for namespace
|
0.4.7 | Last stable version before 0.5.x features |
0.4.5 | You can use objects as values with set <properties, jvmProperties, confProperties> . They will be converted to strings at build time. Added usesDisableAuditing method for the KafkaPushJob job type. |
0.4.2 | Ability to clear workflow targets and job dependencies; read-before-write static check is now a warning instead of an error |
0.4.1 | Support for child workflows |
0.3.6 | Support for propertySet for property sets and set confProperties for Hadoop configuration properties |
0.2.9 | Initial stable release |
To compile the Hadoop DSL into Azkaban job and property files, see Building the Hadoop DSL for Azkaban.
Right now, Azkaban is the only Hadoop workflow scheduler which can be targeted by the Hadoop DSL, but eventually the Hadoop Plugin may include compilers for other schedulers such as Apache Oozie.
Although we started on a Hadoop DSL compiler for Apache Oozie, we did not complete it, and it is currently not in a usable form. We are not currently working on it and it is unlikely to be completed.
If you need to know exactly how something works, you can check the Hadoop DSL source code. You can check the source code on GitHub at https://github.com/linkedin/linkedin-gradle-plugin-for-apache-hadoop/tree/master/hadoop-plugin/src/main/groovy/com/linkedin/gradle/hadoopdsl.
The Hadoop Plugin includes a complete set of GroovyDoc and sources jars for all the classes in the Hadoop Plugin, including the the Hadoop DSL classes.
These jars are published to Bintray at https://bintray.com/convexquad/maven/linkedin-gradle-plugin-for-apache-hadoop/view#files/com/linkedin/hadoop-plugin/hadoop-plugin. Once you download the jar you want, explode it using jar xvf <jarFile>
to get the actual GroovyDoc or sources.
The Hadoop DSL supports automatic syntax completion in all recent versions of IntelliJ IDEA. Simply run ./gradlew idea
from the root directory of any project for which you apply the Hadoop Plugin (and the idea
and java
plugins, which are required for syntax completion), open the project in IDEA, and wait for indexing to complete. Then syntax completion will be available for the Hadoop DSL in any .gradle
file.
Additionally, you can CTRL-click
only any Hadoop DSL language feature to click-thru to the Hadoop DSL source for that feature. This makes it very easy to check the extensive GroovyDoc that is available for every Hadoop DSL language feature.
The basic idea of the Hadoop DSL is to write down your Hadoop workflows, jobs and their dependencies.
hadoop {
// Write down where you want the job files to be generated!
buildPath "azkaban"
// Write down your workflows!
workflow('workflow1') {
hiveJob('job1') { // job1 has no other jobs on which it depends
uses 'hello.q'
}
pigJob('job2') {
uses 'src/main/pig/pigScript.pig'
depends 'job1' // job2 depends on job1
}
targets 'job2' // workflow1 executes job2, which depends on job1, etc.
}
// Write down another workflow!
workflow('workflow2') {
// More jobs here
}
}
When the task that compiles the DSL into Azkaban job files runs, it doesn't just build all the jobs you have declared. The build process does the following:
- Finds the workflows added to the
hadoop
block - Examines the jobs on which the workflows depend
- Transitively walks the graph of job dependencies, and builds all the jobs it encounters
No other jobs in the DSL get built. The reason for this is so that you can declare jobs and workflows in the global scope (outside of a hadoop
block) and then easily manipulate them later - just like global variables!
We'll see an example of how this is useful once we discuss cloning jobs and workflows. If you are just getting started, you probably want to add all your workflows within a hadoop
block (and all your jobs within those workflows), so that all the workflows you declare get built.
// NOT built - not in a workflow!
pigJob('job1') {
}
// NOT built - not in a hadoop block!
workflow('workflow1') {
pigJob('job1') {
}
targets 'job1'
}
// NOT built - not in a hadoop block!
propertyFile('propertyFile1') {
set properties: [
'foo' : 'bar'
]
}
hadoop {
// Specify where to generate the job files
buildPath "azkaban"
workflow('workflow1') {
pigJob('job1') { // NOT built - nothing transitively depends on it!
uses 'src/main/pig/pigScript.pig'
}
pigJob('job2') { // Built
uses 'src/main/pig/pigScript.pig'
}
pigJob('job3') {
uses 'src/main/pig/pigScript.pig'
depends 'job2' // Causes job2 to be built
}
hiveJob('job4') {
uses 'src/main/hive/hello.q'
depends 'job2' // Causes job2 to be built
}
targets 'job3', 'job4' // workflow1 targets job3 and job4, this causes job3 and job4 to be built
}
// NOT built - not in a workflow!
pigJob('job1') {
uses 'src/main/pig/pigScript.pig'
}
workflow('workflow2') {
// More jobs here
}
}
// You can have more than one hadoop block if you like, perhaps it makes your DSL code easier to read.
// You don't need to re-specify the buildPath if you already specified it from another hadoop block.
hadoop {
workflow('workflow3') {
// More jobs here
}
}
The DSL is just fancy Groovy + Gradle. Since it's an embedded DSL, you can use Groovy anywhere, or access Gradle functionality by writing code in-line in the DSL.
hadoop {
buildPath "azkaban"
}
// It's just Groovy! You can write down Groovy directly in the DSL.
def myVariable1 = true
def myVariable2 = true
def myFunction() {
// do something
}
def pigSrcDir = 'src/main/pig'
def countByCountryJob3 = 'count_by_country3.job'
workflow('workflow1') {
if (myVariable1) { // No, REALLY it's just Groovy. You can write down code blocks, import classes,
pigJob('job1') { // Define and call functions, etc. in the DSL.
uses 'src/main/pig/count_by_country1.job' // Groovy can use single-quoted strings, but note the gotcha below!
}
}
else {
pigJob('job2') {
if (myVariable2) {
uses "${pigSrcDir}/count_by_country2.job" // Groovy GOTCHA: to do ${dollar sign replacement}, you MUST use a double-quoted string.
}
else {
uses "${pigSrcDir}/count_by_country3.job"
}
}
}
// Since it's Groovy, you can do things like add jobs in a loop, and declare that the workflow targets all of them.
for (int i = 0; i < 3; i++) {
pigJob("myPigJob${i}") {
uses "${pigSrcDir}/pigScript${i}.pig"
}
targets "myPigJob${i}" // The workflow targets method is additive, so the workflow will depend on all the jobs in the loop
}
myFunction() // Invoke the Groovy function we defined earlier
}
// REALLY, REALLY it's just Groovy. The variable workflow2 is now a Workflow object (that contains a Pig job) that you can
// manipulate programmatically. We'll get into this later.
def workflow2 = workflow('workflow2') { pigJob('job3') { } }
If you were paying close attention, you might have wondered how the DSL keywords like workflow
or pigJob
are implemented. These are just Groovy methods exposed by the Hadoop Plugin that set properties that can be later used by the Plugin during its build phase. Remember, it's just Groovy!
Gradle and Groovy each have a few painful idiosyncrasies that are guaranteed to cause you problems if you don't know about them.
- Groovy
def
Variables and Double-Quoted Strings
Groovy def
variables are limited to the scope of the Gradle file in which you declare them. Read about Hadoop DSL definition sets to solve this problem. When declaring strings, remember in Groovy that dollar-sign variable replacement requires the use of double-quotes as in "${myVariable}"
. If you use single-quoted strings, Groovy will not resolve any dollar-sign variables within the string.
def myVariable1 = "src/main/pig"
def myVariable2 = "${myVariable1}" // Groovy will resolve this double-quoted dollar-sign variable to the string "src/main/pig"
def myVariable3 = '${voldemort.dev.cluster}' // Since you used a single-quoted string, you will get back '${voldemort.dev.cluster}'
- Azkaban Parameter Substitution
Azkaban also supports a variable replacement mechanism, and unfortunately it also uses dollar-sign notation, which can be extremely confusing. You should read the section entitled "Parameter Substitution" at https://azkaban.github.io/azkaban/docs/latest/#job-configuration. This means that if you use Groovy single quotes to declare a dollar-sign string, it will be resolved at execution time by Azkaban.
def myVariable3 = '${voldemort.dev.cluster}' // This variable is resolved at execution time by Azkaban
- Groovy
each
Closures
Another potential source of issues is with Groovy each
closures. If you try to end the evaluation of a Groovy each
iteration by returning a result, the loop actually will not terminate, as the result is being returned from the closure, not the loop itself.
Additionally, Groovy each
closures have an interaction issue with Gradle that causes each
closures declared inside a hadoop
block to evaluate in the context of hadoop
scope. This means that any Hadoop DSL declarations you make inside the closure may take effect in hadoop
scope, rather than the scope in which the closure is declared or evaluated.
For these reasons, we suggest you don't use Groovy each closures. Just use regular for loops instead.
// WARNING: This will run five times, because we return out of the closure and not out of the loop
[1, 2, 3, 4, 5].each { index ->
println index
if (index == 1) return index;
}
hadoop {
buildPath "azkaban"
workflow('workflow1') {
// WARNING: Since the closure is nested underneath a hadoop block it will evaluate in hadoop scope instead of the workflow scope
[1, 2, 3, 4, 5].each { index ->
job("jobName${index}") { }
println(lookup("hadoop.jobName${index}").name) // The jobs got added to hadoop scope instead of the workflow
}
// Just use regular for loops instead
for (index in [1, 2, 3, 4, 5]) {
job("jobName${index}") { }
println(lookup("workflow1.jobName${index}").name) // Now the jobs have been added to the workflow
}
}
}
// Lesson: Don't use Groovy each closures, just use regular for loops instead
- Groovy
def
Closures
Groovy def
closures (functions) also have an interaction issue with Gradle that causes the Hadoop DSL scope of the closure to be set to the scope in which the closure is declared, instead of the scope in which the closure is invoked. This means that if you declare a Groovy def
function in global scope, it will always evaluate in the context of global scope, even if you call the function from another scope.
To get around this problem, declare Groovy def
functions in the scope in which you wish to use them, or use the Hadoop DSL lookup
function inside your Groovy def
function and pass the fully qualified name of a Hadoop DSL object as an argument to your function.
hadoop {
buildPath "azkaban"
}
// WARNING: calling this will create jobs in global scope, even if called from within another scope
def globalScopeDef = { index ->
job("jobName${index}") { }
}
workflow('workflow1') {
// WARNING: These jobs declared in global scope!
for (int index = 1; index <= 5; index++) {
globalScopeDef(index);
println(lookup(".jobName${index}").name)
}
// Since this def is created in workflow scope, it will create jobs in the workflow
def localScopeDef = { index ->
job("jobName${index}") { }
}
for (int index = 1; index <= 5; index++) {
localScopeDef(index); // OK: These jobs declared in workflow1 scope like you intended
println(lookup(".workflow1.jobName${index}").name)
}
}
// ANOTHER WAY: Use the lookup function inside your Groovy def function and pass a fully qualified name as an argument to your function
def updateJob(fullyQualifiedJobName) {
lookup(fullyQualifiedJobName) {
set properties: [
'foo':'bar'
]
}
}
// Pass the fully qualified workflow name to your Groovy def function so that it can lookup the desired object
updateJob('.workflow1.jobName1')
// Lesson: define Groovy def functions in the scope in which you want to use them, or have your Groovy def functions use lookup with a fully-qualified name
- Use Explicit Parens to Pass Groovy Lists
Sometimes when using Groovy and Gradle together, the compiler has issues with functions that take lists. If you write a function or use a Hadoop DSL method that takes a list, you might need to use explicit parenthesis to get the compiler to understand your code.
hadoop {
buildPath "azkaban"
}
def myFunction = { myList ->
println myList
}
myFunction [1, 2, 3, 4] // Error: Could not find method getAt() for arguments [[1, 2, 3, 4]]
myFunction([1, 2, 3, 4]) // Prints [1, 2, 3, 4]
You might have noticed that whenever we have created workflows or jobs in the DSL, that we have named them with a string - for example, pigJob('job2')
. As a programming language, the most interesting thing about the Hadoop DSL is that it has explicit (static) scope, but dynamic name resolution at build time.
This feature lets you declare objects in a convenient way and have the names be resolved by the DSL at build time. This is more easily understood with an example. In a normal programming language (with static scope), you might have:
int x = 1;
int y = x + z; // ERROR: z not defined
int z = 2;
However, in the Hadoop DSL, the names will be resolved at build time for you:
workflow('workflow1') {
targets 'job1' // job1 is not defined yet, but that's ok
job('job1') { depends 'job2' } // Now job1 is defined - job2 is not defined yet, but that's ok
job('job2') { } // Now both job1 and job2 are defined - at build time, everything will resolve correctly
}
The DSL has proper scoping, similar to a programming language. In addition to the local name, each object named in the DSL has an explicit fully qualified name. Fully qualified names start with a dot (to distinguish them from locally qualified names). For most users, you won't need to use the fully qualified name, but advanced users may find this helpful. You can learn more about the name resolution process for fully qualified and locally qualified names in the section on qualified names.
workflow('workflow1') { } // The name workflow1 is now bound in global scope with the fully-qualified name workflow1
hiveJob('job1') { } // The name job1 is now bound in global scope with the fully-qualified name job1
// workflow('workflow1') { } // Error! You can't redefine workflow1 in global scope!
// hiveJob('job1') { } // Error! You can't redefine job1 in global scope!
hadoop {
buildPath "azkaban"
workflow('workflow1') { // The name workflow1 is now bound in hadoop scope with the fully-qualified name .hadoop.workflow1. The name workflow1 now hides the previously-declared workflow1 within the global scope.
pigJob('job1') { // The name job1 is now bound in .hadoop.workflow1 scope with the fully-qualified name .hadoop.workflow1.job1. The name job1 now hides the previously-declared job1 within the global scope.
uses 'pigScript.pig'
}
targets 'job1' // This job1 will resolve at build time to .hadoop.workflow1.job1
}
// workflow('workflow1') { // Error! You can't redefine workflow1 in hadoop scope!
// }
workflow('workflow2') {
addJob('job1') { // This job1 will resolve to job1 in global scope. This method adds a clone of job1 to workflow2 and binds it in workflow2 scope.
uses 'hello.q'
}
targets 'job1' // This job1 then resolves to the job1 we just added to workflow2 scope.
}
}
As you have seen, workflows in the Hadoop DSL are collections of jobs and property files. The workflow declares job targets, and the jobs have dependencies between them that determine the topological order in which the jobs in the workflow will execute.
workflow('workflowName') {
// Declare your jobs
job('job1') { }
job('job2') { }
// Then declare the target jobs on which your workflow depends. The build process will transitively walk these dependencies when it builds the jobs.
targets 'job1', 'job2'
}
// You can also declare workflows inside of workflows! See the section on child workflows.
(Since version 0.4.1) You can declare workflows inside of workflows and declare dependencies between the parent and child workflows. This may allow you to express your overall workflows in a more natural and readable fashion.
Another way this can be useful is to declare a workflow once in global scope, then use cloning to add it as a child workflow to many other workflows, overriding some of the properties of the new child workflow each time. For more information about this, see the section on cloning jobs and workflows.
For Azkaban, the Hadoop DSL will build job files for child workflows in subfolders of the folder in which the parent flow is built. We made the design decision that the child workflows do not actually produce Azkaban "embedded" flow types. Rather, when you examine the parent workflow in Azkaban, the entire flow (including the child workflows) will appear in Azkaban as one seamless flow (although you will see obvious places in the flow where the child workflows begin and end).
(Since version 0.11.16) A community member contributed an option to generated Azkaban "embedded" flow types for child workflows. In a child workflow, make the optional declaration groupJobs true
, and you will get an Azkaban embedded flow for the child workflow. If you look at the Azkaban user interface page that shows your workflow DAG, you will see your child workflow grouped under an icon in the DAG.
Note that an unfortunate (and unavoidable) side-effect of using Azkaban embedded flows is that the embedded flow itself also shows up as a top-level flow in the Azkaban project page for your project.
hadoop {
buildPath "azkaban"
}
workflow('parentFlow') {
job('job1') { }
// Declare a child workflow
workflow('childFlow1') {
groupJobs true // Optionally generate the child workflow as an Azkaban embedded flow
flowDepends 'job1' // Use the flowDepends method to declare dependencies on jobs (and other child workflows) in the parent workflow
job('job2') { }
targets 'job2'
}
job('job2') {
depends 'childFlow1' // Jobs can now depend on child workflows
}
workflow('childFlow2') {
flowDepends 'childFlow1' // Child workflows can depend on other child workflows (in the parent workflow)
// You can declare workflows inside of workflows inside of workflows, etc.
workflow('grandChildFlow1') {
// ...
}
targets 'grandChildFlow1'
}
targets 'job2', 'childFlow2' // The overall parent workflow can depend on its child workflows
}
You can easily build property files. Like anything else, you have to add them to a hadoop
block for them to actually get built. Property files can inherit from Hadoop DSL property sets using the baseProperties
method - see the section on property sets for more information.
You can use set properties
, set jvmProperties
, and either set confProperties
or set hadoopProperties
with property files. The last two both set Hadoop job configuration properties and are synonymous.
hadoop {
buildPath "azkaban"
// This will build the file common.properties
propertyFile('common') {
// baseProperties 'common' // Inherit properties from a Hadoop DSL property set - see property sets for more information
set properties: [ // Sets myPropertyA=valA and myPropertyB=valB in the properties file
'myPropertyA' : 'valA',
'myPropertyB' : 'valB'
]
set hadoopProperties: [ // Sets hadoop-inject.mapred.foo.bar=foobar in the properties file
'mapred.foo.bar' : 'foobar'
]
set jvmProperties: [ // Sets jvm.args=-jvmPropertyName1=jvmPropertyValue1 -jvmPropertyName2=jvmPropertyValue2 in the properties file
'jvmPropertyName1' : 'jvmPropertyValue1',
'jvmPropertyName2' : 'jvmPropertyValue2'
]
}
}
(Since version 0.3.7) Property sets are an easy way to declare sets of properties. The nice thing about property sets is that jobs, property files and other property sets can all inherit from a property set using the baseProperties
declaration. This makes it really easy to build different groups of properties and then apply them in different workflows, jobs and property files.
Property sets are in-memory abstractions only. To actually write properties to disk, declare a propertyFile
or job that inherits from your property set and build that instead.
You can use set properties
, set jvmProperties
, and either set confProperties
or set hadoopProperties
with property sets. The last two both set Hadoop job configuration properties and are synonymous.
propertySet('common') {
set properties: [
'myPropertyA' : 'valA',
'myPropertyB' : 'valB'
]
}
propertySet('javaCommon') {
baseProperties 'common' // Inherit from the property set named common
set jvmProperties: [
'jvmPropertyName1' : 'jvmPropertyValue1'
]
set properties: [
'myPropertyB' : 'valBUpdated' // Override an inherited property
]
}
propertySet('hadoopCommon') {
baseProperties 'javaCommon' // Inherit from javaCommon, which inherits from common
set jvmProperties: [
'jvmPropertyName2' : 'jvmPropertyValue1Updated',
'jvmPropertyName2' : 'jvmPropertyValue2'
]
set hadoopProperties: [
'mapred.foo.bar' : 'foobar'
]
}
hadoop {
buildPath "azkaban"
// This will build the file global.properties containing all the properties of the propertySet 'common'
propertyFile('global') {
baseProperties 'common'
set properties: [
'myPropertyA' : 'valAUpdated' // Override an inherited property
]
}
workflow('workflow1') {
hadoopJavaJob('job1') {
uses 'com.linkedin.foo.HelloHadoopJavaJob'
baseProperties 'hadoopCommon' // job1 now inherits all properties from the propertySet 'hadoopCommon'
set hadoopProperties: [
'mapred.foo.bar' : 'foobarUpdated' // Override this inherited property
]
}
targets 'job1'
}
}
You can easily declare different types of jobs using the Hadoop DSL. When the Hadoop DSL is built, it will compile the jobs into Azkaban job files. Here is a complete reference for the different types of jobs you can declare and how they are compiled into job files.
The Job
class is intended as a base class - you will probably never declare a pure job
in your DSL. However, all job types inherit the methods and properties of the Job
class.
(Since version 0.11.16) The Job
class now supports required parameters
that declares the required properties for the job.
// Job is the base class for all other job types. You will probably never declare a pure Job
// in your DSL, but all job types support the following syntax and properties.
job('jobName') {
baseProperties 'common' // Inherit properties from a Hadoop DSL property set - see property sets for more information
reads files: [
'foo' : '/data/databases/foo', // Declares that this job reads this HDFS path. Will be used during static checking of the DSL.
'bar' : '/data/databases/bar',
]
writes files: [
'bazz' : '/user/bazz/foobar' // Declares that this job writes this HDFS path. Will be used during static checking of the DSL.
]
required parameters: [
'bazz', 'propertyName1' // Declares that this job must declare the properties named 'bazz' and 'propertyName1'
]
set properties: [
'propertyName1' : 'propertyValue1' // Sets propertyName1=propertyValue1 in the job file
]
depends 'job1' // Sets dependencies=job1 in the job file
}
// About reads and writes: these are optional, but highly recommended methods you can use to declare the HDFS paths your job reads and
// writes. If you use these, the static checker will verify that if you have a workflow with job A that writes a certain path and job
// B that reads it, that job B is (directly or transitively) dependent on job A, or it will issue a warning. This is a common (and
// hard to diagnose) race condition that can be prevented by this static check.
Extends the Job
class. Intended as cron-like functionality to run non-Hadoop scripts or Unix commands. To run Hadoop commands, use HadoopShellJob
instead.
def commands = ['echo "hello"', 'echo "This is how one runs a command job"', 'whoami']
commandJob('jobName') { // Sets type=command in the job file
uses 'echo "hello world"' // Exactly one of uses or usesCommands is required
usesCommands commands // Exactly one of uses or usesCommands is required
// All other Job syntax is supported
}
(Since version 0.3.6) Extends the JavaProcessJob
class. This is an abstract base class for all Hadoop-related jobs implemented in Java. All concrete subclasses of this class have access to the declarative syntax below.
concreteHadoopJavaProcessJob('jobName') { // This class is abstract. HadoopJavaJob, JavaJob, HiveJob, and PigJob are all concrete subclasses of HadoopJavaProcessJob.
caches files: [
'foo.jar' : '/user/bazz/foo.jar' // Adds /user/abain/foo.jar#foo.jar to mapred.cache.files and sets mapred.create.symlink to yes
]
cachesArchive files: [
'foobar' : '/user/bazz/foobar.zip' // Adds /user/abain/foobar.zip#foobar to mapred.cache.archives and sets mapred.create.symlink to yes
]
set confProperties: [
'mapred.property1' : 'value1', // Sets hadoop-inject.mapred.property1=value1 in the job file
'mapred.property2' : 'value2' // Sets hadoop-inject.mapred.property2=value2 in the job file
]
queue 'marathon' // Sets mapred.job.queue.name=marathon in the job file
// All other JavaProcessJob syntax is supported
}
Extends the HadoopJavaProcessJob
class. This is the main job type you should use for Java Map-Reduce jobs.
hadoopJavaJob('jobName') { // Sets type=hadoopJava in the job file
uses 'com.linkedin.foo.HelloHadoopJavaJob' // Required. Sets job.class=com.linkedin.foo.HelloHadoopJavaJob in the job file
// All other HadoopJavaProcessJob syntax is supported
}
(Since version 0.7.10) Extends the CommandJob
class. HadoopShellJob
is a Hadoop security enabled "command" job type.
def commands = ['echo "hello"', 'hadoop fs -ls', 'hadoop fs -cp /user/foo/bar /user/foo/bazz']
hadoopShellJob('jobName') { // Sets type=hadoopShell in the job file
uses 'hadoop fs -ls' // Exactly one of uses or usesCommands is required
usesCommands commands // Exactly one of uses or usesCommands is required
// All other Job syntax is supported
}
(Set parameters method since version 0.3.6) Extends the HadoopJavaProcessJob
class. The job type for Apache Hive scripts.
hiveJob('jobName') { // Sets type=hive in the job file
uses 'hello.q' // Sets hive.script=hello.q in the job file
set parameters: [
'param1' : 'val1', // Sets hivevar.param1=val1 in the job file
'param2' : 'val2' // Sets hivevar.param2=val2 in the job file
]
// All other HadoopJavaProcessJob syntax is supported
}
Extends the HadoopJavaProcessJob
class. Don't use javaJob
as it has been deprecated. Use hadoopJavaJob
if your job needs to access HDFS, or javaProcessJob
if your job consists of plain Java code (i.e. it's not a Hadoop job).
javaJob('jobName') { // Sets type=java in the job file
uses 'com.linkedin.foo.HelloJavaJob' // Required. Sets job.class=com.linkedin.foo.HelloJavaJob in the job file
// All other HadoopJavaProcessJob syntax is supported
}
(jvmClasspath, set jvmProperties, Xms and Xmx methods since version 0.3.6) Extends the Job
class. The job type for Java jobs that do not require secure access to HDFS. This job type adds the ability to set JVM parameters and to set the memory properties of the JVM that runs the process.
Please note that the JVM-related syntax for this job type affect the JVM for the process started by Azkaban that runs the job only. In particular, this functionality DOES NOT set JVM properties for the map and reduce tasks of Hadoop jobs.
javaProcessJob('jobName') { // Sets type=javaprocess in the job file
uses 'com.linkedin.foo.HelloJavaProcessJob' // Required. Sets java.class=com.linkedin.foo.HelloJavaProcessJob in the job file
jvmClasspath './*:./lib/*' // Sets the classpath of the JVM started by Azkaban to run the job
set jvmProperties: [ // Sets jvm.args=-DpropertyName1=propertyValue1 -DpropertyName2=propertyValue2 in the job file.
'jvmPropertyName1' : 'jvmPropertyValue1', // These arguments are passed to the JVM started by Azkaban to run the job.
'jvmPropertyName2' : 'jvmPropertyValue2'
]
Xms 96 // Sets -Xms for the JVM started by Azkaban to run the job
Xmx 384 // Sets -Xmx for the JVM started by Azkaban to run the job
// All other Job syntax is supported
}
Extends the Job
class. Using noOpJob
allows you to create dependencies on multiple sub-jobs.
noOpJob('jobName') { // Sets type=noop in the job file
depends 'job1', 'job2' // Typically in a NoOpJob the only thing you will ever declare are job dependencies
}
Extends the HadoopJavaProcessJob
class. The job type for Apache Pig scripts.
pigJob('jobName') { // Sets type=pig n the job file
uses 'myScript.pig' // Required. Sets pig.script=myScript.pig in the job file
reads files: [
'foo' : '/data/databases/foo' // Declares that this job reads this HDFS path and sets the Pig parameter param.foo=/data/databases/foo in the job file
]
writes files: [
'bazz' : '/user/bazz/foobar' // Declares that this job writes this HDFS path and sets the Pig parameter param.bazz=/user/bazz/foobar in the job file
]
set parameters: [
'param1' : 'val1', // Sets the Pig parameter param.param1=val1 in the job file
'param2' : 'val2' // Sets the Pig parameter param.param2=val2 in the job file
]
// All other HadoopJavaProcessJob syntax is supported
}
(Since version 0.6.5. Support for declaring driverMemory, executorMemory, numExecutors, executorCores and jars since version 0.9.1. Support for PySpark since version 0.9.11) Extends the HadoopJavaProcessJob
class. The job type for Apache Spark jobs.
def flags = ['verbose', 'version'] // Set of flags to pass to spark-submit
def params = ['param1', 'param2', 'param3'] // Ordered list of params to pass to the app jar
sparkJob('jobName') {
uses 'com.linkedin.hello.HelloSparkJob' // Required if executing a jar. Sets class=com.linkedin.hello.HelloSparkJob in the job file.
executes 'mySparkJob.jar' // Required. Sets execution-jar=mySparkJob.jar in the job file.
// You can also execute PySpark instead of a jar. If you execute PySpark, do not declare a "uses" declaration.
// executes 'hello_pyspark.py' // Required. Sets execution-jar=hello_pyspark.py in the job file.
driverMemory '2G' // Sets driver-memory=2G in the job file. This sets the amount of memory used by the Spark driver.
executorMemory '4G' // Sets executor-memory=4G in the job file. This sets the amount of memory used by Spark executors.
numExecutors 400 // Sets num-executors=400 in the job file. This sets the number of Spark executors.
executorCores 2 // Sets executor-cores=2 in the job file. This sets the number of executor cores in each Spark executor.
jars 'foo.jar,bar.jar' // Sets jars=foo.jar,bar.jar in the job file. This specifies the jars which should be added to the classpath of Spark jobs during execution.
enableFlags flags // Sets flag.verbose=true and flag.version=true in the job file. Multiple flags can be passed in this way.
set sparkConfs: [
'spark.authenticate': 'true', // Sets conf.spark.authenticate=true in the job file
'key1': 'value1', // Other configurations can be passed in similar way
'key2': 'value2'
]
appParams params // Sets params=param1 param2 param3 in the job file. Usually you would pass input and output file paths.
queue 'default' // Sets queue=default in the job file
// All other HadoopJavaProcessJob syntax is supported
}
Extends the HadoopJavaJob
class. The job type for Kafka build-and-push jobs.
kafkaPushJob('jobName') {
usesInputPath '/data/databases/MEMBER2/MEMBER_PROFILE/#LATEST' // Required
usesTopic 'kafkatestpush' // Required
usesNameNode 'hdfs://theNameNode.linkedin.com:9000' // Optional (Required in versions 0.5.3 through 0.5.14)
usesBatchNumBytes 1000000 // Optional
usesDisableSchemaRegistration true // Optional
usesKafkaUrl 'theKafkaNode.linkedin.com:10251' // Optional
usesSchemaRegistryUrl 'http://theKafkaNode:10252/schemaRegistry/schemas' // Optional
usesDisableAuditing true // (Since version 0.4.5) Optional
// All other HadoopJavaJob syntax is supported
}
Extends the HadoopJavaJob
class. The job type for Voldemort build-and-push jobs.
// The property values that appear here are all just examples and not necessarily the values you should use!
voldemortBuildPushJob('jobName') {
usesStoreName 'yourStoreName' // Required
usesClusterName 'voldemortClusterUrl' // Required
usesInputPath 'hdfsInputPath' // Required
usesOutputPath 'hdfsOutputPath' // Required
usesStoreOwners 'yourTeamMembersEmails' // Required
usesStoreDesc 'Short description of your store' // Required
usesRepFactor 2 // Optional, but you should ALWAYS set this to 2
usesCompressValue true // Optional, but you should ALWAYS set this to true
usesAvroSerializerVersioned true // Optional, but you should ALWAYS set this to true
usesAvroData true // Optional, but you should ALWAYS set this to true
usesAvroKeyField 'yourKeyField' // Required (unless isAvroData is false, in legacy stores)
usesAvroValueField 'yourValueField' // Required (unless isAvroData is false, in legacy stores)
// Legacy fields - DO NOT USE FOR NEW STORES
usesKeySelection 'memberId' // Optional, legacy config used for JSON stores (now unsupported)
usesValueSelection 'lastName' // Optional, legacy config used for JSON stores (now unsupported)
usesBuildStore true // Optional, legacy config used for having one BnP job per destination colo. Now useless as you should always have just one BnP job.
usesPushStore true // Optional, legacy config used for having one BnP job per destination colo. Now useless as you should always have just one BnP job.
// Do not mess with the configs below unless you are told to by the Voldemort team
usesNumChunks(-1) // Optional
usesChunkSize 1073741824 // Optional
usesKeepOutput false // Optional
}
(Since version 0.7.11) Extends the Job
class. The job type to launch Gobblin in Azkaban.
// Below is for the MySQL -> HDFS use case that you can use preset:
gobblinJob('jobName') {
work_dir '/hdfs/directory' // Optional
config_preset 'mysqlToHdfs' // Optional
set properties: [ // Optional - Add Gobblin parameters along with other Azkaban parameters such as user.to.proxy
'user.to.proxy' : 'userid',
'source.querybased.schema' : 'database_name',
'source.entity' : 'table_name',
'source.conn.host' : 'eat1-db1234.corp.com',
'source.conn.username' : 'user_name',
'source.conn.password' : 'ENC(encrypted_password)',
'encrypt.key.loc' : '/file/in/hdfs.key',
'extract.table.type' : 'snapshot_only',
'extract.is.full' : true,
'data.publisher.replace.final.dir' : true,
'data.publisher.final.dir' : '${gobblin.work_dir}/job-output'
]
}
// Below is for the HDFS -> MySQL use case that you can use preset:
gobblinJob('jobName') {
work_dir '/hdfs/directory' // Optional
config_preset 'hdfsToMysql' // Optional
set properties: [ // Optional - Add Gobblin parameters along with other Azkaban parameters such as user.to.proxy
'user.to.proxy' : 'userid',
'source.filebased.data.directory' : '/source/path/in/hdfs',
'jdbc.publisher.table_name' : 'test_user'
'jdbc.publisher.database_name' : 'test_database',
'jdbc.publisher.username' : 'db_user_id',
'jdbc.publisher.password' : 'ENC(encrypted_password)'
'jdbc.publisher.url' : 'jdbc:mysql://test.corp.linkedin.com'
'jdbc.publisher.encrypt_key_loc' : '/file/in/hdfs.key',
'converter.avro.jdbc.entry_fields_pairs' : '{"userId":"user_id" , "memberId":"member_id"}',
'jdbc.publisher.replace_table' : true
'extract.table.type' : 'snapshot_only'
]
}
(Since version 0.7.11) Extends the Job
class. The job type for loading data from HDFS to Espresso.
hdfsToEspressoJob('jobName') {
qps 1000 // Required. Up to 6,000, consult with DBA to decide the QPS.
sourceHdfsPath '/hdfs/directory' // Required
espressoEndpoint 'http://eat1-app1234.stg.linkedin.com:12345' // Required
espressoDatabaseName 'espresso_database_name' // Required
espressoTableName 'espresso_table_name' // Required
espressoContentType 'APPLICATION_JSON' // Required
espressoKey 'key_column_name' // Required
espressoSubkeys 'sub_key_column_name1,sub_key_column_name2' // Optional
espressoOperation 'put' // Required
errorHdfsPath '/hdfs/error/directory' // Required
set properties: [
'force.error.overwrite' : true // Optional. Add other properties defined in go/HdfsToEspresso.
]
}
(Since version 0.7.11) Extends the Job
class. The job type for loading data from HDFS to Teradata.
hdfsToTeradataJob('jobName') {
hostName 'dw.foo.com' // Required
userId 'scott' // Required
credentialName 'com.linkedin.teradata.scott' // Deprecated
cryptoKeyFilePath '/hdfs/path/to/passphrase/file' // Since version 0.9.5
encryptedCredential 'eyJ2YWwiOiJiQzVCU09HbDVwYndxNFRXV00y3UzhQdFdjPSIsInZlciI6IjEuMSJ9' // Since version 0.9.5
sourceHdfsPath '/job/data/src' // Required
targetTable 'teradataTable' // Required
avroSchemaPath '/job/data/src/avro.avsc'
avroSchemaInline '{"type":"record", "namespace":"com.example", "name":"FullName", "fields":[{"name":"first","type":"string"},{"name":"last","type":"string"}]}'
set hadoopProperties: [
'hadoopPropertyName1' : 'hadoopPropertyValue1',
'hadoopPropertyName2' : 'hadoopPropertyValue2'
]
}
// Hive support has been added since version 0.9.15
hdfsToTeradataJob('jobName') {
hostName 'dw.foo.com' // Required
userId 'scott' // Required
cryptoKeyFilePath '/hdfs/path/to/passphrase/file'
encryptedCredential 'eyJ2YWwiOiJiQzVCU09HbDVwYndxNFRXV00y3UzhQdFdjPSIsInZlciI6IjEuMSJ9'
sourceHiveDatabase 'hive_database' // Required
sourceHiveTable 'hive_table' // Required
targetTable 'teradataTable'
set hadoopProperties: [
'hadoopPropertyName1' : 'hadoopPropertyValue1',
'hadoopPropertyName2' : 'hadoopPropertyValue2'
]
}
(Since version 0.7.11) Extends the Job
class. The job type for loading data from Teradata to HDFS.
teradataToHdfsJob('jobName') {
hostName 'dw.foo.com' // Required
userId 'scott' // Required
credentialName 'com.linkedin.teradata.scott' // Deprecated
cryptoKeyFilePath '/hdfs/path/to/passphrase/file' // Since version 0.9.5
encryptedCredential 'eyJ2YWwiOiJiQzVCU09HbDVwYndxNFRXV00y3UzhQdFdjPSIsInZlciI6IjEuMSJ9' // Since version 0.9.5
sourceTable 'person' // Either sourceTable or sourceQuery is required
sourceQuery 'select * from person;' // Either sourceTable or sourceQuery is required
targetHdfsPath '/job/data/test/output' // Required
avroSchemaPath '/job/data/src/avro.avsc'
avroSchemaInline '{"type":"record", "namespace":"com.example", "name":"FullName", "fields":[{"name":"first","type":"string"},{"name":"last","type":"string"}]}'
set hadoopProperties: [
'hadoopPropertyName1' : 'hadoopPropertyValue1',
'hadoopPropertyName2' : 'hadoopPropertyValue2'
]
}
(Since version 0.9.5) Extends the Job
class. The job type for executing SQL statements (currently only supports Teradata).
sqlJob('jobName') {
jdbcDriverClass 'com.teradata.jdbc.TeraDriver' // Required
jdbcUrl '/job/data/src' // Required
jdbcUserId 'foo' // Required
jdbcEncryptedCredential 'eyJ2YWwiOiJiQzVCU09HbDVwYndxNFRXV00yZ' // Required
jdbcCryptoKeyPath '/hdfs/path/to/cryptokey/file' // Required
set properties: [
'user.to.proxy' : 'testUser',
'jdbc.sql.1' : 'DELETE test_table_publish ALL;',
'jdbc.sql.2' : 'INSERT INTO test_table_publish SELECT * FROM test_table;'
]
}
(Since version 0.11.16) Extends the HadoopJavaJob
class. The job type for to generate segments and push data to Pinot.
pinotBuildAndPushJob('jobName') {
usesTableName 'internalTesting' // Required
usesInputPath '/user/input' // Required
usesPushLocation '${fabric}' // Required
}
(Since version 0.12.1) Extends the HadoopJavaJob
class. The job type for Tableau jobs that refreshes the specified Tableau workbook.
tableauJob('jobName') {
usesWorkbookName 'workbookName' // Required
}
(Since version 0.12.4) Extends the HadoopJavaJob
class. This job type waits for fresh data in HDFS. The job will poll the given directory for any subdirectories created since the specified freshness duration.
The directoryFreshness
, timeoutAfter
and sleepTime
parameters all take a duration specified in the desired number of days, hours, minutes and seconds, e.g. 26H 10D 37M
= 26 hours, 10 days, and 37 minutes; 40H 22M
= 40 hours and 22 minutes; and 20S
= 20 seconds.
hdfsWaitJob('jobName') {
jvmClasspath './lib/*' // Sets the classpath of the JVM started by Azkaban to run the job
directoryPath '/data/foo/bar' // Required. Specifies the target directory in HDFS under which to check for fresh data.
directoryFreshness '15H 4M 10S' // Required. Specifies how recently a subdirectory in the target directory must have been created for the freshness check to pass.
timeoutAfter '1D 11S' // Required. Specifies how long the job should continue to run before it times out.
failOnTimeout true // Required. Whether or not the job should fail if it times out.
checkExactPath true // Optional. Specifies whether or not to check of the exact target directory path or for files underneath it.
sleepTime '2M 10S' // Optional. How long to wait between polling HDFS to make a check for fresh data.
}
Here is a complete example of a workflow declaring a Pig jobs from the Hadoop Starter Kit multiproduct (example Hadoop code internal to LinkedIn).
def memberSummary = '/data/derived/member/summary#LATEST'
def memberAccount = '/data/databases/MEMBER2/MEMBER_ACCOUNT/#LATEST'
hadoop {
buildPath "azkaban"
workflow('countFlow') {
pigJob('countByCountry') { // Sets type=pig in the job file
uses 'src/main/pig/count_by_country.pig' // Sets pig.script=src/main/pig/count_by_country.pig in the job file
reads files: [
'member_summary': memberSummary // Declares that this job reads this HDFS path and sets the Pig parameter param.member_summary=/data/derived/member/summary#LATEST in the job file
]
writes files: [
// Declares that this job writes this HDFS path and sets the Pig parameter param.output_path=hadoop-starter-kit/hello-pig-azkaban/count_by_country in the job file
'output_path': 'hadoop-starter-kit/hello-pig-azkaban/count_by_country'
]
set properties: [
'pig.additional.jars': "hello-pig-udf-${project.version}.jar" // Sets pig.additional.jars=hello-pig-udf-x.y.z.jar in the job file
]
}
pigJob('memberEventCount') {
uses 'src/main/pig/member_event_count.pig'
reads files: [
'event_path': '/data/tracking/EndorsementsEndorseEvent'
]
writes files: [
// Declares that this job writes this HDFS path and sets the Pig parameter param.output_path=hadoop-starter-kit/hello-pig-azkaban/member_event_count in the job file
'output_path': 'hadoop-starter-kit/hello-pig-azkaban/member_event_count'
]
set parameters: [
'num_days': 2 // Sets the Pig parameter param.num_days=2 in the job file
]
}
targets 'countByCountry', 'memberEventCount'
}
}
One of the most powerful features of the Hadoop DSL is the ability to define jobs and workflows (often in global scope) and then clone them into additional places using the addJob
and addWorkflow
functions.
When you clone objects, the DSL gives you a very convenient syntax to override things from the original object with new values. This is an incredibly useful way to define workflows and jobs once in global scope and then quickly build multiple (slightly different) versions of them by cloning them into a hadoop
block.
Cloning is a quick way to reduce the amount of code you need to create a few similar Hadoop DSL objects. However, if you find yourself spending a lot of time cloning and customizing objects, consider using the Hadoop DSL definitionSet
and hadoopClosure
language features to declare reusable Hadoop DSL code blocks instead.
(Since version 0.5.3) You can additionally clone entire namespaces with the addNamespace
function. This will clone everything under the namespace.
// Example of cloning jobs
// Declare job1 in global scope
job('job1') {
set properties: [
'foo': '1'
]
}
hadoop {
buildPath "azkaban"
workflow('workflow1') {
addJob('job1', 'job0') { // Directly clone job1 from global scope into workflow1 and rename it job0
set properties: [ // When cloning a job, you can use job syntax inside the brackets to easily override anything from the original job
'foo': '2',
'bar': '1'
]
}
addJob('job1') { // Directly clone job1 from global scope into workflow1
depends 'job0' // Make job1 depend on the job0 you previously cloned into workflow1
}
targets 'job1'
}
}
You can clone workflows similarly using the addWorkflow
function. One very helpful pattern is to declare a workflow in global scope, and then clone it as a child workflow into many other workflows. This can help you eliminate a lot of redundant DSL code.
// Example of cloning workflows
// Declare commonWorkflow in global scope
workflow('commonWorkflow') {
job('job1') {
// ...
}
targets 'job1'
}
hadoop {
buildPath "azkaban"
// Directly clone commonWorkflow from global scope into hadoop scope and give it the new name workflow1
addWorkflow('commonWorkflow', 'workflow1') {
}
workflow('workflow2') {
job('job1') { }
// Directly clone commonWorkflow from global scope as a child workflow of workflow2
addWorkflow('commonWorkflow') {
flowDepends 'job1'
}
job('job2') {
depends 'commonWorkflow'
}
targets 'job2'
}
}
(Since version 0.4.2) When cloning jobs or workflows, you might want to clear previously declared job dependencies or workflow targets. The DSL supports an easy syntactic way to do so.
// Example of clearing job dependencies and workflow targets
workflow('workflow1') {
job('job1') { }
job('job2') { }
job('job3') { depends 'job1' }
job('job4') { }
targets 'job4'
}
addWorkflow('workflow1', 'workflow2') {
lookup('job3') {
depends clear: 'true', targetNames: ['job2'] // Clears the previously declared dependency of job3 on job1
}
targets clear: 'true', targetNames: ['job3'] // Clears the previously declared workflow target job4
}
// When cloning child workflows, you can also clear previous flow dependencies the same way, e.g. flowDepends clear: 'true', targetNames: ['job1']
Using qualified names, you can directly refer to Hadoop DSL elements declared in any particular scope. This means you can easily clone objects declared in any scope (instead of only being able to clone objects that are declared in global scope). A qualified name is a name that contains a "." character. Qualified names come in two flavors, fully qualified names and locally qualified names.
A fully qualified name is a name that starts with a "." character and that has the form .outerScopeName.innerScopeName1.innerScopeNameN.objectName
. Fully qualified name resolution always begins from global scope. From global scope, name resolution will look for a scope named outerScopeName, and then will look for the nested scopes innerScopeName1 through innerScopeNameN, and finally will look in innerScopeNameN for the name objectName.
// Declare job1 in global scope
job('job1') {
set properties: [
'foo': '1'
]
}
hadoop {
buildPath "azkaban"
workflow('workflow1') {
job('job1') { // This job1 now "hides" the name job1 from global scope. If you refer to job1 inside workflow1 scope, you will get this job1.
}
addJob('.job1', 'job2') { // Directly clone job1 from global scope into workflow1. To get the job1 from global scope, use its fully qualified name ".job1".
set properties: [ // When cloning a job, you can use job syntax inside the brackets to easily override anything from the original job
'foo': '2',
'bar': '1'
]
}
}
addWorkflow('.hadoop.workflow1', 'workflow2') { // You could have just said 'workflow1' here, since they are in the same scope, but just for an example we are using workflow1's fully qualified name
// See the section on the lookup function on how to customize the workflow when you clone it using the addWorkflow function
}
}
// You could also clone things the other way, i.e. from within hadoop scope to global scope
addJob('.hadoop.workflow2.job1', 'job2') { // Remember that workflow2 has a job1, because workflow2 was cloned from workflow1, which had a job1
}
All other names that do not begin with a "." are locally qualified names. Locally qualified names have the form outerScopeName.innerScopeName1.innerScopeNameN.objectName
. Locally qualified name resolution begins from the current scope. Name resolution will look in the current scope for a scope named outerScopeName. If it does not find one, it will search outwards towards global scope until it finds such a scope. Once it finds outerScopeName, name resolution will then will look for the nested scopes innerScopeName1 through innerScopeNameN, and finally will look in innerScopeNameN for the name objectName.
hadoop {
buildPath "azkaban"
// Declare workflow1 with job1
workflow('workflow1') {
job('job1') {
}
}
workflow('workflow2') {
// ERROR: can't find the locally qualified name job1, since it's in workflow1
// addJob('job1') {
// }
// CORRECT: Use the locally qualified name workflow1.job1 to clone a job into workflow2
addJob('workflow1.job1', 'job1') {
}
// CORRECT: The locally qualified name hadoop.workflow1.job1 would have found the same job
addJob('hadoop.workflow1.job1', 'job2') {
}
// CORRECT: You could have also used the fully-qualified name to refer to the same job
addJob('.hadoop.workflow1.job1', 'job3') {
}
}
}
The lookup
function is an extremely helpful function that looks up (returns) a named object. When you use the lookup
function, you can use DSL syntax to easily alter the returned object. This can be extremely powerful when used together with the cloning as you can clone an object, and then use lookup to customize nested elements of the cloned object. You can use either fully qualified or locally qualified names with lookup
.
pigJob('pigJob') {
uses 'src/main/foo.pig'
}
// Once you lookup an object, you can use that object's native DSL syntax to alter it, i.e. the
// same syntax used in the cloning section.
lookup('pigJob') {
uses 'src/main/bar.pig'
}
// This is particularly helpful when cloning jobs, workflows and namespaces! Here's an example.
workflow('workflow1') {
pigJob('job1') {
uses 'src/main/foo.pig'
set parameter: [
'bazz' : 'barbazz'
]
}
}
namespace('namespace1') {
workflow('wf1') {
job('j1') {
set properties: [
'foo' : 'bar'
]
}
}
}
hadoop {
buildPath "azkaban"
addWorkflow('workflow1') { // Clone workflow1 into hadoop scope
lookup('job1') { // Now lookup job1 in the cloned workflow and syntactically update its values!
uses 'src/main/bar.pig'
set parameter: [
'bazz' : 'barbazz1'
]
}
}
// When cloning namespaces, you'll often need a series of nested lookups to get
// down to the right object you want to update.
addNamespace('namespace1') { // Clone namespace1 into hadoop scope and do a couple of nested lookups to update j1
lookup('wf1') {
lookup('j1') {
set properties: [
'foo' : 'BAR'
]
}
}
// OR you could do a locally-qualified lookup instead:
lookup('wf1.j1').setJobProperty('foo', 'BAR')
}
// OR you could do a fully-qualified lookup instead:
lookup('.hadoop.namespace1.wf1.j1').setJobProperty('foo', 'BAR')
}
You can also use lookup
as an easy entry point for programming with the actual underlying classes and objects in the DSL, since lookup simply returns the underlying object. See the section below on Programming with Classes and Objects for more information.
pigJob('pigJob') {
uses 'src/main/foo.pig'
}
// Lookup the PigJob object and set its API script property directly instead of with DSL syntax
lookup('pigJob').uses 'src/main/bar.pig'
// Another way you could do this is to set the object to a def and then manipulate that
def myJob = lookup('pigJob')
myJob.uses 'src/main/bazz.pig'
The lookupRef
method is just like lookup
, except that it returns a NamedScopeReference object. This reference has fields for the name, entry, qualifiedName and declaringScope of the object. While this method is primarily used internally in the Hadoop DSL, it has one useful feature. Unlike lookup
, the lookupRef
method will return null if you lookupRef
a name that is not bound in scope (lookup
will throw an exception). This means you can check if a name is bound in scope by checking whether lookupRef
returns null or not.
hadoop {
buildPath "azkaban"
pigJob('pigJob') {
uses 'src/main/foo.pig'
}
}
println lookupRef('hadoop.pigJob').name // Prints pigJob
println lookupRef('hadoop.pigJob').entry.script // Prints src/main/foo.pig
println lookupRef('hadoop.pigJob').qualifiedName // Prints hadoop.pigJob
println lookupRef('hadoop.pigJob').declaringScope.levelName // Prints hadoop
// You can use lookupRef to determine if a name is bound in scope
println lookupRef('foobar') // null - name is unbound in scope
// println lookup('foobar') // Exception!
As you use the Hadoop DSL more and more, you will often find yourself in the situation where you would like to declare a Hadoop DSL code block once, but have the logic of the code block or the values it uses depend on some parameters.
While you could use Hadoop DSL cloning or write your own Groovy functions to do something similar, the Hadoop DSL provides a series of language features that help you achieve this in a declarative and consistent fashion. These language features are especially useful in the case where you want to customize your Hadoop DSL workflows and jobs for specific Hadoop grids (for example, a development grid versus a production grid).
(Since version 0.5.3) Hadoop DSL definition sets provide a way to declare a set of definitions that are intended to be thought of as configurable parameters.
Unlike regular Groovy defs, which are limited to the scope of the file in which they are declared, once you have declared a definition set you can refer to it from any subsequent Hadoop DSL code in any file. You can declare unnamed definition sets, in which case the definitions are added to the default definition set (called "default"), or you can declare a named definition set.
The values in a definition set can be retrieved with the lookupDef
method. You can use lookupDef
purely to retrieve values that you think of as Hadoop DSL property values, or you can use it directly with Groovy logic that affects how you construct Hadoop DSL elements.
When you use lookupDef
to retrieve the value of a definition, it will first look for the definition in the current (named) definition set. If the definition is not found, it will then check the default definition set.
Internally, the Hadoop DSL changes the current definition set with the setDefinitionSet
method, and resets the current definition set to the default definition set by calling setDefinitionSet
with the name "default". However, instead of calling setDefinitionSet
directly, we recommend that you declare and evaluate hadoopClosure
blocks. The mechanism for evaluating Hadoop closures will use setDefinitionSet
internally as appropriate.
In your Hadoop closures, you can retrieve the current definition set name by using the definitionSetName
method. This allows you to have Groovy logic in your closures that is customized for each definition set.
Since definition sets are meta-level constructs (you can use them to affect your Groovy logic), they are not technically named Hadoop DSL elements. Thus, definition sets are not added to Hadoop DSL scopes, and you cannot use them with the lookup
function. Definition sets always exist in their own plugin-wide scope, regardless of where they are declared.
When you declare a definitionSet
, it will return a Groovy Map
, which allows you to easily combine previous definitions to form new definition sets using the Groovy +
operator.
// Add definitions to the default definition set (called "default")
definitionSet defs: [
var1: "val1",
var2: "val2"
]
// Create a new named definition set and save a reference to the Groovy Map it returns
def contextOneMap = definitionSet name: "context1", defs: [
var2: 'VAL2',
var3: 3
]
// Another way to derive new definition sets is to save a reference to the Groovy Map returned when declaring
// a previous definitionSet and then append it to the new defs with the Groovy plus operator
definitionSet name: "anotherDefinitionSet", defs: contextOneMap + [
var2: 'VAL2_UPDATED',
var4: false
]
// If you redeclare an existing definition set, the existing one is updated. This allows you to extend
// definition sets across more than one file.
definitionSet name: "context1", defs: [
var4: true
]
// Now you can use the lookupDef method to lookup the values in any Gradle file
assert(lookupDef("var1") == "val1") // From the default definition set
// Remember - we don't recommend that you use setDefinitionSet directly. Rather, use hadoopClosure
// to express your logic in a declarative fashion. Calling it here is just to make the example work.
setDefinitionSet "context1" // Set the current definition set to be context1
assert(lookupDef("var1") == "val1") // var1 is not in the definition set context1, but is in the default set
assert(lookupDef("var2") == "VAL2") // var2 is found in the definition set context1
assert(lookupDef("var3") == 3) // var3 is found in the definition set context1
setDefinitionSet "default" // Reset the current definition set to be the default set
assert(lookupDef("var2") == "val2") // This time we find var2 in the default definition set
// One use case of lookupDef is to set configurable property values
job('job1') {
set properties: [
'property1': lookupDef('var1')
]
}
// Remember - we don't recommend that you use setDefinitionSet directly. Rather, use hadoopClosure
// to express your logic in a declarative fashion. Calling it here is just to make the example work.
setDefinitionSet 'context1'
// Another is to use lookupDef to affect your actual Groovy logic
if (lookupDef('var4') == true) {
// Do something
} else {
// Do something else
}
// You can use the definitionSetName method to get the name of the current definition set. Often, you
// will want to encapsulate definitionSet-specific logic within its own namespace to avoid name clashes.
namespace(definitionSetName()) {
if (definitionSetName() == "cluster1") {
// cluster1-specific logic
}
else if (definitionSetName() == "default") {
// etc.
}
}
(Since version 0.5.1) To prevent name clashes within a scope, you can nest Hadoop DSL elements within a namespace. You can then clone a namespace (including all of its child elements) to a new namespace, and then customize elements within the new namespace using the lookup
function.
hadoop {
buildPath "azkaban"
namespace('namespace1') {
workflow('workflow1') { // Bound in the namespace1 scope with fully qualified name .hadoop.namespace1.workflow1
pigJob('pigJob1') { // Fully qualified name of the job is .hadoop.namepsace1.workflow1.pigJob1
uses 'src/main/pig/pigScript1.pig'
}
targets 'pigJob1'
}
}
// For convenience, if you declare the same namespace more than once, it will continue
// the previous namespace
namespace('namespace1') {
workflow('workflow2') {
pigJob('pigJob2') {
uses 'src/main/pig/pigScript2.pig'
}
targets 'pigJob2'
}
}
// Clone the entire namespace1 into namespace2 and customize the jobs there
addNamespace('namespace1', 'namespace2') {
lookup('workflow2') {
lookup('pigJob2') {
uses 'src/main/pig/PIGSCRIPT2.pig'
}
}
}
}
(Since version 0.5.8) Hadoop closures makes it easy to declare a Hadoop DSL code block once and re-evaluate it against several different definition sets.
The basic idea is that since closures are first-class constructs in Groovy, we can save a reference to each closure and evaluate them once per definition set. To make sure the results of applying the closure more than once don't cause name clashes, you generally want to encapsulate each invocation of the closure within its own Hadoop DSL namespace
.
There are two types of Hadoop closures: anonymous (unnamed) closures and named closures. When you want to evaluate Hadoop closures, use evalHadoopClosures
to evaluate all of your anonymous Hadoop closures against a particular definition set, or use evalHadoopClosure
to evaluate a particular named closure.
Similar to definition sets, Hadoop closures are meta-level constructs that always exist in their own plugin-wide scope, regardless of where they are declared. Although you can use lookup
inside a Hadoop closure, you cannot lookup
the closure itself. However, you can get a reference to the Groovy closure for the hadoopClosure
when you declare it.
In this first example, we'll demonstrate some of the basic ways you can declare and evaluate Hadoop closures.
// Add to the default definition set
definitionSet defs: [
jobName : 'DefaultName'
]
// Add a named definition set
definitionSet name: 'customDefs', defs: [
jobName : 'CustomName'
]
// Declare an anonymous Hadoop closure
hadoopClosure closure: {
job("anon${lookupDef('jobName')}") { }
}
// Declare a named Hadoop closure and save a reference to the Groovy closure it returns
def theNamedClosure = hadoopClosure name: 'namedClosure', closure: {
job("named${lookupDef('jobName')}") { }
}
// Evaluate all the anonymous Hadoop closures against the default definition set.
// For this one, you need to add the parentheses, so that Groovy will find the right overload,
evalHadoopClosures() // Declares the job anonDefaultName
// Evaluate all the anonymous Hadoop closures against a named definition set
evalHadoopClosures defs: 'customDefs' // Declares the job anonCustomName
// Evaluate a named Hadoop closure against the specified definition set
evalHadoopClosure name: 'namedClosure', defs: 'default' // Declares the job namedDefaultName
evalHadoopClosure name: 'namedClosure', defs: 'customDefs' // Declares the job namedCustomName
One of the most useful features of Hadoop closures is that if you evaluate a Hadoop closure from within a scope container such as a hadoop
block or a workflow
, the closure will be evaluated in the context of that scope container.
It will be as if the code from the Hadoop closure was declared directly inside of your scope container. Alternatively, instead of nesting the evaluation inside the scope container, you can name a specific object to use as the evaluation context using the targetName
declaration.
In Groovy terms, the scope container (or evaluation target) will be set as the closure delegate. This means that you can use Hadoop closures to declare partial functions: if your Hadoop closure refers to an undefined property or method, Groovy will look to see if the closure delegate contains the property or method.
// Add to the default definition set
definitionSet defs: [
jobName : 'DefaultName'
]
// Add a named definition set
definitionSet name: 'customDefs', defs: [
jobName : 'CustomName'
]
// Declare an anonymous Hadoop closure
hadoopClosure closure: {
job("anon${lookupDef('jobName')}") { }
}
// Declare a named Hadoop closure
hadoopClosure name: 'namedClosure', closure: {
job("named${lookupDef('jobName')}") { }
}
workflow('workflow1') {
// Evaluations of Hadoop closures will be done in the context of workflow1
evalHadoopClosures defs: 'customDefs' // Declares the job anonCustomName in workflow1
}
// Instead of evaluating the closure inside workflow1, we can declare workflow1 as the evaluation target
evalHadoopClosure name: 'namedClosure', defs: 'customDefs', targetName: 'workflow1' // Declares the job namedCustomName in workflow1
hadoop {
// For this one, you need to add the parentheses, so that Groovy will find the right overload
evalHadoopClosures() // Declares the job hadoop.anonDefaultName
workflow('workflow1') {
evalHadoopClosures defs: 'customDefs' // Declares the job hadoop.workflow1.anonCustomName
}
evalHadoopClosure name: 'namedClosure', defs: 'customDefs', targetName: 'workflow1' // Declares the job hadoop.workflow1.namedCustomName
}
// You can treat Hadoop closures as partial functions, and evaluate them in the context of another object
hadoopClosure name: 'partialClosure', closure: {
job("partial${lookupDef('jobName')}") {
// ...
}
targets "partial${lookupDef('jobName')}" // We'll get the undefined targets function from the evaluation context
}
// ERROR: "targets" undefined
// evalHadoopClosure name: 'partialClosure', defs: 'customDefs'
hadoop {
buildPath "azkaban"
// ERROR: "targets" undefined
// evalHadoopClosure name: 'partialClosure', defs: 'customDefs'
workflow('workflow2') {
// Declares the job hadoop.workflow2.partialCustomName and causes workflow2 to target that job.
// When the closure was evaluated, the "targets" function was found on the surrounding workflow.
evalHadoopClosure name: 'partialClosure', defs: 'customDefs'
}
}
(Hadoop DSL Automatic Builds since version 0.13.1) Try using Hadoop DSL Automatic Builds as described in the next section to easily build workflows customized for multiple grids! We have left the original documentation for this section in place for reference (or for users that want to manually orchestrate their Hadoop DSL workflows).
When used together, the combination of definition sets, namespaces and Hadoop closures allow you to easily declare and build grid-specific Hadoop DSL workflows, without having to copy and paste any Hadoop DSL code for each grid. To build multi-grid Hadoop DSL workflows using definition sets, namespaces and Hadoop closures, you should do the following:
- Declare a named definition set for each grid with
definitionSet
. If some of your definitions have default values, you can add them to the default (unnamed) definition set. - Use a (named or anonymous)
hadoopClosure
block to declare the Hadoop DSL code you would like re-evaluated for each definition set - Within the
hadoopClosure
block, declare anamespace
(using the current definition set name, which you can get with thedefinitionSetName
method) so that each time the closure is re-evaluated, generated Hadoop DSL elements will have unique fully qualified names - Inside the
hadoop
block, invokeevalHadoopClosures
(for anonymous closures) orevalHadoopClosure
(for named closures) to evaluate your closures against the definition sets you declared (or the default definition set)
One of the advantages of Hadoop DSL namespaces is that the compiled Hadoop DSL output for each namespace
is written to its own subdirectory (based on the namespace
name). When you build zip artifacts for each grid in a multi-grid build, this makes it easy to use Gradle CopySpec
declarations that pick out the correct subdirectories for each particular grid.
// Example of using Hadoop closures for multi-grid builds. First, declare definition sets for each grid.
definitionSet name: 'dev', defs: [
var1 : 'val1_dev',
var2 : true
]
definitionSet name: 'prod', defs: [
var1 : 'val1_prod',
var2 : false
]
// Now declare hadoopClosure blocks for each set of Hadoop DSL code blocks you want re-evaluated per grid.
// The hadoopClosure block is just like a regular hadoop block except that its evaluation is deferred.
hadoopClosure closure: {
// You should usually encapsulate Hadoop DSL code in a hadoopClosure within
// within a namespace so that the fully-qualified element names will be unique.
namespace(definitionSetName()) { // Returns the current definition set name!
workflow('workflow1') {
pigJob('pigJob1') {
uses 'src/main/pig/pigScript.pig'
}
// Another use case is to customize runtime logic per definition set
if (lookupDef('var2')) {
noOpJob('noOpJob1') {
depends 'pigJob1'
}
targets 'noOpJob1'
}
else {
targets 'pigJob1'
}
}
}
}
// Then have a regular hadoop block. You can have regular Hadoop DSL code in it like normal, except
// that it is evaluated immediately (like normal) instead of the deferred evaluation you get with
// hadoopClosure blocks. You might want to leave some of your code in a regular hadoop block; it's
// up to you!
hadoop {
buildPath "azkaban"
cleanPath false
// Evaluate your anonymous Hadoop closures against your named definition sets
evalHadoopClosures defs: 'dev'
evalHadoopClosures defs: 'prod'
}
(Since version 0.13.1) Instead of using Hadoop DSL namespace
, hadoopClosure
and evalHadoopClosure
language features to declare multi-grid builds (as described in the previous section), you can now use Hadoop DSL Automatic Builds as described at Hadoop DSL Automatic Builds to easily build workflows customized for multiple grids.
Add your Hadoop DSL definitionSet
declarations for each grid to src/main/definitions/<gridName>.gradle
, your Hadoop DSL user profile scripts to src/main/profiles/<userName>.gradle
and your Hadoop DSL workflow files to src/main/gradle
. Then Hadoop DSL Automatic Builds will rebuild the Hadoop DSL for each definition set file by applying the definition set file, your user profile file (if it exists) and all the Hadoop DSL workflow files automatically.
In your workflow files, you don't need to wrap your Hadoop DSL code in Hadoop DSL namespace
or hadoopClosure
declarations or call evalHadoopClosures
as the automatic build process will take care of it for you. Of course, you can still use all of these Hadoop DSL language features in your code, as they can still be very useful for advanced users that wish to declare reusable Hadoop DSL blocks or otherwise manually orchestrate their Hadoop DSL workflows.
(Since version 0.6.10. Support for the applyUserProfile
method since version 0.12.9.) Hadoop DSL profiles are simply Gradle scripts in which users apply their own definition set overrides. Users often want to override definition set parameters like HDFS input paths, home directory paths, job configuration values, etc.
We recommend that you store user profile scripts in the src/main/profiles
directory with the name userName.gradle
and that you keep them in source control. Then you can easily apply user profile scripts with the Hadoop DSL applyUserProfile
and applyProfile
helper functions as shown in the example below.
We recommend that you use the newer applyUserProfile
method. By default, this method looks file the userName.gradle
in the src/main/profiles
directory. You can override this behavior by specifying the optional profileName
and pathName
parameters, or by specifying -PprofileName=<profileName>
and -PpathName=<pathName>
on the Gradle command line.
To skip applying the user profile, you can specify the optional skipProfile
parameter or by specifying -PskipProfile=true
on the command line. You can also cause the profile to be skipped by specifying -PprofileName
with no value on the command line.
// In your regular Hadoop DSL scripts you specify definition sets
definitionSet defs: [
'hdfsHomePath' : '/jobs/projectResults',
'notify.email' : '[email protected]'
]
// Now use the applyUserProfile to apply each user's profile script, if it exists. By default,
// applyUserProfile looks file the file <userName>.gradle in the "src/main/profiles" directory.
applyUserProfile()
// You can also override the optional profileName and pathName parameters if you
// want to use the applyUserProfile method to apply a particular profile script.
// applyUserProfile profileName: 'ackermann.gradle', profilePath: 'src/main/otherProfiles', skipProfile: false
// You can also use the old applyProfile method if you want to specify the complete path to the
// profile script
// applyProfile from: "${project.projectDir}/src/main/profiles/${System.properties['user.name']}.gradle"
// Now in the file src/main/profiles/userName.gradle each user keeps their own definitionSet overrides.
// We recommend that you keep profile scripts in source control so people only need to create them once.
definitionSet defs: [
'hdfsHomePath' : '/user/joe_smith/projectResults',
'notify.email' : '[email protected]'
]
The DSL is defined by classes and objects, just like anything else in Groovy. It's possible to ignore the DSL syntax and write your workflows and jobs using the API with straight Groovy code.
Additionally, you can save a reference to your object as a Groovy def
or use lookup
to get a reference to the objects you have declared, and then manipulate them with the API.
The best way to understand the API is to check the Hadoop DSL source code at https://github.com/linkedin/linkedin-gradle-plugin-for-apache-hadoop/tree/master/hadoop-plugin/src/main/groovy/com/linkedin/gradle/hadoopdsl.
// Add the Hadoop Plugin classes to the buildscript so we can use the API
buildscript {
repositories {
maven {
url "https://plugins.gradle.org/m2/"
}
}
dependencies {
classpath "gradle.plugin.com.linkedin.hadoop-plugin:hadoop-plugin:0.8.6"
}
}
apply plugin: "com.linkedin.gradle.hadoop.HadoopPlugin"
// Use the DSL to create a PigJob and then use the API to update it
def job1 = pigJob('job1') { }
job1.uses("src/main/pig/count_by_country.pig")
job1.setParameter("member_summary", "/data/derived/member/summary#LATEST")
job1.setParameter("output", "count_by_country.dat")
// Use the DSL to create a Property File and then use the API to update it
def propertyFile1 = propertyFile('properties1') { }
propertyFile1.setJvmProperty("jvmProperty1", "jvm1")
propertyFile1.setJobProperty("myPropertyA", "valA")
propertyFile1.setJobProperty("myPropertyB", "valB")
// Use the DSL to create a workflow and then use the API to add the pigJob and propertyFile to it
def workflow1 = workflow('classes1Workflow1') { }
workflow1.jobs.add(job1)
workflow1.properties.add(propertyFile1)
workflow1.targets 'job1'
// Add the workflow to the HadoopDslExtension so it will be built
hadoop.buildPath "azkaban"
hadoop.cleanPath false
hadoop.workflows.add(workflow1)
Check the FAQ for questions at Hadoop DSL Language Reference FAQ.