Hive源码学习:应用程序提交

Hive源码学习:应用程序提交

核心组件

SQL Parser:解析器
Physical Plan:编译器
Query Optimizer:优化器
Execution:执行器

HQL编译MR流程

先看具体流程图:

通过流程图,对相关流程进行总结:

  1. 提交作业,进入程序,定义HQL的语法规则,对HQL完成语法解析,将HQL转换为AST(抽象语法树)
  2. 遍历AST,抽象出查询的基本组成单元QueryBlock(查询块)
  3. 遍历QueryBlock,将其转换成OperatorTree(操作树,也就是逻辑执行计划)
  4. 利用逻辑优化器对OperatorTree进行逻辑优化
  5. 遍历OperatorTree,转换为TaskTree。也就是翻译为MR任务的流程,将逻辑执行计划转换为物理执行计划
  6. 使用物理优化器对TaskTree进行物理优化
  7. 生成最终的执行计划,提交任务到Hadoop集群运行。

源码解析

根据上面的流程图,我们查看具体源码。

新建CliDriver

CliDriver主要负责:

  • 解析客户端-e、-f等参数
  • 定义标准输入输出流
  • 定义Hive客户端输出前缀
  • 切分HQL,执行HQL

一个通用的提交程序是:

1
bin/hive -e "select * from table"

通过查看shell脚本,找到程序入口:org.apache.hadoop.hive.cli.CliDriver

核心源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
  // 确定程序入口,找到main方法
public static void main(String[] args) throws Exception {
int ret = new CliDriver().run(args);
System.exit(ret);
}
// 通过main方法找到run方法。
public int run(String[] args) throws Exception {
OptionsProcessor oproc = new OptionsProcessor();
// 参数解析:hiveconf等
if (!oproc.process_stage1(args)) {
return 1;
}

boolean logInitFailed = false;
String logInitDetailMessage;
try {
logInitDetailMessage = LogUtils.initHiveLog4j();
} catch (LogInitializationException e) {
logInitFailed = true;
logInitDetailMessage = e.getMessage();
}

CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class));
// 定义标准输入输出流
ss.in = System.in;
try {
ss.out = new PrintStream(System.out, true, "UTF-8");
ss.info = new PrintStream(System.err, true, "UTF-8");
ss.err = new CachingPrintStream(System.err, true, "UTF-8");
} catch (UnsupportedEncodingException e) {
return 3;
}

// 参数解析:-e、-f等。
if (!oproc.process_stage2(ss)) {
return 2;
}

if (!ss.getIsSilent()) {
if (logInitFailed) {
System.err.println(logInitDetailMessage);
} else {
SessionState.getConsole().printInfo(logInitDetailMessage);
}
}

// set all properties specified via command line
HiveConf conf = ss.getConf();
for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) {
conf.set((String) item.getKey(), (String) item.getValue());
ss.getOverriddenConfigurations().put((String) item.getKey(), (String) item.getValue());
}

// 定义hive客户端前缀。默认hive,客户端体现 hive>
// read prompt configuration and substitute variables.
prompt = conf.getVar(HiveConf.ConfVars.CLIPROMPT);
prompt = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(conf, prompt);
prompt2 = spacesForString(prompt);

if (HiveConf.getBoolVar(conf, ConfVars.HIVE_CLI_TEZ_SESSION_ASYNC)) {
// Start the session in a fire-and-forget manner. When the asynchronously initialized parts of
// the session are needed, the corresponding getters and other methods will wait as needed.
SessionState.beginStart(ss, console);
} else {
SessionState.start(ss);
}

ss.updateThreadName();

// Create views registry
HiveMaterializedViewsRegistry.get().init();

// execute cli driver work
try {
// 这里是启动一个Hive Cli客户端。
return executeDriver(ss, conf, oproc);
} finally {
ss.resetThreadName();
ss.close();
}
}

初始化Hive Cli客户端

这里主要包括:

