mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-10 11:07:06 +00:00
(improvement)(headless) add sql optimizer and push minMax date filter down: (#567)
Co-authored-by: jipengli <jipengli@tencent.com>
This commit is contained in:
@@ -1,8 +1,5 @@
|
||||
package com.tencent.supersonic.headless.query.parser;
|
||||
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.model.domain.Catalog;
|
||||
import com.tencent.supersonic.headless.query.persistence.pojo.QueryStatement;
|
||||
|
||||
@@ -10,7 +7,7 @@ public interface HeadlessConverter {
|
||||
|
||||
boolean accept(QueryStatement queryStatement);
|
||||
|
||||
void converter(Catalog catalog, QueryStructReq queryStructCmd, ParseSqlReq sqlCommend, MetricReq metricCommand)
|
||||
void converter(Catalog catalog, QueryStatement queryStatement)
|
||||
throws Exception;
|
||||
|
||||
}
|
||||
|
||||
@@ -43,8 +43,7 @@ public class QueryParser {
|
||||
for (HeadlessConverter headlessConverter : ComponentFactory.getSemanticConverters()) {
|
||||
if (headlessConverter.accept(queryStatement)) {
|
||||
log.info("SemanticConverter accept [{}]", headlessConverter.getClass().getName());
|
||||
headlessConverter.converter(catalog, queryStructReq, queryStatement.getParseSqlReq(),
|
||||
queryStatement.getMetricReq());
|
||||
headlessConverter.converter(catalog, queryStatement);
|
||||
}
|
||||
}
|
||||
log.info("SemanticConverter after {} {} {}", queryStructReq, queryStatement.getParseSqlReq(),
|
||||
|
||||
@@ -3,15 +3,17 @@ package com.tencent.supersonic.headless.query.parser.calcite;
|
||||
import com.tencent.supersonic.headless.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.model.domain.Catalog;
|
||||
import com.tencent.supersonic.headless.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.query.parser.SqlParser;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.planner.AggPlanner;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.HeadlessModel;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.HeadlessSchema;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.RuntimeOptions;
|
||||
import com.tencent.supersonic.headless.query.persistence.pojo.QueryStatement;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component("CalciteSqlParser")
|
||||
public class CalciteSqlParser implements SqlParser {
|
||||
|
||||
private final HeadlessSchemaManager headlessSchemaManager;
|
||||
|
||||
public CalciteSqlParser(
|
||||
@@ -28,7 +30,7 @@ public class CalciteSqlParser implements SqlParser {
|
||||
return queryStatement;
|
||||
}
|
||||
queryStatement.setMetricReq(metricReq);
|
||||
HeadlessSchema headlessSchema = getSemanticSchema(headlessModel);
|
||||
HeadlessSchema headlessSchema = getSemanticSchema(headlessModel, queryStatement);
|
||||
AggPlanner aggBuilder = new AggPlanner(headlessSchema);
|
||||
aggBuilder.explain(queryStatement, isAgg);
|
||||
queryStatement.setSql(aggBuilder.getSql());
|
||||
@@ -36,12 +38,14 @@ public class CalciteSqlParser implements SqlParser {
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
private HeadlessSchema getSemanticSchema(HeadlessModel headlessModel) {
|
||||
private HeadlessSchema getSemanticSchema(HeadlessModel headlessModel, QueryStatement queryStatement) {
|
||||
HeadlessSchema headlessSchema = HeadlessSchema.newBuilder(headlessModel.getRootPath()).build();
|
||||
headlessSchema.setDatasource(headlessModel.getDatasourceMap());
|
||||
headlessSchema.setDimension(headlessModel.getDimensionMap());
|
||||
headlessSchema.setMetric(headlessModel.getMetrics());
|
||||
headlessSchema.setJoinRelations(headlessModel.getJoinRelations());
|
||||
headlessSchema.setRuntimeOptions(RuntimeOptions.builder().minMaxTime(queryStatement.getMinMaxTime())
|
||||
.enableOptimize(queryStatement.getEnableOptimize()).build());
|
||||
return headlessSchema;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.tencent.supersonic.headless.query.parser.calcite;
|
||||
|
||||
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.SemanticSqlDialect;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.ViewExpanderImpl;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@@ -13,11 +14,15 @@ import org.apache.calcite.config.CalciteConnectionConfigImpl;
|
||||
import org.apache.calcite.config.CalciteConnectionProperty;
|
||||
import org.apache.calcite.config.Lex;
|
||||
import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.plan.RelOptCluster;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.prepare.CalciteCatalogReader;
|
||||
import org.apache.calcite.prepare.Prepare;
|
||||
import org.apache.calcite.prepare.Prepare.CatalogReader;
|
||||
import org.apache.calcite.rel.hint.HintStrategyTable;
|
||||
import org.apache.calcite.rel.type.RelDataTypeFactory;
|
||||
import org.apache.calcite.rel.type.RelDataTypeSystem;
|
||||
import org.apache.calcite.rex.RexBuilder;
|
||||
import org.apache.calcite.sql.SqlOperatorTable;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
@@ -26,8 +31,11 @@ import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
|
||||
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
|
||||
import org.apache.calcite.sql.validate.SqlConformanceEnum;
|
||||
import org.apache.calcite.sql.validate.SqlValidator;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorUtil;
|
||||
import org.apache.calcite.sql2rel.SqlToRelConverter;
|
||||
import org.apache.calcite.tools.FrameworkConfig;
|
||||
import org.apache.calcite.tools.Frameworks;
|
||||
|
||||
public class Configuration {
|
||||
|
||||
@@ -36,15 +44,15 @@ public class Configuration {
|
||||
public static SqlOperatorTable operatorTable = SqlStdOperatorTable.instance();
|
||||
public static CalciteConnectionConfig config = new CalciteConnectionConfigImpl(configProperties);
|
||||
public static SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
|
||||
.withLenientOperatorLookup(config.lenientOperatorLookup())
|
||||
.withSqlConformance(SemanticSqlDialect.DEFAULT.getConformance())
|
||||
.withConformance(SemanticSqlDialect.DEFAULT.getConformance())
|
||||
.withDefaultNullCollation(config.defaultNullCollation())
|
||||
.withIdentifierExpansion(true);
|
||||
.withLenientOperatorLookup(true);
|
||||
|
||||
static {
|
||||
configProperties.put(CalciteConnectionProperty.CASE_SENSITIVE.camelName(), Boolean.TRUE.toString());
|
||||
configProperties.put(CalciteConnectionProperty.UNQUOTED_CASING.camelName(), Casing.UNCHANGED.toString());
|
||||
configProperties.put(CalciteConnectionProperty.QUOTED_CASING.camelName(), Casing.TO_LOWER.toString());
|
||||
|
||||
}
|
||||
|
||||
public static SqlParser.Config getParserConfig() {
|
||||
@@ -71,11 +79,6 @@ public class Configuration {
|
||||
tables.add(SqlStdOperatorTable.instance());
|
||||
SqlOperatorTable operatorTable = new ChainedSqlOperatorTable(tables);
|
||||
//operatorTable.
|
||||
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
|
||||
.withLenientOperatorLookup(config.lenientOperatorLookup())
|
||||
.withConformance(SemanticSqlDialect.DEFAULT.getConformance())
|
||||
.withDefaultNullCollation(config.defaultNullCollation())
|
||||
.withIdentifierExpansion(true);
|
||||
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
|
||||
rootSchema,
|
||||
Collections.singletonList(rootSchema.getName()),
|
||||
@@ -95,4 +98,18 @@ public class Configuration {
|
||||
.withExpand(true);
|
||||
}
|
||||
|
||||
public static SqlToRelConverter getSqlToRelConverter(SqlValidatorScope scope, SqlValidator sqlValidator,
|
||||
RelOptPlanner relOptPlanner) {
|
||||
RexBuilder rexBuilder = new RexBuilder(typeFactory);
|
||||
RelOptCluster cluster = RelOptCluster.create(relOptPlanner, rexBuilder);
|
||||
FrameworkConfig fromworkConfig = Frameworks.newConfigBuilder()
|
||||
.parserConfig(getParserConfig())
|
||||
.defaultSchema(scope.getValidator().getCatalogReader().getRootSchema().plus())
|
||||
.build();
|
||||
return new SqlToRelConverter(new ViewExpanderImpl(),
|
||||
sqlValidator,
|
||||
(CatalogReader) scope.getValidator().getCatalogReader(), cluster, fromworkConfig.getConvertletTable(),
|
||||
getConverterConfig());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -3,28 +3,39 @@ package com.tencent.supersonic.headless.query.parser.calcite.planner;
|
||||
|
||||
import com.tencent.supersonic.headless.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.Configuration;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.SchemaBuilder;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.HeadlessSchema;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.SchemaBuilder;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.SemanticSqlDialect;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.TableView;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.DataSourceNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.SemanticNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.optimizer.FilterToGroupScanRule;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.render.FilterRender;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.render.OutputRender;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.render.SourceRender;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
|
||||
import com.tencent.supersonic.headless.query.persistence.pojo.QueryStatement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Objects;
|
||||
import java.util.Stack;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.plan.hep.HepPlanner;
|
||||
import org.apache.calcite.plan.hep.HepProgramBuilder;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.validate.SqlValidator;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.apache.calcite.sql2rel.SqlToRelConverter;
|
||||
|
||||
@Slf4j
|
||||
public class AggPlanner implements Planner {
|
||||
|
||||
private MetricReq metricReq;
|
||||
@@ -107,6 +118,34 @@ public class AggPlanner implements Planner {
|
||||
// build a parse Node
|
||||
parse();
|
||||
// optimizer
|
||||
optimize();
|
||||
}
|
||||
|
||||
public void optimize() {
|
||||
if (Objects.isNull(schema.getRuntimeOptions()) || Objects.isNull(schema.getRuntimeOptions().getEnableOptimize())
|
||||
|| !schema.getRuntimeOptions().getEnableOptimize()) {
|
||||
return;
|
||||
}
|
||||
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
|
||||
hepProgramBuilder.addRuleInstance(new FilterToGroupScanRule(FilterToGroupScanRule.DEFAULT, schema));
|
||||
RelOptPlanner relOptPlanner = new HepPlanner(hepProgramBuilder.build());
|
||||
RelToSqlConverter converter = new RelToSqlConverter(SemanticSqlDialect.DEFAULT);
|
||||
SqlValidator sqlValidator = Configuration.getSqlValidator(
|
||||
scope.getValidator().getCatalogReader().getRootSchema());
|
||||
try {
|
||||
log.info("before optimize {}", SemanticNode.getSql(parserNode));
|
||||
SqlToRelConverter sqlToRelConverter = Configuration.getSqlToRelConverter(scope, sqlValidator,
|
||||
relOptPlanner);
|
||||
RelNode sqlRel = sqlToRelConverter.convertQuery(
|
||||
sqlValidator.validate(parserNode), false, true).rel;
|
||||
log.debug("RelNode optimize {}", SemanticNode.getSql(converter.visitRoot(sqlRel).asStatement()));
|
||||
relOptPlanner.setRoot(sqlRel);
|
||||
RelNode relNode = relOptPlanner.findBestExp();
|
||||
parserNode = converter.visitRoot(relNode).asStatement();
|
||||
log.debug("after optimize {}", SemanticNode.getSql(parserNode));
|
||||
} catch (Exception e) {
|
||||
log.error("optimize error {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -14,6 +14,8 @@ public class Constants {
|
||||
public static final String DIMENSION_ARRAY_SINGLE_SUFFIX = "_sgl";
|
||||
public static final String MATERIALIZATION_ZIPPER_START = "start_";
|
||||
public static final String MATERIALIZATION_ZIPPER_END = "end_";
|
||||
|
||||
public static final String SQL_PARSER_TABLE = "parsed_tb";
|
||||
public static final String SQL_PARSER_DB = "parsed_db";
|
||||
public static final String SQL_PARSER_FIELD = "parsed_field";
|
||||
|
||||
}
|
||||
|
||||
@@ -25,6 +25,8 @@ public class HeadlessSchema extends AbstractSchema {
|
||||
|
||||
private List<JoinRelation> joinRelations;
|
||||
|
||||
private RuntimeOptions runtimeOptions;
|
||||
|
||||
|
||||
private HeadlessSchema(String rootPath, Map<String, Table> tableMap) {
|
||||
this.rootPath = rootPath;
|
||||
@@ -97,6 +99,13 @@ public class HeadlessSchema extends AbstractSchema {
|
||||
return joinRelations;
|
||||
}
|
||||
|
||||
public void setRuntimeOptions(RuntimeOptions runtimeOptions) {
|
||||
this.runtimeOptions = runtimeOptions;
|
||||
}
|
||||
|
||||
public RuntimeOptions getRuntimeOptions() {
|
||||
return runtimeOptions;
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
|
||||
|
||||
@@ -152,7 +152,7 @@ public class HeadlessSqlConformance implements SqlConformance {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean allowCoercionStringToArray() {
|
||||
public boolean allowLenientCoercion() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.tencent.supersonic.headless.query.parser.calcite.schema;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class RuntimeOptions {
|
||||
private Triple<String, String, String> minMaxTime;
|
||||
private Boolean enableOptimize;
|
||||
|
||||
}
|
||||
@@ -6,6 +6,8 @@ import com.tencent.supersonic.headless.query.parser.calcite.sql.S2SQLSqlValidato
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.prepare.CalciteCatalogReader;
|
||||
import org.apache.calcite.prepare.Prepare;
|
||||
@@ -56,4 +58,33 @@ public class SchemaBuilder {
|
||||
schema.add(MATERIALIZATION_SYS_VIEW, viewTable);
|
||||
return rootSchema;
|
||||
}
|
||||
|
||||
public static void addSourceView(CalciteSchema viewSchema, String dbSrc, String tbSrc, Set<String> dates,
|
||||
Set<String> dimensions, Set<String> metrics) {
|
||||
String tb = tbSrc.toLowerCase();
|
||||
String db = dbSrc.toLowerCase();
|
||||
DataSourceTable.Builder builder = DataSourceTable.newBuilder(tb);
|
||||
for (String date : dates) {
|
||||
builder.addField(date.toLowerCase(), SqlTypeName.VARCHAR);
|
||||
}
|
||||
for (String dim : dimensions) {
|
||||
builder.addField(dim.toLowerCase(), SqlTypeName.VARCHAR);
|
||||
}
|
||||
for (String metric : metrics) {
|
||||
builder.addField(metric.toLowerCase(), SqlTypeName.BIGINT);
|
||||
}
|
||||
DataSourceTable srcTable = builder
|
||||
.withRowCount(1)
|
||||
.build();
|
||||
if (Objects.nonNull(db) && !db.isEmpty()) {
|
||||
SchemaPlus schemaPlus = viewSchema.plus().getSubSchema(db);
|
||||
if (Objects.isNull(schemaPlus)) {
|
||||
viewSchema.plus().add(db, new AbstractSchema());
|
||||
schemaPlus = viewSchema.plus().getSubSchema(db);
|
||||
}
|
||||
schemaPlus.add(tb, srcTable);
|
||||
} else {
|
||||
viewSchema.add(tb, srcTable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,12 +34,16 @@ public class SemanticSqlDialect extends SqlDialect {
|
||||
writer.newlineAndIndent();
|
||||
fetchFrame = writer.startList(SqlWriter.FrameTypeEnum.OFFSET);
|
||||
writer.keyword("LIMIT");
|
||||
boolean hasOffset = false;
|
||||
if (offset != null) {
|
||||
offset.unparse(writer, -1, -1);
|
||||
hasOffset = true;
|
||||
}
|
||||
|
||||
if (fetch != null) {
|
||||
writer.keyword(",");
|
||||
if (hasOffset) {
|
||||
writer.keyword(",");
|
||||
}
|
||||
fetch.unparse(writer, -1, -1);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.tencent.supersonic.headless.query.parser.calcite.schema;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.calcite.plan.RelOptTable;
|
||||
import org.apache.calcite.rel.RelRoot;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
|
||||
public class ViewExpanderImpl implements RelOptTable.ViewExpander {
|
||||
public ViewExpanderImpl() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath,
|
||||
List<String> viewPath) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,10 @@
|
||||
package com.tencent.supersonic.headless.query.parser.calcite.sql;
|
||||
|
||||
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.SemanticNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.HeadlessSchema;
|
||||
|
||||
|
||||
public interface Optimization {
|
||||
|
||||
public void visit(SemanticNode semanticNode);
|
||||
public void visit(HeadlessSchema headlessSchema);
|
||||
}
|
||||
|
||||
@@ -4,23 +4,15 @@ package com.tencent.supersonic.headless.query.parser.calcite.sql.node;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.Configuration;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.extend.LateralViewExplodeNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.JoinRelation;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.HeadlessSchema;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
import org.apache.calcite.sql.SqlDataTypeSpec;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.SqlNodeList;
|
||||
import org.apache.calcite.sql.SqlUserDefinedTypeNameSpec;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.SchemaBuilder;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.extend.LateralViewExplodeNode;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
@@ -32,6 +24,16 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
import org.apache.calcite.sql.SqlDataTypeSpec;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.SqlNodeList;
|
||||
import org.apache.calcite.sql.SqlUserDefinedTypeNameSpec;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Slf4j
|
||||
public class DataSourceNode extends SemanticNode {
|
||||
@@ -46,7 +48,48 @@ public class DataSourceNode extends SemanticNode {
|
||||
if (sqlTable.isEmpty()) {
|
||||
throw new Exception("DatasourceNode build error [tableSqlNode not found]");
|
||||
}
|
||||
return buildAs(datasource.getName(), getTable(sqlTable, scope));
|
||||
SqlNode source = getTable(sqlTable, scope);
|
||||
addSchema(scope, datasource, source);
|
||||
return buildAs(datasource.getName(), source);
|
||||
}
|
||||
|
||||
private static void addSchema(SqlValidatorScope scope, DataSource datasource, SqlNode table) throws Exception {
|
||||
Map<String, String> parseInfo = SemanticNode.getDbTable(table);
|
||||
if (!parseInfo.isEmpty() && parseInfo.containsKey(Constants.SQL_PARSER_TABLE)) {
|
||||
Set<String> dateInfo = new HashSet<>();
|
||||
Set<String> dimensions = new HashSet<>();
|
||||
Set<String> metrics = new HashSet<>();
|
||||
String db = parseInfo.containsKey(Constants.SQL_PARSER_DB) ? parseInfo.get(Constants.SQL_PARSER_DB) : "";
|
||||
String tb = parseInfo.get(Constants.SQL_PARSER_TABLE);
|
||||
for (Dimension d : datasource.getDimensions()) {
|
||||
List<SqlNode> identifiers = expand(SemanticNode.parse(d.getExpr(), scope), scope);
|
||||
identifiers.stream().forEach(i -> dimensions.add(i.toString()));
|
||||
dimensions.add(d.getName());
|
||||
}
|
||||
if (parseInfo.containsKey(Constants.SQL_PARSER_FIELD)) {
|
||||
for (String field : parseInfo.get(Constants.SQL_PARSER_FIELD).split(",")) {
|
||||
dimensions.add(field);
|
||||
}
|
||||
}
|
||||
for (Identify i : datasource.getIdentifiers()) {
|
||||
dimensions.add(i.getName());
|
||||
}
|
||||
for (Measure m : datasource.getMeasures()) {
|
||||
List<SqlNode> identifiers = expand(SemanticNode.parse(m.getExpr(), scope), scope);
|
||||
identifiers.stream().forEach(i -> {
|
||||
if (!dimensions.contains(i.toString())) {
|
||||
metrics.add(i.toString());
|
||||
}
|
||||
}
|
||||
);
|
||||
if (!dimensions.contains(m.getName())) {
|
||||
metrics.add(m.getName());
|
||||
}
|
||||
}
|
||||
SchemaBuilder.addSourceView(scope.getValidator().getCatalogReader().getRootSchema(), db,
|
||||
tb, dateInfo,
|
||||
dimensions, metrics);
|
||||
}
|
||||
}
|
||||
|
||||
public static SqlNode buildExtend(DataSource datasource, Set<String> exprList,
|
||||
|
||||
@@ -2,19 +2,35 @@ package com.tencent.supersonic.headless.query.parser.calcite.sql.node;
|
||||
|
||||
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.Configuration;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.SemanticSqlDialect;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.Optimization;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.sql.JoinType;
|
||||
import org.apache.calcite.sql.SqlAsOperator;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
import org.apache.calcite.sql.SqlBinaryOperator;
|
||||
import org.apache.calcite.sql.SqlCall;
|
||||
import org.apache.calcite.sql.SqlIdentifier;
|
||||
import org.apache.calcite.sql.SqlJoin;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlLiteral;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.SqlNodeList;
|
||||
import org.apache.calcite.sql.SqlOperator;
|
||||
import org.apache.calcite.sql.SqlSelect;
|
||||
import org.apache.calcite.sql.SqlWith;
|
||||
import org.apache.calcite.sql.SqlWriterConfig;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.sql.parser.SqlParseException;
|
||||
@@ -26,17 +42,25 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.apache.calcite.sql2rel.SqlToRelConverter;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class SemanticNode {
|
||||
|
||||
public static Set<SqlKind> AGGREGATION_KIND = new HashSet<>();
|
||||
public static Set<String> AGGREGATION_FUNC = new HashSet<>();
|
||||
|
||||
static {
|
||||
AGGREGATION_KIND.add(SqlKind.AVG);
|
||||
AGGREGATION_KIND.add(SqlKind.COUNT);
|
||||
AGGREGATION_KIND.add(SqlKind.SUM);
|
||||
AGGREGATION_KIND.add(SqlKind.MAX);
|
||||
AGGREGATION_KIND.add(SqlKind.MIN);
|
||||
AGGREGATION_KIND.add(SqlKind.OTHER_FUNCTION); // more
|
||||
AGGREGATION_FUNC.add("sum");
|
||||
AGGREGATION_FUNC.add("count");
|
||||
AGGREGATION_FUNC.add("max");
|
||||
AGGREGATION_FUNC.add("avg");
|
||||
AGGREGATION_FUNC.add("min");
|
||||
}
|
||||
|
||||
public static SqlNode parse(String expression, SqlValidatorScope scope) throws Exception {
|
||||
SqlParser sqlParser = SqlParser.create(expression, Configuration.getParserConfig());
|
||||
SqlNode sqlNode = sqlParser.parseExpression();
|
||||
@@ -129,6 +153,150 @@ public abstract class SemanticNode {
|
||||
return sqlNode;
|
||||
}
|
||||
|
||||
private static void sqlVisit(SqlNode sqlNode, Map<String, String> parseInfo) {
|
||||
SqlKind kind = sqlNode.getKind();
|
||||
switch (kind) {
|
||||
case SELECT:
|
||||
queryVisit(sqlNode, parseInfo);
|
||||
break;
|
||||
case AS:
|
||||
SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
|
||||
sqlVisit(sqlBasicCall.getOperandList().get(0), parseInfo);
|
||||
break;
|
||||
case JOIN:
|
||||
SqlJoin sqlJoin = (SqlJoin) sqlNode;
|
||||
sqlVisit(sqlJoin.getLeft(), parseInfo);
|
||||
sqlVisit(sqlJoin.getRight(), parseInfo);
|
||||
break;
|
||||
case UNION:
|
||||
((SqlBasicCall) sqlNode).getOperandList().forEach(node -> {
|
||||
sqlVisit(node, parseInfo);
|
||||
});
|
||||
break;
|
||||
case WITH:
|
||||
SqlWith sqlWith = (SqlWith) sqlNode;
|
||||
sqlVisit(sqlWith.body, parseInfo);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private static void queryVisit(SqlNode select, Map<String, String> parseInfo) {
|
||||
if (select == null) {
|
||||
return;
|
||||
}
|
||||
SqlSelect sqlSelect = (SqlSelect) select;
|
||||
SqlNodeList selectList = sqlSelect.getSelectList();
|
||||
selectList.getList().forEach(list -> {
|
||||
fieldVisit(list, parseInfo, "");
|
||||
});
|
||||
fromVisit(sqlSelect.getFrom(), parseInfo);
|
||||
}
|
||||
|
||||
private static void fieldVisit(SqlNode field, Map<String, String> parseInfo, String func) {
|
||||
if (field == null) {
|
||||
return;
|
||||
}
|
||||
SqlKind kind = field.getKind();
|
||||
//System.out.println(kind);
|
||||
// aggfunction
|
||||
if (AGGREGATION_KIND.contains(kind)) {
|
||||
SqlOperator sqlCall = ((SqlCall) field).getOperator();
|
||||
if (AGGREGATION_FUNC.contains(sqlCall.toString().toLowerCase())) {
|
||||
List<SqlNode> operandList = ((SqlBasicCall) field).getOperandList();
|
||||
for (int i = 0; i < operandList.size(); i++) {
|
||||
fieldVisit(operandList.get(i), parseInfo, sqlCall.toString().toUpperCase());
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (kind.equals(SqlKind.IDENTIFIER)) {
|
||||
addTagField(field.toString(), parseInfo, func);
|
||||
return;
|
||||
}
|
||||
if (kind.equals(SqlKind.AS)) {
|
||||
List<SqlNode> operandList1 = ((SqlBasicCall) field).getOperandList();
|
||||
SqlNode left = operandList1.get(0);
|
||||
fieldVisit(left, parseInfo, "");
|
||||
return;
|
||||
}
|
||||
if (field instanceof SqlBasicCall) {
|
||||
List<SqlNode> operandList = ((SqlBasicCall) field).getOperandList();
|
||||
for (int i = 0; i < operandList.size(); i++) {
|
||||
fieldVisit(operandList.get(i), parseInfo, "");
|
||||
}
|
||||
}
|
||||
if (field instanceof SqlNodeList) {
|
||||
((SqlNodeList) field).getList().forEach(node -> {
|
||||
fieldVisit(node, parseInfo, "");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static void addTagField(String exp, Map<String, String> parseInfo, String func) {
|
||||
Set<String> fields = new HashSet<>();
|
||||
for (String f : exp.split("[^\\w]+")) {
|
||||
if (Pattern.matches("(?i)[a-z\\d_]+", f)) {
|
||||
fields.add(f);
|
||||
}
|
||||
}
|
||||
if (!fields.isEmpty()) {
|
||||
parseInfo.put(Constants.SQL_PARSER_FIELD, fields.stream().collect(Collectors.joining(",")));
|
||||
}
|
||||
}
|
||||
|
||||
private static void fromVisit(SqlNode from, Map<String, String> parseInfo) {
|
||||
SqlKind kind = from.getKind();
|
||||
switch (kind) {
|
||||
case IDENTIFIER:
|
||||
SqlIdentifier sqlIdentifier = (SqlIdentifier) from;
|
||||
addTableName(sqlIdentifier.toString(), parseInfo);
|
||||
break;
|
||||
case AS:
|
||||
SqlBasicCall sqlBasicCall = (SqlBasicCall) from;
|
||||
SqlNode selectNode1 = sqlBasicCall.getOperandList().get(0);
|
||||
if (!SqlKind.UNION.equals(selectNode1.getKind())) {
|
||||
if (!SqlKind.SELECT.equals(selectNode1.getKind())) {
|
||||
addTableName(selectNode1.toString(), parseInfo);
|
||||
}
|
||||
}
|
||||
sqlVisit(selectNode1, parseInfo);
|
||||
break;
|
||||
case JOIN:
|
||||
SqlJoin sqlJoin = (SqlJoin) from;
|
||||
sqlVisit(sqlJoin.getLeft(), parseInfo);
|
||||
sqlVisit(sqlJoin.getRight(), parseInfo);
|
||||
break;
|
||||
case SELECT:
|
||||
sqlVisit(from, parseInfo);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private static void addTableName(String exp, Map<String, String> parseInfo) {
|
||||
if (exp.indexOf(" ") > 0) {
|
||||
return;
|
||||
}
|
||||
if (exp.indexOf("_") > 0) {
|
||||
if (exp.split("_").length > 1) {
|
||||
String[] dbTb = exp.split("\\.");
|
||||
if (Objects.nonNull(dbTb) && dbTb.length > 0) {
|
||||
parseInfo.put(Constants.SQL_PARSER_TABLE, dbTb.length > 1 ? dbTb[1] : dbTb[0]);
|
||||
parseInfo.put(Constants.SQL_PARSER_DB, dbTb.length > 1 ? dbTb[0] : "");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, String> getDbTable(SqlNode sqlNode) {
|
||||
Map<String, String> parseInfo = new HashMap<>();
|
||||
sqlVisit(sqlNode, parseInfo);
|
||||
return parseInfo;
|
||||
}
|
||||
|
||||
public static RelNode getRelNode(CalciteSchema rootSchema, SqlToRelConverter sqlToRelConverter, String sql)
|
||||
throws SqlParseException {
|
||||
SqlValidator sqlValidator = Configuration.getSqlValidator(rootSchema);
|
||||
@@ -176,8 +344,4 @@ public abstract class SemanticNode {
|
||||
return SqlLiteral.createSymbol(JoinType.INNER, SqlParserPos.ZERO);
|
||||
}
|
||||
|
||||
public void accept(Optimization optimization) {
|
||||
optimization.visit(this);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
package com.tencent.supersonic.headless.query.parser.calcite.sql.optimizer;
|
||||
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.HeadlessSchema;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import org.apache.calcite.plan.RelOptRuleCall;
|
||||
import org.apache.calcite.plan.RelRule;
|
||||
import org.apache.calcite.rel.core.Aggregate;
|
||||
import org.apache.calcite.rel.core.AggregateCall;
|
||||
import org.apache.calcite.rel.core.Filter;
|
||||
import org.apache.calcite.rel.core.Project;
|
||||
import org.apache.calcite.rel.logical.LogicalAggregate;
|
||||
import org.apache.calcite.rel.logical.LogicalFilter;
|
||||
import org.apache.calcite.rel.logical.LogicalProject;
|
||||
import org.apache.calcite.rel.rules.FilterTableScanRule;
|
||||
import org.apache.calcite.rel.rules.FilterTableScanRule.Config;
|
||||
import org.apache.calcite.rel.rules.TransformationRule;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.tools.RelBuilder;
|
||||
import org.apache.calcite.util.ImmutableBitSet;
|
||||
import org.apache.calcite.util.Pair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
public class FilterToGroupScanRule extends RelRule<Config>
|
||||
implements TransformationRule {
|
||||
|
||||
public static FilterTableScanRule.Config DEFAULT = FilterTableScanRule.Config.DEFAULT.withOperandSupplier((b0) -> {
|
||||
return b0.operand(LogicalFilter.class).oneInput((b1) -> {
|
||||
return b1.operand(LogicalProject.class).oneInput((b2) -> {
|
||||
return b2.operand(LogicalAggregate.class).oneInput((b3) -> {
|
||||
return b3.operand(LogicalProject.class).anyInputs();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
}).as(FilterTableScanRule.Config.class);
|
||||
|
||||
private HeadlessSchema headlessSchema;
|
||||
|
||||
public FilterToGroupScanRule(FilterTableScanRule.Config config, HeadlessSchema headlessSchema) {
|
||||
super(config);
|
||||
this.headlessSchema = headlessSchema;
|
||||
}
|
||||
|
||||
public void onMatch(RelOptRuleCall call) {
|
||||
if (call.rels.length != 4) {
|
||||
return;
|
||||
}
|
||||
if (Objects.isNull(headlessSchema.getRuntimeOptions()) || Objects.isNull(
|
||||
headlessSchema.getRuntimeOptions().getMinMaxTime()) || headlessSchema.getRuntimeOptions()
|
||||
.getMinMaxTime().getLeft().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
Triple<String, String, String> minMax = headlessSchema.getRuntimeOptions().getMinMaxTime();
|
||||
Filter filter = (Filter) call.rel(0);
|
||||
Project project0 = (Project) call.rel(1);
|
||||
Project project1 = (Project) call.rel(3);
|
||||
Aggregate logicalAggregate = (Aggregate) call.rel(2);
|
||||
Optional<Pair<RexNode, String>> isIn = project1.getNamedProjects()
|
||||
.stream().filter(i -> i.right.equalsIgnoreCase(minMax.getLeft())).findFirst();
|
||||
if (!isIn.isPresent()) {
|
||||
return;
|
||||
}
|
||||
|
||||
RelBuilder relBuilder = call.builder();
|
||||
relBuilder.push(project1);
|
||||
RexNode addPartitionCondition = getRexNodeByTimeRange(relBuilder, minMax.getLeft(), minMax.getMiddle(),
|
||||
minMax.getRight());
|
||||
relBuilder.filter(new RexNode[]{addPartitionCondition});
|
||||
relBuilder.project(project1.getProjects());
|
||||
ImmutableBitSet newGroupSet = logicalAggregate.getGroupSet();
|
||||
int newGroupCount = newGroupSet.cardinality();
|
||||
int groupCount = logicalAggregate.getGroupCount();
|
||||
List<AggregateCall> newAggCalls = new ArrayList();
|
||||
Iterator var = logicalAggregate.getAggCallList().iterator();
|
||||
while (var.hasNext()) {
|
||||
AggregateCall aggCall = (AggregateCall) var.next();
|
||||
newAggCalls.add(
|
||||
aggCall.adaptTo(project1, aggCall.getArgList(), aggCall.filterArg, groupCount, newGroupCount));
|
||||
}
|
||||
relBuilder.aggregate(relBuilder.groupKey(newGroupSet), newAggCalls);
|
||||
relBuilder.project(project0.getProjects());
|
||||
relBuilder.filter(new RexNode[]{filter.getCondition()});
|
||||
call.transformTo(relBuilder.build());
|
||||
}
|
||||
|
||||
private RexNode getRexNodeByTimeRange(RelBuilder relBuilder, String dateField, String start, String end) {
|
||||
return relBuilder.call(SqlStdOperatorTable.AND,
|
||||
relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, relBuilder.field(dateField),
|
||||
relBuilder.literal(start)),
|
||||
relBuilder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, relBuilder.field(dateField),
|
||||
relBuilder.literal(end)));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,22 +2,22 @@ package com.tencent.supersonic.headless.query.parser.calcite.sql.render;
|
||||
|
||||
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Materialization.TimePartType;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.HeadlessSchema;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.TableView;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.DataSourceNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.DimensionNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.FilterNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.IdentifyNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.MetricNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.node.SemanticNode;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.schema.HeadlessSchema;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.query.parser.calcite.sql.TableView;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
||||
@@ -9,7 +9,6 @@ import com.tencent.supersonic.common.util.DateModeUtils;
|
||||
import com.tencent.supersonic.headless.api.model.response.DatabaseResp;
|
||||
import com.tencent.supersonic.headless.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.headless.api.query.pojo.MetricTable;
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.model.domain.Catalog;
|
||||
@@ -122,8 +121,9 @@ public class CalculateAggConverter implements HeadlessConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void converter(Catalog catalog, QueryStructReq queryStructCmd, ParseSqlReq sqlCommend,
|
||||
MetricReq metricCommand) throws Exception {
|
||||
public void converter(Catalog catalog, QueryStatement queryStatement) throws Exception {
|
||||
QueryStructReq queryStructCmd = queryStatement.getQueryStructReq();
|
||||
ParseSqlReq sqlCommend = queryStatement.getParseSqlReq();
|
||||
DatabaseResp databaseResp = catalog.getDatabaseByModelId(queryStructCmd.getModelIds().get(0));
|
||||
ParseSqlReq parseSqlReq = generateSqlCommend(queryStructCmd,
|
||||
EngineTypeEnum.valueOf(databaseResp.getType().toUpperCase()), databaseResp.getVersion());
|
||||
|
||||
@@ -3,8 +3,6 @@ package com.tencent.supersonic.headless.query.parser.convert;
|
||||
import com.tencent.supersonic.common.pojo.Filter;
|
||||
import com.tencent.supersonic.common.pojo.enums.FilterOperatorEnum;
|
||||
import com.tencent.supersonic.headless.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.model.domain.Catalog;
|
||||
import com.tencent.supersonic.headless.query.parser.HeadlessConverter;
|
||||
@@ -29,8 +27,8 @@ public class DefaultDimValueConverter implements HeadlessConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void converter(Catalog catalog, QueryStructReq queryStructCmd,
|
||||
ParseSqlReq sqlCommend, MetricReq metricCommand) throws Exception {
|
||||
public void converter(Catalog catalog, QueryStatement queryStatement) throws Exception {
|
||||
QueryStructReq queryStructCmd = queryStatement.getQueryStructReq();
|
||||
List<DimensionResp> dimensionResps = catalog.getDimensions(queryStructCmd.getModelIds());
|
||||
//dimension which has default values
|
||||
dimensionResps = dimensionResps.stream()
|
||||
|
||||
@@ -4,8 +4,6 @@ import com.tencent.supersonic.common.pojo.Filter;
|
||||
import com.tencent.supersonic.common.pojo.exception.InvalidArgumentException;
|
||||
import com.tencent.supersonic.headless.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.headless.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.model.domain.Catalog;
|
||||
import com.tencent.supersonic.headless.query.parser.HeadlessConverter;
|
||||
@@ -37,8 +35,8 @@ public class MetricCheckConverter implements HeadlessConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void converter(Catalog catalog, QueryStructReq queryStructReq, ParseSqlReq sqlCommend,
|
||||
MetricReq metricCommand) throws Exception {
|
||||
public void converter(Catalog catalog, QueryStatement queryStatement) throws Exception {
|
||||
QueryStructReq queryStructReq = queryStatement.getQueryStructReq();
|
||||
List<MetricResp> metricResps = catalog.getMetrics(queryStructReq.getModelIds());
|
||||
List<DimensionResp> dimensionResps = catalog.getDimensions(queryStructReq.getModelIds());
|
||||
Map<Long, DimensionResp> dimensionMap = dimensionResps.stream()
|
||||
|
||||
@@ -3,7 +3,6 @@ package com.tencent.supersonic.headless.query.parser.convert;
|
||||
import com.tencent.supersonic.common.pojo.ColumnOrder;
|
||||
import com.tencent.supersonic.headless.api.query.pojo.Param;
|
||||
import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.model.domain.Catalog;
|
||||
import com.tencent.supersonic.headless.query.parser.HeadlessConverter;
|
||||
@@ -40,10 +39,11 @@ public class ParserDefaultConverter implements HeadlessConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void converter(Catalog catalog, QueryStructReq queryStructCmd, ParseSqlReq sqlCommend,
|
||||
MetricReq metricCommand)
|
||||
throws Exception {
|
||||
public void converter(Catalog catalog, QueryStatement queryStatement) throws Exception {
|
||||
QueryStructReq queryStructCmd = queryStatement.getQueryStructReq();
|
||||
MetricReq metricCommand = queryStatement.getMetricReq();
|
||||
MetricReq metricReq = generateSqlCommand(catalog, queryStructCmd);
|
||||
queryStatement.setMinMaxTime(queryStructUtils.getBeginEndTime(queryStructCmd));
|
||||
BeanUtils.copyProperties(metricReq, metricCommand);
|
||||
}
|
||||
|
||||
|
||||
@@ -121,6 +121,7 @@ public class QueryReqConverter {
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setParseSqlReq(result);
|
||||
queryStatement.setIsS2SQL(true);
|
||||
queryStatement.setMinMaxTime(queryStructUtils.getBeginEndTime(queryStructCmd));
|
||||
queryStatement = parserService.plan(queryStatement);
|
||||
queryStatement.setSql(String.format(SqlExecuteReq.LIMIT_WRAPPER, queryStatement.getSql()));
|
||||
return queryStatement;
|
||||
|
||||
@@ -6,8 +6,8 @@ import com.tencent.supersonic.headless.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.model.domain.Catalog;
|
||||
import com.tencent.supersonic.headless.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.query.parser.HeadlessConverter;
|
||||
import com.tencent.supersonic.headless.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.query.utils.QueryStructUtils;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
@@ -48,8 +48,10 @@ public class ZipperModelConverter implements HeadlessConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void converter(Catalog catalog, QueryStructReq queryStructCmd, ParseSqlReq sqlCommend,
|
||||
MetricReq metricCommand) throws Exception {
|
||||
public void converter(Catalog catalog, QueryStatement queryStatement) throws Exception {
|
||||
QueryStructReq queryStructCmd = queryStatement.getQueryStructReq();
|
||||
ParseSqlReq sqlCommend = queryStatement.getParseSqlReq();
|
||||
MetricReq metricCommand = queryStatement.getMetricReq();
|
||||
doSingleZipperSource(queryStructCmd, sqlCommend, metricCommand);
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import lombok.Data;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
|
||||
@Data
|
||||
public class QueryStatement {
|
||||
@@ -22,6 +23,8 @@ public class QueryStatement {
|
||||
private Integer status = 0;
|
||||
private Boolean isS2SQL = false;
|
||||
private List<ImmutablePair<String, String>> timeRanges;
|
||||
private Boolean enableOptimize = true;
|
||||
private Triple<String, String, String> minMaxTime;
|
||||
|
||||
public boolean isOk() {
|
||||
this.ok = "".equals(errMsg) && !"".equals(sql);
|
||||
|
||||
@@ -44,6 +44,7 @@ public class HeadlessQueryEngineImpl implements HeadlessQueryEngine {
|
||||
}
|
||||
|
||||
public QueryStatement plan(QueryStatement queryStatement) throws Exception {
|
||||
queryStatement.setEnableOptimize(queryUtils.enableOptimize());
|
||||
queryStatement = queryParser.logicSql(queryStatement);
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
queryStatement.setModelIds(queryStatement.getQueryStructReq().getModelIds());
|
||||
|
||||
@@ -49,6 +49,7 @@ import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@@ -320,35 +321,44 @@ public class QueryStructUtils {
|
||||
return "";
|
||||
}
|
||||
|
||||
public ImmutablePair<String, String> getBeginEndTime(QueryStructReq queryStructCmd) {
|
||||
public Triple<String, String, String> getBeginEndTime(QueryStructReq queryStructCmd) {
|
||||
if (Objects.isNull(queryStructCmd.getDateInfo())) {
|
||||
return Triple.of("", "", "");
|
||||
}
|
||||
DateConf dateConf = queryStructCmd.getDateInfo();
|
||||
String dateInfo = dateModeUtils.getSysDateCol(dateConf);
|
||||
if (dateInfo.isEmpty()) {
|
||||
return Triple.of("", "", "");
|
||||
}
|
||||
switch (dateConf.getDateMode()) {
|
||||
case AVAILABLE:
|
||||
case BETWEEN:
|
||||
return ImmutablePair.of(dateConf.getStartDate(), dateConf.getEndDate());
|
||||
return Triple.of(dateInfo, dateConf.getStartDate(), dateConf.getEndDate());
|
||||
case LIST:
|
||||
return ImmutablePair.of(Collections.min(dateConf.getDateList()),
|
||||
return Triple.of(dateInfo, Collections.min(dateConf.getDateList()),
|
||||
Collections.max(dateConf.getDateList()));
|
||||
case RECENT:
|
||||
ItemDateResp dateDate = getItemDateResp(queryStructCmd);
|
||||
LocalDate dateMax = LocalDate.now().minusDays(1);
|
||||
LocalDate dateMin = dateMax.minusDays(dateConf.getUnit() - 1);
|
||||
if (Objects.isNull(dateDate)) {
|
||||
return ImmutablePair.of(dateMin.format(DateTimeFormatter.ofPattern(DAY_FORMAT)),
|
||||
return Triple.of(dateInfo, dateMin.format(DateTimeFormatter.ofPattern(DAY_FORMAT)),
|
||||
dateMax.format(DateTimeFormatter.ofPattern(DAY_FORMAT)));
|
||||
}
|
||||
switch (dateConf.getPeriod()) {
|
||||
case DAY:
|
||||
return dateModeUtils.recentDay(dateDate, dateConf);
|
||||
ImmutablePair<String, String> dayInfo = dateModeUtils.recentDay(dateDate, dateConf);
|
||||
return Triple.of(dateInfo, dayInfo.left, dayInfo.right);
|
||||
case WEEK:
|
||||
return dateModeUtils.recentWeek(dateDate, dateConf);
|
||||
ImmutablePair<String, String> weekInfo = dateModeUtils.recentWeek(dateDate, dateConf);
|
||||
return Triple.of(dateInfo, weekInfo.left, weekInfo.right);
|
||||
case MONTH:
|
||||
List<ImmutablePair<String, String>> rets = dateModeUtils.recentMonth(dateDate, dateConf);
|
||||
Optional<String> minBegins = rets.stream().map(i -> i.left).sorted().findFirst();
|
||||
Optional<String> maxBegins = rets.stream().map(i -> i.right).sorted(Comparator.reverseOrder())
|
||||
.findFirst();
|
||||
if (minBegins.isPresent() && maxBegins.isPresent()) {
|
||||
return ImmutablePair.of(minBegins.get(), maxBegins.get());
|
||||
return Triple.of(dateInfo, minBegins.get(), maxBegins.get());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@@ -359,7 +369,7 @@ public class QueryStructUtils {
|
||||
break;
|
||||
|
||||
}
|
||||
return ImmutablePair.of("", "");
|
||||
return Triple.of("", "", "");
|
||||
}
|
||||
|
||||
public List<ImmutablePair<String, String>> getTimeRanges(QueryStructReq queryStructCmd) {
|
||||
@@ -438,11 +448,11 @@ public class QueryStructUtils {
|
||||
if ("=".equals(f.getOperator())) {
|
||||
dateList.add(f.getFieldValue().toString());
|
||||
} else if ("<".equals(f.getOperator()) || "<=".equals(f.getOperator())) {
|
||||
if (!"".equals(startDate) && startDate.compareTo(f.getFieldValue().toString()) > 0) {
|
||||
if (startDate.isEmpty() || startDate.compareTo(f.getFieldValue().toString()) > 0) {
|
||||
startDate = f.getFieldValue().toString();
|
||||
}
|
||||
} else if (">".equals(f.getOperator()) || ">=".equals(f.getOperator())) {
|
||||
if (!"".equals(endDate) && endDate.compareTo(f.getFieldValue().toString()) < 0) {
|
||||
if (endDate.isEmpty() || endDate.compareTo(f.getFieldValue().toString()) < 0) {
|
||||
endDate = f.getFieldValue().toString();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,10 @@ public class QueryUtils {
|
||||
private final Set<Pattern> patterns = new HashSet<>();
|
||||
@Value("${query.cache.enable:true}")
|
||||
private Boolean cacheEnable;
|
||||
|
||||
@Value("${query.optimizer.enable:true}")
|
||||
private Boolean optimizeEnable;
|
||||
|
||||
private final CacheUtils cacheUtils;
|
||||
private final StatUtils statUtils;
|
||||
|
||||
@@ -241,4 +245,8 @@ public class QueryUtils {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Boolean enableOptimize() {
|
||||
return optimizeEnable;
|
||||
}
|
||||
}
|
||||
|
||||
2
pom.xml
2
pom.xml
@@ -58,7 +58,7 @@
|
||||
<commons.compress.version>1.21</commons.compress.version>
|
||||
<jetty.util.version>6.1.26</jetty.util.version>
|
||||
<spring.version>2.5.1</spring.version>
|
||||
<calcite.version>1.34.0</calcite.version>
|
||||
<calcite.version>1.35.0</calcite.version>
|
||||
<calcite.avatica.version>1.23.0</calcite.avatica.version>
|
||||
<xk.time.version>3.2.4</xk.time.version>
|
||||
<mockito-inline.version>4.5.1</mockito-inline.version>
|
||||
|
||||
Reference in New Issue
Block a user