@Override
public Collection<CrateCollector> getCollectors(CollectPhase collectPhase,
RowReceiver downstream,
JobCollectContext jobCollectContext) {
TableFunctionCollectPhase phase = (TableFunctionCollectPhase) collectPhase;
TableFunctionImplementation tableFunctionSafe = functions.getTableFunctionSafe(phase.functionName());
TableInfo tableInfo = tableFunctionSafe.createTableInfo(clusterService, Symbols.extractTypes(phase.arguments()));
//noinspection unchecked Only literals can be passed to table functions. Anything else is invalid SQL
List<Input<?>> inputs = (List<Input<?>>) (List) phase.arguments();
final Context context = new Context(new ArrayList<>(tableInfo.columns()));
List<Input<?>> topLevelInputs = new ArrayList<>(phase.toCollect().size());
for (Symbol symbol : phase.toCollect()) {
topLevelInputs.add(implementationVisitor.process(symbol, context));
}
Iterable<Row> rows = Iterables.transform(
tableFunctionSafe.execute(inputs),
InputRow.toInputRowFunction(topLevelInputs, context.collectExpressions));
OrderBy orderBy = phase.orderBy();
if (orderBy != null) {
rows = SystemCollectSource.sortRows(Iterables.transform(rows, Row.MATERIALIZE), phase);
}
RowsCollector rowsCollector = new RowsCollector(downstream, rows);
return Collections.<CrateCollector>singletonList(rowsCollector);
}
TableFunctionCollectSource.java 文件源码
java
阅读 30
收藏 0
点赞 0
评论 0
项目:Elasticsearch
作者:
评论列表
文章目录