1.如果是quit或者exit,关闭客户端并退出
2.如果是source,在Hive Cli中来执行一个HQL文件
3.如果是!则执行一个shell命令
4.如果是标准的HQL,执行HQL命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//初始化Hive Cli客户端,处理SQL语句。
private int executeDriver(CliSessionState ss, HiveConf conf, OptionsProcessor oproc)
throws Exception {

CliDriver cli = new CliDriver();
cli.setHiveVariables(oproc.getHiveVariables());

// TODO
// MR引擎输出信息
if ("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE))) {
console.printInfo(HiveConf.generateMrDeprecationWarning());
}

// 初始化控制台阅读器
setupConsoleReader();

String line;
int ret = 0;
String prefix = "";
String curDB = getFormattedDb(conf, ss);
String curPrompt = prompt + curDB;
String dbSpaces = spacesForString(curDB);

while ((line = reader.readLine(curPrompt + "> ")) != null) {
if (!prefix.equals("")) {
prefix += '\n';
}
// Hive Cli中用--表示注释。如果遇见--注释,直接跳过。
if (line.trim().startsWith("--")) {
continue;
}
// 如果以;结尾说明一个SQL结束。处理SQL。
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
ret = cli.processLine(line, true);
prefix = "";
curDB = getFormattedDb(conf, ss);
curPrompt = prompt + curDB;
dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB);
} else {
prefix = prefix + line;
curPrompt = prompt2 + dbSpaces;
continue;
}
}

return ret;
}

// 进入processLine方法,处理SQL
public int processLine(String line, boolean allowInterrupting) {
// TODO
ret = processCmd(command);
command = "";
lastRet = ret;
}

分情况处理HQL

这里主要包括:

1.如果是quit或者exit,关闭客户端并退出
2.如果是source,在Hive Cli中来执行一个HQL文件
3.如果是!则执行一个shell命令
4.如果是标准的HQL,执行HQL命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* 分情况处理SQL
* @param cmd
* @return
*/
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);

ss.updateThreadName();

// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;

// 如果是quit或者exit,关闭客户端并退出。
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {

// if we have come this far - either the previous commands
// are all successful or this is command line. in either case
// this counts as a successful run
ss.close();
System.exit(0);

// 如果是source,在Hive Cli中来执行一个HQL文件
} else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
cmd_1 = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), cmd_1);

File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
} else {
try {
ret = processFile(cmd_1);
} catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
}
// 如果是!则执行一个shell命令
} else if (cmd_trimmed.startsWith("!")) {
// for shell commands, use unstripped command
String shell_cmd = cmd.trim().substring(1);
shell_cmd = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), shell_cmd);

// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
try {
ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
ret = executor.execute();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
} else { // local mode
try {
// 其它情况则是SQL命令
try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) {
if (proc instanceof IDriver) {
// Let Driver strip comments using sql parser
ret = processLocalCmd(cmd, proc, ss);
} else {
ret = processLocalCmd(cmd_trimmed, proc, ss);
}
}
} catch (SQLException e) {
console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

ss.resetThreadName();
return ret;
}

新建Driver

这里主要包括:

1.解析编译HQL(解析、编译、优化)
2.执行优化后的物理执行计划(执行)

流程如下:
如果是标准HQL,先通过抽象方法找到Driver类中具体的实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  // 进入 processLocalCmd(cmd, proc, ss)方法,找到run方法。
ret = qp.run(cmd).getResponseCode();

