mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-10 19:51:00 +00:00
[improvement][semantic] refactor zipper source type (#518)
Co-authored-by: jipengli <jipengli@tencent.com>
This commit is contained in:
@@ -137,6 +137,7 @@ CREATE TABLE IF NOT EXISTS `s2_model` (
|
||||
`database_id` INT NOT NULL ,
|
||||
`model_detail` LONGVARCHAR NOT NULL ,
|
||||
`depends` varchar(500) DEFAULT NULL ,
|
||||
`source_type` varchar(128) DEFAULT NULL ,
|
||||
`filter_sql` varchar(1000) DEFAULT NULL ,
|
||||
PRIMARY KEY (`id`)
|
||||
);
|
||||
|
||||
@@ -324,6 +324,7 @@ CREATE TABLE `s2_model` (
|
||||
`drill_down_dimensions` varchar(500) DEFAULT NULL,
|
||||
`database_id` INT NOT NULL ,
|
||||
`model_detail` text NOT NULL ,
|
||||
`source_type` varchar(128) DEFAULT NULL ,
|
||||
`depends` varchar(500) DEFAULT NULL ,
|
||||
`filter_sql` varchar(1000) DEFAULT NULL ,
|
||||
PRIMARY KEY (`id`)
|
||||
|
||||
@@ -145,4 +145,5 @@ CREATE TABLE `s2_metric_query_default_config`
|
||||
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
|
||||
|
||||
--20231214
|
||||
alter table s2_chat_query add column `similar_queries` varchar(1024) DEFAULT '';
|
||||
alter table s2_chat_query add column `similar_queries` varchar(1024) DEFAULT '';
|
||||
alter table s2_model add column `source_type` varchar(128) DEFAULT NULL;
|
||||
@@ -137,6 +137,7 @@ CREATE TABLE IF NOT EXISTS `s2_model` (
|
||||
`database_id` INT NOT NULL ,
|
||||
`model_detail` LONGVARCHAR NOT NULL ,
|
||||
`depends` varchar(500) DEFAULT NULL ,
|
||||
`source_type` varchar(128) DEFAULT NULL ,
|
||||
`filter_sql` varchar(1000) DEFAULT NULL ,
|
||||
PRIMARY KEY (`id`)
|
||||
);
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
package com.tencent.supersonic.semantic.api.materialization.enums;
|
||||
|
||||
public enum MaterializedTypeEnum {
|
||||
FULL,
|
||||
PARTITION,
|
||||
ZIPPER
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.tencent.supersonic.semantic.api.materialization.pojo;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.UpdateCycleEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
@@ -15,7 +15,7 @@ public class MaterializationFilter {
|
||||
|
||||
private Long materializationId;
|
||||
private String name;
|
||||
private MaterializedTypeEnum materializedType;
|
||||
private ModelSourceTypeEnum materializedType;
|
||||
private UpdateCycleEnum updateCycle;
|
||||
private Long modelId;
|
||||
private Long databaseId;
|
||||
|
||||
@@ -2,7 +2,7 @@ package com.tencent.supersonic.semantic.api.materialization.request;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.RecordInfo;
|
||||
import com.tencent.supersonic.common.pojo.enums.StatusEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.UpdateCycleEnum;
|
||||
import lombok.Data;
|
||||
|
||||
@@ -12,7 +12,7 @@ import java.util.List;
|
||||
public class MaterializationReq extends RecordInfo {
|
||||
private Long id;
|
||||
private String name;
|
||||
private MaterializedTypeEnum materializedType;
|
||||
private ModelSourceTypeEnum materializedType;
|
||||
private UpdateCycleEnum updateCycle;
|
||||
private Long modelId;
|
||||
private Long databaseId;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.tencent.supersonic.semantic.api.materialization.response;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.RecordInfo;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.UpdateCycleEnum;
|
||||
import lombok.Data;
|
||||
|
||||
@@ -11,7 +11,7 @@ import java.util.List;
|
||||
public class MaterializationResp extends RecordInfo {
|
||||
private Long id;
|
||||
private String name;
|
||||
private MaterializedTypeEnum materializedType;
|
||||
private ModelSourceTypeEnum materializedType;
|
||||
private UpdateCycleEnum updateCycle;
|
||||
private Long modelId;
|
||||
private Long databaseId;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.tencent.supersonic.semantic.api.materialization.response;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.UpdateCycleEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DatabaseResp;
|
||||
import java.util.List;
|
||||
@@ -26,7 +26,7 @@ public class MaterializationSourceResp {
|
||||
|
||||
private String dateInfo;
|
||||
private String entities;
|
||||
private MaterializedTypeEnum materializedType;
|
||||
private ModelSourceTypeEnum materializedType;
|
||||
private UpdateCycleEnum updateCycle;
|
||||
private DatabaseResp databaseResp;
|
||||
private String depends;
|
||||
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.tencent.supersonic.semantic.api.model.enums;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public enum ModelSourceTypeEnum {
|
||||
FULL,
|
||||
PARTITION,
|
||||
ZIPPER;
|
||||
|
||||
public static ModelSourceTypeEnum of(String src) {
|
||||
for (ModelSourceTypeEnum modelSourceTypeEnum : ModelSourceTypeEnum.values()) {
|
||||
if (Objects.nonNull(src) && src.equalsIgnoreCase(modelSourceTypeEnum.name())) {
|
||||
return modelSourceTypeEnum;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static boolean isZipper(ModelSourceTypeEnum modelSourceTypeEnum) {
|
||||
return Objects.nonNull(modelSourceTypeEnum) && ZIPPER.equals(modelSourceTypeEnum);
|
||||
}
|
||||
|
||||
public static boolean isZipper(String str) {
|
||||
ModelSourceTypeEnum modelSourceTypeEnum = of(str);
|
||||
return Objects.nonNull(modelSourceTypeEnum) && isZipper(modelSourceTypeEnum);
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,8 @@ public class ModelReq extends SchemaItem {
|
||||
|
||||
private String alias;
|
||||
|
||||
private String sourceType;
|
||||
|
||||
private ModelDetail modelDetail;
|
||||
|
||||
private List<String> viewers = new ArrayList<>();
|
||||
|
||||
@@ -22,6 +22,8 @@ public class ModelResp extends SchemaItem {
|
||||
|
||||
private String depends;
|
||||
|
||||
private String sourceType;
|
||||
|
||||
private String filterSql;
|
||||
|
||||
private List<String> viewers = new ArrayList<>();
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.tencent.supersonic.semantic.api.model.yaml;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import java.util.List;
|
||||
import lombok.Data;
|
||||
|
||||
@@ -21,6 +22,8 @@ public class DataModelYamlTpl {
|
||||
|
||||
private List<MeasureYamlTpl> measures;
|
||||
|
||||
private ModelSourceTypeEnum modelSourceTypeEnum;
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ package com.tencent.supersonic.semantic.materialization.domain.pojo;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.RecordInfo;
|
||||
import com.tencent.supersonic.common.pojo.enums.StatusEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.UpdateCycleEnum;
|
||||
import lombok.Data;
|
||||
|
||||
@@ -14,7 +14,7 @@ public class Materialization extends RecordInfo {
|
||||
|
||||
private Long id;
|
||||
private String name;
|
||||
private MaterializedTypeEnum materializedType;
|
||||
private ModelSourceTypeEnum materializedType;
|
||||
private UpdateCycleEnum updateCycle;
|
||||
private Long modelId;
|
||||
private Long databaseId;
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.tencent.supersonic.common.util.BeanMapper;
|
||||
import com.tencent.supersonic.common.util.JsonUtil;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.ElementFrequencyEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.ElementTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.UpdateCycleEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationElementReq;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationReq;
|
||||
@@ -115,7 +115,7 @@ public class MaterializationConverter {
|
||||
MaterializationResp materializationResp = new MaterializationResp();
|
||||
BeanUtils.copyProperties(materializationDO, materializationResp);
|
||||
if (Strings.isNotEmpty(materializationDO.getMaterializedType())) {
|
||||
materializationResp.setMaterializedType(Enum.valueOf(MaterializedTypeEnum.class,
|
||||
materializationResp.setMaterializedType(Enum.valueOf(ModelSourceTypeEnum.class,
|
||||
materializationDO.getMaterializedType()));
|
||||
}
|
||||
if (Strings.isNotEmpty(materializationDO.getUpdateCycle())) {
|
||||
|
||||
@@ -18,13 +18,14 @@ import com.tencent.supersonic.semantic.model.domain.MetricService;
|
||||
import com.tencent.supersonic.semantic.model.domain.ModelRelaService;
|
||||
import com.tencent.supersonic.semantic.model.domain.ModelService;
|
||||
import com.tencent.supersonic.semantic.model.domain.pojo.MetaFilter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@@ -99,6 +100,17 @@ public class CatalogImpl implements Catalog {
|
||||
return datasourceService.getItemDate(dimension, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ModelResp> getModelList(List<Long> modelIds) {
|
||||
List<ModelResp> modelRespList = new ArrayList<>();
|
||||
if (!CollectionUtils.isEmpty(modelIds)) {
|
||||
modelIds.stream().forEach(m -> {
|
||||
modelRespList.add(modelService.getModel(m));
|
||||
});
|
||||
}
|
||||
return modelRespList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getModelYamlTplByModelIds(Set<Long> modelIds, Map<String, List<DimensionYamlTpl>> dimensionYamlMap,
|
||||
List<DataModelYamlTpl> dataModelYamlTplList, List<MetricYamlTpl> metricYamlTplList,
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.tencent.supersonic.semantic.api.model.pojo.ItemDateFilter;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DatabaseResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelResp;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.DataModelYamlTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.DimensionYamlTpl;
|
||||
import com.tencent.supersonic.semantic.api.model.yaml.MetricYamlTpl;
|
||||
@@ -38,4 +39,6 @@ public interface Catalog {
|
||||
|
||||
ItemDateResp getItemDate(ItemDateFilter dimension, ItemDateFilter metric);
|
||||
|
||||
List<ModelResp> getModelList(List<Long> modelIds);
|
||||
|
||||
}
|
||||
|
||||
@@ -53,4 +53,6 @@ public class ModelDO {
|
||||
|
||||
private String alias;
|
||||
|
||||
private String sourceType;
|
||||
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.tencent.supersonic.semantic.model.domain.manager;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.ModelDetail;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.Dim;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.Identify;
|
||||
@@ -42,6 +43,7 @@ public class DatasourceYamlManager {
|
||||
.collect(Collectors.toList()));
|
||||
dataModelYamlTpl.setName(datasource.getBizName());
|
||||
dataModelYamlTpl.setSourceId(datasource.getDatabaseId());
|
||||
dataModelYamlTpl.setModelSourceTypeEnum(ModelSourceTypeEnum.of(datasource.getSourceType()));
|
||||
if (datasourceDetail.getQueryType().equalsIgnoreCase(DatasourceQueryEnum.SQL_QUERY.getName())) {
|
||||
dataModelYamlTpl.setSqlQuery(datasourceDetail.getSqlQuery());
|
||||
} else {
|
||||
|
||||
@@ -15,6 +15,8 @@ public class Datasource extends SchemaItem {
|
||||
|
||||
private ModelDetail datasourceDetail;
|
||||
|
||||
private String sourceType;
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -43,7 +43,7 @@ public class SysTimeDimensionBuilder {
|
||||
dim.setType(DimensionTypeEnum.time.name());
|
||||
dim.setExpr(generateTimeExpr(timeDim, TimeDimensionEnum.WEEK.name().toLowerCase(), engineAdaptor));
|
||||
DimensionTimeTypeParams typeParams = new DimensionTimeTypeParams();
|
||||
typeParams.setTimeGranularity(TimeDimensionEnum.DAY.name().toLowerCase());
|
||||
typeParams.setTimeGranularity(TimeDimensionEnum.WEEK.name().toLowerCase());
|
||||
typeParams.setIsPrimary("false");
|
||||
dim.setTypeParams(typeParams);
|
||||
return dim;
|
||||
@@ -55,7 +55,7 @@ public class SysTimeDimensionBuilder {
|
||||
dim.setType(DimensionTypeEnum.time.name());
|
||||
dim.setExpr(generateTimeExpr(timeDim, TimeDimensionEnum.MONTH.name().toLowerCase(), engineAdaptor));
|
||||
DimensionTimeTypeParams typeParams = new DimensionTimeTypeParams();
|
||||
typeParams.setTimeGranularity(TimeDimensionEnum.DAY.name().toLowerCase());
|
||||
typeParams.setTimeGranularity(TimeDimensionEnum.MONTH.name().toLowerCase());
|
||||
typeParams.setIsPrimary("false");
|
||||
dim.setTypeParams(typeParams);
|
||||
return dim;
|
||||
|
||||
@@ -1,392 +0,0 @@
|
||||
package com.tencent.supersonic.semantic.query.optimizer;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.enums.TypeEnums;
|
||||
import com.tencent.supersonic.common.util.StringUtil;
|
||||
import com.tencent.supersonic.common.util.calcite.SqlParseUtils;
|
||||
import com.tencent.supersonic.semantic.api.materialization.enums.MaterializedTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationRecordResp;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationResp;
|
||||
import com.tencent.supersonic.semantic.api.model.enums.QueryOptMode;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.pojo.MetricTable;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.materialization.domain.MaterializationConfService;
|
||||
import com.tencent.supersonic.semantic.materialization.domain.MaterializationRecordService;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.SemanticSchemaManager;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.planner.AggPlanner;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.planner.MaterializationPlanner;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.DataType;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Materialization;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Materialization.TimePartType;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.MaterializationElement;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.SemanticModel;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.TimeRange;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import com.tencent.supersonic.semantic.query.utils.StatUtils;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Slf4j
|
||||
@Component("MaterializationQuery")
|
||||
public class MaterializationQuery implements QueryOptimizer {
|
||||
|
||||
protected final MaterializationConfService materializationConfService;
|
||||
protected final MaterializationRecordService materializationRecordService;
|
||||
protected final Catalog catalog;
|
||||
protected final QueryStructUtils queryStructUtils;
|
||||
protected final SemanticSchemaManager semanticSchemaManager;
|
||||
protected final StatUtils statUtils;
|
||||
|
||||
|
||||
@Value("${materialization.query.enable:false}")
|
||||
private boolean enabled;
|
||||
|
||||
public MaterializationQuery(
|
||||
MaterializationConfService materializationConfService,
|
||||
MaterializationRecordService materializationRecordService,
|
||||
Catalog catalog, QueryStructUtils queryStructUtils,
|
||||
SemanticSchemaManager semanticSchemaManager,
|
||||
StatUtils statUtils) {
|
||||
this.materializationConfService = materializationConfService;
|
||||
this.materializationRecordService = materializationRecordService;
|
||||
this.catalog = catalog;
|
||||
|
||||
this.queryStructUtils = queryStructUtils;
|
||||
this.semanticSchemaManager = semanticSchemaManager;
|
||||
this.statUtils = statUtils;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rewrite(QueryStructReq queryStructCmd, QueryStatement queryStatement) {
|
||||
if (!enabled) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (Objects.isNull(queryStructCmd) || Objects.isNull(queryStatement)
|
||||
|| CollectionUtils.isEmpty(queryStructCmd.getModelIds())
|
||||
|| Objects.isNull(
|
||||
queryStructCmd.getDateInfo())) {
|
||||
return;
|
||||
}
|
||||
if (Objects.nonNull(queryStatement.getParseSqlReq())) {
|
||||
rewriteSqlReq(queryStructCmd, queryStatement);
|
||||
return;
|
||||
}
|
||||
List<Materialization> materializationList = getMaterializationSchema(queryStructCmd,
|
||||
queryStatement.getMetricReq());
|
||||
if (!CollectionUtils.isEmpty(materializationList)) {
|
||||
if (replan(materializationList, queryStructCmd, queryStatement)) {
|
||||
statUtils.updateQueryOptMode(QueryOptMode.MATERIALIZATION.name());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("MaterializationQuery error {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void rewriteSqlReq(QueryStructReq queryStructCmd, QueryStatement queryStatement) throws Exception {
|
||||
ParseSqlReq parseSqlReq = queryStatement.getParseSqlReq();
|
||||
String sourceId = queryStatement.getSourceId();
|
||||
String sql = queryStatement.getSql();
|
||||
String parseSql = parseSqlReq.getSql();
|
||||
String msg = queryStatement.getErrMsg();
|
||||
String materializationSourceId = "";
|
||||
String materializationSql = "";
|
||||
String parseSqlReqMaterialization = "";
|
||||
if (!CollectionUtils.isEmpty(parseSqlReq.getTables())) {
|
||||
List<String[]> tables = new ArrayList<>();
|
||||
for (MetricTable metricTable : parseSqlReq.getTables()) {
|
||||
MetricReq metricReq = new MetricReq();
|
||||
metricReq.setMetrics(metricTable.getMetrics());
|
||||
metricReq.setDimensions(metricTable.getDimensions());
|
||||
metricReq.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
|
||||
metricReq.setRootPath(parseSqlReq.getRootPath());
|
||||
List<Materialization> materializationList = getMaterializationSchema(queryStructCmd,
|
||||
metricReq);
|
||||
if (!CollectionUtils.isEmpty(materializationList)) {
|
||||
queryStatement.setMetricReq(metricReq);
|
||||
boolean ok = replan(materializationList, queryStructCmd, queryStatement);
|
||||
if (!ok) {
|
||||
log.info("MaterializationQuery rewriteSqlReq not match {}", metricTable.getAlias());
|
||||
queryStatement.setSql(sql);
|
||||
queryStatement.setSourceId(sourceId);
|
||||
queryStatement.setErrMsg(msg);
|
||||
queryStatement.setMetricReq(null);
|
||||
return;
|
||||
}
|
||||
tables.add(new String[]{metricTable.getAlias(), queryStatement.getSql()});
|
||||
materializationSourceId = queryStatement.getSourceId();
|
||||
parseSqlReqMaterialization = queryStatement.getParseSqlReq().getSql();
|
||||
}
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(tables)) {
|
||||
if (parseSqlReq.isSupportWith()) {
|
||||
materializationSql = "with " + String.join(",",
|
||||
tables.stream().map(t -> String.format("%s as (%s)", t[0], t[1])).collect(
|
||||
Collectors.toList())) + "\n" + parseSqlReqMaterialization;
|
||||
} else {
|
||||
materializationSql = parseSqlReqMaterialization;
|
||||
for (String[] tb : tables) {
|
||||
materializationSql = StringUtils.replace(materializationSql, tb[0],
|
||||
"(" + tb[1] + ") " + (parseSqlReq.isWithAlias() ? "" : tb[0]), -1);
|
||||
}
|
||||
}
|
||||
|
||||
queryStatement.setSql(materializationSql);
|
||||
queryStatement.setSourceId(materializationSourceId);
|
||||
log.info("rewriteSqlReq before[{}] after[{}]", sql, materializationSql);
|
||||
statUtils.updateQueryOptMode(QueryOptMode.MATERIALIZATION.name());
|
||||
}
|
||||
parseSqlReq.setSql(parseSql);
|
||||
}
|
||||
}
|
||||
|
||||
public List<Materialization> getMaterializationSchema(QueryStructReq queryStructReq, MetricReq metricReq)
|
||||
throws Exception {
|
||||
List<Materialization> materializationList = new ArrayList<>();
|
||||
if (Objects.isNull(metricReq)) {
|
||||
return materializationList;
|
||||
}
|
||||
ImmutablePair<String, String> timeRange = queryStructUtils.getBeginEndTime(queryStructReq);
|
||||
String start = timeRange.left;
|
||||
String end = timeRange.right;
|
||||
//todo
|
||||
Long modelId = 1L;
|
||||
List<MaterializationResp> materializationResps = materializationConfService.getMaterializationByModel(modelId);
|
||||
List<DimensionResp> dimensionResps = catalog.getDimensions(queryStructReq.getModelIds());
|
||||
List<MetricResp> metrics = catalog.getMetrics(queryStructReq.getModelIds());
|
||||
Set<String> fields = new HashSet<>();
|
||||
|
||||
if (Objects.nonNull(metricReq.getWhere()) && !metricReq.getWhere().isEmpty()) {
|
||||
fields.addAll(SqlParseUtils.getFilterField(metricReq.getWhere()));
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricReq.getMetrics())) {
|
||||
fields.addAll(metricReq.getMetrics());
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricReq.getDimensions())) {
|
||||
fields.addAll(metricReq.getDimensions());
|
||||
}
|
||||
|
||||
materializationResps.forEach(materializationResp -> {
|
||||
Materialization materialization =
|
||||
Materialization.builder().dateInfo(materializationResp.getDateInfo())
|
||||
.materializationId(materializationResp.getId())
|
||||
.level(materializationResp.getLevel())
|
||||
.timePartType(TimePartType.of(materializationResp.getMaterializedType().name()))
|
||||
.dimensions(new ArrayList<>())
|
||||
.metrics(new ArrayList<>())
|
||||
.entities(materializationResp.getEntities())
|
||||
.destinationTable(materializationResp.getDestinationTable()).modelId(modelId)
|
||||
.dataBase(materializationResp.getDatabaseId()).build();
|
||||
List<Long> sameTableMaterialization = materializationConfService.getMaterializationByTable(
|
||||
materializationResp.getDatabaseId(), materializationResp.getDestinationTable());
|
||||
Set<Long> metricIds = materializationResp.getMaterializationElementRespList().stream()
|
||||
.filter(e -> e.getType().equals(
|
||||
TypeEnums.METRIC)).map(e -> e.getId()).collect(Collectors.toSet());
|
||||
Set<Long> dimensionIds = materializationResp.getMaterializationElementRespList().stream()
|
||||
.filter(e -> e.getType().equals(
|
||||
TypeEnums.DIMENSION)).map(e -> e.getId()).collect(Collectors.toSet());
|
||||
|
||||
dimensionResps.stream().filter(d -> dimensionIds.contains(d.getId()))
|
||||
.filter(d -> fields.contains(d.getBizName())).forEach(d -> {
|
||||
List<MaterializationRecordResp> materializationRecordResps = materializationRecordService
|
||||
.fetchMaterializationDate(sameTableMaterialization, d.getBizName(), start, end);
|
||||
if (!CollectionUtils.isEmpty(materializationRecordResps)) {
|
||||
List<TimeRange> timeRangeList = new ArrayList<>();
|
||||
materializationRecordResps.stream().forEach(t -> timeRangeList.add(
|
||||
TimeRange.builder().start(t.getDataTime()).end(t.getDataTime()).build()));
|
||||
materialization.getDimensions().add(
|
||||
MaterializationElement.builder()
|
||||
.name(d.getBizName())
|
||||
.timeRangeList(timeRangeList)
|
||||
.build()
|
||||
);
|
||||
} else if (MaterializedTypeEnum.FULL.equals(materializationResp.getMaterializedType())) {
|
||||
materialization.getDimensions().add(
|
||||
MaterializationElement.builder()
|
||||
.name(d.getBizName())
|
||||
.timeRangeList(
|
||||
Arrays.asList(TimeRange.builder().start(start).end(end).build()))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
});
|
||||
metrics.stream().filter(m -> metricIds.contains(m.getId())).filter(m -> fields.contains(m.getBizName()))
|
||||
.forEach(m -> {
|
||||
List<MaterializationRecordResp> materializationRecordResps = materializationRecordService
|
||||
.fetchMaterializationDate(sameTableMaterialization, m.getBizName(), start, end);
|
||||
if (!CollectionUtils.isEmpty(materializationRecordResps)) {
|
||||
List<TimeRange> timeRangeList = new ArrayList<>();
|
||||
materializationRecordResps.stream().forEach(t -> timeRangeList.add(
|
||||
TimeRange.builder().start(t.getDataTime()).end(t.getDataTime()).build()));
|
||||
materialization.getMetrics().add(MaterializationElement.builder().name(m.getBizName())
|
||||
.timeRangeList(timeRangeList).build());
|
||||
} else if (MaterializedTypeEnum.FULL.equals(materializationResp.getMaterializedType())) {
|
||||
materialization.getMetrics().add(
|
||||
MaterializationElement.builder()
|
||||
.name(m.getBizName())
|
||||
.timeRangeList(
|
||||
Arrays.asList(TimeRange.builder().start(start).end(end).build()))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
});
|
||||
materializationList.add(materialization);
|
||||
});
|
||||
return materializationList;
|
||||
}
|
||||
|
||||
protected boolean replan(List<Materialization> materializationList, QueryStructReq queryStructReq,
|
||||
QueryStatement queryStatement)
|
||||
throws Exception {
|
||||
log.info("{}", materializationList);
|
||||
SemanticSchema schema = SemanticSchema.newBuilder(queryStatement.getMetricReq().getRootPath()).build();
|
||||
schema.setMaterializationList(materializationList);
|
||||
getTimeRanges(queryStructReq, queryStatement);
|
||||
removeDefaultMetric(queryStructReq, queryStatement.getMetricReq());
|
||||
MaterializationPlanner materializationPlanner = new MaterializationPlanner(schema);
|
||||
materializationPlanner.explain(queryStatement,
|
||||
AggOption.getAggregation(queryStructReq.getQueryType().isNativeAggQuery()));
|
||||
log.info("optimize {}", materializationPlanner.findBest().getDatasource());
|
||||
SemanticSchema semanticSchema = materializationPlanner.findBest();
|
||||
if (!CollectionUtils.isEmpty(semanticSchema.getDatasource())) {
|
||||
semanticSchema.getSemanticModel().setRootPath(semanticSchema.getRootPath());
|
||||
semanticSchema.setSemanticModel(transform(queryStatement, semanticSchema.getSemanticModel()));
|
||||
int materCnt = semanticSchema.getDatasource().size();
|
||||
if (materCnt == semanticSchema.getDatasource().entrySet().stream()
|
||||
.filter(d -> d.getValue().getTimePartType().equals(TimePartType.ZIPPER)).count()) {
|
||||
doSingleZipperSource(queryStructReq, queryStatement);
|
||||
}
|
||||
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
|
||||
aggBuilder.explain(queryStatement,
|
||||
AggOption.getAggregation(queryStructReq.getQueryType().isNativeAggQuery()));
|
||||
log.debug("optimize before {} sql {}", queryStatement.getSourceId(), queryStatement.getSql());
|
||||
log.debug("optimize after {} sql {}", aggBuilder.getSourceId(), aggBuilder.getSql());
|
||||
queryStatement.setSourceId(aggBuilder.getSourceId());
|
||||
queryStatement.setSql(aggBuilder.getSql());
|
||||
queryStatement.setStatus(queryStatement.getStatus() + 1);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
protected SemanticModel transform(QueryStatement queryStatement, SemanticModel semanticModel) throws Exception {
|
||||
for (DataSource dataSource : semanticModel.getDatasourceMap().values()) {
|
||||
if (!CollectionUtils.isEmpty(dataSource.getMeasures())) {
|
||||
dataSource.getMeasures().stream().forEach(m -> {
|
||||
setMetricExpr(semanticModel.getRootPath(), m.getName(), m);
|
||||
});
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(dataSource.getDimensions())) {
|
||||
dataSource.getDimensions().stream().forEach(d -> {
|
||||
setDimension(semanticModel.getRootPath(), d.getName(), d);
|
||||
});
|
||||
}
|
||||
}
|
||||
return semanticModel;
|
||||
}
|
||||
|
||||
protected void setDimension(String rootPath, String bizName, Dimension dimension) {
|
||||
try {
|
||||
dimension.setDataType(DataType.UNKNOWN);
|
||||
SemanticModel oriSemanticModel = semanticSchemaManager.get(rootPath);
|
||||
if (Objects.nonNull(oriSemanticModel)) {
|
||||
for (List<Dimension> dimensions : oriSemanticModel.getDimensionMap().values()) {
|
||||
Optional<Dimension> dim = dimensions.stream()
|
||||
.filter(d -> d.getName().equalsIgnoreCase(bizName)).findFirst();
|
||||
if (dim.isPresent()) {
|
||||
dimension.setDataType(dim.get().getDataType());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("getMetricExpr {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void setMetricExpr(String rootPath, String bizName, Measure measure) {
|
||||
try {
|
||||
measure.setExpr(bizName);
|
||||
SemanticModel oriSemanticModel = semanticSchemaManager.get(rootPath);
|
||||
if (Objects.nonNull(oriSemanticModel)) {
|
||||
Optional<Metric> metric = oriSemanticModel.getMetrics()
|
||||
.stream().filter(m -> m.getName().equalsIgnoreCase(bizName)).findFirst();
|
||||
if (metric.isPresent()) {
|
||||
if (metric.get().getMetricTypeParams().getExpr().contains(getVariablePrefix())) {
|
||||
measure.setExpr(metric.get().getMetricTypeParams().getExpr());
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metric.get().getMetricTypeParams().getMeasures())) {
|
||||
String measureParam = metric.get().getMetricTypeParams().getMeasures().get(0).getName();
|
||||
for (DataSource dataSource : oriSemanticModel.getDatasourceMap().values()) {
|
||||
Optional<Measure> measureOpt = dataSource.getMeasures().stream()
|
||||
.filter(mm -> mm.getName().equalsIgnoreCase(measureParam)).findFirst();
|
||||
if (measureOpt.isPresent()) {
|
||||
measure.setAgg(measureOpt.get().getAgg());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("getMetricExpr {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void removeDefaultMetric(QueryStructReq queryStructReq, MetricReq metricReq) {
|
||||
// due to default metrics have no materialization
|
||||
if (CollectionUtils.isEmpty(queryStructReq.getAggregators()) && Objects.nonNull(metricReq)) {
|
||||
metricReq.setMetrics(new ArrayList<>());
|
||||
}
|
||||
}
|
||||
|
||||
protected void doSingleZipperSource(QueryStructReq queryStructReq, QueryStatement queryStatement) {
|
||||
// time field rewrite to start_ end_
|
||||
log.info("doSingleZipperSource {}", queryStatement);
|
||||
|
||||
if (CollectionUtils.isEmpty(queryStructReq.getAggregators()) && CollectionUtils.isEmpty(
|
||||
queryStructReq.getGroups()) && Objects.nonNull(queryStatement.getParseSqlReq())) {
|
||||
String sqlNew = queryStructUtils.generateZipperWhere(queryStatement, queryStructReq);
|
||||
log.info("doSingleZipperSource before[{}] after[{}]", queryStatement.getParseSqlReq().getSql(), sqlNew);
|
||||
queryStatement.getParseSqlReq().setSql(sqlNew);
|
||||
return;
|
||||
}
|
||||
MetricReq metricReq = queryStatement.getMetricReq();
|
||||
String where = queryStructUtils.generateZipperWhere(queryStructReq);
|
||||
if (!where.isEmpty() && Objects.nonNull(metricReq)) {
|
||||
log.info("doSingleZipperSource before[{}] after[{}]", metricReq.getWhere(), where);
|
||||
metricReq.setWhere(where);
|
||||
}
|
||||
}
|
||||
|
||||
protected void getTimeRanges(QueryStructReq queryStructReq, QueryStatement queryStatement) {
|
||||
queryStatement.setTimeRanges(queryStructUtils.getTimeRanges(queryStructReq));
|
||||
}
|
||||
|
||||
protected String getVariablePrefix() {
|
||||
return queryStructUtils.getVariablePrefix();
|
||||
}
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.ComponentFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -30,28 +31,35 @@ public class QueryParser {
|
||||
this.catalog = catalog;
|
||||
}
|
||||
|
||||
public QueryStatement logicSql(QueryStructReq queryStructReq) throws Exception {
|
||||
ParseSqlReq parseSqlReq = new ParseSqlReq();
|
||||
MetricReq metricReq = new MetricReq();
|
||||
public QueryStatement logicSql(QueryStatement queryStatement) throws Exception {
|
||||
QueryStructReq queryStructReq = queryStatement.getQueryStructReq();
|
||||
if (Objects.isNull(queryStatement.getParseSqlReq())) {
|
||||
queryStatement.setParseSqlReq(new ParseSqlReq());
|
||||
}
|
||||
if (Objects.isNull(queryStatement.getMetricReq())) {
|
||||
queryStatement.setMetricReq(new MetricReq());
|
||||
}
|
||||
log.info("SemanticConverter before [{}]", queryStructReq);
|
||||
for (SemanticConverter semanticConverter : ComponentFactory.getSemanticConverters()) {
|
||||
if (semanticConverter.accept(queryStructReq)) {
|
||||
if (semanticConverter.accept(queryStatement)) {
|
||||
log.info("SemanticConverter accept [{}]", semanticConverter.getClass().getName());
|
||||
semanticConverter.converter(catalog, queryStructReq, parseSqlReq, metricReq);
|
||||
semanticConverter.converter(catalog, queryStructReq, queryStatement.getParseSqlReq(),
|
||||
queryStatement.getMetricReq());
|
||||
}
|
||||
}
|
||||
log.info("SemanticConverter after {} {} {}", queryStructReq, metricReq, parseSqlReq);
|
||||
if (!parseSqlReq.getSql().isEmpty()) {
|
||||
return parser(parseSqlReq);
|
||||
log.info("SemanticConverter after {} {} {}", queryStructReq, queryStatement.getParseSqlReq(),
|
||||
queryStatement.getMetricReq());
|
||||
if (!queryStatement.getParseSqlReq().getSql().isEmpty()) {
|
||||
return parser(queryStatement.getParseSqlReq(), queryStatement);
|
||||
}
|
||||
|
||||
metricReq.setNativeQuery(queryStructReq.getQueryType().isNativeAggQuery());
|
||||
return parser(metricReq);
|
||||
queryStatement.getMetricReq().setNativeQuery(queryStructReq.getQueryType().isNativeAggQuery());
|
||||
return parser(queryStatement);
|
||||
|
||||
}
|
||||
|
||||
public QueryStatement parser(ParseSqlReq sqlCommend) {
|
||||
public QueryStatement parser(ParseSqlReq sqlCommend, QueryStatement queryStatement) {
|
||||
log.info("parser MetricReq [{}] ", sqlCommend);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
try {
|
||||
if (!CollectionUtils.isEmpty(sqlCommend.getTables())) {
|
||||
List<String[]> tables = new ArrayList<>();
|
||||
@@ -63,7 +71,10 @@ public class QueryParser {
|
||||
metricReq.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
|
||||
metricReq.setNativeQuery(!AggOption.isAgg(metricTable.getAggOption()));
|
||||
metricReq.setRootPath(sqlCommend.getRootPath());
|
||||
QueryStatement tableSql = parser(metricReq, metricTable.getAggOption());
|
||||
QueryStatement tableSql = new QueryStatement();
|
||||
tableSql.setIsS2SQL(false);
|
||||
tableSql.setMetricReq(metricReq);
|
||||
tableSql = parser(tableSql, metricTable.getAggOption());
|
||||
if (!tableSql.isOk()) {
|
||||
queryStatement.setErrMsg(String.format("parser table [%s] error [%s]", metricTable.getAlias(),
|
||||
tableSql.getErrMsg()));
|
||||
@@ -99,19 +110,19 @@ public class QueryParser {
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
public QueryStatement parser(MetricReq metricCommand) {
|
||||
return parser(metricCommand, AggOption.getAggregation(metricCommand.isNativeQuery()));
|
||||
public QueryStatement parser(QueryStatement queryStatement) {
|
||||
return parser(queryStatement, AggOption.getAggregation(queryStatement.getMetricReq().isNativeQuery()));
|
||||
}
|
||||
|
||||
public QueryStatement parser(MetricReq metricCommand, AggOption isAgg) {
|
||||
public QueryStatement parser(QueryStatement queryStatement, AggOption isAgg) {
|
||||
MetricReq metricCommand = queryStatement.getMetricReq();
|
||||
log.info("parser MetricReq [{}] isAgg [{}]", metricCommand, isAgg);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
if (metricCommand.getRootPath().isEmpty()) {
|
||||
queryStatement.setErrMsg("rootPath empty");
|
||||
return queryStatement;
|
||||
}
|
||||
try {
|
||||
queryStatement = ComponentFactory.getSqlParser().explain(metricCommand, isAgg, catalog);
|
||||
queryStatement = ComponentFactory.getSqlParser().explain(queryStatement, isAgg, catalog);
|
||||
return queryStatement;
|
||||
} catch (Exception e) {
|
||||
queryStatement.setErrMsg(e.getMessage());
|
||||
|
||||
@@ -4,10 +4,11 @@ import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
|
||||
public interface SemanticConverter {
|
||||
|
||||
boolean accept(QueryStructReq queryStructCmd);
|
||||
boolean accept(QueryStatement queryStatement);
|
||||
|
||||
void converter(Catalog catalog, QueryStructReq queryStructCmd, ParseSqlReq sqlCommend, MetricReq metricCommand)
|
||||
throws Exception;
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
package com.tencent.supersonic.semantic.query.parser;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.query.enums.AggOption;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
|
||||
public interface SqlParser {
|
||||
QueryStatement explain(MetricReq metricReq, AggOption aggOption, Catalog catalog) throws Exception;
|
||||
|
||||
QueryStatement explain(QueryStatement queryStatement, AggOption aggOption, Catalog catalog) throws Exception;
|
||||
}
|
||||
|
||||
@@ -20,8 +20,8 @@ public class CalciteSqlParser implements SqlParser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryStatement explain(MetricReq metricReq, AggOption isAgg, Catalog catalog) throws Exception {
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
public QueryStatement explain(QueryStatement queryStatement, AggOption isAgg, Catalog catalog) throws Exception {
|
||||
MetricReq metricReq = queryStatement.getMetricReq();
|
||||
SemanticModel semanticModel = semanticSchemaManager.get(metricReq.getRootPath());
|
||||
if (semanticModel == null) {
|
||||
queryStatement.setErrMsg("semanticSchema not found");
|
||||
|
||||
@@ -20,6 +20,7 @@ import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.DimensionTimeTypeParams;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.JoinRelation;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Materialization.TimePartType;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.MetricTypeParams;
|
||||
@@ -62,10 +63,6 @@ public class SemanticSchemaManager {
|
||||
public SemanticModel reload(String rootPath) {
|
||||
SemanticModel semanticModel = new SemanticModel();
|
||||
semanticModel.setRootPath(rootPath);
|
||||
//Map<Long, String> modelFullPathMap = catalog.getModelFullPath();
|
||||
//log.info("modelFullPathMap {}", modelFullPathMap);
|
||||
//Set<Long> modelIds = modelFullPathMap.entrySet().stream().filter(e -> e.getValue().startsWith(rootPath))
|
||||
// .map(Entry::getKey).collect(Collectors.toSet());
|
||||
Set<Long> modelIds = Arrays.stream(rootPath.split(",")).map(s -> Long.parseLong(s.trim()))
|
||||
.collect(Collectors.toSet());
|
||||
if (modelIds.isEmpty()) {
|
||||
@@ -122,6 +119,9 @@ public class SemanticSchemaManager {
|
||||
.name(d.getName()).tableQuery(d.getTableQuery()).identifiers(getIdentify(d.getIdentifiers()))
|
||||
.measures(getMeasures(d.getMeasures())).dimensions(getDimensions(d.getDimensions())).build();
|
||||
datasource.setAggTime(getDataSourceAggTime(datasource.getDimensions()));
|
||||
if (Objects.nonNull(d.getModelSourceTypeEnum())) {
|
||||
datasource.setTimePartType(TimePartType.of(d.getModelSourceTypeEnum().name()));
|
||||
}
|
||||
return datasource;
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Identify.Type;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.JoinRelation;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Materialization.TimePartType;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Metric;
|
||||
@@ -18,6 +17,17 @@ import com.tencent.supersonic.semantic.query.parser.calcite.sql.node.FilterNode;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.node.IdentifyNode;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.node.MetricNode;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.sql.node.SemanticNode;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.JoinConditionType;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
@@ -30,21 +40,6 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
public class JoinRender extends Renderer {
|
||||
|
||||
@@ -259,14 +254,26 @@ public class JoinRender extends Renderer {
|
||||
throws Exception {
|
||||
SqlNode condition = getCondition(leftTable, tableView, dataSource, schema, scope);
|
||||
SqlLiteral sqlLiteral = SemanticNode.getJoinSqlLiteral("");
|
||||
if (!TimePartType.ZIPPER.equals(leftTable.getDataSource().getTimePartType()) && !TimePartType.ZIPPER.equals(
|
||||
JoinRelation matchJoinRelation = getMatchJoinRelation(before, tableView, schema);
|
||||
SqlNode joinRelationCondition = null;
|
||||
if (!CollectionUtils.isEmpty(matchJoinRelation.getJoinCondition())) {
|
||||
sqlLiteral = SemanticNode.getJoinSqlLiteral(matchJoinRelation.getJoinType());
|
||||
joinRelationCondition = getCondition(matchJoinRelation, scope);
|
||||
condition = joinRelationCondition;
|
||||
}
|
||||
if (TimePartType.ZIPPER.equals(leftTable.getDataSource().getTimePartType()) || TimePartType.ZIPPER.equals(
|
||||
tableView.getDataSource().getTimePartType())) {
|
||||
JoinRelation matchJoinRelation = getMatchJoinRelation(before, tableView, schema);
|
||||
if (!CollectionUtils.isEmpty(matchJoinRelation.getJoinCondition())) {
|
||||
sqlLiteral = SemanticNode.getJoinSqlLiteral(matchJoinRelation.getJoinType());
|
||||
condition = getCondition(matchJoinRelation, scope);
|
||||
SqlNode zipperCondition = getZipperCondition(leftTable, tableView, dataSource, schema, scope);
|
||||
if (Objects.nonNull(joinRelationCondition)) {
|
||||
condition = new SqlBasicCall(
|
||||
SqlStdOperatorTable.AND,
|
||||
new ArrayList<>(Arrays.asList(zipperCondition, joinRelationCondition)),
|
||||
SqlParserPos.ZERO, null);
|
||||
} else {
|
||||
condition = zipperCondition;
|
||||
}
|
||||
}
|
||||
|
||||
return new SqlJoin(
|
||||
SqlParserPos.ZERO,
|
||||
left,
|
||||
@@ -324,10 +331,7 @@ public class JoinRender extends Renderer {
|
||||
|
||||
private SqlNode getCondition(TableView left, TableView right, DataSource dataSource, SemanticSchema schema,
|
||||
SqlValidatorScope scope) throws Exception {
|
||||
if (TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) || TimePartType.ZIPPER.equals(
|
||||
right.getDataSource().getTimePartType())) {
|
||||
return getZipperCondition(left, right, dataSource, schema, scope);
|
||||
}
|
||||
|
||||
Set<String> selectLeft = SemanticNode.getSelect(left.getTable());
|
||||
Set<String> selectRight = SemanticNode.getSelect(right.getTable());
|
||||
selectLeft.retainAll(selectRight);
|
||||
@@ -368,67 +372,6 @@ public class JoinRender extends Renderer {
|
||||
return condition;
|
||||
}
|
||||
|
||||
private List<DataSource> getOrderSource(List<DataSource> dataSources) throws Exception {
|
||||
if (CollectionUtils.isEmpty(dataSources) || dataSources.size() <= 2) {
|
||||
return dataSources;
|
||||
}
|
||||
Map<String, Set<String>> next = new HashMap<>();
|
||||
Map<String, Boolean> visited = new HashMap<>();
|
||||
Map<String, List<Identify>> dataSourceIdentifies = new HashMap<>();
|
||||
dataSources.stream().forEach(d -> {
|
||||
next.put(d.getName(), new HashSet<>());
|
||||
visited.put(d.getName(), false);
|
||||
dataSourceIdentifies.put(d.getName(), d.getIdentifiers());
|
||||
});
|
||||
int cnt = dataSources.size();
|
||||
List<Map.Entry<String, List<Identify>>> dataSourceIdentifyList = dataSourceIdentifies.entrySet().stream()
|
||||
.collect(Collectors.toList());
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
for (int j = i + 1; j < cnt; j++) {
|
||||
Set<String> primaries = IdentifyNode.getIdentifyNames(dataSourceIdentifyList.get(i).getValue(),
|
||||
Type.PRIMARY);
|
||||
Set<String> foreign = IdentifyNode.getIdentifyNames(dataSourceIdentifyList.get(i).getValue(),
|
||||
Type.FOREIGN);
|
||||
Set<String> nextPrimaries = IdentifyNode.getIdentifyNames(dataSourceIdentifyList.get(j).getValue(),
|
||||
Type.PRIMARY);
|
||||
Set<String> nextForeign = IdentifyNode.getIdentifyNames(dataSourceIdentifyList.get(j).getValue(),
|
||||
Type.FOREIGN);
|
||||
Set<String> nextAll = new HashSet<>();
|
||||
nextAll.addAll(nextPrimaries);
|
||||
nextAll.addAll(nextForeign);
|
||||
primaries.retainAll(nextPrimaries);
|
||||
foreign.retainAll(nextPrimaries);
|
||||
if (primaries.size() > 0 || foreign.size() > 0) {
|
||||
next.get(dataSourceIdentifyList.get(i).getKey()).add(dataSourceIdentifyList.get(j).getKey());
|
||||
next.get(dataSourceIdentifyList.get(j).getKey()).add(dataSourceIdentifyList.get(i).getKey());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Queue<String> paths = new ArrayDeque<>();
|
||||
for (String id : visited.keySet()) {
|
||||
if (!visited.get(id)) {
|
||||
joinOrder(cnt, id, next, paths, visited);
|
||||
if (paths.size() >= cnt) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (paths.size() < cnt) {
|
||||
throw new Exception("datasource cant join,pls check identify :" + dataSources.stream()
|
||||
.map(d -> d.getName()).collect(
|
||||
Collectors.joining(",")));
|
||||
}
|
||||
List<String> orderList = new ArrayList<>(paths);
|
||||
Collections.sort(dataSources, new Comparator<DataSource>() {
|
||||
@Override
|
||||
public int compare(DataSource o1, DataSource o2) {
|
||||
return orderList.indexOf(o1.getName()) - orderList.indexOf(o2.getName());
|
||||
}
|
||||
});
|
||||
return dataSources;
|
||||
}
|
||||
|
||||
private static void joinOrder(int cnt, String id, Map<String, Set<String>> next, Queue<String> orders,
|
||||
Map<String, Boolean> visited) {
|
||||
visited.put(id, true);
|
||||
@@ -482,8 +425,6 @@ public class JoinRender extends Renderer {
|
||||
String startTime = "";
|
||||
String endTime = "";
|
||||
String dateTime = "";
|
||||
List<String> primaryZipper = new ArrayList<>();
|
||||
List<String> primaryPartition = new ArrayList<>();
|
||||
|
||||
Optional<Dimension> startTimeOp = (TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType()) ? left
|
||||
: right).getDataSource().getDimensions().stream()
|
||||
@@ -502,12 +443,8 @@ public class JoinRender extends Renderer {
|
||||
startTime = zipper.getAlias() + "." + startTimeOp.get().getName();
|
||||
endTime = zipper.getAlias() + "." + endTimeOp.get().getName();
|
||||
dateTime = partMetric.getAlias() + "." + partTime.get().getName();
|
||||
primaryZipper = zipper.getDataSource().getIdentifiers().stream().map(i -> i.getName()).collect(
|
||||
Collectors.toList());
|
||||
primaryPartition = partMetric.getDataSource().getIdentifiers().stream().map(i -> i.getName()).collect(
|
||||
Collectors.toList());
|
||||
}
|
||||
primaryZipper.retainAll(primaryPartition);
|
||||
|
||||
condition =
|
||||
new SqlBasicCall(
|
||||
SqlStdOperatorTable.AND,
|
||||
@@ -522,20 +459,6 @@ public class JoinRender extends Renderer {
|
||||
SqlParserPos.ZERO, null))),
|
||||
SqlParserPos.ZERO, null);
|
||||
|
||||
for (String p : primaryZipper) {
|
||||
List<SqlNode> ons = new ArrayList<>();
|
||||
ons.add(SemanticNode.parse(left.getAlias() + "." + p, scope));
|
||||
ons.add(SemanticNode.parse(right.getAlias() + "." + p, scope));
|
||||
SqlNode addCondition = new SqlBasicCall(
|
||||
SqlStdOperatorTable.EQUALS,
|
||||
ons,
|
||||
SqlParserPos.ZERO, null);
|
||||
condition = new SqlBasicCall(
|
||||
SqlStdOperatorTable.AND,
|
||||
new ArrayList<>(Arrays.asList(condition, addCondition)),
|
||||
SqlParserPos.ZERO, null);
|
||||
}
|
||||
|
||||
}
|
||||
return condition;
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Materialization.TimePartType;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.semantic.query.parser.calcite.schema.SemanticSchema;
|
||||
@@ -55,7 +56,7 @@ public class SourceRender extends Renderer {
|
||||
datasource, scope,
|
||||
schema, nonAgg);
|
||||
}
|
||||
|
||||
addTimeDimension(datasource, queryDimensions);
|
||||
for (String metric : queryMetrics) {
|
||||
MetricNode metricNode = buildMetricNode(metric, datasource, scope, schema, nonAgg, alias);
|
||||
if (!metricNode.getAggNode().isEmpty()) {
|
||||
@@ -287,14 +288,26 @@ public class SourceRender extends Renderer {
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void expandWhere(MetricReq metricCommand, TableView tableView, SqlValidatorScope scope)
|
||||
throws Exception {
|
||||
if (metricCommand.getWhere() != null && !metricCommand.getWhere().isEmpty()) {
|
||||
SqlNode sqlNode = SemanticNode.parse(metricCommand.getWhere(), scope);
|
||||
Set<String> fieldWhere = new HashSet<>();
|
||||
FilterNode.getFilterField(sqlNode, fieldWhere);
|
||||
//super.tableView.getFilter().add(sqlNode);
|
||||
tableView.getFilter().add(sqlNode);
|
||||
private static void addTimeDimension(DataSource dataSource, List<String> queryDimension) {
|
||||
if (TimePartType.ZIPPER.equals(dataSource.getTimePartType())) {
|
||||
Optional<Dimension> startTimeOp = dataSource.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_START)).findFirst();
|
||||
Optional<Dimension> endTimeOp = dataSource.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_END)).findFirst();
|
||||
if (startTimeOp.isPresent() && !queryDimension.contains(startTimeOp.get().getName())) {
|
||||
queryDimension.add(startTimeOp.get().getName());
|
||||
}
|
||||
if (endTimeOp.isPresent() && !queryDimension.contains(endTimeOp.get().getName())) {
|
||||
queryDimension.add(endTimeOp.get().getName());
|
||||
}
|
||||
} else {
|
||||
Optional<Dimension> timeOp = dataSource.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType())).findFirst();
|
||||
if (timeOp.isPresent() && !queryDimension.contains(timeOp.get().getName())) {
|
||||
queryDimension.add(timeOp.get().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,20 +15,20 @@ import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.model.domain.pojo.EngineTypeEnum;
|
||||
import com.tencent.supersonic.semantic.query.parser.SemanticConverter;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.service.SemanticQueryEngine;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import com.tencent.supersonic.semantic.query.utils.SqlGenerateUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
|
||||
@Component("CalculateAggConverter")
|
||||
@@ -94,7 +94,11 @@ public class CalculateAggConverter implements SemanticConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean accept(QueryStructReq queryStructCmd) {
|
||||
public boolean accept(QueryStatement queryStatement) {
|
||||
if (Objects.isNull(queryStatement.getQueryStructReq()) || queryStatement.getIsS2SQL()) {
|
||||
return false;
|
||||
}
|
||||
QueryStructReq queryStructCmd = queryStatement.getQueryStructReq();
|
||||
if (queryStructCmd.getQueryType().isNativeAggQuery()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1,31 +1,36 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.convert;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.common.pojo.enums.FilterOperatorEnum;
|
||||
import com.tencent.supersonic.common.pojo.Filter;
|
||||
import com.tencent.supersonic.common.pojo.enums.FilterOperatorEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.parser.SemanticConverter;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Component("DefaultDimValueConverter")
|
||||
public class DefaultDimValueConverter implements SemanticConverter {
|
||||
|
||||
@Override
|
||||
public boolean accept(QueryStructReq queryStructCmd) {
|
||||
public boolean accept(QueryStatement queryStatement) {
|
||||
if (Objects.isNull(queryStatement.getQueryStructReq()) || queryStatement.getIsS2SQL()) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void converter(Catalog catalog, QueryStructReq queryStructCmd,
|
||||
ParseSqlReq sqlCommend, MetricReq metricCommand) throws Exception {
|
||||
ParseSqlReq sqlCommend, MetricReq metricCommand) throws Exception {
|
||||
List<DimensionResp> dimensionResps = catalog.getDimensions(queryStructCmd.getModelIds());
|
||||
//dimension which has default values
|
||||
dimensionResps = dimensionResps.stream()
|
||||
|
||||
@@ -1,21 +1,23 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.convert;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.Filter;
|
||||
import com.tencent.supersonic.common.pojo.exception.InvalidArgumentException;
|
||||
import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.common.pojo.Filter;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.parser.SemanticConverter;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Component("MetricCheckConverter")
|
||||
@@ -23,7 +25,11 @@ import java.util.stream.Collectors;
|
||||
public class MetricCheckConverter implements SemanticConverter {
|
||||
|
||||
@Override
|
||||
public boolean accept(QueryStructReq queryStructCmd) {
|
||||
public boolean accept(QueryStatement queryStatement) {
|
||||
if (Objects.isNull(queryStatement.getQueryStructReq()) || queryStatement.getIsS2SQL()) {
|
||||
return false;
|
||||
}
|
||||
QueryStructReq queryStructCmd = queryStatement.getQueryStructReq();
|
||||
if (queryStructCmd.getQueryType().isNativeAggQuery()) {
|
||||
return false;
|
||||
}
|
||||
@@ -41,9 +47,9 @@ public class MetricCheckConverter implements SemanticConverter {
|
||||
List<String> dimensionFilterBizNames = queryStructReq.getDimensionFilters().stream()
|
||||
.map(Filter::getBizName).collect(Collectors.toList());
|
||||
List<MetricResp> metricToQuery = metricResps.stream().filter(metricResp ->
|
||||
metricBizNames.contains(metricResp.getBizName())).collect(Collectors.toList());
|
||||
metricBizNames.contains(metricResp.getBizName())).collect(Collectors.toList());
|
||||
List<Long> dimensionToFilter = dimensionResps.stream().filter(dimensionResp ->
|
||||
dimensionFilterBizNames.contains(dimensionResp.getBizName()))
|
||||
dimensionFilterBizNames.contains(dimensionResp.getBizName()))
|
||||
.map(DimensionResp::getId).collect(Collectors.toList());
|
||||
for (MetricResp metricResp : metricToQuery) {
|
||||
Set<Long> necessaryDimensionIds = metricResp.getNecessaryDimensionIds();
|
||||
|
||||
@@ -7,7 +7,9 @@ import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.parser.SemanticConverter;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
@@ -30,8 +32,11 @@ public class ParserDefaultConverter implements SemanticConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean accept(QueryStructReq queryStructCmd) {
|
||||
return !calculateCoverterAgg.accept(queryStructCmd);
|
||||
public boolean accept(QueryStatement queryStatement) {
|
||||
if (Objects.isNull(queryStatement.getQueryStructReq()) || queryStatement.getIsS2SQL()) {
|
||||
return false;
|
||||
}
|
||||
return !calculateCoverterAgg.accept(queryStatement);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -3,8 +3,8 @@ package com.tencent.supersonic.semantic.query.parser.convert;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.Aggregator;
|
||||
import com.tencent.supersonic.common.pojo.Constants;
|
||||
import com.tencent.supersonic.common.pojo.enums.QueryType;
|
||||
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
|
||||
import com.tencent.supersonic.common.pojo.enums.QueryType;
|
||||
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserReplaceHelper;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserSelectFunctionHelper;
|
||||
@@ -26,14 +26,6 @@ import com.tencent.supersonic.semantic.model.domain.pojo.EngineTypeEnum;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.service.SemanticQueryEngine;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
@@ -42,6 +34,13 @@ import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
@@ -118,7 +117,11 @@ public class QueryReqConverter {
|
||||
queryStructCmd.setModelIds(databaseReq.getModelIds().stream().collect(Collectors.toSet()));
|
||||
queryStructCmd.setQueryType(getQueryType(aggOption));
|
||||
log.info("QueryReqConverter queryStructCmd[{}]", queryStructCmd);
|
||||
QueryStatement queryStatement = parserService.physicalSql(queryStructCmd, result);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setParseSqlReq(result);
|
||||
queryStatement.setIsS2SQL(true);
|
||||
queryStatement = parserService.plan(queryStatement);
|
||||
queryStatement.setSql(String.format(SqlExecuteReq.LIMIT_WRAPPER, queryStatement.getSql()));
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
package com.tencent.supersonic.semantic.query.parser.convert;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.model.enums.ModelSourceTypeEnum;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.query.parser.SemanticConverter;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* correct the Query parameters when the model source type is zipper
|
||||
*/
|
||||
@Component("ZipperModelConverter")
|
||||
@Slf4j
|
||||
public class ZipperModelConverter implements SemanticConverter {
|
||||
|
||||
private final QueryStructUtils queryStructUtils;
|
||||
private final Catalog catalog;
|
||||
|
||||
public ZipperModelConverter(QueryStructUtils queryStructUtils,
|
||||
Catalog catalog) {
|
||||
this.queryStructUtils = queryStructUtils;
|
||||
this.catalog = catalog;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean accept(QueryStatement queryStatement) {
|
||||
if (Objects.isNull(queryStatement.getQueryStructReq())) {
|
||||
return false;
|
||||
}
|
||||
QueryStructReq queryStructCmd = queryStatement.getQueryStructReq();
|
||||
List<ModelResp> modelRespList = catalog.getModelList(queryStructCmd.getModelIds());
|
||||
if (!CollectionUtils.isEmpty(modelRespList)) {
|
||||
// all data sources are zipper tables
|
||||
long zipperCnt = modelRespList.stream().filter(m -> ModelSourceTypeEnum.isZipper(m.getSourceType()))
|
||||
.count();
|
||||
return modelRespList.size() == zipperCnt;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void converter(Catalog catalog, QueryStructReq queryStructCmd, ParseSqlReq sqlCommend,
|
||||
MetricReq metricCommand) throws Exception {
|
||||
doSingleZipperSource(queryStructCmd, sqlCommend, metricCommand);
|
||||
}
|
||||
|
||||
protected void doSingleZipperSource(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend,
|
||||
MetricReq metricCommand) {
|
||||
// all data sources are zipper tables
|
||||
// request time field rewrite to start_ end_
|
||||
if (!sqlCommend.getSql().isEmpty()) {
|
||||
String sqlNew = queryStructUtils.generateZipperWhere(queryStructCmd, sqlCommend);
|
||||
log.info("doSingleZipperSource before[{}] after[{}]", sqlCommend.getSql(), sqlNew);
|
||||
sqlCommend.setSql(sqlNew);
|
||||
} else {
|
||||
String where = queryStructUtils.generateZipperWhere(queryStructCmd);
|
||||
if (!where.isEmpty() && Objects.nonNull(metricCommand)) {
|
||||
log.info("doSingleZipperSource before[{}] after[{}]", metricCommand.getWhere(), where);
|
||||
metricCommand.setWhere(where);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package com.tencent.supersonic.semantic.query.persistence.pojo;
|
||||
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import lombok.Data;
|
||||
import org.apache.commons.lang3.tuple.ImmutablePair;
|
||||
|
||||
@@ -15,9 +16,11 @@ public class QueryStatement {
|
||||
private String sourceId = "";
|
||||
private String errMsg = "";
|
||||
private Boolean ok;
|
||||
private QueryStructReq queryStructReq;
|
||||
private MetricReq metricReq;
|
||||
private ParseSqlReq parseSqlReq;
|
||||
private Integer status = 0;
|
||||
private Boolean isS2SQL = false;
|
||||
private List<ImmutablePair<String, String>> timeRanges;
|
||||
|
||||
public boolean isOk() {
|
||||
|
||||
@@ -4,8 +4,6 @@ import com.github.pagehelper.PageInfo;
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.auth.api.authentication.utils.UserHolder;
|
||||
import com.tencent.supersonic.common.pojo.enums.AuthType;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationSourceReq;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationSourceResp;
|
||||
import com.tencent.supersonic.semantic.api.model.request.ModelSchemaFilterReq;
|
||||
import com.tencent.supersonic.semantic.api.model.request.PageDimensionReq;
|
||||
import com.tencent.supersonic.semantic.api.model.request.PageMetricReq;
|
||||
@@ -14,8 +12,10 @@ import com.tencent.supersonic.semantic.api.model.response.DomainResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelSchemaResp;
|
||||
import com.tencent.supersonic.semantic.query.service.MaterializationService;
|
||||
import com.tencent.supersonic.semantic.query.service.SchemaService;
|
||||
import java.util.List;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
@@ -24,39 +24,33 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.util.List;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/api/semantic/schema")
|
||||
public class SchemaController {
|
||||
|
||||
@Autowired
|
||||
private SchemaService schemaService;
|
||||
@Autowired
|
||||
private MaterializationService materializationService;
|
||||
|
||||
@PostMapping
|
||||
public List<ModelSchemaResp> fetchModelSchema(@RequestBody ModelSchemaFilterReq filter,
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) {
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) {
|
||||
User user = UserHolder.findUser(request, response);
|
||||
return schemaService.fetchModelSchema(filter, user);
|
||||
}
|
||||
|
||||
@GetMapping("/domain/list")
|
||||
public List<DomainResp> getDomainList(HttpServletRequest request,
|
||||
HttpServletResponse response) {
|
||||
HttpServletResponse response) {
|
||||
User user = UserHolder.findUser(request, response);
|
||||
return schemaService.getDomainList(user);
|
||||
}
|
||||
|
||||
@GetMapping("/model/list")
|
||||
public List<ModelResp> getModelList(@RequestParam("domainId") Long domainId,
|
||||
@RequestParam("authType") String authType,
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) {
|
||||
@RequestParam("authType") String authType,
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) {
|
||||
User user = UserHolder.findUser(request, response);
|
||||
return schemaService.getModelList(user, AuthType.valueOf(authType), domainId);
|
||||
}
|
||||
@@ -77,16 +71,4 @@ public class SchemaController {
|
||||
return schemaService.queryMetric(pageMetricCmd, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* task api
|
||||
*/
|
||||
|
||||
@PostMapping("/materialization/source")
|
||||
MaterializationSourceResp queryDataSource(@RequestBody MaterializationSourceReq materializationSourceReq,
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) throws Exception {
|
||||
User user = UserHolder.findUser(request, response);
|
||||
return materializationService.getMaterializationDataSource(materializationSourceReq, user);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
package com.tencent.supersonic.semantic.query.service;
|
||||
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationSourceReq;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationSourceResp;
|
||||
|
||||
public interface MaterializationService {
|
||||
|
||||
MaterializationSourceResp getMaterializationDataSource(MaterializationSourceReq materializationSourceReq,
|
||||
User user) throws Exception;
|
||||
}
|
||||
@@ -1,140 +0,0 @@
|
||||
package com.tencent.supersonic.semantic.query.service;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.semantic.api.materialization.request.MaterializationSourceReq;
|
||||
import com.tencent.supersonic.semantic.api.materialization.response.MaterializationSourceResp;
|
||||
import com.tencent.supersonic.semantic.api.model.pojo.Measure;
|
||||
import com.tencent.supersonic.semantic.api.model.request.ModelSchemaFilterReq;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MeasureResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.MetricReq;
|
||||
import com.tencent.supersonic.semantic.materialization.domain.MaterializationConfService;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.model.domain.ModelService;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryStructUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Service("MaterializationService")
|
||||
@Slf4j
|
||||
public class MaterializationServiceImpl implements MaterializationService {
|
||||
|
||||
protected final MaterializationConfService materializationConfService;
|
||||
protected final ModelService modelService;
|
||||
protected final Catalog catalog;
|
||||
protected final QueryStructUtils queryStructUtils;
|
||||
protected final QueryService queryService;
|
||||
|
||||
public MaterializationServiceImpl(
|
||||
MaterializationConfService materializationConfService,
|
||||
ModelService modelService,
|
||||
Catalog catalog, QueryStructUtils queryStructUtils,
|
||||
QueryService queryService) {
|
||||
this.materializationConfService = materializationConfService;
|
||||
this.modelService = modelService;
|
||||
this.catalog = catalog;
|
||||
this.queryStructUtils = queryStructUtils;
|
||||
this.queryService = queryService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MaterializationSourceResp getMaterializationDataSource(MaterializationSourceReq materializationSourceReq,
|
||||
User user) throws Exception {
|
||||
|
||||
if (materializationSourceReq.getMaterializationId() <= 0 || materializationSourceReq.getDataSourceId() <= 0) {
|
||||
throw new Exception("MaterializationId and DataSourceId are must");
|
||||
}
|
||||
Long materializationId = materializationSourceReq.getMaterializationId();
|
||||
List<MaterializationSourceResp> materializationList = materializationConfService.getMaterializationSourceResp(
|
||||
materializationId);
|
||||
if (!CollectionUtils.isEmpty(materializationList)) {
|
||||
Optional<MaterializationSourceResp> materializationSourceRespOpt = materializationList.stream()
|
||||
.filter(m -> m.getDataSourceId().equals(materializationSourceReq.getDataSourceId())).findFirst();
|
||||
if (materializationSourceRespOpt.isPresent()) {
|
||||
MaterializationSourceResp materializationSourceResp = materializationSourceRespOpt.get();
|
||||
Set<String> dimensionFields = new HashSet<>();
|
||||
Set<String> metricFields = new HashSet<>();
|
||||
ModelSchemaFilterReq modelFilter = new ModelSchemaFilterReq();
|
||||
modelFilter.setModelIds(Arrays.asList(materializationSourceResp.getModelId()));
|
||||
List<ModelSchemaResp> modelSchemaRespList = modelService.fetchModelSchema(modelFilter);
|
||||
//todo
|
||||
List<MeasureResp> measureRespList = modelService.getMeasureListOfModel(
|
||||
Lists.newArrayList(materializationSourceResp.getModelId()));
|
||||
modelSchemaRespList.stream().forEach(m -> {
|
||||
m.getDimensions().stream()
|
||||
.filter(mm -> mm.getModelId().equals(materializationSourceReq.getDataSourceId())
|
||||
&& materializationSourceResp.getDimensions().keySet().contains(mm.getId())
|
||||
).forEach(mm -> {
|
||||
dimensionFields.add(mm.getBizName());
|
||||
});
|
||||
for (MetricSchemaResp metricSchemaResp : m.getMetrics()) {
|
||||
if (!materializationSourceResp.getMetrics().keySet().contains(metricSchemaResp.getId())) {
|
||||
continue;
|
||||
}
|
||||
Long dataSourceId = 0L;
|
||||
for (Measure measure : metricSchemaResp.getTypeParams().getMeasures()) {
|
||||
dataSourceId = materializationConfService.getSourceIdByMeasure(measureRespList,
|
||||
measure.getBizName());
|
||||
if (dataSourceId > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (dataSourceId.equals(materializationSourceReq.getDataSourceId())) {
|
||||
metricFields.addAll(getMetric(metricSchemaResp));
|
||||
}
|
||||
}
|
||||
});
|
||||
if (!dimensionFields.isEmpty() || !metricFields.isEmpty()) {
|
||||
materializationSourceResp.setFields(new ArrayList<>(dimensionFields));
|
||||
materializationSourceResp.getFields().addAll(metricFields);
|
||||
|
||||
MetricReq metricReq = new MetricReq();
|
||||
metricReq.setRootPath(catalog.getModelFullPath(materializationSourceResp.getModelId()));
|
||||
metricReq.setMetrics(new ArrayList<>(metricFields));
|
||||
metricReq.setDimensions(new ArrayList<>(dimensionFields));
|
||||
metricReq.getDimensions().add(materializationSourceResp.getDateInfo());
|
||||
metricReq.getDimensions().add(materializationSourceResp.getEntities());
|
||||
metricReq.setNativeQuery(true);
|
||||
if (CollectionUtils.isEmpty(metricReq.getMetrics())) {
|
||||
String internalMetricName = queryStructUtils.generateInternalMetricName(
|
||||
materializationSourceResp.getModelId(), metricReq.getDimensions());
|
||||
metricReq.getMetrics().add(internalMetricName);
|
||||
}
|
||||
try {
|
||||
QueryStatement queryStatement = queryService.parseMetricReq(metricReq);
|
||||
if (queryStatement.isOk()) {
|
||||
materializationSourceResp.setSql(queryStatement.getSql());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("getMaterializationDataSource sql error [{}]", e);
|
||||
}
|
||||
}
|
||||
return materializationSourceResp;
|
||||
}
|
||||
}
|
||||
throw new Exception("cant find MaterializationSource");
|
||||
}
|
||||
|
||||
protected List<String> getMetric(MetricSchemaResp metricSchemaResp) {
|
||||
if (Objects.nonNull(metricSchemaResp.getTypeParams()) && metricSchemaResp.getTypeParams().getExpr()
|
||||
.contains(queryStructUtils.getVariablePrefix())) {
|
||||
return metricSchemaResp.getTypeParams().getMeasures().stream().map(m -> m.getBizName())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
return Arrays.asList(metricSchemaResp.getBizName());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -31,17 +31,17 @@ import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.utils.QueryUtils;
|
||||
import com.tencent.supersonic.semantic.query.utils.S2SQLPermissionAnnotation;
|
||||
import com.tencent.supersonic.semantic.query.utils.StatUtils;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
|
||||
@Service
|
||||
@@ -123,7 +123,10 @@ public class QueryServiceImpl implements QueryService {
|
||||
}
|
||||
}
|
||||
StatUtils.get().setUseResultCache(false);
|
||||
QueryStatement queryStatement = semanticQueryEngine.plan(queryStructCmd);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setIsS2SQL(false);
|
||||
queryStatement = semanticQueryEngine.plan(queryStatement);
|
||||
QueryExecutor queryExecutor = semanticQueryEngine.route(queryStatement);
|
||||
if (queryExecutor != null) {
|
||||
queryResultWithColumns = semanticQueryEngine.execute(queryStatement);
|
||||
@@ -183,7 +186,10 @@ public class QueryServiceImpl implements QueryService {
|
||||
private QueryStatement getQueryStatementByMultiStruct(QueryMultiStructReq queryMultiStructReq) throws Exception {
|
||||
List<QueryStatement> sqlParsers = new ArrayList<>();
|
||||
for (QueryStructReq queryStructCmd : queryMultiStructReq.getQueryStructReqs()) {
|
||||
QueryStatement queryStatement = semanticQueryEngine.plan(queryStructCmd);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setIsS2SQL(false);
|
||||
queryStatement = semanticQueryEngine.plan(queryStatement);
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
sqlParsers.add(queryStatement);
|
||||
}
|
||||
@@ -229,7 +235,10 @@ public class QueryServiceImpl implements QueryService {
|
||||
return getExplainResp(queryStatement);
|
||||
}
|
||||
if (QueryTypeEnum.STRUCT.equals(queryTypeEnum) && queryReq instanceof QueryStructReq) {
|
||||
QueryStatement queryStatement = semanticQueryEngine.plan((QueryStructReq) queryReq);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq((QueryStructReq) queryReq);
|
||||
queryStatement.setIsS2SQL(false);
|
||||
queryStatement = semanticQueryEngine.plan(queryStatement);
|
||||
return getExplainResp(queryStatement);
|
||||
}
|
||||
if (QueryTypeEnum.STRUCT.equals(queryTypeEnum) && queryReq instanceof QueryMultiStructReq) {
|
||||
|
||||
@@ -9,7 +9,7 @@ import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
|
||||
public interface SemanticQueryEngine {
|
||||
|
||||
QueryStatement plan(QueryStructReq queryStructCmd) throws Exception;
|
||||
QueryStatement plan(QueryStatement queryStatement) throws Exception;
|
||||
|
||||
QueryExecutor route(QueryStatement queryStatement);
|
||||
|
||||
|
||||
@@ -43,12 +43,12 @@ public class SemanticQueryEngineImpl implements SemanticQueryEngine {
|
||||
return queryResultWithColumns;
|
||||
}
|
||||
|
||||
public QueryStatement plan(QueryStructReq queryStructCmd) throws Exception {
|
||||
QueryStatement queryStatement = queryParser.logicSql(queryStructCmd);
|
||||
public QueryStatement plan(QueryStatement queryStatement) throws Exception {
|
||||
queryStatement = queryParser.logicSql(queryStatement);
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
queryStatement.setModelIds(queryStructCmd.getModelIds());
|
||||
queryStatement.setModelIds(queryStatement.getQueryStructReq().getModelIds());
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
return optimize(queryStructCmd, queryStatement);
|
||||
return optimize(queryStatement.getQueryStructReq(), queryStatement);
|
||||
}
|
||||
|
||||
public QueryStatement optimize(QueryStructReq queryStructCmd, QueryStatement queryStatement) {
|
||||
@@ -69,11 +69,19 @@ public class SemanticQueryEngineImpl implements SemanticQueryEngine {
|
||||
|
||||
@Override
|
||||
public QueryStatement physicalSql(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend) throws Exception {
|
||||
return optimize(queryStructCmd, queryParser.parser(sqlCommend));
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setParseSqlReq(sqlCommend);
|
||||
queryStatement.setIsS2SQL(true);
|
||||
return optimize(queryStructCmd, queryParser.parser(sqlCommend, queryStatement));
|
||||
}
|
||||
|
||||
public QueryStatement physicalSql(QueryStructReq queryStructCmd, MetricReq metricCommand) throws Exception {
|
||||
return queryParser.parser(metricCommand);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setMetricReq(metricCommand);
|
||||
queryStatement.setIsS2SQL(false);
|
||||
return queryParser.parser(queryStatement);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import com.tencent.supersonic.common.util.ContextUtils;
|
||||
import com.tencent.supersonic.semantic.query.executor.JdbcExecutor;
|
||||
import com.tencent.supersonic.semantic.query.executor.QueryExecutor;
|
||||
import com.tencent.supersonic.semantic.query.optimizer.DetailQuery;
|
||||
import com.tencent.supersonic.semantic.query.optimizer.MaterializationQuery;
|
||||
import com.tencent.supersonic.semantic.query.optimizer.QueryOptimizer;
|
||||
import com.tencent.supersonic.semantic.query.parser.SemanticConverter;
|
||||
import com.tencent.supersonic.semantic.query.parser.SqlParser;
|
||||
@@ -13,6 +12,7 @@ import com.tencent.supersonic.semantic.query.parser.convert.CalculateAggConverte
|
||||
import com.tencent.supersonic.semantic.query.parser.convert.DefaultDimValueConverter;
|
||||
import com.tencent.supersonic.semantic.query.parser.convert.MetricCheckConverter;
|
||||
import com.tencent.supersonic.semantic.query.parser.convert.ParserDefaultConverter;
|
||||
import com.tencent.supersonic.semantic.query.parser.convert.ZipperModelConverter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -73,7 +73,6 @@ public class ComponentFactory {
|
||||
}
|
||||
|
||||
private static void initQueryOptimizer() {
|
||||
queryOptimizers.put("MaterializationQuery", getBean("MaterializationQuery", MaterializationQuery.class));
|
||||
queryOptimizers.put("DetailQuery", getBean("DetailQuery", DetailQuery.class));
|
||||
}
|
||||
|
||||
@@ -82,6 +81,7 @@ public class ComponentFactory {
|
||||
semanticConverters.add(getBean("DefaultDimValueConverter", DefaultDimValueConverter.class));
|
||||
semanticConverters.add(getBean("CalculateAggConverter", CalculateAggConverter.class));
|
||||
semanticConverters.add(getBean("ParserDefaultConverter", ParserDefaultConverter.class));
|
||||
semanticConverters.add(getBean("ZipperModelConverter", ZipperModelConverter.class));
|
||||
}
|
||||
|
||||
private static void initQueryExecutors() {
|
||||
|
||||
@@ -28,11 +28,11 @@ import com.tencent.supersonic.semantic.api.model.response.DimensionResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.MetricSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.model.response.ModelSchemaResp;
|
||||
import com.tencent.supersonic.semantic.api.query.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryS2SQLReq;
|
||||
import com.tencent.supersonic.semantic.api.query.request.QueryStructReq;
|
||||
import com.tencent.supersonic.semantic.model.domain.Catalog;
|
||||
import com.tencent.supersonic.semantic.model.domain.pojo.EngineTypeEnum;
|
||||
import com.tencent.supersonic.semantic.query.persistence.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.semantic.query.service.SchemaService;
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -277,24 +277,30 @@ public class QueryStructUtils {
|
||||
return mergeDateWhereClause(queryStructCmd, whereClauseFromFilter, whereFromDate);
|
||||
}
|
||||
|
||||
public String generateZipperWhere(QueryStatement queryStatement, QueryStructReq queryStructReq) {
|
||||
if (Objects.nonNull(queryStatement.getParseSqlReq().getSql())) {
|
||||
String sql = SqlParserRemoveHelper.removeWhere(queryStatement.getParseSqlReq().getSql(),
|
||||
public String generateZipperWhere(QueryStructReq queryStructCmd, ParseSqlReq parseSqlReq) {
|
||||
if (Objects.nonNull(parseSqlReq.getSql()) && !CollectionUtils.isEmpty(parseSqlReq.getTables())
|
||||
&& Objects.nonNull(queryStructCmd.getDateInfo())) {
|
||||
String sql = SqlParserRemoveHelper.removeWhere(parseSqlReq.getSql(),
|
||||
dateModeUtils.getDateCol());
|
||||
if (!CollectionUtils.isEmpty(queryStatement.getMetricReq().getDimensions())) {
|
||||
List<String> dimension = queryStatement.getMetricReq().getDimensions().stream()
|
||||
.filter(d -> !dateModeUtils.getDateCol().contains(d.toLowerCase())).collect(
|
||||
Collectors.toList());
|
||||
dimension.add(dateModeUtils.getDateColBegin(queryStructReq.getDateInfo()));
|
||||
dimension.add(dateModeUtils.getDateColEnd(queryStructReq.getDateInfo()));
|
||||
queryStatement.getMetricReq().setDimensions(dimension);
|
||||
}
|
||||
parseSqlReq.getTables().stream().forEach(t -> {
|
||||
if (Objects.nonNull(t)) {
|
||||
List<String> dimensions = new ArrayList<>();
|
||||
if (!CollectionUtils.isEmpty(t.getDimensions())) {
|
||||
dimensions.addAll(t.getDimensions().stream()
|
||||
.filter(d -> !dateModeUtils.getDateCol().contains(d.toLowerCase())).collect(
|
||||
Collectors.toList()));
|
||||
}
|
||||
dimensions.add(dateModeUtils.getDateColBegin(queryStructCmd.getDateInfo()));
|
||||
dimensions.add(dateModeUtils.getDateColEnd(queryStructCmd.getDateInfo()));
|
||||
t.setDimensions(dimensions);
|
||||
}
|
||||
});
|
||||
return SqlParserAddHelper.addWhere(sql,
|
||||
SqlParserSelectHelper.getTimeFilter(queryStatement.getTimeRanges(),
|
||||
dateModeUtils.getDateColBegin(queryStructReq.getDateInfo()),
|
||||
dateModeUtils.getDateColEnd(queryStructReq.getDateInfo())));
|
||||
SqlParserSelectHelper.getTimeFilter(getTimeRanges(queryStructCmd),
|
||||
dateModeUtils.getDateColBegin(queryStructCmd.getDateInfo()),
|
||||
dateModeUtils.getDateColEnd(queryStructCmd.getDateInfo())));
|
||||
}
|
||||
return queryStatement.getSql();
|
||||
return parseSqlReq.getSql();
|
||||
}
|
||||
|
||||
public String getZipperDateWhereClause(QueryStructReq queryStructCmd) {
|
||||
|
||||
@@ -42,7 +42,10 @@ public class MaterializationQueryTest {
|
||||
queryStructReq.setDateInfo(dateConf);
|
||||
|
||||
try {
|
||||
QueryStatement queryStatement = queryParser.logicSql(queryStructReq);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructReq);
|
||||
queryStatement.setIsS2SQL(false);
|
||||
queryStatement = queryParser.logicSql(queryStatement);
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
queryStatement.setModelIds(queryStructReq.getModelIds());
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
|
||||
Reference in New Issue
Block a user