WorkflowHiveQLAdaptor

From Gcube Wiki
Jump to: navigation, search

Overview

This adaptor, external to the adaptors offered by the WorkflowEngine, constructs an Execution Plan based on the query defined in HiveQL. This query can vary from a simple transformation job to a complex distributed MapReduce job. The query is parsed and the adaptor then processes the retrieved parsed info to create the Execution Plan. During plan creation the Search Operators are being exploited.

HiveQL

While based on SQL, HiveQL offers extensions, including loading/exporting data from outer sources and create table as select, but only offers basic support for indexes. Also, HiveQL lacks support for transactions and materialized views, and only limited subquery support. Nevertheless, the main outstanding feature resides on ease of writing queries that can be translated into a DAG of jobs that comply with MapReduce programming model.

Query Example

Here follows a simple example of a job expressed in HiveQL, composed by multiple statements. In brief, data stored locally to a given path, are retrieved and distributed by a particular field before transforming each record by the provided script. Finally, output after being merged is directed to the provided local location. All import/export path can also be uri like ftp, jdbc etc.

CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
 
LOAD DATA LOCAL INPATH '/home/user/u.data'
OVERWRITE INTO TABLE u_data;
 
ADD FILE /home/USER/weekday_mapper.py;
 
CREATE TABLE u_data_new (
  userid INT,
  movieid INT,
  rating INT,
  weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
 
FROM u_data
INSERT OVERWRITE TABLE u_data_new
SELECT
 TRANSFORM(userid, movieid, rating, unixtime)
 USING 'python weekday_mapper.py'
 AS userid, movieid, rating, weekday
CLUSTER BY movieid;
 
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/u_data_new'
SELECT u_data_new.*
FROM u_data_new;

Operator Plan

<Operation>
  <Functionality>DATASINK</Functionality>
  <Indications>
  	schema-[userid, movieid, rating, weekday], 
  	tableName-u_data_new, 
  	sink-file:/tmp/u_data_new
  </Indications>
  <Children>
    <Operation>
      <Functionality>SELECT</Functionality>
      <Indications>
      	schema-[userid, movieid, rating, weekday], 
      	filterMask-[0, 1, 2, 3]
      </Indications>
      <Children>
        <Operation>
          <Functionality>MERGE</Functionality>
          <Indications>schema-[_col0, _col1, _col2, _col3]</Indications>
          <Children>
            <Operation>
              <Functionality>SCRIPT</Functionality>
              <Indications>
              	schema-[_col0, _col1, _col2, _col3], 
              	source-/home/user/weekday_mapper.py, 
              	scriptCmd-python weekday_mapper.py
              </Indications>
              <Children>
                <Operation>
                  <Functionality>PARTITION</Functionality>
                  <Indications>
                    schema-[userid, movieid, rating, unixtime], 
                    clusterBy-[1], 
                    order-+
                 </Indications>
                  <Children>
                    <DataSource>
                      <<nowiki>Source</nowiki>>u_data</<nowiki>Source</nowiki>>
                      <Indications>
                        schema-[userid, movieid, rating, unixtime], 
                        input-file:/home/user/u.data, 
                        filterMask-[0, 1, 2, 3]
                      </Indications>
                    </DataSource>
                  </Children>
                </Operation>
              </Children>
            </Operation>
          </Children>
        </Operation>
      </Children>
    </Operation>
  </Children>
</Operation>

Constructed plan can be seen in next figure:

Constructed DAG Plan

Highlights

Parallelization factor

The adaptor will create the ExecutionPlan that will orchestrate the execution of a DAG of jobs as a series of Sequence. Depending on the transformation query, the adaptor may create a single sequential plan to multiple chains. For example, if query specifies “Distribute by” command, with a value equal to integer, the query will be executed in parallelization factor equal to that value. Otherwise, a table field may be specified, thus parallelization will be done according to that field's cardinality. For each unique value of that field a new sequential plan is created to handle corresponding records.

User Defined Function

User can provide the script of his choice that will be used during transformation. The script must consume data provided to standard input and produce output. Each record is splitted in separated lines while each record field is delimited by tab ‘\t’ character.

import sys
import datetime
 
for line in sys.stdin:
  line = line.strip()
  userid, movieid, rating, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print '\t'.join([userid, movieid, rating, str(weekday)])

Usage

The following snippet demonstrate the usage of the adaptor as a library:

String query = args[0];
WorkflowHiveQLAdaptor adaptor = new WorkflowHiveQLAdaptor();
 
HiveQLPlanner planner = new HiveQLPlanner();
planner .processLine(query);
PlanNode plan = OperatorAnalyzer.optimizePlan(planner.getCreatedPlan());
 
adaptor.setPlanDesc(plan);
adaptor.CreatePlan();
String output = adaptor.ExecutePlan();