// 找到具体的实现方法。
public CommandProcessorResponse run(String command, boolean alreadyCompiled) {

try {
// 找到SQL命令执行的地方。此时alreadyCompiled为false。
runInternal(command, alreadyCompiled);
return createProcessorResponse(0);
// TODO
}

//进入runInternal方法,找到具体的SQL编译入口。
private void runInternal(String command, boolean alreadyCompiled) throws CommandProcessorResponse {
errorMessage = null;
SQLState = null;
downstreamError = null;
LockedDriverState.setLockedDriverState(lDrvState);

// TODO

if (!alreadyCompiled) {
// 编译SQL。
// compile internal will automatically reset the perf logger
compileInternal(command, true);
// then we continue to use this perf logger
perfLogger = SessionState.getPerfLogger();
} else {
// reuse existing perf logger.
perfLogger = SessionState.getPerfLogger();
// Since we're reusing the compiled plan, we need to update its start time for current run
plan.setQueryStartTime(perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
}
// TODO
try {
// SQL编译完成之后再执行。
execute();
} catch (CommandProcessorResponse cpr) {
rollback(cpr);
throw cpr;
}
// TODO
}

解析、编译、优化HQL(核心源码)

这里分为两步:
1.解析器工作,将HQL转换成AST
2.编译器和优化器工作,将AST优化输出

HQL转AST
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public ASTNode parse(String command, Context ctx, String viewFullyQualifiedName)
throws ParseException {
if (LOG.isDebugEnabled()) {
LOG.debug("Parsing command: " + command);
}

// 构建词法解析器
HiveLexerX lexer = new HiveLexerX(new ANTLRNoCaseStringStream(command));
TokenRewriteStream tokens = new TokenRewriteStream(lexer);
if (ctx != null) {
if (viewFullyQualifiedName == null) {
// Top level query
ctx.setTokenRewriteStream(tokens);
} else {
// It is a view
ctx.addViewTokenRewriteStream(viewFullyQualifiedName, tokens);
}
lexer.setHiveConf(ctx.getConf());
}
// 将HQL中的关键词替换为Token
HiveParser parser = new HiveParser(tokens);
if (ctx != null) {
parser.setHiveConf(ctx.getConf());
}
parser.setTreeAdaptor(adaptor);
HiveParser.statement_return r = null;
try {
// 进行语法解析,生成最终的AST。
r = parser.statement();
} catch (RecognitionException e) {
e.printStackTrace();
throw new ParseException(parser.errors);
}

if (lexer.getErrors().size() == 0 && parser.errors.size() == 0) {
LOG.debug("Parse Completed");
} else if (lexer.getErrors().size() != 0) {
throw new ParseException(lexer.getErrors());
} else {
throw new ParseException(parser.errors);
}

ASTNode tree = (ASTNode) r.getTree();
tree.setUnknownTokenBoundaries();
return tree;
}
AST解析

主要包括:

  1. 将AST转换为QueryBlock进一步转换为OperatorTree
  2. 对OperatorTree进行逻辑优化(LogicalOptimizer)
  3. 将OperatorTree转换为TaskTree(任务树)
  4. 对TaskTree进行物理优化(PhysicalOptimizer)

通过抽象方法找到实现类SemanticAnalyzer里面的实现方法analyzeInternal。注释里面进行了比较详细的说明,一共11条。
1.Generate Resolved Parse tree from syntax tree(从语法树AST生成解析分析树)
2.Gen OP Tree from resolved Parse Tree(从解析的分析树生成OP树:OperatorTree)
3.Deduce Resultset Schema(推导结果集Schema)
4.Generate Parse Context for Optimizer & Physical compiler(为优化器Optimizer和物理编译器生成分析上下文)
5.Take care of view creation(注意视图创建)
6.Generate table access stats if required(如果需要,生成表访问统计信息)
7.Perform Logical optimization(执行逻辑优化)
8.Generate column access stats if required - wait until column pruning(如果需要,生成列访问统计信息-等待列修剪)
9.Optimize Physical op tree & Translate to target execution engine (MR,TEZ..)(优化物理操作树并转换为目标执行引擎(MR,TEZ等)
10.Attach CTAS/Insert-Commit-hooks for Storage Handlers(为存储处理程序附加CTAS/插入提交hooks)
11.put accessed columns to readEntity(将访问的列放入readEntity)

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
void analyzeInternal(ASTNode ast, PlannerContextFactory pcf) throws SemanticException {
LOG.info("Starting Semantic Analysis");
// 1. Generate Resolved Parse tree from syntax tree
boolean needsTransform = needsTransform();
//change the location of position alias process here
processPositionAlias(ast);
PlannerContext plannerCtx = pcf.create();
if (!genResolvedParseTree(ast, plannerCtx)) {
return;
}

if (HiveConf.getBoolVar(conf, ConfVars.HIVE_REMOVE_ORDERBY_IN_SUBQUERY)) {
for (String alias : qb.getSubqAliases()) {
removeOBInSubQuery(qb.getSubqForAlias(alias));
}
}

// Check query results cache.
// If no masking/filtering required, then we can check the cache now, before
// generating the operator tree and going through CBO.
// Otherwise we have to wait until after the masking/filtering step.
boolean isCacheEnabled = isResultsCacheEnabled();
QueryResultsCache.LookupInfo lookupInfo = null;
if (isCacheEnabled && !needsTransform && queryTypeCanUseCache()) {
lookupInfo = createLookupInfoForQuery(ast);
if (checkResultsCache(lookupInfo)) {
return;
}
}

ASTNode astForMasking;
if (isCBOExecuted() && needsTransform &&
(qb.isCTAS() || qb.isView() || qb.isMaterializedView() || qb.isMultiDestQuery())) {
// If we use CBO and we may apply masking/filtering policies, we create a copy of the ast.
// The reason is that the generation of the operator tree may modify the initial ast,
// but if we need to parse for a second time, we would like to parse the unmodified ast.
astForMasking = (ASTNode) ParseDriver.adaptor.dupTree(ast);
} else {
astForMasking = ast;
}

// 2. Gen OP Tree from resolved Parse Tree
Operator sinkOp = genOPTree(ast, plannerCtx);

boolean usesMasking = false;
if (!unparseTranslator.isEnabled() &&
(tableMask.isEnabled() && analyzeRewrite == null)) {
// Here we rewrite the * and also the masking table
ASTNode rewrittenAST = rewriteASTWithMaskAndFilter(tableMask, astForMasking, ctx.getTokenRewriteStream(),
ctx, db, tabNameToTabObject, ignoredTokens);
if (astForMasking != rewrittenAST) {
usesMasking = true;
plannerCtx = pcf.create();
ctx.setSkipTableMasking(true);
init(true);
//change the location of position alias process here
processPositionAlias(rewrittenAST);
genResolvedParseTree(rewrittenAST, plannerCtx);
if (this instanceof CalcitePlanner) {
((CalcitePlanner) this).resetCalciteConfiguration();
}
sinkOp = genOPTree(rewrittenAST, plannerCtx);
}
}

// Check query results cache
// In the case that row or column masking/filtering was required, we do not support caching.
// TODO: Enable caching for queries with masking/filtering
if (isCacheEnabled && needsTransform && !usesMasking && queryTypeCanUseCache()) {
lookupInfo = createLookupInfoForQuery(ast);
if (checkResultsCache(lookupInfo)) {
return;
}
}

// 3. Deduce Resultset Schema
if (createVwDesc != null && !this.ctx.isCboSucceeded()) {
resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
} else {
// resultSchema will be null if
// (1) cbo is disabled;
// (2) or cbo is enabled with AST return path (whether succeeded or not,
// resultSchema will be re-initialized)
// It will only be not null if cbo is enabled with new return path and it
// succeeds.
if (resultSchema == null) {
resultSchema = convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(),
HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES));
}
}

// 4. Generate Parse Context for Optimizer & Physical compiler
copyInfoToQueryProperties(queryProperties);
ParseContext pCtx = new ParseContext(queryState, opToPartPruner, opToPartList, topOps,
new HashSet<JoinOperator>(joinContext.keySet()),
new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
loadTableWork, loadFileWork, columnStatsAutoGatherContexts, ctx, idToTableNameMap, destTableId, uCtx,
listMapJoinOpsNoReducer, prunedPartitions, tabNameToTabObject, opToSamplePruner,
globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
analyzeRewrite, tableDesc, createVwDesc, materializedViewUpdateDesc,
queryProperties, viewProjectToTableSchema, acidFileSinks);

// Set the semijoin hints in parse context
pCtx.setSemiJoinHints(parseSemiJoinHint(getQB().getParseInfo().getHintList()));
// Set the mapjoin hint if it needs to be disabled.
pCtx.setDisableMapJoin(disableMapJoinWithHint(getQB().getParseInfo().getHintList()));

// 5. Take care of view creation
if (createVwDesc != null) {
if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) {
return;
}

if (!ctx.isCboSucceeded()) {
saveViewDefinition();
}

// validate the create view statement at this point, the createVwDesc gets
// all the information for semanticcheck
validateCreateView();

if (createVwDesc.isMaterialized()) {
createVwDesc.setTablesUsed(getTablesUsed(pCtx));
} else {
// Since we're only creating a view (not executing it), we don't need to
// optimize or translate the plan (and in fact, those procedures can
// interfere with the view creation). So skip the rest of this method.
ctx.setResDir(null);
ctx.setResFile(null);

try {
PlanUtils.addInputsForView(pCtx);
} catch (HiveException e) {
throw new SemanticException(e);
}

// Generate lineage info for create view statements
// if LineageLogger hook is configured.
// Add the transformation that computes the lineage information.
Set<String> postExecHooks = Sets.newHashSet(Splitter.on(",").trimResults()
.omitEmptyStrings()
.split(Strings.nullToEmpty(HiveConf.getVar(conf, HiveConf.ConfVars.POSTEXECHOOKS))));
if (postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")
|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")
|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {
ArrayList<Transform> transformations = new ArrayList<Transform>();
transformations.add(new HiveOpConverterPostProc());
transformations.add(new Generator(postExecHooks));
for (Transform t : transformations) {
pCtx = t.transform(pCtx);
}
// we just use view name as location.
queryState.getLineageState()
.mapDirToOp(new Path(createVwDesc.getViewName()), sinkOp);
}
return;
}
}

// 6. Generate table access stats if required
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS)) {
TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);
setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());
}

