mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-14 13:47:09 +00:00
(feature)(semantic) add materialization optimizer (#239)
Co-authored-by: jipengli <jipengli@tencent.com>
This commit is contained in:
@@ -19,16 +19,16 @@ public class DetailQuery implements QueryOptimizer {
|
||||
if (Strings.isNullOrEmpty(sqlRaw)) {
|
||||
throw new RuntimeException("sql is empty or null");
|
||||
}
|
||||
log.info("before handleNoMetric, sql:{}", sqlRaw);
|
||||
log.debug("before handleNoMetric, sql:{}", sqlRaw);
|
||||
if (isDetailQuery(queryStructCmd)) {
|
||||
if (queryStructCmd.getMetrics().size() == 0) {
|
||||
if (queryStructCmd.getMetrics().size() == 0 && !CollectionUtils.isEmpty(queryStructCmd.getGroups())) {
|
||||
String sqlForm = "select %s from ( %s ) src_no_metric";
|
||||
String sql = String.format(sqlForm, queryStructCmd.getGroups().stream().collect(
|
||||
Collectors.joining(",")), sqlRaw);
|
||||
queryStatement.setSql(sql);
|
||||
}
|
||||
}
|
||||
log.info("after handleNoMetric, sql:{}", queryStatement.getSql());
|
||||
log.debug("after handleNoMetric, sql:{}", queryStatement.getSql());
|
||||
}
|
||||
|
||||
public boolean isDetailQuery(QueryStructReq queryStructCmd) {
|
||||
|
||||
@@ -0,0 +1,349 @@
|
||||
package com.tencent.supersonic.semantic.query.optimizer;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.enums.TypeEnums;
|
||||
import com.tencent.supersonic.common.util.StringUtil;
|
||||
import com.tencent.supersonic.common.util.calcite.SqlParseUtils;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationRecordResp;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationResp;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.QueryOptMode;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.pojo.MetricTable;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.materialization.domain.MaterializationConfService;
|
||||
import com.tencent.supersonic.semantic.materialization.domain.MaterializationRecordService;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.SemanticSchemaManager;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.DataSource;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Materialization;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Materialization.TimePartType;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.MaterializationElement;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Metric;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.SemanticModel;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.TimeRange;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.planner.AggPlanner;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.planner.MaterializationPlanner;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import com.tencent.supersonic.semantic.query.utils.StatUtils;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Slf4j
|
||||
@Component("MaterializationQuery")
|
||||
public class MaterializationQuery implements QueryOptimizer {
|
||||
|
||||
protected final MaterializationConfService materializationConfService;
|
||||
protected final MaterializationRecordService materializationRecordService;
|
||||
protected final Catalog catalog;
|
||||
protected final QueryStructUtils queryStructUtils;
|
||||
protected final SemanticSchemaManager semanticSchemaManager;
|
||||
protected final StatUtils statUtils;
|
||||
|
||||
|
||||
@Value("${materialization.query.enable:false}")
|
||||
private boolean enabled;
|
||||
|
||||
public MaterializationQuery(
|
||||
MaterializationConfService materializationConfService,
|
||||
MaterializationRecordService materializationRecordService,
|
||||
Catalog catalog, QueryStructUtils queryStructUtils,
|
||||
SemanticSchemaManager semanticSchemaManager,
|
||||
StatUtils statUtils) {
|
||||
this.materializationConfService = materializationConfService;
|
||||
this.materializationRecordService = materializationRecordService;
|
||||
this.catalog = catalog;
|
||||
|
||||
this.queryStructUtils = queryStructUtils;
|
||||
this.semanticSchemaManager = semanticSchemaManager;
|
||||
this.statUtils = statUtils;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rewrite(QueryStructReq queryStructCmd, QueryStatement queryStatement) {
|
||||
if (!enabled) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (Objects.isNull(queryStructCmd) || Objects.isNull(queryStatement) || Objects.isNull(
|
||||
queryStructCmd.getModelId()) || Objects.isNull(
|
||||
queryStructCmd.getDateInfo())) {
|
||||
return;
|
||||
}
|
||||
if (Objects.nonNull(queryStatement.getParseSqlReq())) {
|
||||
rewriteSqlReq(queryStructCmd, queryStatement);
|
||||
return;
|
||||
}
|
||||
List<Materialization> materializationList = getMaterializationSchema(queryStructCmd,
|
||||
queryStatement.getMetricReq());
|
||||
if (!CollectionUtils.isEmpty(materializationList)) {
|
||||
if (replan(materializationList, queryStructCmd, queryStatement)) {
|
||||
statUtils.updateQueryOptMode(QueryOptMode.MATERIALIZATION.name());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("MaterializationQuery error {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void rewriteSqlReq(QueryStructReq queryStructCmd, QueryStatement queryStatement) throws Exception {
|
||||
ParseSqlReq parseSqlReq = queryStatement.getParseSqlReq();
|
||||
String sourceId = queryStatement.getSourceId();
|
||||
String sql = queryStatement.getSql();
|
||||
String parseSql = parseSqlReq.getSql();
|
||||
String msg = queryStatement.getErrMsg();
|
||||
String materializationSourceId = "";
|
||||
String materializationSql = "";
|
||||
String parseSqlReqMaterialization = "";
|
||||
if (!CollectionUtils.isEmpty(parseSqlReq.getTables())) {
|
||||
List<String[]> tables = new ArrayList<>();
|
||||
for (MetricTable metricTable : parseSqlReq.getTables()) {
|
||||
MetricReq metricReq = new MetricReq();
|
||||
metricReq.setMetrics(metricTable.getMetrics());
|
||||
metricReq.setDimensions(metricTable.getDimensions());
|
||||
metricReq.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
|
||||
metricReq.setRootPath(parseSqlReq.getRootPath());
|
||||
List<Materialization> materializationList = getMaterializationSchema(queryStructCmd,
|
||||
metricReq);
|
||||
if (!CollectionUtils.isEmpty(materializationList)) {
|
||||
queryStatement.setMetricReq(metricReq);
|
||||
boolean ok = replan(materializationList, queryStructCmd, queryStatement);
|
||||
if (!ok) {
|
||||
log.info("MaterializationQuery rewriteSqlReq not match {}", metricTable.getAlias());
|
||||
queryStatement.setSql(sql);
|
||||
queryStatement.setSourceId(sourceId);
|
||||
queryStatement.setErrMsg(msg);
|
||||
queryStatement.setMetricReq(null);
|
||||
return;
|
||||
}
|
||||
tables.add(new String[]{metricTable.getAlias(), queryStatement.getSql()});
|
||||
materializationSourceId = queryStatement.getSourceId();
|
||||
parseSqlReqMaterialization = queryStatement.getParseSqlReq().getSql();
|
||||
}
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(tables)) {
|
||||
if (parseSqlReq.isSupportWith()) {
|
||||
materializationSql = "with " + String.join(",",
|
||||
tables.stream().map(t -> String.format("%s as (%s)", t[0], t[1])).collect(
|
||||
Collectors.toList())) + "\n" + parseSqlReqMaterialization;
|
||||
} else {
|
||||
materializationSql = parseSqlReqMaterialization;
|
||||
for (String[] tb : tables) {
|
||||
materializationSql = StringUtils.replace(materializationSql, tb[0],
|
||||
"(" + tb[1] + ") " + (parseSqlReq.isWithAlias() ? "" : tb[0]), -1);
|
||||
}
|
||||
}
|
||||
|
||||
queryStatement.setSql(materializationSql);
|
||||
queryStatement.setSourceId(materializationSourceId);
|
||||
log.info("rewriteSqlReq before[{}] after[{}]", sql, materializationSql);
|
||||
statUtils.updateQueryOptMode(QueryOptMode.MATERIALIZATION.name());
|
||||
}
|
||||
parseSqlReq.setSql(parseSql);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Materialization> getMaterializationSchema(QueryStructReq queryStructReq, MetricReq metricReq)
|
||||
throws Exception {
|
||||
List<Materialization> materializationList = new ArrayList<>();
|
||||
if (Objects.isNull(metricReq)) {
|
||||
return materializationList;
|
||||
}
|
||||
ImmutablePair<String, String> timeRange = queryStructUtils.getBeginEndTime(queryStructReq);
|
||||
String start = timeRange.left;
|
||||
String end = timeRange.right;
|
||||
Long modelId = queryStructReq.getModelId();
|
||||
List<MaterializationResp> materializationResps = materializationConfService.getMaterializationByModel(modelId);
|
||||
List<DimensionResp> dimensionResps = catalog.getDimensions(modelId);
|
||||
List<MetricResp> metrics = catalog.getMetrics(modelId);
|
||||
Set<String> fields = new HashSet<>();
|
||||
|
||||
if (Objects.nonNull(metricReq.getWhere()) && !metricReq.getWhere().isEmpty()) {
|
||||
fields.addAll(SqlParseUtils.getFilterField(metricReq.getWhere()));
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricReq.getMetrics())) {
|
||||
fields.addAll(metricReq.getMetrics());
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricReq.getDimensions())) {
|
||||
fields.addAll(metricReq.getDimensions());
|
||||
}
|
||||
|
||||
materializationResps.forEach(materializationResp -> {
|
||||
Materialization materialization =
|
||||
Materialization.builder().dateInfo(materializationResp.getDateInfo())
|
||||
.materializationId(materializationResp.getId())
|
||||
.level(materializationResp.getLevel())
|
||||
.timePartType(TimePartType.of(materializationResp.getMaterializedType().name()))
|
||||
.dimensions(new ArrayList<>())
|
||||
.metrics(new ArrayList<>())
|
||||
.entities(materializationResp.getEntities())
|
||||
.destinationTable(materializationResp.getDestinationTable()).modelId(modelId)
|
||||
.dataBase(materializationResp.getDatabaseId()).build();
|
||||
List<Long> sameTableMaterialization = materializationConfService.getMaterializationByTable(
|
||||
materializationResp.getDatabaseId(), materializationResp.getDestinationTable());
|
||||
Set<Long> metricIds = materializationResp.getMaterializationElementRespList().stream()
|
||||
.filter(e -> e.getType().equals(
|
||||
TypeEnums.METRIC)).map(e -> e.getId()).collect(Collectors.toSet());
|
||||
Set<Long> dimensionIds = materializationResp.getMaterializationElementRespList().stream()
|
||||
.filter(e -> e.getType().equals(
|
||||
TypeEnums.DIMENSION)).map(e -> e.getId()).collect(Collectors.toSet());
|
||||
|
||||
dimensionResps.stream().filter(d -> dimensionIds.contains(d.getId()))
|
||||
.filter(d -> fields.contains(d.getBizName())).forEach(d -> {
|
||||
List<MaterializationRecordResp> materializationRecordResps = materializationRecordService
|
||||
.fetchMaterializationDate(sameTableMaterialization, d.getBizName(), start, end);
|
||||
if (!CollectionUtils.isEmpty(materializationRecordResps)) {
|
||||
List<TimeRange> timeRangeList = new ArrayList<>();
|
||||
materializationRecordResps.stream().forEach(t -> timeRangeList.add(
|
||||
TimeRange.builder().start(t.getDataTime()).end(t.getDataTime()).build()));
|
||||
materialization.getDimensions().add(
|
||||
MaterializationElement.builder()
|
||||
.name(d.getBizName())
|
||||
.timeRangeList(timeRangeList)
|
||||
.build()
|
||||
);
|
||||
} else if (MaterializedTypeEnum.FULL.equals(materializationResp.getMaterializedType())) {
|
||||
materialization.getDimensions().add(
|
||||
MaterializationElement.builder()
|
||||
.name(d.getBizName())
|
||||
.timeRangeList(
|
||||
Arrays.asList(TimeRange.builder().start(start).end(end).build()))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
});
|
||||
metrics.stream().filter(m -> metricIds.contains(m.getId())).filter(m -> fields.contains(m.getBizName()))
|
||||
.forEach(m -> {
|
||||
List<MaterializationRecordResp> materializationRecordResps = materializationRecordService
|
||||
.fetchMaterializationDate(sameTableMaterialization, m.getBizName(), start, end);
|
||||
if (!CollectionUtils.isEmpty(materializationRecordResps)) {
|
||||
List<TimeRange> timeRangeList = new ArrayList<>();
|
||||
materializationRecordResps.stream().forEach(t -> timeRangeList.add(
|
||||
TimeRange.builder().start(t.getDataTime()).end(t.getDataTime()).build()));
|
||||
materialization.getMetrics().add(MaterializationElement.builder().name(m.getBizName())
|
||||
.timeRangeList(timeRangeList).build());
|
||||
} else if (MaterializedTypeEnum.FULL.equals(materializationResp.getMaterializedType())) {
|
||||
materialization.getMetrics().add(
|
||||
MaterializationElement.builder()
|
||||
.name(m.getBizName())
|
||||
.timeRangeList(
|
||||
Arrays.asList(TimeRange.builder().start(start).end(end).build()))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
});
|
||||
materializationList.add(materialization);
|
||||
});
|
||||
return materializationList;
|
||||
}
|
||||
|
||||
protected boolean replan(List<Materialization> materializationList, QueryStructReq queryStructReq,
|
||||
QueryStatement queryStatement)
|
||||
throws Exception {
|
||||
log.info("{}", materializationList);
|
||||
SemanticSchema schema = SemanticSchema.newBuilder(queryStatement.getMetricReq().getRootPath()).build();
|
||||
schema.setMaterializationList(materializationList);
|
||||
getTimeRanges(queryStructReq, queryStatement);
|
||||
removeDefaultMetric(queryStructReq, queryStatement.getMetricReq());
|
||||
MaterializationPlanner materializationPlanner = new MaterializationPlanner(schema);
|
||||
materializationPlanner.explain(queryStatement, AggOption.getAggregation(queryStructReq.getNativeQuery()));
|
||||
log.info("optimize {}", materializationPlanner.findBest().getDatasource());
|
||||
SemanticSchema semanticSchema = materializationPlanner.findBest();
|
||||
if (!CollectionUtils.isEmpty(semanticSchema.getDatasource())) {
|
||||
semanticSchema.getSemanticModel().setRootPath(semanticSchema.getRootPath());
|
||||
semanticSchema.setSemanticModel(transform(queryStatement, semanticSchema.getSemanticModel()));
|
||||
int materCnt = semanticSchema.getDatasource().size();
|
||||
if (materCnt == semanticSchema.getDatasource().entrySet().stream()
|
||||
.filter(d -> d.getValue().getTimePartType().equals(TimePartType.ZIPPER)).count()) {
|
||||
doSingleZipperSource(queryStructReq, queryStatement);
|
||||
}
|
||||
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
|
||||
aggBuilder.explain(queryStatement, AggOption.getAggregation(queryStructReq.getNativeQuery()));
|
||||
log.debug("optimize before {} sql {}", queryStatement.getSourceId(), queryStatement.getSql());
|
||||
log.debug("optimize after {} sql {}", aggBuilder.getSourceId(), aggBuilder.getSql());
|
||||
queryStatement.setSourceId(aggBuilder.getSourceId());
|
||||
queryStatement.setSql(aggBuilder.getSql());
|
||||
queryStatement.setStatus(queryStatement.getStatus() + 1);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected SemanticModel transform(QueryStatement queryStatement, SemanticModel semanticModel) throws Exception {
|
||||
for (DataSource dataSource : semanticModel.getDatasourceMap().values()) {
|
||||
if (!CollectionUtils.isEmpty(dataSource.getMeasures())) {
|
||||
dataSource.getMeasures().stream().forEach(m -> {
|
||||
m.setExpr(getMetricExpr(semanticModel.getRootPath(), m.getName()));
|
||||
});
|
||||
}
|
||||
}
|
||||
return semanticModel;
|
||||
}
|
||||
|
||||
protected String getMetricExpr(String rootPath, String bizName) {
|
||||
try {
|
||||
SemanticModel oriSemanticModel = semanticSchemaManager.get(rootPath);
|
||||
if (Objects.nonNull(oriSemanticModel)) {
|
||||
Optional<Metric> metric = oriSemanticModel.getMetrics()
|
||||
.stream().filter(m -> m.getName().equalsIgnoreCase(bizName)).findFirst();
|
||||
if (metric.isPresent() && metric.get().getMetricTypeParams().getExpr().contains(getVariablePrefix())) {
|
||||
return metric.get().getMetricTypeParams().getExpr();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("getMetricExpr {}", e);
|
||||
}
|
||||
return bizName;
|
||||
}
|
||||
|
||||
protected void removeDefaultMetric(QueryStructReq queryStructReq, MetricReq metricReq) {
|
||||
// due to default metrics have no materialization
|
||||
if (CollectionUtils.isEmpty(queryStructReq.getAggregators()) && Objects.nonNull(metricReq)) {
|
||||
metricReq.setMetrics(new ArrayList<>());
|
||||
}
|
||||
}
|
||||
|
||||
protected void doSingleZipperSource(QueryStructReq queryStructReq, QueryStatement queryStatement) {
|
||||
// time field rewrite to start_ end_
|
||||
log.info("doSingleZipperSource {}", queryStatement);
|
||||
|
||||
if (CollectionUtils.isEmpty(queryStructReq.getAggregators()) && CollectionUtils.isEmpty(
|
||||
queryStructReq.getGroups()) && Objects.nonNull(queryStatement.getParseSqlReq())) {
|
||||
String sqlNew = queryStructUtils.generateZipperWhere(queryStatement, queryStructReq);
|
||||
log.info("doSingleZipperSource before[{}] after[{}]", queryStatement.getParseSqlReq().getSql(), sqlNew);
|
||||
queryStatement.getParseSqlReq().setSql(sqlNew);
|
||||
return;
|
||||
}
|
||||
MetricReq metricReq = queryStatement.getMetricReq();
|
||||
String where = queryStructUtils.generateZipperWhere(queryStructReq);
|
||||
if (!where.isEmpty() && Objects.nonNull(metricReq)) {
|
||||
log.info("doSingleZipperSource before[{}] after[{}]", metricReq.getWhere(), where);
|
||||
metricReq.setWhere(where);
|
||||
}
|
||||
}
|
||||
|
||||
protected void getTimeRanges(QueryStructReq queryStructReq, QueryStatement queryStatement) {
|
||||
queryStatement.setTimeRanges(queryStructUtils.getTimeRanges(queryStructReq));
|
||||
}
|
||||
|
||||
protected String getVariablePrefix() {
|
||||
return queryStructUtils.getVariablePrefix();
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.tencent.supersonic.semantic.query.parser;
|
||||
|
||||
import com.tencent.supersonic.common.util.StringUtil;
|
||||
import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.pojo.MetricTable;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
@@ -59,7 +60,8 @@ public class QueryParser {
|
||||
MetricReq metricReq = new MetricReq();
|
||||
metricReq.setMetrics(metricTable.getMetrics());
|
||||
metricReq.setDimensions(metricTable.getDimensions());
|
||||
metricReq.setWhere(formatWhere(metricTable.getWhere()));
|
||||
metricReq.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
|
||||
metricReq.setNativeQuery(!AggOption.isAgg(metricTable.getAggOption()));
|
||||
metricReq.setRootPath(sqlCommend.getRootPath());
|
||||
QueryStatement tableSql = parser(metricReq, metricTable.getAggOption());
|
||||
if (!tableSql.isOk()) {
|
||||
@@ -86,6 +88,7 @@ public class QueryParser {
|
||||
}
|
||||
queryStatement.setSql(sql);
|
||||
queryStatement.setSourceId(sourceId);
|
||||
queryStatement.setParseSqlReq(sqlCommend);
|
||||
return queryStatement;
|
||||
}
|
||||
}
|
||||
@@ -118,10 +121,4 @@ public class QueryParser {
|
||||
}
|
||||
|
||||
|
||||
private String formatWhere(String where) {
|
||||
if (StringUtils.isEmpty(where)) {
|
||||
return where;
|
||||
}
|
||||
return where.replace("\"", "\\\\\"");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,9 +27,10 @@ public class CalciteSqlParser implements SqlParser {
|
||||
queryStatement.setErrMsg("semanticSchema not found");
|
||||
return queryStatement;
|
||||
}
|
||||
queryStatement.setMetricReq(metricReq);
|
||||
SemanticSchema semanticSchema = getSemanticSchema(semanticModel);
|
||||
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
|
||||
aggBuilder.explain(metricReq, isAgg);
|
||||
aggBuilder.explain(queryStatement, isAgg);
|
||||
queryStatement.setSql(aggBuilder.getSql());
|
||||
queryStatement.setSourceId(aggBuilder.getSourceId());
|
||||
return queryStatement;
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.calcite;
|
||||
|
||||
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.DSLSqlValidatorImpl;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSqlDialect;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.DSLSqlValidatorImpl;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import org.apache.calcite.avatica.util.Casing;
|
||||
import org.apache.calcite.avatica.util.Quoting;
|
||||
@@ -10,6 +13,10 @@ import org.apache.calcite.config.CalciteConnectionConfig;
|
||||
import org.apache.calcite.config.CalciteConnectionConfigImpl;
|
||||
import org.apache.calcite.config.CalciteConnectionProperty;
|
||||
import org.apache.calcite.config.Lex;
|
||||
import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.prepare.CalciteCatalogReader;
|
||||
import org.apache.calcite.prepare.Prepare;
|
||||
import org.apache.calcite.rel.hint.HintStrategyTable;
|
||||
import org.apache.calcite.rel.type.RelDataTypeFactory;
|
||||
import org.apache.calcite.rel.type.RelDataTypeSystem;
|
||||
import org.apache.calcite.sql.SqlOperatorTable;
|
||||
@@ -17,8 +24,11 @@ import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
import org.apache.calcite.sql.parser.impl.SqlParserImpl;
|
||||
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
|
||||
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
|
||||
import org.apache.calcite.sql.validate.SqlConformanceEnum;
|
||||
import org.apache.calcite.sql.validate.SqlValidator;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorUtil;
|
||||
import org.apache.calcite.sql2rel.SqlToRelConverter;
|
||||
|
||||
public class Configuration {
|
||||
|
||||
@@ -58,4 +68,33 @@ public class Configuration {
|
||||
return parserConfig.build();
|
||||
}
|
||||
|
||||
public static SqlValidator getSqlValidator(CalciteSchema rootSchema) {
|
||||
List<SqlOperatorTable> tables = new ArrayList<>();
|
||||
tables.add(SqlStdOperatorTable.instance());
|
||||
SqlOperatorTable operatorTable = new ChainedSqlOperatorTable(tables); //.of(SqlStdOperatorTable.instance());
|
||||
//operatorTable.
|
||||
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
|
||||
.withLenientOperatorLookup(config.lenientOperatorLookup())
|
||||
.withConformance(SemanticSqlDialect.DEFAULT.getConformance())
|
||||
.withDefaultNullCollation(config.defaultNullCollation())
|
||||
.withIdentifierExpansion(true);
|
||||
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
|
||||
rootSchema,
|
||||
Collections.singletonList(rootSchema.getName()),
|
||||
typeFactory,
|
||||
config
|
||||
);
|
||||
SqlValidator validator = SqlValidatorUtil.newValidator(operatorTable, catalogReader, typeFactory,
|
||||
validatorConfig);
|
||||
return validator;
|
||||
}
|
||||
|
||||
public static SqlToRelConverter.Config getConverterConfig() {
|
||||
HintStrategyTable strategies = HintStrategyTable.builder().build();
|
||||
return SqlToRelConverter.config()
|
||||
.withHintStrategyTable(strategies)
|
||||
.withTrimUnusedFields(true)
|
||||
.withExpand(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -107,15 +107,9 @@ public class SemanticSchemaManager {
|
||||
|
||||
|
||||
public static DataSource getDatasource(final DatasourceYamlTpl d) {
|
||||
DataSource datasource = new DataSource();
|
||||
datasource.setSqlQuery(d.getSqlQuery());
|
||||
datasource.setName(d.getName());
|
||||
datasource.setSourceId(d.getSourceId());
|
||||
datasource.setTableQuery(d.getTableQuery());
|
||||
|
||||
datasource.setIdentifiers(getIdentify(d.getIdentifiers()));
|
||||
datasource.setDimensions(getDimensions(d.getDimensions()));
|
||||
datasource.setMeasures(getMeasures(d.getMeasures()));
|
||||
DataSource datasource = DataSource.builder().sourceId(d.getSourceId()).sqlQuery(d.getSqlQuery())
|
||||
.name(d.getName()).tableQuery(d.getTableQuery()).identifiers(getIdentify(d.getIdentifiers()))
|
||||
.measures(getMeasures(d.getMeasures())).dimensions(getDimensions(d.getDimensions())).build();
|
||||
datasource.setAggTime(getDataSourceAggTime(datasource.getDimensions()));
|
||||
return datasource;
|
||||
}
|
||||
@@ -167,7 +161,7 @@ public class SemanticSchemaManager {
|
||||
private static List<Dimension> getDimension(List<DimensionYamlTpl> dimensionYamlTpls) {
|
||||
List<Dimension> dimensions = new ArrayList<>();
|
||||
for (DimensionYamlTpl dimensionYamlTpl : dimensionYamlTpls) {
|
||||
Dimension dimension = new Dimension();
|
||||
Dimension dimension = Dimension.builder().build();
|
||||
dimension.setType(dimensionYamlTpl.getType());
|
||||
dimension.setExpr(dimensionYamlTpl.getExpr());
|
||||
dimension.setName(dimensionYamlTpl.getName());
|
||||
|
||||
@@ -11,6 +11,8 @@ public class Constants {
|
||||
public static final String JOIN_TABLE_LEFT_PREFIX = "src12_";
|
||||
public static final String DIMENSION_TYPE_TIME_GRANULARITY_NONE = "none";
|
||||
public static final String DIMENSION_TYPE_TIME = "time";
|
||||
public static final String MATERIALIZATION_ZIPPER_START = "start_";
|
||||
public static final String MATERIALIZATION_ZIPPER_END = "end_";
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.calcite.dsl;
|
||||
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Materialization.TimePartType;
|
||||
import java.util.List;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class DataSource {
|
||||
|
||||
private String name;
|
||||
@@ -22,4 +25,6 @@ public class DataSource {
|
||||
private List<Measure> measures;
|
||||
|
||||
private String aggTime;
|
||||
|
||||
private TimePartType timePartType = TimePartType.None;
|
||||
}
|
||||
|
||||
@@ -2,10 +2,12 @@ package com.tencent.supersonic.semantic.query.parser.calcite.dsl;
|
||||
|
||||
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticItem;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class Dimension implements SemanticItem {
|
||||
|
||||
String name;
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.calcite.dsl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class Materialization {
|
||||
|
||||
public enum TimePartType {
|
||||
/**
|
||||
* partition time type
|
||||
* 1 - FULL, not use partition
|
||||
* 2 - PARTITION , use time list
|
||||
* 3 - ZIPPER, use [startDate, endDate] range time
|
||||
*/
|
||||
FULL("FULL"),
|
||||
PARTITION("PARTITION"),
|
||||
ZIPPER("ZIPPER"),
|
||||
None("");
|
||||
private String name;
|
||||
|
||||
TimePartType(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public static TimePartType of(String name) {
|
||||
for (TimePartType typeEnum : TimePartType.values()) {
|
||||
if (typeEnum.name.equalsIgnoreCase(name)) {
|
||||
return typeEnum;
|
||||
}
|
||||
}
|
||||
return TimePartType.None;
|
||||
}
|
||||
}
|
||||
|
||||
private TimePartType timePartType;
|
||||
private String destinationTable;
|
||||
private String dateInfo;
|
||||
private String entities;
|
||||
private Long modelId;
|
||||
private Long dataBase;
|
||||
private Long materializationId;
|
||||
private Integer level;
|
||||
private List<MaterializationElement> dimensions = new ArrayList<>();
|
||||
private List<MaterializationElement> metrics = new ArrayList<>();
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.calcite.dsl;
|
||||
|
||||
import java.util.List;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class MaterializationElement {
|
||||
private List<TimeRange> timeRangeList;
|
||||
private String name;
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.calcite.dsl;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@@ -8,6 +9,7 @@ import lombok.NoArgsConstructor;
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Builder
|
||||
public class Measure {
|
||||
|
||||
private String name;
|
||||
|
||||
@@ -13,4 +13,5 @@ public class SemanticModel {
|
||||
private List<Metric> metrics = new ArrayList<>();
|
||||
private Map<String, DataSource> datasourceMap = new HashMap<>();
|
||||
private Map<String, List<Dimension>> dimensionMap = new HashMap<>();
|
||||
private List<Materialization> materializationList = new ArrayList<>();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.calcite.dsl;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class TimeRange {
|
||||
private String start;
|
||||
private String end;
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import com.tencent.supersonic.semantic.query.parser.calcite.sql.node.SemanticNod
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.render.FilterRender;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.render.OutputRender;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.render.SourceRender;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
@@ -84,15 +85,17 @@ public class AggPlanner implements Planner {
|
||||
// default by dataSource time aggregation
|
||||
if (Objects.nonNull(dataSource.getAggTime()) && !dataSource.getAggTime().equalsIgnoreCase(
|
||||
Constants.DIMENSION_TYPE_TIME_GRANULARITY_NONE)) {
|
||||
return true;
|
||||
if (!metricCommand.isNativeQuery()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return isAgg;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void explain(MetricReq metricCommand, AggOption aggOption) throws Exception {
|
||||
this.metricCommand = metricCommand;
|
||||
public void explain(QueryStatement queryStatement, AggOption aggOption) throws Exception {
|
||||
this.metricCommand = queryStatement.getMetricReq();
|
||||
if (metricCommand.getMetrics() == null) {
|
||||
metricCommand.setMetrics(new ArrayList<>());
|
||||
}
|
||||
@@ -117,4 +120,9 @@ public class AggPlanner implements Planner {
|
||||
public String getSourceId() {
|
||||
return sourceId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SemanticSchema findBest() {
|
||||
return schema;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,340 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.calcite.planner;
|
||||
|
||||
import com.tencent.supersonic.common.util.calcite.SqlParseUtils;
|
||||
import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.Configuration;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Constants;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.DataSource;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Dimension;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Identify;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Materialization;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Materialization.TimePartType;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.MaterializationElement;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Measure;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.TimeRange;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SchemaBuilder;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.adapter.enumerable.EnumerableRules;
|
||||
import org.apache.calcite.config.CalciteConnectionConfigImpl;
|
||||
import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.plan.ConventionTraitDef;
|
||||
import org.apache.calcite.plan.RelOptCluster;
|
||||
import org.apache.calcite.plan.RelOptMaterialization;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
import org.apache.calcite.plan.RelOptTable;
|
||||
import org.apache.calcite.plan.hep.HepPlanner;
|
||||
import org.apache.calcite.plan.hep.HepProgramBuilder;
|
||||
import org.apache.calcite.prepare.CalciteCatalogReader;
|
||||
import org.apache.calcite.rel.RelDistributionTraitDef;
|
||||
import org.apache.calcite.rel.RelHomogeneousShuttle;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.rel.RelShuttle;
|
||||
import org.apache.calcite.rel.core.RelFactories;
|
||||
import org.apache.calcite.rel.core.TableScan;
|
||||
import org.apache.calcite.rel.rules.materialize.MaterializedViewRules;
|
||||
import org.apache.calcite.rex.RexBuilder;
|
||||
import org.apache.calcite.rex.RexNode;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.sql.parser.SqlParseException;
|
||||
import org.apache.calcite.tools.RelBuilder;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Slf4j
|
||||
public class MaterializationPlanner implements Planner {
|
||||
|
||||
protected SemanticSchema schema;
|
||||
protected CalciteSchema viewSchema;
|
||||
protected HepProgramBuilder hepProgramBuilder;
|
||||
protected RelOptPlanner relOptPlanner;
|
||||
protected RelBuilder relBuilder;
|
||||
protected CalciteCatalogReader calciteCatalogReader;
|
||||
|
||||
protected Comparator materializationSort = new Comparator<Entry<Long, Set<String>>>() {
|
||||
@Override
|
||||
public int compare(Entry<Long, Set<String>> o1, Entry<Long, Set<String>> o2) {
|
||||
if (o1.getValue().size() == o2.getValue().size()) {
|
||||
Optional<Materialization> o1Lever = schema.getMaterializationList().stream()
|
||||
.filter(m -> m.getMaterializationId().equals(o1.getKey())).findFirst();
|
||||
Optional<Materialization> o2Lever = schema.getMaterializationList().stream()
|
||||
.filter(m -> m.getMaterializationId().equals(o2.getKey())).findFirst();
|
||||
if (o1Lever.isPresent() && o2Lever.isPresent()) {
|
||||
return o2Lever.get().getLevel() - o1Lever.get().getLevel();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
return o2.getValue().size() - o1.getValue().size();
|
||||
}
|
||||
};
|
||||
|
||||
public MaterializationPlanner(SemanticSchema schema) {
|
||||
this.schema = schema;
|
||||
init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void explain(QueryStatement queryStatement, AggOption isAgg) throws Exception {
|
||||
// findMatchMaterialization
|
||||
// checkValid field + time
|
||||
if (CollectionUtils.isEmpty(queryStatement.getTimeRanges())) {
|
||||
//has no matchMaterialization time info
|
||||
return;
|
||||
}
|
||||
Set<String> fields = new HashSet<>();
|
||||
MetricReq metricCommand = queryStatement.getMetricReq();
|
||||
if (!Objects.isNull(metricCommand.getWhere()) && !metricCommand.getWhere().isEmpty()) {
|
||||
fields.addAll(SqlParseUtils.getFilterField(metricCommand.getWhere()));
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricCommand.getMetrics())) {
|
||||
fields.addAll(metricCommand.getMetrics());
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricCommand.getDimensions())) {
|
||||
fields.addAll(metricCommand.getDimensions());
|
||||
}
|
||||
Map<Long, Set<String>> matchMaterialization = new HashMap<>();
|
||||
Map<Long, Long> materializationDataBase = schema.getMaterializationList().stream()
|
||||
.collect(Collectors.toMap(Materialization::getMaterializationId, Materialization::getDataBase));
|
||||
for (String elem : fields) {
|
||||
boolean checkOk = false;
|
||||
for (Materialization materialization : schema.getMaterializationList()) {
|
||||
if (check(metricCommand, materialization, elem, queryStatement.getTimeRanges())) {
|
||||
if (!matchMaterialization.containsKey(materialization.getMaterializationId())) {
|
||||
matchMaterialization.put(materialization.getMaterializationId(), new HashSet<>());
|
||||
}
|
||||
matchMaterialization.get(materialization.getMaterializationId()).add(elem);
|
||||
checkOk = true;
|
||||
}
|
||||
}
|
||||
if (!checkOk) {
|
||||
log.info("check fail [{}]", elem);
|
||||
}
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(matchMaterialization)) {
|
||||
List<Entry<Long, Set<String>>> sortedMaterialization = new ArrayList<>(matchMaterialization.entrySet());
|
||||
sortedMaterialization.stream().collect(Collectors.toList()).sort(materializationSort);
|
||||
for (Entry<Long, Set<String>> m : sortedMaterialization) {
|
||||
Optional<Materialization> materialization = schema.getMaterializationList().stream()
|
||||
.filter(mz -> mz.getMaterializationId().equals(m.getKey())).findFirst();
|
||||
if (!materialization.isPresent()) {
|
||||
continue;
|
||||
}
|
||||
Set<String> viewField = new HashSet<>(m.getValue());
|
||||
viewField.add(materialization.get().getEntities());
|
||||
viewField.add(materialization.get().getDateInfo());
|
||||
if (materialization.get().getTimePartType().equals(TimePartType.ZIPPER)) {
|
||||
viewField.add(Constants.MATERIALIZATION_ZIPPER_START + materialization.get().getDateInfo());
|
||||
viewField.add(Constants.MATERIALIZATION_ZIPPER_END + materialization.get().getDateInfo());
|
||||
}
|
||||
if (viewField.containsAll(fields)) {
|
||||
addDataSource(materialization.get());
|
||||
break;
|
||||
}
|
||||
List<Entry<Long, Set<String>>> linkMaterialization = new ArrayList<>();
|
||||
for (Entry<Long, Set<String>> mm : sortedMaterialization) {
|
||||
if (mm.getKey().equals(m.getKey())) {
|
||||
continue;
|
||||
}
|
||||
if (materializationDataBase.get(mm.getKey()).equals(materializationDataBase.get(m.getKey()))) {
|
||||
linkMaterialization.add(mm);
|
||||
}
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(linkMaterialization)) {
|
||||
linkMaterialization.sort(materializationSort);
|
||||
for (Entry<Long, Set<String>> mm : linkMaterialization) {
|
||||
Set<String> linkField = new HashSet<>(mm.getValue());
|
||||
linkField.addAll(viewField);
|
||||
if (linkField.containsAll(fields)) {
|
||||
Optional<Materialization> linkMaterial = schema.getMaterializationList().stream()
|
||||
.filter(mz -> mz.getMaterializationId().equals(mm.getKey())).findFirst();
|
||||
if (linkMaterial.isPresent()) {
|
||||
addDataSource(materialization.get());
|
||||
addDataSource(linkMaterial.get());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void addDataSource(Materialization materialization) {
|
||||
Identify identify = new Identify();
|
||||
identify.setName(materialization.getEntities());
|
||||
List<Measure> metrics = materialization.getMetrics().stream()
|
||||
.map(m -> Measure.builder().name(m.getName()).expr(m.getName()).build()).collect(
|
||||
Collectors.toList());
|
||||
List<Dimension> dimensions = materialization.getDimensions().stream()
|
||||
.map(d -> Dimension.builder().name(d.getName()).expr(d.getName()).build()).collect(
|
||||
Collectors.toList());
|
||||
if (materialization.getTimePartType().equals(TimePartType.ZIPPER)) {
|
||||
dimensions.add(
|
||||
Dimension.builder().name(Constants.MATERIALIZATION_ZIPPER_START + materialization.getDateInfo())
|
||||
.type(Constants.DIMENSION_TYPE_TIME)
|
||||
.expr(Constants.MATERIALIZATION_ZIPPER_START + materialization.getDateInfo()).build());
|
||||
dimensions.add(
|
||||
Dimension.builder().name(Constants.MATERIALIZATION_ZIPPER_END + materialization.getDateInfo())
|
||||
.type(Constants.DIMENSION_TYPE_TIME)
|
||||
.expr(Constants.MATERIALIZATION_ZIPPER_END + materialization.getDateInfo()).build());
|
||||
} else {
|
||||
dimensions.add(Dimension.builder().name(materialization.getDateInfo()).expr(materialization.getDateInfo())
|
||||
.type(Constants.DIMENSION_TYPE_TIME)
|
||||
.build());
|
||||
}
|
||||
|
||||
DataSource dataSource = DataSource.builder().sourceId(materialization.getDataBase())
|
||||
.tableQuery(materialization.getDestinationTable())
|
||||
.timePartType(materialization.getTimePartType())
|
||||
.name("v_" + String.valueOf(materialization.getMaterializationId()))
|
||||
.identifiers(Arrays.asList(identify))
|
||||
.measures(metrics)
|
||||
.dimensions(dimensions)
|
||||
.build();
|
||||
schema.getDatasource().put(dataSource.getName(), dataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSql() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSourceId() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SemanticSchema findBest() {
|
||||
return schema;
|
||||
}
|
||||
|
||||
|
||||
private void init() {
|
||||
viewSchema = SchemaBuilder.getMaterializationSchema();
|
||||
hepProgramBuilder = new HepProgramBuilder();
|
||||
hepProgramBuilder.addRuleInstance(MaterializedViewRules.PROJECT_FILTER);
|
||||
relOptPlanner = new HepPlanner(hepProgramBuilder.build());
|
||||
calciteCatalogReader = new CalciteCatalogReader(
|
||||
CalciteSchema.from(viewSchema.plus()),
|
||||
CalciteSchema.from(viewSchema.plus()).path(null),
|
||||
Configuration.typeFactory,
|
||||
new CalciteConnectionConfigImpl(new Properties()));
|
||||
|
||||
relOptPlanner.addRelTraitDef(ConventionTraitDef.INSTANCE);
|
||||
relOptPlanner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
|
||||
EnumerableRules.rules().forEach(relOptPlanner::addRule);
|
||||
|
||||
RexBuilder rexBuilder = new RexBuilder(Configuration.typeFactory);
|
||||
RelOptCluster relOptCluster = RelOptCluster.create(relOptPlanner, rexBuilder);
|
||||
relBuilder = RelFactories.LOGICAL_BUILDER.create(relOptCluster, calciteCatalogReader);
|
||||
}
|
||||
|
||||
private RexNode getRexNode(List<ImmutablePair<String, String>> timeRanges, String viewField) {
|
||||
RexNode rexNode = null;
|
||||
for (ImmutablePair<String, String> timeRange : timeRanges) {
|
||||
if (rexNode == null) {
|
||||
rexNode = getRexNodeByTimeRange(TimeRange.builder().start(timeRange.left).end(timeRange.right).build(),
|
||||
viewField);
|
||||
continue;
|
||||
}
|
||||
rexNode = relBuilder.call(SqlStdOperatorTable.OR, rexNode,
|
||||
getRexNodeByTimeRange(TimeRange.builder().start(timeRange.left).end(timeRange.right).build(),
|
||||
viewField));
|
||||
}
|
||||
return rexNode;
|
||||
}
|
||||
|
||||
private RexNode getRexNode(Materialization materialization, String elem, String viewField) {
|
||||
Optional<MaterializationElement> dim = materialization.getDimensions()
|
||||
.stream().filter(d -> d.getName().equalsIgnoreCase(elem)).findFirst();
|
||||
if (!dim.isPresent()) {
|
||||
dim = materialization.getMetrics().stream().filter(m -> m.getName().equalsIgnoreCase(elem)).findFirst();
|
||||
}
|
||||
RexNode rexNode = null;
|
||||
if (dim.isPresent()) {
|
||||
for (TimeRange timeRange : dim.get().getTimeRangeList()) {
|
||||
if (rexNode == null) {
|
||||
rexNode = getRexNodeByTimeRange(timeRange, viewField);
|
||||
continue;
|
||||
}
|
||||
rexNode = relBuilder.call(SqlStdOperatorTable.OR, rexNode, getRexNodeByTimeRange(timeRange, viewField));
|
||||
}
|
||||
}
|
||||
return rexNode;
|
||||
}
|
||||
|
||||
private RexNode getRexNodeByTimeRange(TimeRange timeRange, String field) {
|
||||
return relBuilder.call(SqlStdOperatorTable.AND,
|
||||
relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, relBuilder.field(field),
|
||||
relBuilder.literal(timeRange.getStart())),
|
||||
relBuilder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, relBuilder.field(field),
|
||||
relBuilder.literal(timeRange.getEnd())));
|
||||
}
|
||||
|
||||
public boolean check(MetricReq metricCommand, Materialization materialization, String elem,
|
||||
List<ImmutablePair<String, String>> timeRanges)
|
||||
throws SqlParseException {
|
||||
boolean isMatch = false;
|
||||
try {
|
||||
relBuilder.clear();
|
||||
if (!CollectionUtils.isEmpty(relOptPlanner.getMaterializations())) {
|
||||
relOptPlanner.clear();
|
||||
}
|
||||
String db = SchemaBuilder.MATERIALIZATION_SYS_DB;
|
||||
RelBuilder viewBuilder = relBuilder.scan(Arrays.asList(db, SchemaBuilder.MATERIALIZATION_SYS_SOURCE));
|
||||
RexNode viewFilter = getRexNode(materialization, elem, SchemaBuilder.MATERIALIZATION_SYS_FIELD_DATE);
|
||||
if (viewFilter == null) {
|
||||
return false;
|
||||
}
|
||||
RelNode viewRel = viewBuilder.filter(viewFilter).project(relBuilder.fields()).build();
|
||||
log.debug("view {}", viewRel.explain());
|
||||
List<String> view = Arrays.asList(db, SchemaBuilder.MATERIALIZATION_SYS_VIEW);
|
||||
RelNode replacement = relBuilder.scan(view).build();
|
||||
RelOptMaterialization relOptMaterialization = new RelOptMaterialization(replacement, viewRel, null, view);
|
||||
relOptPlanner.addMaterialization(relOptMaterialization);
|
||||
|
||||
RelNode checkRel = relBuilder.scan(Arrays.asList(db, SchemaBuilder.MATERIALIZATION_SYS_SOURCE))
|
||||
.filter(getRexNode(timeRanges, SchemaBuilder.MATERIALIZATION_SYS_FIELD_DATE))
|
||||
.project(relBuilder.field(SchemaBuilder.MATERIALIZATION_SYS_FIELD_DATE)).build();
|
||||
|
||||
relOptPlanner.setRoot(checkRel);
|
||||
RelNode optRel = relOptPlanner.findBestExp();
|
||||
log.debug("findBestExp {}", optRel.explain());
|
||||
isMatch = !extractTableNames(optRel).contains(SchemaBuilder.MATERIALIZATION_SYS_SOURCE);
|
||||
} catch (Exception e) {
|
||||
log.error("check error {}", e);
|
||||
}
|
||||
return isMatch;
|
||||
}
|
||||
|
||||
public static Set<String> extractTableNames(RelNode relNode) {
|
||||
Set<String> tableNames = new HashSet<>();
|
||||
RelShuttle shuttle = new RelHomogeneousShuttle() {
|
||||
public RelNode visit(TableScan scan) {
|
||||
RelOptTable table = scan.getTable();
|
||||
tableNames.addAll(table.getQualifiedName());
|
||||
return scan;
|
||||
}
|
||||
};
|
||||
relNode.accept(shuttle);
|
||||
return tableNames;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -2,13 +2,16 @@ package com.tencent.supersonic.semantic.query.parser.calcite.planner;
|
||||
|
||||
|
||||
import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
|
||||
public interface Planner {
|
||||
|
||||
public void explain(MetricReq metricCommand, AggOption aggOption) throws Exception;
|
||||
public void explain(QueryStatement queryStatement, AggOption aggOption) throws Exception;
|
||||
|
||||
public String getSql();
|
||||
|
||||
public String getSourceId();
|
||||
|
||||
public SemanticSchema findBest();
|
||||
}
|
||||
|
||||
@@ -10,11 +10,21 @@ import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.prepare.CalciteCatalogReader;
|
||||
import org.apache.calcite.prepare.Prepare;
|
||||
import org.apache.calcite.rel.type.RelDataType;
|
||||
import org.apache.calcite.schema.SchemaPlus;
|
||||
import org.apache.calcite.schema.impl.AbstractSchema;
|
||||
import org.apache.calcite.sql.type.SqlTypeName;
|
||||
import org.apache.calcite.sql.validate.ParameterScope;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
|
||||
public class SchemaBuilder {
|
||||
|
||||
public static final String MATERIALIZATION_SYS_DB = "SYS";
|
||||
public static final String MATERIALIZATION_SYS_SOURCE = "SYS_SOURCE";
|
||||
public static final String MATERIALIZATION_SYS_VIEW = "SYS_VIEW";
|
||||
public static final String MATERIALIZATION_SYS_FIELD_DATE = "C1";
|
||||
public static final String MATERIALIZATION_SYS_FIELD_DATA = "C2";
|
||||
|
||||
|
||||
public static SqlValidatorScope getScope(SemanticSchema schema) throws Exception {
|
||||
Map<String, RelDataType> nameToTypeMap = new HashMap<>();
|
||||
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
|
||||
@@ -29,4 +39,22 @@ public class SchemaBuilder {
|
||||
Configuration.typeFactory, Configuration.validatorConfig);
|
||||
return new ParameterScope(dslSqlValidator, nameToTypeMap);
|
||||
}
|
||||
|
||||
public static CalciteSchema getMaterializationSchema() {
|
||||
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
|
||||
SchemaPlus schema = rootSchema.plus().add(MATERIALIZATION_SYS_DB, new AbstractSchema());
|
||||
DataSourceTable srcTable = DataSourceTable.newBuilder(MATERIALIZATION_SYS_SOURCE)
|
||||
.addField(MATERIALIZATION_SYS_FIELD_DATE, SqlTypeName.DATE)
|
||||
.addField(MATERIALIZATION_SYS_FIELD_DATA, SqlTypeName.BIGINT)
|
||||
.withRowCount(1)
|
||||
.build();
|
||||
schema.add(MATERIALIZATION_SYS_SOURCE, srcTable);
|
||||
DataSourceTable viewTable = DataSourceTable.newBuilder(MATERIALIZATION_SYS_VIEW)
|
||||
.addField(MATERIALIZATION_SYS_FIELD_DATE, SqlTypeName.DATE)
|
||||
.addField(MATERIALIZATION_SYS_FIELD_DATA, SqlTypeName.BIGINT)
|
||||
.withRowCount(1)
|
||||
.build();
|
||||
schema.add(MATERIALIZATION_SYS_VIEW, viewTable);
|
||||
return rootSchema;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.tencent.supersonic.semantic.query.parser.calcite.schema;
|
||||
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.DataSource;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Dimension;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Materialization;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Metric;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.SemanticModel;
|
||||
import java.util.HashMap;
|
||||
@@ -34,6 +35,14 @@ public class SemanticSchema extends AbstractSchema {
|
||||
return rootPath;
|
||||
}
|
||||
|
||||
public void setSemanticModel(SemanticModel semanticModel) {
|
||||
this.semanticModel = semanticModel;
|
||||
}
|
||||
|
||||
public SemanticModel getSemanticModel() {
|
||||
return semanticModel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Table> getTableMap() {
|
||||
return tableMap;
|
||||
@@ -68,6 +77,13 @@ public class SemanticSchema extends AbstractSchema {
|
||||
semanticModel.setMetrics(metric);
|
||||
}
|
||||
|
||||
public void setMaterializationList(List<Materialization> materializationList) {
|
||||
semanticModel.setMaterializationList(materializationList);
|
||||
}
|
||||
public List<Materialization> getMaterializationList() {
|
||||
return semanticModel.getMaterializationList();
|
||||
}
|
||||
|
||||
|
||||
public static final class Builder {
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ public class TableView {
|
||||
|
||||
private String alias;
|
||||
private List<String> primary;
|
||||
|
||||
private DataSource dataSource;
|
||||
|
||||
public SqlNode build() {
|
||||
|
||||
@@ -142,7 +142,7 @@ public class DataSourceNode extends SemanticNode {
|
||||
String.format("not find the match datasource : dimension[%s],measure[%s]", queryDimension,
|
||||
measures));
|
||||
}
|
||||
log.info("linkDataSources {}", linkDataSources);
|
||||
log.debug("linkDataSources {}", linkDataSources);
|
||||
|
||||
dataSources.addAll(linkDataSources);
|
||||
}
|
||||
|
||||
@@ -2,8 +2,8 @@ package com.tencent.supersonic.semantic.query.parser.calcite.sql.node;
|
||||
|
||||
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.Configuration;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSqlDialect;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.Optimization;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSqlDialect;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
@@ -11,6 +11,8 @@ import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.function.UnaryOperator;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.calcite.jdbc.CalciteSchema;
|
||||
import org.apache.calcite.rel.RelNode;
|
||||
import org.apache.calcite.sql.SqlAsOperator;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
import org.apache.calcite.sql.SqlIdentifier;
|
||||
@@ -18,10 +20,13 @@ import org.apache.calcite.sql.SqlKind;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.SqlSelect;
|
||||
import org.apache.calcite.sql.SqlWriterConfig;
|
||||
import org.apache.calcite.sql.parser.SqlParseException;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.pretty.SqlPrettyWriter;
|
||||
import org.apache.calcite.sql.validate.SqlValidator;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.apache.calcite.sql2rel.SqlToRelConverter;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
public abstract class SemanticNode {
|
||||
@@ -118,6 +123,13 @@ public abstract class SemanticNode {
|
||||
return sqlNode;
|
||||
}
|
||||
|
||||
public static RelNode getRelNode(CalciteSchema rootSchema, SqlToRelConverter sqlToRelConverter, String sql)
|
||||
throws SqlParseException {
|
||||
SqlValidator sqlValidator = Configuration.getSqlValidator(rootSchema);
|
||||
return sqlToRelConverter.convertQuery(
|
||||
sqlValidator.validate(SqlParser.create(sql, SqlParser.Config.DEFAULT).parseStmt()), false, true).rel;
|
||||
}
|
||||
|
||||
public void accept(Optimization optimization) {
|
||||
optimization.visit(this);
|
||||
}
|
||||
|
||||
@@ -5,7 +5,10 @@ import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Constants;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.DataSource;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Dimension;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Identify;
|
||||
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Identify.Type;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Materialization.TimePartType;
|
||||
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.dsl.Metric;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.Renderer;
|
||||
@@ -97,7 +100,9 @@ public class JoinRender extends Renderer {
|
||||
fieldWhere.add(identify.getName());
|
||||
}
|
||||
}
|
||||
TableView tableView = SourceRender.renderOne("", fieldWhere, queryMetrics, queryDimension,
|
||||
List<String> dataSourceWhere = new ArrayList<>(fieldWhere);
|
||||
addZipperField(dataSource, dataSourceWhere);
|
||||
TableView tableView = SourceRender.renderOne("", dataSourceWhere, queryMetrics, queryDimension,
|
||||
metricCommand.getWhere(), dataSources.get(i), scope, schema, true);
|
||||
log.info("tableView {}", tableView.getTable().toString());
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataSource.getName();
|
||||
@@ -255,7 +260,10 @@ public class JoinRender extends Renderer {
|
||||
|
||||
private SqlNode getCondition(TableView left, TableView right, DataSource dataSource, SemanticSchema schema,
|
||||
SqlValidatorScope scope) throws Exception {
|
||||
|
||||
if (TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) || TimePartType.ZIPPER.equals(
|
||||
right.getDataSource().getTimePartType())) {
|
||||
return getZipperCondition(left, right, dataSource, schema, scope);
|
||||
}
|
||||
Set<String> selectLeft = SemanticNode.getSelect(left.getTable());
|
||||
Set<String> selectRight = SemanticNode.getSelect(right.getTable());
|
||||
selectLeft.retainAll(selectRight);
|
||||
@@ -376,4 +384,95 @@ public class JoinRender extends Renderer {
|
||||
orders.poll();
|
||||
visited.put(id, false);
|
||||
}
|
||||
private void addZipperField(DataSource dataSource, List<String> fields) {
|
||||
if (TimePartType.ZIPPER.equals(dataSource.getTimePartType())) {
|
||||
dataSource.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType())).forEach(t -> {
|
||||
if (t.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_END)
|
||||
&& !fields.contains(t.getName())
|
||||
) {
|
||||
fields.add(t.getName());
|
||||
}
|
||||
if (t.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_START)
|
||||
&& !fields.contains(t.getName())
|
||||
) {
|
||||
fields.add(t.getName());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private SqlNode getZipperCondition(TableView left, TableView right, DataSource dataSource, SemanticSchema schema,
|
||||
SqlValidatorScope scope) throws Exception {
|
||||
if (TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) && TimePartType.ZIPPER.equals(
|
||||
right.getDataSource().getTimePartType())) {
|
||||
throw new Exception("not support two zipper table");
|
||||
}
|
||||
SqlNode condition = null;
|
||||
Optional<Dimension> leftTime = left.getDataSource().getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType())).findFirst();
|
||||
Optional<Dimension> rightTime = right.getDataSource().getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType())).findFirst();
|
||||
if (leftTime.isPresent() && rightTime.isPresent()) {
|
||||
|
||||
String startTime = "";
|
||||
String endTime = "";
|
||||
String dateTime = "";
|
||||
List<String> primaryZipper = new ArrayList<>();
|
||||
List<String> primaryPartition = new ArrayList<>();
|
||||
|
||||
Optional<Dimension> startTimeOp = (TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) ? left
|
||||
: right).getDataSource().getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_START)).findFirst();
|
||||
Optional<Dimension> endTimeOp = (TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) ? left
|
||||
: right).getDataSource().getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_END)).findFirst();
|
||||
if (startTimeOp.isPresent() && endTimeOp.isPresent()) {
|
||||
TableView zipper = TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) ? left : right;
|
||||
TableView partMetric =
|
||||
TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) ? right : left;
|
||||
Optional<Dimension> partTime =
|
||||
TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) ? rightTime : leftTime;
|
||||
startTime = zipper.getAlias() + "." + startTimeOp.get().getName();
|
||||
endTime = zipper.getAlias() + "." + endTimeOp.get().getName();
|
||||
dateTime = partMetric.getAlias() + "." + partTime.get().getName();
|
||||
primaryZipper = zipper.getDataSource().getIdentifiers().stream().map(i -> i.getName()).collect(
|
||||
Collectors.toList());
|
||||
primaryPartition = partMetric.getDataSource().getIdentifiers().stream().map(i -> i.getName()).collect(
|
||||
Collectors.toList());
|
||||
}
|
||||
primaryZipper.retainAll(primaryPartition);
|
||||
condition =
|
||||
new SqlBasicCall(
|
||||
SqlStdOperatorTable.AND,
|
||||
new ArrayList<SqlNode>(Arrays.asList(new SqlBasicCall(
|
||||
SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
|
||||
new ArrayList<SqlNode>(Arrays.asList(SemanticNode.parse(startTime, scope),
|
||||
SemanticNode.parse(dateTime, scope))),
|
||||
SqlParserPos.ZERO, null), new SqlBasicCall(
|
||||
SqlStdOperatorTable.GREATER_THAN,
|
||||
new ArrayList<SqlNode>(Arrays.asList(SemanticNode.parse(endTime, scope),
|
||||
SemanticNode.parse(dateTime, scope))),
|
||||
SqlParserPos.ZERO, null))),
|
||||
SqlParserPos.ZERO, null);
|
||||
|
||||
for (String p : primaryZipper) {
|
||||
List<SqlNode> ons = new ArrayList<>();
|
||||
ons.add(SemanticNode.parse(left.getAlias() + "." + p, scope));
|
||||
ons.add(SemanticNode.parse(right.getAlias() + "." + p, scope));
|
||||
SqlNode addCondition = new SqlBasicCall(
|
||||
SqlStdOperatorTable.EQUALS,
|
||||
ons,
|
||||
SqlParserPos.ZERO, null);
|
||||
condition = new SqlBasicCall(
|
||||
SqlStdOperatorTable.AND,
|
||||
new ArrayList<>(Arrays.asList(condition, addCondition)),
|
||||
SqlParserPos.ZERO, null);
|
||||
}
|
||||
|
||||
}
|
||||
return condition;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.convert;
|
||||
|
||||
|
||||
import com.tencent.supersonic.common.pojo.Aggregator;
|
||||
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
|
||||
import com.tencent.supersonic.common.util.DateUtils;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserReplaceHelper;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserSelectFunctionHelper;
|
||||
@@ -13,6 +16,7 @@ import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.pojo.MetricTable;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryDslReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.model.domain.ModelService;
|
||||
import com.tencent.supersonic.semantic.model.domain.adaptor.engineadapter.EngineAdaptor;
|
||||
@@ -51,6 +55,7 @@ public class QueryReqConverter {
|
||||
private Catalog catalog;
|
||||
|
||||
public QueryStatement convert(QueryDslReq databaseReq, ModelSchemaResp modelSchemaResp) throws Exception {
|
||||
|
||||
if (Objects.isNull(modelSchemaResp)) {
|
||||
return new QueryStatement();
|
||||
}
|
||||
@@ -68,6 +73,7 @@ public class QueryReqConverter {
|
||||
//4.build MetricTables
|
||||
List<String> allFields = SqlParserSelectHelper.getAllFields(databaseReq.getSql());
|
||||
List<String> metrics = getMetrics(modelSchemaResp, allFields);
|
||||
QueryStructReq queryStructCmd = new QueryStructReq();
|
||||
MetricTable metricTable = new MetricTable();
|
||||
metricTable.setMetrics(metrics);
|
||||
|
||||
@@ -81,8 +87,13 @@ public class QueryReqConverter {
|
||||
metricTable.setMetrics(new ArrayList<>(Arrays.asList(
|
||||
queryStructUtils.generateInternalMetricName(databaseReq.getModelId(),
|
||||
metricTable.getDimensions()))));
|
||||
} else {
|
||||
queryStructCmd.setAggregators(
|
||||
metricTable.getMetrics().stream().map(m -> new Aggregator(m, AggOperatorEnum.UNKNOWN)).collect(
|
||||
Collectors.toList()));
|
||||
}
|
||||
metricTable.setAggOption(getAggOption(databaseReq));
|
||||
AggOption aggOption = getAggOption(databaseReq);
|
||||
metricTable.setAggOption(aggOption);
|
||||
List<MetricTable> tables = new ArrayList<>();
|
||||
tables.add(metricTable);
|
||||
//4.build ParseSqlReq
|
||||
@@ -97,7 +108,11 @@ public class QueryReqConverter {
|
||||
result.setWithAlias(false);
|
||||
}
|
||||
//5.physicalSql by ParseSqlReq
|
||||
QueryStatement queryStatement = parserService.physicalSql(result);
|
||||
queryStructCmd.setDateInfo(queryStructUtils.getDateConfBySql(databaseReq.getSql()));
|
||||
queryStructCmd.setModelId(databaseReq.getModelId());
|
||||
queryStructCmd.setNativeQuery(!AggOption.isAgg(aggOption));
|
||||
log.info("QueryReqConverter queryStructCmd[{}]", queryStructCmd);
|
||||
QueryStatement queryStatement = parserService.physicalSql(queryStructCmd, result);
|
||||
queryStatement.setSql(String.format(SqlExecuteReq.LIMIT_WRAPPER, queryStatement.getSql()));
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
@@ -1,14 +1,24 @@
|
||||
package com.tencent.supersonic.semantic.query.persistence.pojo;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
|
||||
@Data
|
||||
public class QueryStatement {
|
||||
|
||||
private Long modelId = 0L;
|
||||
private String sql = "";
|
||||
private String sourceId = "";
|
||||
private String errMsg = "";
|
||||
private Boolean ok;
|
||||
private MetricReq metricReq;
|
||||
private ParseSqlReq parseSqlReq;
|
||||
private Integer status = 0;
|
||||
private List<ImmutablePair<String, String>> timeRanges;
|
||||
|
||||
|
||||
public boolean isOk() {
|
||||
this.ok = "".equals(errMsg) && !"".equals(sql);
|
||||
|
||||
@@ -68,7 +68,8 @@ public class QueryController {
|
||||
public SqlParserResp parseByStruct(@RequestBody ParseSqlReq parseSqlReq,
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) throws Exception {
|
||||
QueryStatement queryStatement = semanticQueryEngine.physicalSql(parseSqlReq);
|
||||
QueryStructReq queryStructCmd = new QueryStructReq();
|
||||
QueryStatement queryStatement = semanticQueryEngine.physicalSql(queryStructCmd, parseSqlReq);
|
||||
SqlParserResp sqlParserResp = new SqlParserResp();
|
||||
BeanUtils.copyProperties(queryStatement, sqlParserResp);
|
||||
return sqlParserResp;
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.github.pagehelper.PageInfo;
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.auth.api.authentication.utils.UserHolder;
|
||||
import com.tencent.supersonic.common.pojo.enums.AuthType;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationSourceReq;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationSourceResp;
|
||||
import com.tencent.supersonic.semantic.api.model.request.ModelSchemaFilterReq;
|
||||
import com.tencent.supersonic.semantic.api.model.request.PageDimensionReq;
|
||||
import com.tencent.supersonic.semantic.api.model.request.PageMetricReq;
|
||||
@@ -12,6 +14,7 @@ import com.tencent.supersonic.semantic.api.model.response.DomainResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.semantic.query.service.MaterializationService;
|
||||
import com.tencent.supersonic.semantic.query.service.SchemaService;
|
||||
import java.util.List;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
@@ -30,6 +33,8 @@ public class SchemaController {
|
||||
|
||||
@Autowired
|
||||
private SchemaService schemaService;
|
||||
@Autowired
|
||||
private MaterializationService materializationService;
|
||||
|
||||
@PostMapping
|
||||
public List<ModelSchemaResp> fetchModelSchema(@RequestBody ModelSchemaFilterReq filter,
|
||||
@@ -71,4 +76,16 @@ public class SchemaController {
|
||||
return schemaService.queryMetric(pageMetricCmd, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* task api
|
||||
*/
|
||||
|
||||
@PostMapping("/materialization/source")
|
||||
MaterializationSourceResp queryDataSource(@RequestBody MaterializationSourceReq materializationSourceReq,
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) throws Exception {
|
||||
User user = UserHolder.findUser(request, response);
|
||||
return materializationService.getMaterializationDataSource(materializationSourceReq, user);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package com.tencent.supersonic.semantic.query.service;
|
||||
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationSourceReq;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationSourceResp;
|
||||
|
||||
public interface MaterializationService {
|
||||
|
||||
MaterializationSourceResp getMaterializationDataSource(MaterializationSourceReq materializationSourceReq,
|
||||
User user) throws Exception;
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package com.tencent.supersonic.semantic.query.service;
|
||||
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationSourceReq;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationSourceResp;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.Measure;
|
||||
import com.tencent.supersonic.semantic.api.model.request.ModelSchemaFilterReq;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MeasureResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.materialization.domain.MaterializationConfService;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.model.domain.DatasourceService;
|
||||
import com.tencent.supersonic.semantic.model.domain.ModelService;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Service("MaterializationService")
|
||||
@Slf4j
|
||||
public class MaterializationServiceImpl implements MaterializationService {
|
||||
|
||||
protected final MaterializationConfService materializationConfService;
|
||||
protected final ModelService modelService;
|
||||
protected final DatasourceService datasourceService;
|
||||
protected final Catalog catalog;
|
||||
protected final QueryStructUtils queryStructUtils;
|
||||
protected final QueryService queryService;
|
||||
|
||||
public MaterializationServiceImpl(
|
||||
MaterializationConfService materializationConfService,
|
||||
ModelService modelService, DatasourceService datasourceService,
|
||||
Catalog catalog, QueryStructUtils queryStructUtils,
|
||||
QueryService queryService) {
|
||||
this.materializationConfService = materializationConfService;
|
||||
this.modelService = modelService;
|
||||
this.datasourceService = datasourceService;
|
||||
this.catalog = catalog;
|
||||
this.queryStructUtils = queryStructUtils;
|
||||
this.queryService = queryService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MaterializationSourceResp getMaterializationDataSource(MaterializationSourceReq materializationSourceReq,
|
||||
User user) throws Exception {
|
||||
|
||||
if (materializationSourceReq.getMaterializationId() <= 0 || materializationSourceReq.getDataSourceId() <= 0) {
|
||||
throw new Exception("MaterializationId and DataSourceId are must");
|
||||
}
|
||||
Long materializationId = materializationSourceReq.getMaterializationId();
|
||||
List<MaterializationSourceResp> materializationList = materializationConfService.getMaterializationSourceResp(
|
||||
materializationId);
|
||||
if (!CollectionUtils.isEmpty(materializationList)) {
|
||||
Optional<MaterializationSourceResp> materializationSourceRespOpt = materializationList.stream()
|
||||
.filter(m -> m.getDataSourceId().equals(materializationSourceReq.getDataSourceId())).findFirst();
|
||||
if (materializationSourceRespOpt.isPresent()) {
|
||||
MaterializationSourceResp materializationSourceResp = materializationSourceRespOpt.get();
|
||||
Set<String> dimensionFields = new HashSet<>();
|
||||
Set<String> metricFields = new HashSet<>();
|
||||
ModelSchemaFilterReq modelFilter = new ModelSchemaFilterReq();
|
||||
modelFilter.setModelIds(Arrays.asList(materializationSourceResp.getModelId()));
|
||||
List<ModelSchemaResp> modelSchemaRespList = modelService.fetchModelSchema(modelFilter);
|
||||
List<MeasureResp> measureRespList = datasourceService.getMeasureListOfModel(
|
||||
materializationSourceResp.getModelId());
|
||||
modelSchemaRespList.stream().forEach(m -> {
|
||||
m.getDimensions().stream()
|
||||
.filter(mm -> mm.getDatasourceId().equals(materializationSourceReq.getDataSourceId())
|
||||
&& materializationSourceResp.getDimensions().keySet().contains(mm.getId())
|
||||
).forEach(mm -> {
|
||||
dimensionFields.add(mm.getBizName());
|
||||
});
|
||||
for (MetricSchemaResp metricSchemaResp : m.getMetrics()) {
|
||||
if (!materializationSourceResp.getMetrics().keySet().contains(metricSchemaResp.getId())) {
|
||||
continue;
|
||||
}
|
||||
Long dataSourceId = 0L;
|
||||
for (Measure measure : metricSchemaResp.getTypeParams().getMeasures()) {
|
||||
dataSourceId = materializationConfService.getSourceIdByMeasure(measureRespList,
|
||||
measure.getBizName());
|
||||
if (dataSourceId > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (dataSourceId.equals(materializationSourceReq.getDataSourceId())) {
|
||||
metricFields.addAll(getMetric(metricSchemaResp));
|
||||
}
|
||||
}
|
||||
});
|
||||
if (!dimensionFields.isEmpty() || !metricFields.isEmpty()) {
|
||||
materializationSourceResp.setFields(new ArrayList<>(dimensionFields));
|
||||
materializationSourceResp.getFields().addAll(metricFields);
|
||||
|
||||
MetricReq metricReq = new MetricReq();
|
||||
metricReq.setRootPath(catalog.getModelFullPath(materializationSourceResp.getModelId()));
|
||||
metricReq.setMetrics(new ArrayList<>(metricFields));
|
||||
metricReq.setDimensions(new ArrayList<>(dimensionFields));
|
||||
metricReq.getDimensions().add(materializationSourceResp.getDateInfo());
|
||||
metricReq.getDimensions().add(materializationSourceResp.getEntities());
|
||||
metricReq.setNativeQuery(true);
|
||||
if (CollectionUtils.isEmpty(metricReq.getMetrics())) {
|
||||
String internalMetricName = queryStructUtils.generateInternalMetricName(
|
||||
materializationSourceResp.getModelId(), metricReq.getDimensions());
|
||||
metricReq.getMetrics().add(internalMetricName);
|
||||
}
|
||||
try {
|
||||
QueryStatement queryStatement = queryService.parseMetricReq(metricReq);
|
||||
if (queryStatement.isOk()) {
|
||||
materializationSourceResp.setSql(queryStatement.getSql());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("getMaterializationDataSource sql error [{}]", e);
|
||||
}
|
||||
}
|
||||
return materializationSourceResp;
|
||||
}
|
||||
}
|
||||
throw new Exception("cant find MaterializationSource");
|
||||
}
|
||||
|
||||
|
||||
protected List<String> getMetric(MetricSchemaResp metricSchemaResp) {
|
||||
if (Objects.nonNull(metricSchemaResp.getTypeParams()) && metricSchemaResp.getTypeParams().getExpr()
|
||||
.contains(queryStructUtils.getVariablePrefix())) {
|
||||
return metricSchemaResp.getTypeParams().getMeasures().stream().map(m -> m.getBizName())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
return Arrays.asList(metricSchemaResp.getBizName());
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,7 @@ import com.tencent.supersonic.semantic.api.model.response.ExplainResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.QueryResultWithSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ExplainSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ItemUseReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryDimValueReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryDslReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryMultiStructReq;
|
||||
@@ -25,6 +26,7 @@ public interface QueryService {
|
||||
|
||||
QueryResultWithSchemaResp queryByMultiStruct(QueryMultiStructReq queryMultiStructCmd, User user) throws Exception;
|
||||
|
||||
|
||||
QueryResultWithSchemaResp queryDimValue(QueryDimValueReq queryDimValueReq, User user);
|
||||
|
||||
Object queryByQueryStatement(QueryStatement queryStatement);
|
||||
@@ -32,4 +34,8 @@ public interface QueryService {
|
||||
List<ItemUseResp> getStatInfo(ItemUseReq itemUseCommend);
|
||||
|
||||
<T> ExplainResp explain(ExplainSqlReq<T> explainSqlReq, User user) throws Exception;
|
||||
|
||||
QueryStatement parseMetricReq(MetricReq metricReq) throws Exception;
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import com.tencent.supersonic.semantic.api.query.pojo.Cache;
|
||||
import com.tencent.supersonic.semantic.api.query.pojo.Filter;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ExplainSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ItemUseReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryDimValueReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryDslReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryMultiStructReq;
|
||||
@@ -256,6 +257,11 @@ public class QueryServiceImpl implements QueryService {
|
||||
}
|
||||
|
||||
|
||||
public QueryStatement parseMetricReq(MetricReq metricReq) throws Exception {
|
||||
QueryStructReq queryStructCmd = new QueryStructReq();
|
||||
return semanticQueryEngine.physicalSql(queryStructCmd, metricReq);
|
||||
}
|
||||
|
||||
private boolean isCache(QueryStructReq queryStructCmd) {
|
||||
if (!cacheEnable) {
|
||||
return false;
|
||||
@@ -314,4 +320,5 @@ public class QueryServiceImpl implements QueryService {
|
||||
return queryStructReq;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.tencent.supersonic.semantic.query.service;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.model.response.QueryResultWithSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.query.executor.QueryExecutor;
|
||||
@@ -14,5 +15,7 @@ public interface SemanticQueryEngine {
|
||||
|
||||
QueryResultWithSchemaResp execute(QueryStatement queryStatement);
|
||||
|
||||
QueryStatement physicalSql(ParseSqlReq sqlCommend) throws Exception;
|
||||
QueryStatement physicalSql(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend) throws Exception;
|
||||
|
||||
QueryStatement physicalSql(QueryStructReq queryStructCmd, MetricReq sqlCommend) throws Exception;
|
||||
}
|
||||
|
||||
@@ -48,6 +48,10 @@ public class SemanticQueryEngineImpl implements SemanticQueryEngine {
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
queryStatement.setModelId(queryStructCmd.getModelId());
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
return optimize(queryStructCmd, queryStatement);
|
||||
}
|
||||
|
||||
public QueryStatement optimize(QueryStructReq queryStructCmd, QueryStatement queryStatement) {
|
||||
for (QueryOptimizer queryOptimizer : ComponentFactory.getQueryOptimizers()) {
|
||||
queryOptimizer.rewrite(queryStructCmd, queryStatement);
|
||||
}
|
||||
@@ -65,12 +69,12 @@ public class SemanticQueryEngineImpl implements SemanticQueryEngine {
|
||||
|
||||
|
||||
@Override
|
||||
public QueryStatement physicalSql(ParseSqlReq sqlCommend) throws Exception {
|
||||
return queryParser.parser(sqlCommend);
|
||||
public QueryStatement physicalSql(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend) throws Exception {
|
||||
return optimize(queryStructCmd, queryParser.parser(sqlCommend));
|
||||
}
|
||||
|
||||
|
||||
public QueryStatement physicalSql(MetricReq metricCommand) throws Exception {
|
||||
public QueryStatement physicalSql(QueryStructReq queryStructCmd, MetricReq metricCommand) throws Exception {
|
||||
return queryParser.parser(metricCommand);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.tencent.supersonic.common.util.ContextUtils;
|
||||
import com.tencent.supersonic.semantic.query.executor.JdbcExecutor;
|
||||
import com.tencent.supersonic.semantic.query.executor.QueryExecutor;
|
||||
import com.tencent.supersonic.semantic.query.optimizer.DetailQuery;
|
||||
import com.tencent.supersonic.semantic.query.optimizer.MaterializationQuery;
|
||||
import com.tencent.supersonic.semantic.query.optimizer.QueryOptimizer;
|
||||
import com.tencent.supersonic.semantic.query.parser.SemanticConverter;
|
||||
import com.tencent.supersonic.semantic.query.parser.SqlParser;
|
||||
@@ -13,13 +14,16 @@ import com.tencent.supersonic.semantic.query.parser.convert.DefaultDimValueConve
|
||||
import com.tencent.supersonic.semantic.query.parser.convert.MultiSourceJoin;
|
||||
import com.tencent.supersonic.semantic.query.parser.convert.ParserDefaultConverter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ComponentFactory {
|
||||
|
||||
private static List<SemanticConverter> semanticConverters = new ArrayList<>();
|
||||
private static List<QueryExecutor> queryExecutors = new ArrayList<>();
|
||||
private static List<QueryOptimizer> queryOptimizers = new ArrayList<>();
|
||||
private static Map<String, QueryOptimizer> queryOptimizers = new HashMap<>();
|
||||
private static SqlParser sqlParser;
|
||||
|
||||
static {
|
||||
@@ -46,7 +50,7 @@ public class ComponentFactory {
|
||||
if (queryOptimizers.isEmpty()) {
|
||||
initQueryOptimizer();
|
||||
}
|
||||
return queryOptimizers;
|
||||
return queryOptimizers.values().stream().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static SqlParser getSqlParser() {
|
||||
@@ -59,9 +63,20 @@ public class ComponentFactory {
|
||||
public static void setSqlParser(SqlParser parser) {
|
||||
sqlParser = parser;
|
||||
}
|
||||
private static void initQueryOptimizer() {
|
||||
queryOptimizers.add(getBean("DetailQuery", DetailQuery.class));
|
||||
|
||||
public static void addQueryOptimizer(String name, QueryOptimizer queryOptimizer) {
|
||||
queryOptimizers.put(name, queryOptimizer);
|
||||
}
|
||||
|
||||
public static <T> T getBean(String name, Class<T> tClass) {
|
||||
return ContextUtils.getContext().getBean(name, tClass);
|
||||
}
|
||||
|
||||
private static void initQueryOptimizer() {
|
||||
queryOptimizers.put("MaterializationQuery", getBean("MaterializationQuery", MaterializationQuery.class));
|
||||
queryOptimizers.put("DetailQuery", getBean("DetailQuery", DetailQuery.class));
|
||||
}
|
||||
|
||||
private static void initSemanticConverter() {
|
||||
semanticConverters.add(getBean("DefaultDimValueConverter", DefaultDimValueConverter.class));
|
||||
semanticConverters.add(getBean("CalculateAggConverter", CalculateAggConverter.class));
|
||||
@@ -73,7 +88,5 @@ public class ComponentFactory {
|
||||
queryExecutors.add(ContextUtils.getContext().getBean("JdbcExecutor", JdbcExecutor.class));
|
||||
}
|
||||
|
||||
public static <T> T getBean(String name, Class<T> tClass) {
|
||||
return ContextUtils.getContext().getBean(name, tClass);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -16,10 +16,13 @@ import java.time.format.DateTimeFormatter;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.temporal.TemporalAdjusters;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.regex.Pattern;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
@@ -36,6 +39,11 @@ public class DateUtils {
|
||||
@Value("${query.parameter.sys.month:sys_imp_week}")
|
||||
private String sysDateWeekCol;
|
||||
|
||||
@Value("${query.parameter.sys.zipper.begin:start_}")
|
||||
private String sysZipperDateColBegin;
|
||||
@Value("${query.parameter.sys.zipper.end:end_}")
|
||||
private String sysZipperDateColEnd;
|
||||
|
||||
public Boolean recentMode(DateConf dateInfo) {
|
||||
if (Objects.nonNull(dateInfo) && DateConf.DateMode.RECENT == dateInfo.getDateMode()
|
||||
&& DAY.equalsIgnoreCase(dateInfo.getPeriod()) && Objects.nonNull(dateInfo.getUnit())) {
|
||||
@@ -118,6 +126,11 @@ public class DateUtils {
|
||||
}
|
||||
|
||||
public String recentDayStr(ItemDateResp dateDate, DateConf dateInfo) {
|
||||
ImmutablePair<String, String> dayRange = recentDay(dateDate, dateInfo);
|
||||
return String.format("(%s >= '%s' and %s <= '%s')", sysDateCol, dayRange.left, sysDateCol, dayRange.right);
|
||||
}
|
||||
|
||||
public ImmutablePair<String, String> recentDay(ItemDateResp dateDate, DateConf dateInfo) {
|
||||
String dateFormatStr = dateDate.getDateFormat();
|
||||
if (Strings.isNullOrEmpty(dateFormatStr)) {
|
||||
dateFormatStr = DAY_FORMAT;
|
||||
@@ -128,7 +141,7 @@ public class DateUtils {
|
||||
|
||||
Integer unit = dateInfo.getUnit() - 1;
|
||||
String start = end.minusDays(unit).format(formatter);
|
||||
return String.format("(%s >= '%s' and %s <= '%s')", sysDateCol, start, sysDateCol, dateDate.getEndDate());
|
||||
return ImmutablePair.of(start, dateDate.getEndDate());
|
||||
}
|
||||
|
||||
public String recentMonthStr(LocalDate endData, Long unit, String dateFormatStr) {
|
||||
@@ -139,16 +152,40 @@ public class DateUtils {
|
||||
}
|
||||
|
||||
public String recentMonthStr(ItemDateResp dateDate, DateConf dateInfo) {
|
||||
List<ImmutablePair<String, String>> range = recentMonth(dateDate, dateInfo);
|
||||
if (range.size() == 1) {
|
||||
return String.format("(%s >= '%s' and %s <= '%s')", sysDateMonthCol, range.get(0).left, sysDateMonthCol,
|
||||
range.get(0).right);
|
||||
}
|
||||
if (range.size() > 0) {
|
||||
StringJoiner joiner = new StringJoiner(",");
|
||||
range.stream().forEach(month -> joiner.add("'" + month.left + "'"));
|
||||
return String.format("(%s in (%s))", sysDateCol, joiner.toString());
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public List<ImmutablePair<String, String>> recentMonth(ItemDateResp dateDate, DateConf dateInfo) {
|
||||
LocalDate endData = LocalDate.parse(dateDate.getEndDate(),
|
||||
DateTimeFormatter.ofPattern(dateDate.getDateFormat()));
|
||||
List<ImmutablePair<String, String>> ret = new ArrayList<>();
|
||||
if (dateDate.getDatePeriod() != null && MONTH.equalsIgnoreCase(dateDate.getDatePeriod())) {
|
||||
Long unit = getInterval(dateInfo.getStartDate(), dateInfo.getEndDate(), dateDate.getDateFormat(),
|
||||
ChronoUnit.MONTHS);
|
||||
return generateMonthSql(endData, unit, dateDate.getDateFormat());
|
||||
LocalDate dateMax = endData;
|
||||
List<String> months = generateMonthStr(dateMax, unit, dateDate.getDateFormat());
|
||||
if (!CollectionUtils.isEmpty(months)) {
|
||||
months.stream().forEach(m -> ret.add(ImmutablePair.of(m, m)));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
String dateFormatStr = MONTH_FORMAT;
|
||||
Integer unit = dateInfo.getUnit() - 1;
|
||||
return recentMonthStr(endData, Long.valueOf(unit), dateFormatStr);
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormatStr);
|
||||
String endStr = endData.format(formatter);
|
||||
String start = endData.minusMonths(unit).format(formatter);
|
||||
ret.add(ImmutablePair.of(start, endStr));
|
||||
return ret;
|
||||
}
|
||||
|
||||
public String recentWeekStr(LocalDate endData, Long unit) {
|
||||
@@ -159,6 +196,12 @@ public class DateUtils {
|
||||
}
|
||||
|
||||
public String recentWeekStr(ItemDateResp dateDate, DateConf dateInfo) {
|
||||
ImmutablePair<String, String> dayRange = recentWeek(dateDate, dateInfo);
|
||||
return String.format("(%s >= '%s' and %s <= '%s')", sysDateWeekCol, dayRange.left, sysDateWeekCol,
|
||||
dayRange.right);
|
||||
}
|
||||
|
||||
public ImmutablePair<String, String> recentWeek(ItemDateResp dateDate, DateConf dateInfo) {
|
||||
String dateFormatStr = dateDate.getDateFormat();
|
||||
if (Strings.isNullOrEmpty(dateFormatStr)) {
|
||||
dateFormatStr = DAY_FORMAT;
|
||||
@@ -166,7 +209,8 @@ public class DateUtils {
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormatStr);
|
||||
LocalDate end = LocalDate.parse(dateDate.getEndDate(), formatter);
|
||||
Integer unit = dateInfo.getUnit() - 1;
|
||||
return recentWeekStr(end, Long.valueOf(unit));
|
||||
String start = end.minusDays(unit * 7).format(formatter);
|
||||
return ImmutablePair.of(start, end.format(formatter));
|
||||
}
|
||||
|
||||
private Long getInterval(String startDate, String endDate, String dateFormat, ChronoUnit chronoUnit) {
|
||||
@@ -294,6 +338,28 @@ public class DateUtils {
|
||||
return dateStr;
|
||||
}
|
||||
|
||||
public String getDateWhereStr(DateConf dateConf, ImmutablePair<String, String> range) {
|
||||
if (DAY.equalsIgnoreCase(dateConf.getPeriod()) || WEEK.equalsIgnoreCase(dateConf.getPeriod())) {
|
||||
if (range.left.equals(range.right)) {
|
||||
return String.format("(%s <= '%s' and %s > '%s')", sysZipperDateColBegin + sysDateCol, range.left,
|
||||
sysZipperDateColEnd + sysDateCol, range.left);
|
||||
}
|
||||
return String.format("( '%s' <= %s and '%s' >= %s)", range.left, sysZipperDateColEnd + sysDateCol,
|
||||
range.right, sysZipperDateColBegin + sysDateCol);
|
||||
}
|
||||
|
||||
if (MONTH.equalsIgnoreCase(dateConf.getPeriod())) {
|
||||
if (range.left.equals(range.right)) {
|
||||
return String.format("(%s <= '%s' and %s > '%s')", sysZipperDateColBegin + sysDateMonthCol, range.left,
|
||||
sysZipperDateColEnd + sysDateMonthCol, range.left);
|
||||
}
|
||||
return String.format("( '%s' <= %s and '%s' >= %s)", range.left, sysZipperDateColEnd + sysDateMonthCol,
|
||||
range.right, sysZipperDateColBegin + sysDateMonthCol);
|
||||
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public String getSysDateCol(DateConf dateInfo) {
|
||||
if (DAY.equalsIgnoreCase(dateInfo.getPeriod())) {
|
||||
return sysDateCol;
|
||||
@@ -306,4 +372,34 @@ public class DateUtils {
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public boolean isDateStr(String date) {
|
||||
return Pattern.matches("[\\d\\s-:]+", date);
|
||||
}
|
||||
|
||||
public String getPeriodByCol(String col) {
|
||||
if (sysDateCol.equalsIgnoreCase(col)) {
|
||||
return DAY;
|
||||
}
|
||||
if (sysDateWeekCol.equalsIgnoreCase(col)) {
|
||||
return WEEK;
|
||||
}
|
||||
if (sysDateMonthCol.equalsIgnoreCase(col)) {
|
||||
return MONTH;
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public String getDateColBegin(DateConf dateInfo) {
|
||||
return sysZipperDateColBegin + getSysDateCol(dateInfo);
|
||||
}
|
||||
|
||||
public String getDateColEnd(DateConf dateInfo) {
|
||||
return sysZipperDateColEnd + getSysDateCol(dateInfo);
|
||||
}
|
||||
|
||||
public List<String> getDateCol() {
|
||||
return Arrays.asList(sysDateCol, sysDateMonthCol, sysDateWeekCol);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,38 +1,50 @@
|
||||
package com.tencent.supersonic.semantic.query.utils;
|
||||
|
||||
import static com.tencent.supersonic.common.pojo.Constants.DAY;
|
||||
import static com.tencent.supersonic.common.pojo.Constants.DAY_FORMAT;
|
||||
import static com.tencent.supersonic.common.pojo.Constants.MONTH;
|
||||
import static com.tencent.supersonic.common.pojo.Constants.UNDERLINE;
|
||||
import static com.tencent.supersonic.common.pojo.Constants.WEEK;
|
||||
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.common.pojo.DateConf.DateMode;
|
||||
import com.tencent.supersonic.common.pojo.enums.TypeEnums;
|
||||
import com.tencent.supersonic.common.pojo.Aggregator;
|
||||
import com.tencent.supersonic.common.pojo.DateConf;
|
||||
import com.tencent.supersonic.common.pojo.DateConf.DateMode;
|
||||
import com.tencent.supersonic.common.pojo.enums.TypeEnums;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.FilterExpression;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserSelectHelper;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.SchemaItem;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserRemoveHelper;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserAddHelper;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.ItemDateFilter;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.SchemaItem;
|
||||
import com.tencent.supersonic.semantic.api.model.request.ModelSchemaFilterReq;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ItemDateResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryDslReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
|
||||
import com.tencent.supersonic.semantic.model.domain.pojo.EngineTypeEnum;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.service.SchemaService;
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.tencent.supersonic.semantic.query.service.SchemaService;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.assertj.core.util.Lists;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -64,6 +76,8 @@ public class QueryStructUtils {
|
||||
@Autowired
|
||||
private SchemaService schemaService;
|
||||
|
||||
private String variablePrefix = "'${";
|
||||
|
||||
public QueryStructUtils(
|
||||
DateUtils dateUtils,
|
||||
SqlFilterUtils sqlFilterUtils, Catalog catalog) {
|
||||
@@ -147,6 +161,11 @@ public class QueryStructUtils {
|
||||
public String generateWhere(QueryStructReq queryStructCmd) {
|
||||
String whereClauseFromFilter = sqlFilterUtils.getWhereClause(queryStructCmd.getOriginalFilter());
|
||||
String whereFromDate = getDateWhereClause(queryStructCmd);
|
||||
return mergeDateWhereClause(queryStructCmd, whereClauseFromFilter, whereFromDate);
|
||||
}
|
||||
|
||||
public String mergeDateWhereClause(QueryStructReq queryStructCmd, String whereClauseFromFilter,
|
||||
String whereFromDate) {
|
||||
if (Strings.isNotEmpty(whereFromDate) && Strings.isNotEmpty(whereClauseFromFilter)) {
|
||||
return String.format("%s AND (%s)", whereFromDate, whereClauseFromFilter);
|
||||
} else if (Strings.isEmpty(whereFromDate) && Strings.isNotEmpty(whereClauseFromFilter)) {
|
||||
@@ -247,5 +266,203 @@ public class QueryStructUtils {
|
||||
return true;
|
||||
}
|
||||
|
||||
public String generateZipperWhere(QueryStructReq queryStructCmd) {
|
||||
String whereClauseFromFilter = sqlFilterUtils.getWhereClause(queryStructCmd.getOriginalFilter());
|
||||
String whereFromDate = getZipperDateWhereClause(queryStructCmd);
|
||||
return mergeDateWhereClause(queryStructCmd, whereClauseFromFilter, whereFromDate);
|
||||
}
|
||||
|
||||
public String generateZipperWhere(QueryStatement queryStatement, QueryStructReq queryStructReq) {
|
||||
if (Objects.nonNull(queryStatement.getParseSqlReq().getSql())) {
|
||||
String sql = SqlParserRemoveHelper.removeWhere(queryStatement.getParseSqlReq().getSql(),
|
||||
dateUtils.getDateCol());
|
||||
if (!CollectionUtils.isEmpty(queryStatement.getMetricReq().getDimensions())) {
|
||||
List<String> dimension = queryStatement.getMetricReq().getDimensions().stream()
|
||||
.filter(d -> !dateUtils.getDateCol().contains(d.toLowerCase())).collect(
|
||||
Collectors.toList());
|
||||
dimension.add(dateUtils.getDateColBegin(queryStructReq.getDateInfo()));
|
||||
dimension.add(dateUtils.getDateColEnd(queryStructReq.getDateInfo()));
|
||||
queryStatement.getMetricReq().setDimensions(dimension);
|
||||
}
|
||||
return SqlParserAddHelper.addWhere(sql,
|
||||
SqlParserSelectHelper.getTimeFilter(queryStatement.getTimeRanges(),
|
||||
dateUtils.getDateColBegin(queryStructReq.getDateInfo()),
|
||||
dateUtils.getDateColEnd(queryStructReq.getDateInfo())));
|
||||
}
|
||||
return queryStatement.getSql();
|
||||
}
|
||||
|
||||
|
||||
public String getZipperDateWhereClause(QueryStructReq queryStructCmd) {
|
||||
List<ImmutablePair<String, String>> timeRanges = getTimeRanges(queryStructCmd);
|
||||
List<String> wheres = new ArrayList<>();
|
||||
if (!CollectionUtils.isEmpty(timeRanges)) {
|
||||
for (ImmutablePair<String, String> range : timeRanges) {
|
||||
String strWhere = dateUtils.getDateWhereStr(queryStructCmd.getDateInfo(), range);
|
||||
if (!strWhere.isEmpty()) {
|
||||
wheres.add(strWhere);
|
||||
}
|
||||
}
|
||||
if (!wheres.isEmpty()) {
|
||||
return wheres.stream().collect(Collectors.joining(" or ", "(", ")"));
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
public ImmutablePair<String, String> getBeginEndTime(QueryStructReq queryStructCmd) {
|
||||
DateConf dateConf = queryStructCmd.getDateInfo();
|
||||
switch (dateConf.getDateMode()) {
|
||||
case AVAILABLE:
|
||||
case BETWEEN:
|
||||
return ImmutablePair.of(dateConf.getStartDate(), dateConf.getEndDate());
|
||||
case LIST:
|
||||
return ImmutablePair.of(Collections.min(dateConf.getDateList()),
|
||||
Collections.max(dateConf.getDateList()));
|
||||
case RECENT:
|
||||
ItemDateResp dateDate = getItemDateResp(queryStructCmd);
|
||||
LocalDate dateMax = LocalDate.now().minusDays(1);
|
||||
LocalDate dateMin = dateMax.minusDays(dateConf.getUnit() - 1);
|
||||
if (Objects.isNull(dateDate)) {
|
||||
return ImmutablePair.of(dateMin.format(DateTimeFormatter.ofPattern(DAY_FORMAT)),
|
||||
dateMax.format(DateTimeFormatter.ofPattern(DAY_FORMAT)));
|
||||
}
|
||||
switch (dateConf.getPeriod()) {
|
||||
case DAY:
|
||||
return dateUtils.recentDay(dateDate, dateConf);
|
||||
case WEEK:
|
||||
return dateUtils.recentWeek(dateDate, dateConf);
|
||||
case MONTH:
|
||||
List<ImmutablePair<String, String>> rets = dateUtils.recentMonth(dateDate, dateConf);
|
||||
Optional<String> minBegins = rets.stream().map(i -> i.left).sorted().findFirst();
|
||||
Optional<String> maxBegins = rets.stream().map(i -> i.right).sorted(Comparator.reverseOrder())
|
||||
.findFirst();
|
||||
if (minBegins.isPresent() && maxBegins.isPresent()) {
|
||||
return ImmutablePair.of(minBegins.get(), maxBegins.get());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
||||
}
|
||||
return ImmutablePair.of("", "");
|
||||
}
|
||||
|
||||
public List<ImmutablePair<String, String>> getTimeRanges(QueryStructReq queryStructCmd) {
|
||||
List<ImmutablePair<String, String>> ret = new ArrayList<>();
|
||||
if (Objects.isNull(queryStructCmd) || Objects.isNull(queryStructCmd.getDateInfo())) {
|
||||
return ret;
|
||||
}
|
||||
DateConf dateConf = queryStructCmd.getDateInfo();
|
||||
switch (dateConf.getDateMode()) {
|
||||
case AVAILABLE:
|
||||
case BETWEEN:
|
||||
ret.add(ImmutablePair.of(dateConf.getStartDate(), dateConf.getEndDate()));
|
||||
break;
|
||||
case LIST:
|
||||
for (String date : dateConf.getDateList()) {
|
||||
ret.add(ImmutablePair.of(date, date));
|
||||
}
|
||||
break;
|
||||
case RECENT:
|
||||
ItemDateResp dateDate = getItemDateResp(queryStructCmd);
|
||||
LocalDate dateMax = LocalDate.now().minusDays(1);
|
||||
LocalDate dateMin = dateMax.minusDays(dateConf.getUnit() - 1);
|
||||
if (Objects.isNull(dateDate)) {
|
||||
ret.add(ImmutablePair.of(dateMin.format(DateTimeFormatter.ofPattern(DAY_FORMAT)),
|
||||
dateMax.format(DateTimeFormatter.ofPattern(DAY_FORMAT))));
|
||||
break;
|
||||
}
|
||||
switch (dateConf.getPeriod()) {
|
||||
case DAY:
|
||||
ret.add(dateUtils.recentDay(dateDate, dateConf));
|
||||
break;
|
||||
case WEEK:
|
||||
ret.add(dateUtils.recentWeek(dateDate, dateConf));
|
||||
break;
|
||||
case MONTH:
|
||||
List<ImmutablePair<String, String>> rets = dateUtils.recentMonth(dateDate, dateConf);
|
||||
ret.addAll(rets);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private ItemDateResp getItemDateResp(QueryStructReq queryStructCmd) {
|
||||
List<Long> dimensionIds = getDimensionIds(queryStructCmd);
|
||||
List<Long> metricIds = getMetricIds(queryStructCmd);
|
||||
ItemDateResp dateDate = catalog.getItemDate(
|
||||
new ItemDateFilter(dimensionIds, TypeEnums.DIMENSION.getName()),
|
||||
new ItemDateFilter(metricIds, TypeEnums.METRIC.getName()));
|
||||
return dateDate;
|
||||
}
|
||||
|
||||
public DateConf getDateConfBySql(String sql) {
|
||||
List<FilterExpression> filterExpressions = SqlParserSelectHelper.getFilterExpression(sql);
|
||||
if (!CollectionUtils.isEmpty(filterExpressions)) {
|
||||
Set<String> dateList = new HashSet<>();
|
||||
String startDate = "";
|
||||
String endDate = "";
|
||||
String period = "";
|
||||
for (FilterExpression f : filterExpressions) {
|
||||
if (Objects.isNull(f.getFieldName()) || !internalCols.contains(f.getFieldName().toLowerCase())) {
|
||||
continue;
|
||||
}
|
||||
if (Objects.isNull(f.getFieldValue()) || !dateUtils.isDateStr(f.getFieldValue().toString())) {
|
||||
continue;
|
||||
}
|
||||
period = dateUtils.getPeriodByCol(f.getFieldName().toLowerCase());
|
||||
if ("".equals(period)) {
|
||||
continue;
|
||||
}
|
||||
if ("=".equals(f.getOperator())) {
|
||||
dateList.add(f.getFieldValue().toString());
|
||||
} else if ("<".equals(f.getOperator()) || "<=".equals(f.getOperator())) {
|
||||
if (!"".equals(startDate) && startDate.compareTo(f.getFieldValue().toString()) > 0) {
|
||||
startDate = f.getFieldValue().toString();
|
||||
}
|
||||
} else if (">".equals(f.getOperator()) || ">=".equals(f.getOperator())) {
|
||||
if (!"".equals(endDate) && endDate.compareTo(f.getFieldValue().toString()) < 0) {
|
||||
endDate = f.getFieldValue().toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!"".equals(period)) {
|
||||
DateConf dateConf = new DateConf();
|
||||
dateConf.setPeriod(period);
|
||||
if (!CollectionUtils.isEmpty(dateList)) {
|
||||
dateConf.setDateList(new ArrayList<>(dateList));
|
||||
dateConf.setDateMode(DateMode.LIST);
|
||||
return dateConf;
|
||||
}
|
||||
if (!"".equals(startDate) && !"".equals(endDate)) {
|
||||
dateConf.setStartDate(startDate);
|
||||
dateConf.setEndDate(endDate);
|
||||
dateConf.setDateMode(DateMode.BETWEEN);
|
||||
return dateConf;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public List<String> getDateCol() {
|
||||
return dateUtils.getDateCol();
|
||||
}
|
||||
|
||||
public String getVariablePrefix() {
|
||||
return variablePrefix;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -5,10 +5,10 @@ import static com.tencent.supersonic.common.pojo.Constants.PARENTHESES_START;
|
||||
import static com.tencent.supersonic.common.pojo.Constants.SPACE;
|
||||
import static com.tencent.supersonic.common.pojo.Constants.SYS_VAR;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.Constants;
|
||||
import com.tencent.supersonic.semantic.api.query.enums.FilterOperatorEnum;
|
||||
import com.tencent.supersonic.semantic.api.query.pojo.Criterion;
|
||||
import com.tencent.supersonic.semantic.api.query.pojo.Filter;
|
||||
import com.tencent.supersonic.common.pojo.Constants;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
@@ -16,19 +16,24 @@ import com.tencent.supersonic.semantic.api.query.request.QueryDslReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.api.query.response.ItemUseResp;
|
||||
import com.tencent.supersonic.semantic.model.domain.ModelService;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.QueryOptMode;
|
||||
|
||||
import com.tencent.supersonic.semantic.query.persistence.repository.StatRepository;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class StatUtils {
|
||||
@@ -116,9 +121,10 @@ public class StatUtils {
|
||||
log.error("initStatInfo:{}", e);
|
||||
}
|
||||
StatUtils.set(queryStatInfo);
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void initStatInfo(QueryStructReq queryStructCmd, User facadeUser) {
|
||||
QueryStat queryStatInfo = new QueryStat();
|
||||
String traceId = "";
|
||||
@@ -146,7 +152,8 @@ public class StatUtils {
|
||||
.setUseResultCache(true)
|
||||
.setUseSqlCache(true)
|
||||
.setMetrics(objectMapper.writeValueAsString(metrics))
|
||||
.setDimensions(objectMapper.writeValueAsString(dimensions));
|
||||
.setDimensions(objectMapper.writeValueAsString(dimensions))
|
||||
.setQueryOptMode(QueryOptMode.NONE.name());
|
||||
} catch (JsonProcessingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
@@ -171,7 +178,10 @@ public class StatUtils {
|
||||
: "Admin";
|
||||
}
|
||||
|
||||
|
||||
public Boolean updateQueryOptMode(String mode) {
|
||||
STATS.get().setQueryOptMode(mode);
|
||||
return true;
|
||||
}
|
||||
|
||||
public List<ItemUseResp> getStatInfo(ItemUseReq itemUseCommend) {
|
||||
return statRepository.getStatInfo(itemUseCommend);
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
<result column="use_sql_cache" property="useSqlCache"/>
|
||||
<result column="sql_cache_key" property="sqlCacheKey"/>
|
||||
<result column="result_cache_key" property="resultCacheKey"/>
|
||||
<result column="query_opt_mode" property="queryOptMode"/>
|
||||
</resultMap>
|
||||
|
||||
<insert id="createRecord">
|
||||
@@ -43,13 +44,13 @@
|
||||
(
|
||||
trace_id, model_id, `user`, query_type, query_type_back, query_sql_cmd, sql_cmd_md5, query_struct_cmd, struct_cmd_md5, `sql`, sql_md5, query_engine,
|
||||
elapsed_ms, query_state, native_query, start_date, end_date, dimensions, metrics, select_cols, agg_cols, filter_cols, group_by_cols,
|
||||
order_by_cols, use_result_cache, use_sql_cache, sql_cache_key, result_cache_key
|
||||
order_by_cols, use_result_cache, use_sql_cache, sql_cache_key, result_cache_key, query_opt_mode
|
||||
)
|
||||
values
|
||||
(
|
||||
#{traceId}, #{modelId}, #{user}, #{queryType}, #{queryTypeBack}, #{querySqlCmd}, #{querySqlCmdMd5}, #{queryStructCmd}, #{queryStructCmdMd5}, #{sql}, #{sqlMd5}, #{queryEngine},
|
||||
#{elapsedMs}, #{queryState}, #{nativeQuery}, #{startDate}, #{endDate}, #{dimensions}, #{metrics}, #{selectCols}, #{aggCols}, #{filterCols}, #{groupByCols},
|
||||
#{orderByCols}, #{useResultCache}, #{useSqlCache}, #{sqlCacheKey}, #{resultCacheKey}
|
||||
#{orderByCols}, #{useResultCache}, #{useSqlCache}, #{sqlCacheKey}, #{resultCacheKey}, #{queryOptMode}
|
||||
)
|
||||
</insert>
|
||||
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.tencent.supersonic.semantic.query.domain.calcite;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.Aggregator;
|
||||
import com.tencent.supersonic.common.pojo.DateConf;
|
||||
import com.tencent.supersonic.common.pojo.DateConf.DateMode;
|
||||
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.query.optimizer.QueryOptimizer;
|
||||
import com.tencent.supersonic.semantic.query.parser.QueryParser;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.ComponentFactory;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryUtils;
|
||||
import java.util.Arrays;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class MaterializationQueryTest {
|
||||
|
||||
private final QueryParser queryParser;
|
||||
private final QueryUtils queryUtils;
|
||||
|
||||
public MaterializationQueryTest(QueryParser queryParser,
|
||||
QueryUtils queryUtils) {
|
||||
this.queryParser = queryParser;
|
||||
this.queryUtils = queryUtils;
|
||||
}
|
||||
|
||||
public void test() {
|
||||
QueryStructReq queryStructReq = new QueryStructReq();
|
||||
queryStructReq.setModelId(1L);
|
||||
|
||||
Aggregator aggregator = new Aggregator();
|
||||
aggregator.setFunc(AggOperatorEnum.UNKNOWN);
|
||||
aggregator.setColumn("pv");
|
||||
queryStructReq.setAggregators(Arrays.asList(aggregator));
|
||||
|
||||
queryStructReq.setGroups(Arrays.asList("department"));
|
||||
DateConf dateConf = new DateConf();
|
||||
dateConf.setDateMode(DateMode.LIST);
|
||||
dateConf.setDateList(Arrays.asList("2023-08-01"));
|
||||
queryStructReq.setDateInfo(dateConf);
|
||||
|
||||
try {
|
||||
QueryStatement queryStatement = queryParser.logicSql(queryStructReq);
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
queryStatement.setModelId(queryStructReq.getModelId());
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
for (QueryOptimizer queryOptimizer : ComponentFactory.getQueryOptimizers()) {
|
||||
queryOptimizer.rewrite(queryStructReq, queryStatement);
|
||||
}
|
||||
//queryParser.test(queryStructReq,metricReq);
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
package com.tencent.supersonic.semantic.query.domain.calcite;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.ColumnOrder;
|
||||
import com.tencent.supersonic.semantic.api.model.response.SqlParserResp;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.DatasourceYamlTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.DimensionTimeTypeParamsTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.DimensionYamlTpl;
|
||||
@@ -7,13 +9,12 @@ import com.tencent.supersonic.semantic.api.model.yaml.IdentifyYamlTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.MeasureYamlTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.MetricTypeParamsYamlTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.MetricYamlTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.response.SqlParserResp;
|
||||
import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.common.pojo.ColumnOrder;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.SemanticSchemaManager;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.planner.AggPlanner;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
@@ -38,7 +39,9 @@ class SemanticParserServiceTest {
|
||||
return sqlParser;
|
||||
}
|
||||
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
|
||||
aggBuilder.explain(metricCommand, AggOption.getAggregation(!isAgg));
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setMetricReq(metricCommand);
|
||||
aggBuilder.explain(queryStatement, AggOption.getAggregation(!isAgg));
|
||||
sqlParser.setSql(aggBuilder.getSql());
|
||||
sqlParser.setSourceId(aggBuilder.getSourceId());
|
||||
} catch (Exception e) {
|
||||
@@ -48,7 +51,6 @@ class SemanticParserServiceTest {
|
||||
return sqlParser;
|
||||
}
|
||||
|
||||
//@Test
|
||||
public void test() throws Exception {
|
||||
|
||||
DatasourceYamlTpl datasource = new DatasourceYamlTpl();
|
||||
|
||||
Reference in New Issue
Block a user