-
Notifications
You must be signed in to change notification settings - Fork 17
MoveToHive
Currently the Windowing Engine runs on top of Hive. An obvious question is if this functionality can be folded into Hive. There are undeniable reasons for doing this, briefly:
- end users would want this functionality inside Hive for reasons of
consistent behavior, support etc.
- use of a consistent expression language. For e.g. reuse of Hive functions in Windowing clauses.
- Implementation wise:
- Windowing is orders of magnitude simpler than Hive, and can benefit from using equivalent components that are in Hive. More on this in the next section.
- Avoid the trap of constantly chasing changes in the Hive code base.
- Folding in Table function mechanics may open up optimizations not possible with the approach today.
Here, we initially summarize how the Windowing Engine works and map it to concepts/components in Hive. Then we list a multi-stage process of moving Windowing & Table functionality into Hive. I am no expert on Hive, so have erred on the side of being non intrusive. There is good chance that there are better approaches to some of the later steps; open to comments/suggestions from the community.
First of all to avoid confusion I am going to call Table functions defined in the Windowing Engine as Partitioned Table Functions (PTF). This is to distinguish it from UDTFs in Hive, and to capture the notion that instances of these functions operate on Partitions of the input dataset. Also Windowing Engine is a misnomer, the central concept is PTF; but to avoid confusion and also because of how this project evolved have got stuck with the name, Windowing Engine.
The central concepts in the Windowing Engine are:
- Partition: is a container of Writables, which is also associated with a hive.serde2.oi.ObjectInspector. An input dataset is divided into Partitions. A Partition exposes Row objects through Groovy’s index and iteration protocols. A Row is a Groovy Binding which exposes cells in the Partition through the Binding protocol. Data is transformed into Java objects at the time of gets on a cell.
-
Partition Table Function
- is responsible for taking in a Partition and producing a Partition.
- The user invokes a PTF as:
FuncName(tableSpecification, arg1, arg2... argN) [partition by .. order by..] [windowing expression] - the input can be a hive Query/table or another table function, enabling chaining of Functions.
- The user specifies how the input dataset should be partitioned and ordered before being made available to instances of the Function. Optionally user may specify operating on a Window when evaluating expressions on a Row.
- A PTF is responsible for describing the Shape of the output Partition. A PTF may also indicate that it wants to reshape input data before it is partitioned. Again the contract with the engine is that it is given a Partition of raw data, it must return a Partition of data that is run through the partitioning/ordering mechanics. If the PTF chooses to reshape raw data it must also describe the new Shape of the raw data.
- This contract is codified in the FuncDef and ArgDef annotations; the FunctionRegistry and the AbstractTableFunction abstract class. At a high level this is similar to the function mechanics in Hive. The shape of Data is described using Hive’s hive.serde2.ti.TypeInfo abstraction.
- The windowing clauses in a Query are handed over to a special Windowing Table Function. This is always added to the end of the PTF chain. Windowing.pdf describes in detail how Windowing clauses are evaluated. But as far as the execution flow goes it appears as though another Table Function is being processed.
-
Runtime: execution of Chain of Functions
- Users can chain PTFs together in one query. More importantly we plan to support Iterative functions which repeatedly process the input until a fixed point is reached.
- These are translated to a list of Job Specifications. Each Job
executes part of the entire Function Chain. See javadoc for
details of rules used to decompose into Jobs. Each Job
is a MR job where:
- In the Map side the first function in the subchain can optionally reshape the Raw data.
- The Reduce side streams partitions through a chain of PTFs.
- The output is written to hdfs and exposed as a Temporary Table to be used by the next Job in the Chain.
- The execution mechanics is provided by classes in the runtime.mr package. The Job class conceptually plays the role of the ql.exec.MapRedTask; the Map and Reduce classes play the role of ql.exec.ExecMapper and ExecReducer; TableFunctions play the role of Operators; there is no concept of Work, this is currently in the Query object itself; there is no explicit concept of a Stage.
-
The Interfaces to the Engine
- the engine is usable from a CLI and programmatically.
- WindowingCLIDriver extends hive.cli.CLIDriver and provides an experience that allows users to use all of Hive CLI functionality with additions to run windowing queries. As of this writing the implementations has been cleaned up: no more spawning of processes and requirements that a Metastore server be available. Much of this stemmed from the use of a different version of Antlr; now Windowing uses the same version of Antlr as Hive. The documentation will reflect this change shortly.
- The API interface is extremely simple: create a WindowingShell, invoke executeQuery on it. Plan to remove the explicit Thrift connection step, which will happen implicitly behind the scenes.
So to summarize here is a table that maps(at a very high level) concepts in the Windowing Engine to components in Hive:
| Windowing Component | Hive component |
|---|---|
| mr package: Job, Map, Reduce classes | ql.exec.MapRedTask,ExecMapper, ExecReducer |
| Table Function annotations | ExplainPlan annotations |
| Table Function classes | Operator classes, User Defined functions. |
| Groovy evaluator mechanics | Hive Expression Evaluation |
| CompositeKey (used as the Key for MR | HiveKey |
| shuffle) | |
| Language Grammar, Parsing | Hive.g, ql.parse package |
| WindowingCLIDriver | hive.cli.CLIDriver |
The proposal can be summarized as:
- Use Hive’s MR runtime for Windowing Jobs. This seems like a no brainer; having a separate micro MR framework is pointless.
- Migrate to Hive’s Expression execution model. This is the key step before Table functions and Windowing clauses can be exposed in HQL.
- expose Table functions and Windowing clauses in HQL.
- This seems like a no brainer. Entails some easy steps and some subtle changes.
- Replace windowing.mr package artifacts with MapRedTask, ExecMapper, ExecReducer.
- Introduction of PTFMapOperator and PTRReduceOperator. Today the function’s business logic and Partition management responsibilities are encapsulated in one concept the ‘TableFunction’. This will force the separation of the two, with the Partition management moved to the Operators and reusable by any PTF. It will be possible to leverage sophisticated techniques in place for memory management, for e.g. from the GroupByOperator. Currently we don’t support function level partitioning and ordering in Windowing Clauses; code in the GroupByOpertor maybe reusable for function level ordering.
- no change to the end user as far as Windowing language.
- may provide better integration at CLI/API level; but these are cosmetic changes.
- implementation better integrated into Hive.
- The Windowing Engine uses Hive’s SerDe mechanics but has its own Expression Evaluation mechanics.
- It uses Groovy for this. The choice was deliberate: gave a big jumpstart, the goal was never to have a proprietary evaluation layer long term.
- Move to using Hive’s Expression runtime is natural choice. Big advantage to User: ability to use Hive expressions in Windowing language(windowing clauses, PTF parameters etc); all Hive functions available in Windowing language.
- Not sure at this time the exact procedure to achieve this:
- whether it is possible to continue to maintain a separate Windowing grammar and parser and reuse/callout to Hive grammar/parser for expression components.
- or whether this entails folding in Windowing clauses into Hive grammar. If this is the case then Step 2 and 3 have to be done as 1 step.
- In either case this involves rewriting the translator to generate Hive Expressions instead of Groovy and proprietary Windowing data structs.
- This will also involve enabling UDAF’s to work in this mode: both on complete partitions and on Windows.
- The Hive Expression model will need to support navigation (lead/lag functions at the very least) operators/functions in this mode.
- User will find better integration with HQL. Expressions in the Windowing language will be equivalent to HQL. HQL Functions will be available in the windowing language.
- Major surgery under the covers; except for the Parser and Translation layer everything else will be native Hive code.
The next 3 steps involve integration early in the translation process; these proposals are based on my lack of expertise in Hive codebase and hence the urge to be conservative. There are probably better ways to do this. The reason to have better integration would be for the possibility of more optimal execution plans. But this has to be balanced with too much complexity when introducing the change. People more intimate with Hive’s codebase will be able to make a better judgement call....
- The first step in introducing PTFs into Hive is via an ‘exec’ command; semantically this would imply execute a Function Chain and optionally store results in a Hive Table/Partition or hdfs:
exec TableFunc(tableInput, arg1,..,argN) [partition by .. order by..] [windowing expression] [into table...]
- tableInput could be a hive Query, Table or another PTF.
- the exact syntax can be discussed, but the idea is to enable users to invoke chains of PTFs in Hive. (What does this mean in JDBC/ODBC?)
- this also involves enabling registering of PTFs and making the PTF contract publicly usable.
- this would involve extending HQL grammar to allow exec calls and table function expressions.
- We propose extending the AST generated to have new nodes for PTableFunctions that are children of the Root Query Node.
- This would allow for a non invasive way to extend the translation layer of Hive to handle PTFs: add Stages for PTFs that execute after the Stages of the base Query.
- Users have PTFs in HQL. Registration mechanism parallels UDF/UDAF/UDTFs.
- Introduction through mostly additions in Hive codebase: to grammar, to translation layer. Be as non invasive as possible. (I don’t know if this is a good thing or a bad things?)
- Still need to keep Windowing language around for expressing Windowing clauses and for post filtering possible.
- Users would obviously want to be able to use Table functions where ever Tables/SubSelects appear.
- A conceptually simple way to introduce this is to transform any select into a ‘exec’ expression, followed by a select:
select ... from tableFunc(tableInput ...) .... becomes: exec tableFunc(tableInput) into TempTable; select ... from TempTable ...;
- this involves extending the grammar and parser; and doing an AST transformation very early in the transformation process, possibly generating 2 ASTs; potentially to the rest of the process it appears as though 2 Queries are being executed.
- almost there: to the user PTableFunctions become to behave like true Table Functions.
- But windowing clauses still not available.
- Though non-invasive, not sure if this is the right long term approach.
- to complete the Syntax plus AST transformation story.
- introduce Windowing clauses with supported UDAFs.
- Apply simple AST transformation to convert to an exec statement on the WindowingTable Function after the Query:
select ... rank() over..., sum(..) over.. from table .... becomes: insert into TempTable select ... from Table ...; exec WindowingTableFunc(TempTable, <an internal representation of Windowing Clauses>) partition by.. order by...
- Will involve restrictions on how windowing clauses can appear in the Select statement.
- An obvious one is that (at least as of now) they all must have the same partitioning and order specification.
- Admittedly hand waving here; assumption is that there is a semantically clean way to extract windowing clauses from the select statement.
- Todo: what is the story on post filtering: lead/lag, where clause critical for Time Series analysis, Top N analysis. The easy answer is some language extension.
- this would be the end of the Windowing code.
- User’s would begin to get Windowing and Table Functions completely integrated in HQL.