// 7. Perform Logical optimization
if (LOG.isDebugEnabled()) {
LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}
Optimizer optm = new Optimizer();
optm.setPctx(pCtx);
optm.initialize(conf);
pCtx = optm.optimize();
if (pCtx.getColumnAccessInfo() != null) {
// set ColumnAccessInfo for view column authorization
setColumnAccessInfo(pCtx.getColumnAccessInfo());
}
if (LOG.isDebugEnabled()) {
LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
}

// 8. Generate column access stats if required - wait until column pruning
// takes place during optimization
boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()
&& HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);
if (isColumnInfoNeedForAuth
|| HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);
// view column access info is carried by this.getColumnAccessInfo().
setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess(this.getColumnAccessInfo()));
}

// 9. Optimize Physical op tree & Translate to target execution engine (MR,
// TEZ..)
if (!ctx.getExplainLogical()) {
TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
compiler.init(queryState, console, db);
compiler.compile(pCtx, rootTasks, inputs, outputs);
fetchTask = pCtx.getFetchTask();
}
//find all Acid FileSinkOperatorS
QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId());

// 10. Attach CTAS/Insert-Commit-hooks for Storage Handlers
final Optional<TezTask> optionalTezTask =
rootTasks.stream().filter(task -> task instanceof TezTask).map(task -> (TezTask) task)
.findFirst();
if (optionalTezTask.isPresent()) {
final TezTask tezTask = optionalTezTask.get();
rootTasks.stream()
.filter(task -> task.getWork() instanceof DDLWork)
.map(task -> (DDLWork) task.getWork())
.filter(ddlWork -> ddlWork.getPreInsertTableDesc() != null)
.map(ddlWork -> ddlWork.getPreInsertTableDesc())
.map(ddlPreInsertTask -> new InsertCommitHookDesc(ddlPreInsertTask.getTable(),
ddlPreInsertTask.isOverwrite()))
.forEach(insertCommitHookDesc -> tezTask.addDependentTask(
TaskFactory.get(new DDLWork(getInputs(), getOutputs(), insertCommitHookDesc), conf)));
}

LOG.info("Completed plan generation");

// 11. put accessed columns to readEntity
if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
putAccessedColumnsToReadEntity(inputs, columnAccessInfo);
}

if (isCacheEnabled && lookupInfo != null) {
if (queryCanBeCached()) {
QueryResultsCache.QueryInfo queryInfo = createCacheQueryInfoForQuery(lookupInfo);

// Specify that the results of this query can be cached.
setCacheUsage(new CacheUsage(
CacheUsage.CacheStatus.CAN_CACHE_QUERY_RESULTS, queryInfo));
}
}
}

执行优化后的物理执行计划

主要包括:

  1. 根据TaskTree构建MR Job
  2. 运行MR Job

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private void execute() throws CommandProcessorResponse {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
// TODO
setQueryDisplays(plan.getRootTasks());
// 将TaskTree转换成MR Jobs
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size()
+ Utilities.getSparkTasks(plan.getRootTasks()).size();
if (jobs > 0) {
logMrWarning(mrJobs);
console.printInfo("Query ID = " + queryId);
console.printInfo("Total jobs = " + jobs);
}
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setQueryProperty(queryId, Keys.QUERY_NUM_TASKS,
String.valueOf(jobs));
SessionState.get().getHiveHistory().setIdToTableMap(plan.getIdToTableNameMap());
}
String jobname = Utilities.abbreviate(queryStr, maxlen - 6);

// A runtime that launches runnable tasks as separate Threads through
// TaskRunners
// As soon as a task isRunnable, it is put in a queue
// At any time, at most maxthreads tasks can be running
// The main thread polls the TaskRunners to check if they have finished.

checkInterrupted("before running tasks.", hookContext, perfLogger);

DriverContext driverCxt = new DriverContext(ctx);
driverCxt.prepare(plan);

ctx.setHDFSCleanup(true);
this.driverCxt = driverCxt; // for canceling the query (should be bound to session?)

SessionState.get().setMapRedStats(new LinkedHashMap<>());
SessionState.get().setStackTraces(new HashMap<>());
SessionState.get().setLocalMapRedErrors(new HashMap<>());

// Add root Tasks to runnable
for (Task<? extends Serializable> tsk : plan.getRootTasks()) {
// This should never happen, if it does, it's a bug with the potential to produce
// incorrect results.
assert tsk.getParentTasks() == null || tsk.getParentTasks().isEmpty();
driverCxt.addToRunnable(tsk);

if (metrics != null) {
tsk.updateTaskMetrics(metrics);
}
}

preExecutionCacheActions();

perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
// 运行MR Job
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}

// TODO
// 控制台打印OK。
if (console != null) {
console.printInfo("OK");
}
}
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信