[improvement][headless]Clean code logic of headless translator.

This commit is contained in:
jerryjzhang
2024-11-20 00:52:59 +08:00
parent eb502e740c
commit d7586a5d3b
37 changed files with 651 additions and 926 deletions

View File

@@ -7,14 +7,11 @@ import com.tencent.supersonic.headless.api.pojo.enums.SchemaType;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static com.tencent.supersonic.common.pojo.Constants.UNDERLINE;
@Data
@AllArgsConstructor
@NoArgsConstructor
@@ -32,13 +29,6 @@ public class SemanticSchemaResp {
private DatabaseResp databaseResp;
private QueryType queryType;
public String getSchemaKey() {
if (dataSetId == null) {
return String.format("%s_%s", schemaType, StringUtils.join(modelIds, UNDERLINE));
}
return String.format("%s_%s", schemaType, dataSetId);
}
public MetricSchemaResp getMetric(String bizName) {
return metrics.stream().filter(metric -> bizName.equalsIgnoreCase(metric.getBizName()))
.findFirst().orElse(null);

View File

@@ -21,12 +21,6 @@ public abstract class DetailSemanticQuery extends RuleSemanticQuery {
super();
}
@Override
public List<SchemaElementMatch> match(List<SchemaElementMatch> candidateElementMatches,
ChatQueryContext queryCtx) {
return super.match(candidateElementMatches, queryCtx);
}
@Override
public void fillParseInfo(ChatQueryContext chatQueryContext, Long dataSetId) {
super.fillParseInfo(chatQueryContext, dataSetId);

View File

@@ -25,12 +25,6 @@ public abstract class MetricSemanticQuery extends RuleSemanticQuery {
queryMatcher.addOption(METRIC, REQUIRED, AT_LEAST, 1);
}
@Override
public List<SchemaElementMatch> match(List<SchemaElementMatch> candidateElementMatches,
ChatQueryContext queryCtx) {
return super.match(candidateElementMatches, queryCtx);
}
@Override
public void fillParseInfo(ChatQueryContext chatQueryContext, Long dataSetId) {
super.fillParseInfo(chatQueryContext, dataSetId);

View File

@@ -4,9 +4,9 @@ import com.tencent.supersonic.common.calcite.Configuration;
import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
import com.tencent.supersonic.headless.core.pojo.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.TimeRange;
import com.tencent.supersonic.headless.core.translator.calcite.schema.DataSourceTable;
import com.tencent.supersonic.headless.core.translator.calcite.schema.DataSourceTable.Builder;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteTable;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteTable.Builder;
import com.tencent.supersonic.headless.core.translator.calcite.sql.SchemaBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.enumerable.EnumerableRules;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
@@ -156,14 +156,14 @@ public abstract class AbstractAccelerator implements QueryAccelerator {
String[] dbTable = materialization.getName().split("\\.");
String tb = dbTable[1].toLowerCase();
String db = dbTable[0].toLowerCase();
Builder builder = DataSourceTable.newBuilder(tb);
Builder builder = S2CalciteTable.newBuilder(tb);
for (String f : materialization.getColumns()) {
builder.addField(f, SqlTypeName.VARCHAR);
}
if (StringUtils.isNotBlank(materialization.getPartitionName())) {
builder.addField(materialization.getPartitionName(), SqlTypeName.VARCHAR);
}
DataSourceTable srcTable = builder.withRowCount(1L).build();
S2CalciteTable srcTable = builder.withRowCount(1L).build();
if (Objects.nonNull(db) && !db.isEmpty()) {
SchemaPlus schemaPlus = dataSetSchema.plus().getSubSchema(db);
if (Objects.isNull(schemaPlus)) {

View File

@@ -1,34 +1,72 @@
package com.tencent.supersonic.headless.core.translator;
import com.tencent.supersonic.common.calcite.SqlMergeWithUtils;
import com.tencent.supersonic.common.jsqlparser.SqlRemoveHelper;
import com.tencent.supersonic.common.jsqlparser.SqlReplaceHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectFunctionHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
import com.tencent.supersonic.common.pojo.Aggregator;
import com.tencent.supersonic.common.pojo.Constants;
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.common.pojo.enums.QueryType;
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
import com.tencent.supersonic.common.util.StringUtil;
import com.tencent.supersonic.headless.api.pojo.Measure;
import com.tencent.supersonic.headless.api.pojo.MetricTable;
import com.tencent.supersonic.headless.api.pojo.QueryParam;
import com.tencent.supersonic.headless.api.pojo.SchemaItem;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.api.pojo.enums.MetricType;
import com.tencent.supersonic.headless.api.pojo.request.QueryStructReq;
import com.tencent.supersonic.headless.api.pojo.response.DatabaseResp;
import com.tencent.supersonic.headless.api.pojo.response.DimSchemaResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricSchemaResp;
import com.tencent.supersonic.headless.api.pojo.response.ModelResp;
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptor;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptorFactory;
import com.tencent.supersonic.headless.core.pojo.DataSetQueryParam;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import com.tencent.supersonic.headless.core.translator.converter.QueryConverter;
import com.tencent.supersonic.headless.core.utils.ComponentFactory;
import com.tencent.supersonic.headless.core.utils.SqlGenerateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component
@Slf4j
public class DefaultSemanticTranslator implements SemanticTranslator {
@Autowired
private SqlGenerateUtils sqlGenerateUtils;
public void translate(QueryStatement queryStatement) {
if (queryStatement.isTranslated()) {
return;
}
try {
preprocess(queryStatement);
parse(queryStatement);
optimize(queryStatement);
} catch (Exception e) {
@@ -36,12 +74,6 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
}
}
public void optimize(QueryStatement queryStatement) {
for (QueryOptimizer queryOptimizer : ComponentFactory.getQueryOptimizers()) {
queryOptimizer.rewrite(queryStatement);
}
}
private void parse(QueryStatement queryStatement) throws Exception {
QueryParam queryParam = queryStatement.getQueryParam();
if (Objects.isNull(queryStatement.getDataSetQueryParam())) {
@@ -50,6 +82,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
if (Objects.isNull(queryStatement.getMetricQueryParam())) {
queryStatement.setMetricQueryParam(new MetricQueryParam());
}
log.debug("SemanticConverter before [{}]", queryParam);
for (QueryConverter headlessConverter : ComponentFactory.getQueryConverters()) {
if (headlessConverter.accept(queryStatement)) {
@@ -59,6 +92,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
}
log.debug("SemanticConverter after {} {} {}", queryParam,
queryStatement.getDataSetQueryParam(), queryStatement.getMetricQueryParam());
if (!queryStatement.getDataSetQueryParam().getSql().isEmpty()) {
doParse(queryStatement.getDataSetQueryParam(), queryStatement);
} else {
@@ -67,6 +101,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
doParse(queryStatement,
AggOption.getAggregation(queryStatement.getMetricQueryParam().isNativeQuery()));
}
if (StringUtils.isEmpty(queryStatement.getSql())) {
throw new RuntimeException("parse Exception: " + queryStatement.getErrMsg());
}
@@ -147,14 +182,15 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
private QueryStatement parserSql(MetricTable metricTable, Boolean isSingleMetricTable,
DataSetQueryParam dataSetQueryParam, QueryStatement queryStatement) throws Exception {
MetricQueryParam metricReq = new MetricQueryParam();
metricReq.setMetrics(metricTable.getMetrics());
metricReq.setDimensions(metricTable.getDimensions());
metricReq.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
metricReq.setNativeQuery(!AggOption.isAgg(metricTable.getAggOption()));
MetricQueryParam metricQueryParam = new MetricQueryParam();
metricQueryParam.setMetrics(metricTable.getMetrics());
metricQueryParam.setDimensions(metricTable.getDimensions());
metricQueryParam.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
metricQueryParam.setNativeQuery(!AggOption.isAgg(metricTable.getAggOption()));
QueryStatement tableSql = new QueryStatement();
tableSql.setIsS2SQL(false);
tableSql.setMetricQueryParam(metricReq);
tableSql.setMetricQueryParam(metricQueryParam);
tableSql.setMinMaxTime(queryStatement.getMinMaxTime());
tableSql.setEnableOptimize(queryStatement.getEnableOptimize());
tableSql.setDataSetId(queryStatement.getDataSetId());
@@ -170,4 +206,302 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
}
return tableSql;
}
private void optimize(QueryStatement queryStatement) {
for (QueryOptimizer queryOptimizer : ComponentFactory.getQueryOptimizers()) {
queryOptimizer.rewrite(queryStatement);
}
}
private void preprocess(QueryStatement queryStatement) {
if (StringUtils.isBlank(queryStatement.getSql())) {
return;
}
SemanticSchemaResp semanticSchemaResp = queryStatement.getSemanticSchemaResp();
convertNameToBizName(queryStatement);
rewriteFunction(queryStatement);
queryStatement.setSql(SqlRemoveHelper.removeUnderscores(queryStatement.getSql()));
String tableName = SqlSelectHelper.getTableName(queryStatement.getSql());
if (StringUtils.isEmpty(tableName)) {
return;
}
// correct order item is same as agg alias
String reqSql = queryStatement.getSql();
queryStatement.setSql(SqlReplaceHelper.replaceAggAliasOrderItem(queryStatement.getSql()));
log.debug("replaceOrderAggSameAlias {} -> {}", reqSql, queryStatement.getSql());
// 5.build MetricTables
List<String> allFields = SqlSelectHelper.getAllSelectFields(queryStatement.getSql());
List<MetricSchemaResp> metricSchemas = getMetrics(semanticSchemaResp, allFields);
List<String> metrics =
metricSchemas.stream().map(SchemaItem::getBizName).collect(Collectors.toList());
Set<String> dimensions = getDimensions(semanticSchemaResp, allFields);
QueryStructReq queryStructReq = new QueryStructReq();
MetricTable metricTable = new MetricTable();
metricTable.getMetrics().addAll(metrics);
metricTable.getDimensions().addAll(dimensions);
metricTable.setAlias(tableName.toLowerCase());
// if metric empty , fill model default
if (CollectionUtils.isEmpty(metricTable.getMetrics())) {
metricTable.getMetrics().add(sqlGenerateUtils.generateInternalMetricName(
getDefaultModel(semanticSchemaResp, metricTable.getDimensions())));
} else {
queryStructReq.getAggregators()
.addAll(metricTable.getMetrics().stream()
.map(m -> new Aggregator(m, AggOperatorEnum.UNKNOWN))
.collect(Collectors.toList()));
}
AggOption aggOption = getAggOption(queryStatement, metricSchemas);
metricTable.setAggOption(aggOption);
List<MetricTable> tables = new ArrayList<>();
tables.add(metricTable);
// 6.build ParseSqlReq
DataSetQueryParam datasetQueryParam = new DataSetQueryParam();
datasetQueryParam.setTables(tables);
datasetQueryParam.setSql(queryStatement.getSql());
DatabaseResp database = semanticSchemaResp.getDatabaseResp();
if (!sqlGenerateUtils.isSupportWith(EngineType.fromString(database.getType().toUpperCase()),
database.getVersion())) {
datasetQueryParam.setSupportWith(false);
datasetQueryParam.setWithAlias(false);
}
// 7. do deriveMetric
generateDerivedMetric(semanticSchemaResp, aggOption, datasetQueryParam);
// 8.physicalSql by ParseSqlReq
// queryStructReq.setDateInfo(queryStructUtils.getDateConfBySql(queryStatement.getSql()));
queryStructReq.setDataSetId(queryStatement.getDataSetId());
queryStructReq.setQueryType(getQueryType(aggOption));
log.debug("QueryReqConverter queryStructReq[{}]", queryStructReq);
QueryParam queryParam = new QueryParam();
BeanUtils.copyProperties(queryStructReq, queryParam);
queryStatement.setQueryParam(queryParam);
queryStatement.setDataSetQueryParam(datasetQueryParam);
// queryStatement.setMinMaxTime(queryStructUtils.getBeginEndTime(queryStructReq));
}
private AggOption getAggOption(QueryStatement queryStatement,
List<MetricSchemaResp> metricSchemas) {
String sql = queryStatement.getSql();
if (!SqlSelectFunctionHelper.hasAggregateFunction(sql) && !SqlSelectHelper.hasGroupBy(sql)
&& !SqlSelectHelper.hasWith(sql) && !SqlSelectHelper.hasSubSelect(sql)) {
log.debug("getAggOption simple sql set to DEFAULT");
return AggOption.DEFAULT;
}
// if there is no group by in S2SQL,set MetricTable's aggOption to "NATIVE"
// if there is count() in S2SQL,set MetricTable's aggOption to "NATIVE"
if (!SqlSelectFunctionHelper.hasAggregateFunction(sql)
|| SqlSelectFunctionHelper.hasFunction(sql, "count")
|| SqlSelectFunctionHelper.hasFunction(sql, "count_distinct")) {
return AggOption.OUTER;
}
// if (queryStatement.isInnerLayerNative()) {
// return AggOption.NATIVE;
// }
if (SqlSelectHelper.hasSubSelect(sql) || SqlSelectHelper.hasWith(sql)
|| SqlSelectHelper.hasGroupBy(sql)) {
return AggOption.OUTER;
}
long defaultAggNullCnt = metricSchemas.stream().filter(
m -> Objects.isNull(m.getDefaultAgg()) || StringUtils.isBlank(m.getDefaultAgg()))
.count();
if (defaultAggNullCnt > 0) {
log.debug("getAggOption find null defaultAgg metric set to NATIVE");
return AggOption.OUTER;
}
return AggOption.DEFAULT;
}
private void convertNameToBizName(QueryStatement queryStatement) {
SemanticSchemaResp semanticSchemaResp = queryStatement.getSemanticSchemaResp();
Map<String, String> fieldNameToBizNameMap = getFieldNameToBizNameMap(semanticSchemaResp);
String sql = queryStatement.getSql();
log.debug("dataSetId:{},convert name to bizName before:{}", queryStatement.getDataSetId(),
sql);
sql = SqlReplaceHelper.replaceSqlByPositions(sql);
log.debug("replaceSqlByPositions:{}", sql);
sql = SqlReplaceHelper.replaceFields(sql, fieldNameToBizNameMap, true);
log.debug("dataSetId:{},convert name to bizName after:{}", queryStatement.getDataSetId(),
sql);
sql = SqlReplaceHelper.replaceTable(sql,
Constants.TABLE_PREFIX + queryStatement.getDataSetId());
log.debug("replaceTableName after:{}", sql);
queryStatement.setSql(sql);
}
private Set<String> getDimensions(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) {
Map<String, String> dimensionLowerToNameMap = semanticSchemaResp.getDimensions().stream()
.collect(Collectors.toMap(entry -> entry.getBizName().toLowerCase(),
SchemaItem::getBizName, (k1, k2) -> k1));
dimensionLowerToNameMap.put(TimeDimensionEnum.DAY.getName(),
TimeDimensionEnum.DAY.getName());
return allFields.stream()
.filter(entry -> dimensionLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> dimensionLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toSet());
}
private List<MetricSchemaResp> getMetrics(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) {
Map<String, MetricSchemaResp> metricLowerToNameMap =
semanticSchemaResp.getMetrics().stream().collect(Collectors
.toMap(entry -> entry.getBizName().toLowerCase(), entry -> entry));
return allFields.stream()
.filter(entry -> metricLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> metricLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toList());
}
private void rewriteFunction(QueryStatement queryStatement) {
SemanticSchemaResp semanticSchemaResp = queryStatement.getSemanticSchemaResp();
DatabaseResp database = semanticSchemaResp.getDatabaseResp();
if (Objects.isNull(database) || Objects.isNull(database.getType())) {
return;
}
String type = database.getType();
DbAdaptor engineAdaptor = DbAdaptorFactory.getEngineAdaptor(type.toLowerCase());
if (Objects.nonNull(engineAdaptor)) {
String functionNameCorrector =
engineAdaptor.functionNameCorrector(queryStatement.getSql());
queryStatement.setSql(functionNameCorrector);
}
}
protected Map<String, String> getFieldNameToBizNameMap(SemanticSchemaResp semanticSchemaResp) {
// support fieldName and field alias to bizName
Map<String, String> dimensionResults = semanticSchemaResp.getDimensions().stream().flatMap(
entry -> getPairStream(entry.getAlias(), entry.getName(), entry.getBizName()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (k1, k2) -> k1));
Map<String, String> metricResults = semanticSchemaResp.getMetrics().stream().flatMap(
entry -> getPairStream(entry.getAlias(), entry.getName(), entry.getBizName()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (k1, k2) -> k1));
dimensionResults.putAll(TimeDimensionEnum.getChNameToNameMap());
dimensionResults.putAll(TimeDimensionEnum.getNameToNameMap());
dimensionResults.putAll(metricResults);
return dimensionResults;
}
private Stream<Pair<String, String>> getPairStream(String aliasStr, String name,
String bizName) {
Set<Pair<String, String>> elements = new HashSet<>();
elements.add(Pair.of(name, bizName));
if (StringUtils.isNotBlank(aliasStr)) {
List<String> aliasList = SchemaItem.getAliasList(aliasStr);
for (String alias : aliasList) {
elements.add(Pair.of(alias, bizName));
}
}
return elements.stream();
}
private QueryType getQueryType(AggOption aggOption) {
boolean isAgg = AggOption.isAgg(aggOption);
QueryType queryType = QueryType.DETAIL;
if (isAgg) {
queryType = QueryType.AGGREGATE;
}
return queryType;
}
private void generateDerivedMetric(SemanticSchemaResp semanticSchemaResp, AggOption aggOption,
DataSetQueryParam viewQueryParam) {
String sql = viewQueryParam.getSql();
for (MetricTable metricTable : viewQueryParam.getTables()) {
Set<String> measures = new HashSet<>();
Map<String, String> replaces = generateDerivedMetric(semanticSchemaResp, aggOption,
metricTable.getMetrics(), metricTable.getDimensions(), measures);
if (!CollectionUtils.isEmpty(replaces)) {
// metricTable sql use measures replace metric
sql = SqlReplaceHelper.replaceSqlByExpression(sql, replaces);
metricTable.setAggOption(AggOption.NATIVE);
// metricTable use measures replace metric
if (!CollectionUtils.isEmpty(measures)) {
metricTable.setMetrics(new ArrayList<>(measures));
} else {
// empty measure , fill default
metricTable.setMetrics(new ArrayList<>());
metricTable.getMetrics().add(sqlGenerateUtils.generateInternalMetricName(
getDefaultModel(semanticSchemaResp, metricTable.getDimensions())));
}
}
}
viewQueryParam.setSql(sql);
}
private Map<String, String> generateDerivedMetric(SemanticSchemaResp semanticSchemaResp,
AggOption aggOption, List<String> metrics, List<String> dimensions,
Set<String> measures) {
Map<String, String> result = new HashMap<>();
List<MetricSchemaResp> metricResps = semanticSchemaResp.getMetrics();
List<DimSchemaResp> dimensionResps = semanticSchemaResp.getDimensions();
// Check if any metric is derived
boolean hasDerivedMetrics =
metricResps.stream().anyMatch(m -> metrics.contains(m.getBizName()) && MetricType
.isDerived(m.getMetricDefineType(), m.getMetricDefineByMeasureParams()));
if (!hasDerivedMetrics) {
return result;
}
log.debug("begin to generateDerivedMetric {} [{}]", aggOption, metrics);
Set<String> allFields = new HashSet<>();
Map<String, Measure> allMeasures = new HashMap<>();
semanticSchemaResp.getModelResps().forEach(modelResp -> {
allFields.addAll(modelResp.getFieldList());
if (modelResp.getModelDetail().getMeasures() != null) {
modelResp.getModelDetail().getMeasures()
.forEach(measure -> allMeasures.put(measure.getBizName(), measure));
}
});
Set<String> derivedDimensions = new HashSet<>();
Set<String> derivedMetrics = new HashSet<>();
Map<String, String> visitedMetrics = new HashMap<>();
for (MetricResp metricResp : metricResps) {
if (metrics.contains(metricResp.getBizName())) {
boolean isDerived = MetricType.isDerived(metricResp.getMetricDefineType(),
metricResp.getMetricDefineByMeasureParams());
if (isDerived) {
String expr = sqlGenerateUtils.generateDerivedMetric(metricResps, allFields,
allMeasures, dimensionResps, sqlGenerateUtils.getExpr(metricResp),
metricResp.getMetricDefineType(), aggOption, visitedMetrics,
derivedMetrics, derivedDimensions);
result.put(metricResp.getBizName(), expr);
log.debug("derived metric {}->{}", metricResp.getBizName(), expr);
} else {
measures.add(metricResp.getBizName());
}
}
}
measures.addAll(derivedMetrics);
derivedDimensions.stream().filter(dimension -> !dimensions.contains(dimension))
.forEach(dimensions::add);
return result;
}
private String getDefaultModel(SemanticSchemaResp semanticSchemaResp, List<String> dimensions) {
if (!CollectionUtils.isEmpty(dimensions)) {
Map<String, Long> modelMatchCnt = new HashMap<>();
for (ModelResp modelResp : semanticSchemaResp.getModelResps()) {
modelMatchCnt.put(modelResp.getBizName(), modelResp.getModelDetail().getDimensions()
.stream().filter(d -> dimensions.contains(d.getBizName())).count());
}
return modelMatchCnt.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
.map(m -> m.getKey()).findFirst().orElse("");
}
return semanticSchemaResp.getModelResps().get(0).getBizName();
}
}

View File

@@ -1,22 +1,15 @@
package com.tencent.supersonic.headless.core.translator.calcite;
import com.tencent.supersonic.common.calcite.SqlMergeWithUtils;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.QueryParser;
import com.tencent.supersonic.headless.core.translator.calcite.planner.AggPlanner;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import com.tencent.supersonic.headless.core.translator.calcite.schema.RuntimeOptions;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.RuntimeOptions;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.SqlBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.parser.SqlParseException;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Objects;
/** the calcite parse implements */
@Component("CalciteQueryParser")
@Slf4j
@@ -24,54 +17,19 @@ public class CalciteQueryParser implements QueryParser {
@Override
public void parse(QueryStatement queryStatement, AggOption isAgg) throws Exception {
MetricQueryParam metricReq = queryStatement.getMetricQueryParam();
Ontology ontology = queryStatement.getOntology();
if (ontology == null) {
queryStatement.setErrMsg("semanticSchema not found");
queryStatement.setErrMsg("No ontology could be found");
return;
}
queryStatement.setMetricQueryParam(metricReq);
S2SemanticSchema semanticSchema = getSemanticSchema(ontology, queryStatement);
AggPlanner aggPlanner = new AggPlanner(semanticSchema);
aggPlanner.plan(queryStatement, isAgg);
EngineType engineType = EngineType.fromString(ontology.getDatabase().getType());
queryStatement.setSql(aggPlanner.getSql(engineType));
if (Objects.nonNull(queryStatement.getEnableOptimize())
&& queryStatement.getEnableOptimize()
&& Objects.nonNull(queryStatement.getDataSetAlias())
&& !queryStatement.getDataSetAlias().isEmpty()) {
// simplify model sql with query sql
String simplifySql = aggPlanner.simplify(
getSqlByDataSet(engineType, aggPlanner.getSql(engineType),
queryStatement.getDataSetSql(), queryStatement.getDataSetAlias()),
engineType);
if (Objects.nonNull(simplifySql) && !simplifySql.isEmpty()) {
log.debug("simplifySql [{}]", simplifySql);
queryStatement.setDataSetSimplifySql(simplifySql);
}
}
S2CalciteSchema semanticSchema = S2CalciteSchema.builder()
.schemaKey("DATASET_" + queryStatement.getDataSetId()).ontology(ontology)
.runtimeOptions(RuntimeOptions.builder().minMaxTime(queryStatement.getMinMaxTime())
.enableOptimize(queryStatement.getEnableOptimize()).build())
.build();
SqlBuilder sqlBuilder = new SqlBuilder(semanticSchema);
sqlBuilder.build(queryStatement, isAgg);
}
private S2SemanticSchema getSemanticSchema(Ontology ontology, QueryStatement queryStatement) {
S2SemanticSchema semanticSchema =
S2SemanticSchema.newBuilder(ontology.getSchemaKey()).build();
semanticSchema.setSemanticModel(ontology);
semanticSchema.setDatasource(ontology.getDatasourceMap());
semanticSchema.setDimension(ontology.getDimensionMap());
semanticSchema.setMetric(ontology.getMetrics());
semanticSchema.setJoinRelations(ontology.getJoinRelations());
semanticSchema.setRuntimeOptions(
RuntimeOptions.builder().minMaxTime(queryStatement.getMinMaxTime())
.enableOptimize(queryStatement.getEnableOptimize()).build());
return semanticSchema;
}
private String getSqlByDataSet(EngineType engineType, String parentSql, String dataSetSql,
String parentAlias) throws SqlParseException {
if (!SqlMergeWithUtils.hasWith(engineType, dataSetSql)) {
return String.format("with %s as (%s) %s", parentAlias, parentSql, dataSetSql);
}
return SqlMergeWithUtils.mergeWith(engineType, dataSetSql,
Collections.singletonList(parentSql), Collections.singletonList(parentAlias));
}
}

View File

@@ -1,17 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.planner;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
/** parse and generate SQL and other execute information */
public interface Planner {
void plan(QueryStatement queryStatement, AggOption aggOption) throws Exception;
String getSql(EngineType enginType);
String getSourceId();
String simplify(String sql, EngineType engineType);
}

View File

@@ -13,7 +13,7 @@ public class DataModel {
private String name;
private Long sourceId;
private Long modelId;
private String type;

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.s2sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticItem;
import lombok.Builder;
import lombok.Data;

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.s2sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticItem;
import lombok.Data;
import java.util.List;

View File

@@ -13,7 +13,6 @@ import java.util.stream.Collectors;
@Data
public class Ontology {
private String schemaKey;
private List<Metric> metrics = new ArrayList<>();
private Map<String, DataModel> datasourceMap = new HashMap<>();
private Map<String, List<Dimension>> dimensionMap = new HashMap<>();

View File

@@ -1,6 +1,7 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.s2sql;
public interface SemanticItem {
String getName();
public String getName();
String getType();
}

View File

@@ -1,137 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class S2SemanticSchema extends AbstractSchema {
private final String schemaKey;
private final Map<String, Table> tableMap;
private Ontology ontology = new Ontology();
private List<JoinRelation> joinRelations;
private RuntimeOptions runtimeOptions;
private S2SemanticSchema(String schemaKey, Map<String, Table> tableMap) {
this.schemaKey = schemaKey;
this.tableMap = tableMap;
}
public static Builder newBuilder(String schemaKey) {
return new Builder(schemaKey);
}
public String getSchemaKey() {
return schemaKey;
}
public void setSemanticModel(Ontology ontology) {
this.ontology = ontology;
}
public Ontology getSemanticModel() {
return ontology;
}
@Override
public Map<String, Table> getTableMap() {
return tableMap;
}
@Override
public Schema snapshot(SchemaVersion version) {
return this;
}
public Map<String, DataModel> getDatasource() {
return ontology.getDatasourceMap();
}
public void setDatasource(Map<String, DataModel> datasource) {
ontology.setDatasourceMap(datasource);
}
public Map<String, List<Dimension>> getDimension() {
return ontology.getDimensionMap();
}
public void setDimension(Map<String, List<Dimension>> dimensions) {
ontology.setDimensionMap(dimensions);
}
public List<Metric> getMetrics() {
return ontology.getMetrics();
}
public void setMetric(List<Metric> metric) {
ontology.setMetrics(metric);
}
public void setMaterializationList(List<Materialization> materializationList) {
ontology.setMaterializationList(materializationList);
}
public List<Materialization> getMaterializationList() {
return ontology.getMaterializationList();
}
public void setJoinRelations(List<JoinRelation> joinRelations) {
this.joinRelations = joinRelations;
}
public List<JoinRelation> getJoinRelations() {
return joinRelations;
}
public void setRuntimeOptions(RuntimeOptions runtimeOptions) {
this.runtimeOptions = runtimeOptions;
}
public RuntimeOptions getRuntimeOptions() {
return runtimeOptions;
}
public static final class Builder {
private final String schemaKey;
private final Map<String, Table> tableMap = new HashMap<>();
private Builder(String schemaKey) {
if (schemaKey == null) {
throw new IllegalArgumentException("Schema name cannot be null or empty");
}
this.schemaKey = schemaKey;
}
public Builder addTable(DataSourceTable table) {
if (tableMap.containsKey(table.getTableName())) {
throw new IllegalArgumentException(
"Table already defined: " + table.getTableName());
}
tableMap.put(table.getTableName(), table);
return this;
}
public S2SemanticSchema build() {
return new S2SemanticSchema(schemaKey, tableMap);
}
}
}

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql.optimizer;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.Aggregate;
@@ -40,24 +39,23 @@ public class FilterToGroupScanRule extends RelRule<Config> implements Transforma
});
}).as(FilterTableScanRule.Config.class);
private S2SemanticSchema semanticSchema;
private S2CalciteSchema schema;
public FilterToGroupScanRule(FilterTableScanRule.Config config,
S2SemanticSchema semanticSchema) {
public FilterToGroupScanRule(FilterTableScanRule.Config config, S2CalciteSchema schema) {
super(config);
this.semanticSchema = semanticSchema;
this.schema = schema;
}
public void onMatch(RelOptRuleCall call) {
if (call.rels.length != 4) {
return;
}
if (Objects.isNull(semanticSchema.getRuntimeOptions())
|| Objects.isNull(semanticSchema.getRuntimeOptions().getMinMaxTime())
|| semanticSchema.getRuntimeOptions().getMinMaxTime().getLeft().isEmpty()) {
if (Objects.isNull(schema.getRuntimeOptions())
|| Objects.isNull(schema.getRuntimeOptions().getMinMaxTime())
|| schema.getRuntimeOptions().getMinMaxTime().getLeft().isEmpty()) {
return;
}
Triple<String, String, String> minMax = semanticSchema.getRuntimeOptions().getMinMaxTime();
Triple<String, String, String> minMax = schema.getRuntimeOptions().getMinMaxTime();
Filter filter = (Filter) call.rel(0);
Project project0 = (Project) call.rel(1);
Project project1 = (Project) call.rel(3);

View File

@@ -1,8 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
public interface Optimization {
public void visit(S2SemanticSchema semanticSchema);
}

View File

@@ -1,4 +1,4 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import lombok.Builder;
import lombok.Data;

View File

@@ -0,0 +1,48 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import lombok.Builder;
import lombok.Data;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.List;
import java.util.Map;
@Data
@Builder
public class S2CalciteSchema extends AbstractSchema {
private String schemaKey;
private Ontology ontology;
private RuntimeOptions runtimeOptions;
@Override
public Schema snapshot(SchemaVersion version) {
return this;
}
public Map<String, DataModel> getDatasource() {
return ontology.getDatasourceMap();
}
public Map<String, List<Dimension>> getDimension() {
return ontology.getDimensionMap();
}
public List<JoinRelation> getJoinRelations() {
return ontology.getJoinRelations();
}
public List<Metric> getMetrics() {
return ontology.getMetrics();
}
}

View File

@@ -1,4 +1,4 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
/** customize the AbstractTable */
public class DataSourceTable extends AbstractTable implements ScannableTable, TranslatableTable {
public class S2CalciteTable extends AbstractTable implements ScannableTable, TranslatableTable {
private final String tableName;
private final List<String> fieldNames;
@@ -32,7 +32,7 @@ public class DataSourceTable extends AbstractTable implements ScannableTable, Tr
private RelDataType rowType;
private DataSourceTable(String tableName, List<String> fieldNames, List<SqlTypeName> fieldTypes,
private S2CalciteTable(String tableName, List<String> fieldNames, List<SqlTypeName> fieldTypes,
Statistic statistic) {
this.tableName = tableName;
this.fieldNames = fieldNames;
@@ -116,7 +116,7 @@ public class DataSourceTable extends AbstractTable implements ScannableTable, Tr
return this;
}
public DataSourceTable build() {
public S2CalciteTable build() {
if (fieldNames.isEmpty()) {
throw new IllegalStateException("Table must have at least one field");
}
@@ -125,7 +125,7 @@ public class DataSourceTable extends AbstractTable implements ScannableTable, Tr
throw new IllegalStateException("Table must have positive row count");
}
return new DataSourceTable(tableName, fieldNames, fieldTypes,
return new S2CalciteTable(tableName, fieldNames, fieldTypes,
Statistics.of(rowCount, null));
}
}

View File

@@ -1,8 +1,7 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.common.calcite.Configuration;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2SQLSqlValidatorImpl;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.Prepare;
@@ -27,15 +26,14 @@ public class SchemaBuilder {
public static final String MATERIALIZATION_SYS_FIELD_DATE = "C1";
public static final String MATERIALIZATION_SYS_FIELD_DATA = "C2";
public static SqlValidatorScope getScope(S2SemanticSchema schema) throws Exception {
public static SqlValidatorScope getScope(S2CalciteSchema schema) throws Exception {
Map<String, RelDataType> nameToTypeMap = new HashMap<>();
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
rootSchema.add(schema.getSchemaKey(), schema);
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(rootSchema,
Collections.singletonList(schema.getSchemaKey()), Configuration.typeFactory,
Configuration.config);
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
S2SQLSqlValidatorImpl s2SQLSqlValidator =
new S2SQLSqlValidatorImpl(Configuration.operatorTable, catalogReader,
Configuration.typeFactory, Configuration.getValidatorConfig(engineType));
@@ -45,12 +43,12 @@ public class SchemaBuilder {
public static CalciteSchema getMaterializationSchema() {
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
SchemaPlus schema = rootSchema.plus().add(MATERIALIZATION_SYS_DB, new AbstractSchema());
DataSourceTable srcTable = DataSourceTable.newBuilder(MATERIALIZATION_SYS_SOURCE)
S2CalciteTable srcTable = S2CalciteTable.newBuilder(MATERIALIZATION_SYS_SOURCE)
.addField(MATERIALIZATION_SYS_FIELD_DATE, SqlTypeName.DATE)
.addField(MATERIALIZATION_SYS_FIELD_DATA, SqlTypeName.BIGINT).withRowCount(1)
.build();
schema.add(MATERIALIZATION_SYS_SOURCE, srcTable);
DataSourceTable dataSetTable = DataSourceTable.newBuilder(MATERIALIZATION_SYS_VIEW)
S2CalciteTable dataSetTable = S2CalciteTable.newBuilder(MATERIALIZATION_SYS_VIEW)
.addField(MATERIALIZATION_SYS_FIELD_DATE, SqlTypeName.DATE)
.addField(MATERIALIZATION_SYS_FIELD_DATA, SqlTypeName.BIGINT).withRowCount(1)
.build();
@@ -62,7 +60,7 @@ public class SchemaBuilder {
Set<String> dates, Set<String> dimensions, Set<String> metrics) {
String tb = tbSrc;
String db = dbSrc;
DataSourceTable.Builder builder = DataSourceTable.newBuilder(tb);
S2CalciteTable.Builder builder = S2CalciteTable.newBuilder(tb);
for (String date : dates) {
builder.addField(date, SqlTypeName.VARCHAR);
}
@@ -72,7 +70,7 @@ public class SchemaBuilder {
for (String metric : metrics) {
builder.addField(metric, SqlTypeName.ANY);
}
DataSourceTable srcTable = builder.withRowCount(1).build();
S2CalciteTable srcTable = builder.withRowCount(1).build();
if (Objects.nonNull(db) && !db.isEmpty()) {
SchemaPlus dbPs = dataSetSchema.plus();
for (String d : db.split("\\.")) {

View File

@@ -1,6 +1,7 @@
package com.tencent.supersonic.headless.core.translator.calcite.planner;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.common.calcite.Configuration;
import com.tencent.supersonic.common.calcite.SqlMergeWithUtils;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.core.pojo.Database;
@@ -8,20 +9,20 @@ import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataModelNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.FilterRender;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.OutputRender;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.SourceRender;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
@@ -29,29 +30,62 @@ import java.util.Objects;
/** parsing from query dimensions and metrics */
@Slf4j
public class AggPlanner implements Planner {
public class SqlBuilder {
private MetricQueryParam metricReq;
private final S2SemanticSchema schema;
private MetricQueryParam metricQueryParam;
private final S2CalciteSchema schema;
private SqlValidatorScope scope;
private SqlNode parserNode;
private String sourceId;
private boolean isAgg = false;
private AggOption aggOption = AggOption.DEFAULT;
public AggPlanner(S2SemanticSchema schema) {
public SqlBuilder(S2CalciteSchema schema) {
this.schema = schema;
}
private void parse() throws Exception {
public void build(QueryStatement queryStatement, AggOption aggOption) throws Exception {
this.metricQueryParam = queryStatement.getMetricQueryParam();
if (metricQueryParam.getMetrics() == null) {
metricQueryParam.setMetrics(new ArrayList<>());
}
if (metricQueryParam.getDimensions() == null) {
metricQueryParam.setDimensions(new ArrayList<>());
}
if (metricQueryParam.getLimit() == null) {
metricQueryParam.setLimit(0L);
}
this.aggOption = aggOption;
buildParseNode();
Database database = queryStatement.getOntology().getDatabase();
EngineType engineType = EngineType.fromString(database.getType());
optimizeParseNode(engineType);
String sql = getSql(engineType);
queryStatement.setSql(sql);
if (Objects.nonNull(queryStatement.getEnableOptimize())
&& queryStatement.getEnableOptimize()
&& Objects.nonNull(queryStatement.getDataSetAlias())
&& !queryStatement.getDataSetAlias().isEmpty()) {
// simplify model sql with query sql
String simplifySql = rewrite(getSqlByDataSet(engineType, sql,
queryStatement.getDataSetSql(), queryStatement.getDataSetAlias()), engineType);
if (Objects.nonNull(simplifySql) && !simplifySql.isEmpty()) {
log.debug("simplifySql [{}]", simplifySql);
queryStatement.setDataSetSimplifySql(simplifySql);
}
}
}
private void buildParseNode() throws Exception {
// find the match Datasource
scope = SchemaBuilder.getScope(schema);
List<DataModel> datasource = getMatchDataSource(scope);
if (datasource == null || datasource.isEmpty()) {
throw new Exception("datasource not found");
List<DataModel> dataModels =
DataModelNode.getRelatedDataModels(scope, schema, metricQueryParam);
if (dataModels == null || dataModels.isEmpty()) {
throw new Exception("data model not found");
}
isAgg = getAgg(datasource.get(0));
sourceId = String.valueOf(datasource.get(0).getSourceId());
isAgg = getAgg(dataModels.get(0));
// build level by level
LinkedList<Renderer> builders = new LinkedList<>();
@@ -64,21 +98,17 @@ public class AggPlanner implements Planner {
while (it.hasNext()) {
Renderer renderer = it.next();
if (previous != null) {
previous.render(metricReq, datasource, scope, schema, !isAgg);
previous.render(metricQueryParam, dataModels, scope, schema, !isAgg);
renderer.setTable(previous
.builderAs(DataSourceNode.getNames(datasource) + "_" + String.valueOf(i)));
.builderAs(DataModelNode.getNames(dataModels) + "_" + String.valueOf(i)));
i++;
}
previous = renderer;
}
builders.getLast().render(metricReq, datasource, scope, schema, !isAgg);
builders.getLast().render(metricQueryParam, dataModels, scope, schema, !isAgg);
parserNode = builders.getLast().builder();
}
private List<DataModel> getMatchDataSource(SqlValidatorScope scope) throws Exception {
return DataSourceNode.getMatchDataSources(scope, schema, metricReq);
}
private boolean getAgg(DataModel dataModel) {
if (!AggOption.DEFAULT.equals(aggOption)) {
return AggOption.isAgg(aggOption);
@@ -86,46 +116,18 @@ public class AggPlanner implements Planner {
// default by dataModel time aggregation
if (Objects.nonNull(dataModel.getAggTime()) && !dataModel.getAggTime()
.equalsIgnoreCase(Constants.DIMENSION_TYPE_TIME_GRANULARITY_NONE)) {
if (!metricReq.isNativeQuery()) {
if (!metricQueryParam.isNativeQuery()) {
return true;
}
}
return isAgg;
}
@Override
public void plan(QueryStatement queryStatement, AggOption aggOption) throws Exception {
this.metricReq = queryStatement.getMetricQueryParam();
if (metricReq.getMetrics() == null) {
metricReq.setMetrics(new ArrayList<>());
}
if (metricReq.getDimensions() == null) {
metricReq.setDimensions(new ArrayList<>());
}
if (metricReq.getLimit() == null) {
metricReq.setLimit(0L);
}
this.aggOption = aggOption;
// build a parse Node
parse();
// optimizer
Database database = queryStatement.getOntology().getDatabase();
EngineType engineType = EngineType.fromString(database.getType());
optimize(engineType);
}
@Override
public String getSql(EngineType engineType) {
return SemanticNode.getSql(parserNode, engineType);
}
@Override
public String getSourceId() {
return sourceId;
}
@Override
public String simplify(String sql, EngineType engineType) {
private String rewrite(String sql, EngineType engineType) {
try {
SqlNode sqlNode =
SqlParser.create(sql, Configuration.getParserConfig(engineType)).parseStmt();
@@ -139,7 +141,7 @@ public class AggPlanner implements Planner {
return "";
}
private void optimize(EngineType engineType) {
private void optimizeParseNode(EngineType engineType) {
if (Objects.isNull(schema.getRuntimeOptions())
|| Objects.isNull(schema.getRuntimeOptions().getEnableOptimize())
|| !schema.getRuntimeOptions().getEnableOptimize()) {
@@ -162,4 +164,13 @@ public class AggPlanner implements Planner {
}
}
private String getSqlByDataSet(EngineType engineType, String parentSql, String dataSetSql,
String parentAlias) throws SqlParseException {
if (!SqlMergeWithUtils.hasWith(engineType, dataSetSql)) {
return String.format("with %s as (%s) %s", parentAlias, parentSql, dataSetSql);
}
return SqlMergeWithUtils.mergeWith(engineType, dataSetSql,
Collections.singletonList(parentSql), Collections.singletonList(parentAlias));
}
}

View File

@@ -11,9 +11,8 @@ import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.extend.LateralViewExplodeNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.SchemaBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlDataTypeSpec;
@@ -38,27 +37,27 @@ import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
public class DataSourceNode extends SemanticNode {
public class DataModelNode extends SemanticNode {
public static SqlNode build(DataModel datasource, SqlValidatorScope scope) throws Exception {
public static SqlNode build(DataModel dataModel, SqlValidatorScope scope) throws Exception {
String sqlTable = "";
if (datasource.getSqlQuery() != null && !datasource.getSqlQuery().isEmpty()) {
sqlTable = datasource.getSqlQuery();
} else if (datasource.getTableQuery() != null && !datasource.getTableQuery().isEmpty()) {
if (datasource.getType().equalsIgnoreCase(EngineType.POSTGRESQL.getName())) {
String fullTableName = Arrays.stream(datasource.getTableQuery().split("\\."))
if (dataModel.getSqlQuery() != null && !dataModel.getSqlQuery().isEmpty()) {
sqlTable = dataModel.getSqlQuery();
} else if (dataModel.getTableQuery() != null && !dataModel.getTableQuery().isEmpty()) {
if (dataModel.getType().equalsIgnoreCase(EngineType.POSTGRESQL.getName())) {
String fullTableName = Arrays.stream(dataModel.getTableQuery().split("\\."))
.collect(Collectors.joining(".public."));
sqlTable = "select * from " + fullTableName;
} else {
sqlTable = "select * from " + datasource.getTableQuery();
sqlTable = "select * from " + dataModel.getTableQuery();
}
}
if (sqlTable.isEmpty()) {
throw new Exception("DatasourceNode build error [tableSqlNode not found]");
}
SqlNode source = getTable(sqlTable, scope, EngineType.fromString(datasource.getType()));
addSchema(scope, datasource, sqlTable);
return buildAs(datasource.getName(), source);
SqlNode source = getTable(sqlTable, scope, EngineType.fromString(dataModel.getType()));
addSchema(scope, dataModel, sqlTable);
return buildAs(dataModel.getName(), source);
}
private static void addSchema(SqlValidatorScope scope, DataModel datasource, String table)
@@ -150,7 +149,7 @@ public class DataSourceNode extends SemanticNode {
return dataModelList.stream().map(d -> d.getName()).collect(Collectors.joining("_"));
}
public static void getQueryDimensionMeasure(S2SemanticSchema schema,
public static void getQueryDimensionMeasure(S2CalciteSchema schema,
MetricQueryParam metricCommand, Set<String> queryDimension, List<String> measures) {
queryDimension.addAll(metricCommand.getDimensions().stream()
.map(d -> d.contains(Constants.DIMENSION_IDENTIFY)
@@ -166,11 +165,10 @@ public class DataSourceNode extends SemanticNode {
.forEach(m -> measures.add(m));
}
public static void mergeQueryFilterDimensionMeasure(S2SemanticSchema schema,
public static void mergeQueryFilterDimensionMeasure(S2CalciteSchema schema,
MetricQueryParam metricCommand, Set<String> queryDimension, List<String> measures,
SqlValidatorScope scope) throws Exception {
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
if (Objects.nonNull(metricCommand.getWhere()) && !metricCommand.getWhere().isEmpty()) {
Set<String> filterConditions = new HashSet<>();
FilterNode.getFilterField(parse(metricCommand.getWhere(), scope, engineType),
@@ -193,8 +191,8 @@ public class DataSourceNode extends SemanticNode {
}
}
public static List<DataModel> getMatchDataSources(SqlValidatorScope scope,
S2SemanticSchema schema, MetricQueryParam metricCommand) throws Exception {
public static List<DataModel> getRelatedDataModels(SqlValidatorScope scope,
S2CalciteSchema schema, MetricQueryParam metricCommand) throws Exception {
List<DataModel> dataModels = new ArrayList<>();
// check by metric
@@ -232,7 +230,7 @@ public class DataSourceNode extends SemanticNode {
filterMeasure.addAll(sourceMeasure);
filterMeasure.addAll(dimension);
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType.fromString(schema.getOntology().getDatabase().getType());
mergeQueryFilterDimensionMeasure(schema, metricCommand, queryDimension, measures,
scope);
boolean isAllMatch = checkMatch(sourceMeasure, queryDimension, measures, dimension,
@@ -302,7 +300,7 @@ public class DataSourceNode extends SemanticNode {
}
private static List<DataModel> getLinkDataSourcesByJoinRelation(Set<String> queryDimension,
List<String> measures, DataModel baseDataModel, S2SemanticSchema schema) {
List<String> measures, DataModel baseDataModel, S2CalciteSchema schema) {
Set<String> linkDataSourceName = new HashSet<>();
List<DataModel> linkDataModels = new ArrayList<>();
Set<String> before = new HashSet<>();
@@ -387,7 +385,7 @@ public class DataSourceNode extends SemanticNode {
private static List<DataModel> getLinkDataSources(Set<String> baseIdentifiers,
Set<String> queryDimension, List<String> measures, DataModel baseDataModel,
S2SemanticSchema schema) {
S2CalciteSchema schema) {
Set<String> linkDataSourceName = new HashSet<>();
List<DataModel> linkDataModels = new ArrayList<>();
for (Map.Entry<String, DataModel> entry : schema.getDatasource().entrySet()) {

View File

@@ -1,13 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql.node;
import lombok.Data;
import org.apache.calcite.sql.SqlNode;
@Data
public class JoinNode extends SemanticNode {
private SqlNode join;
private SqlNode on;
private SqlNode left;
private SqlNode right;
}

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql.node.extend;
package com.tencent.supersonic.headless.core.translator.calcite.sql.node;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.ExtendNode;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;

View File

@@ -2,7 +2,7 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.node;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import lombok.Data;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidatorScope;
@@ -30,7 +30,7 @@ public class MetricNode extends SemanticNode {
return buildAs(metric.getName(), sqlNode);
}
public static Boolean isMetricField(String name, S2SemanticSchema schema) {
public static Boolean isMetricField(String name, S2CalciteSchema schema) {
Optional<Metric> metric = schema.getMetrics().stream()
.filter(m -> m.getName().equalsIgnoreCase(name)).findFirst();
return metric.isPresent() && metric.get().getMetricTypeParams().isFieldMetric();

View File

@@ -5,8 +5,8 @@ import com.tencent.supersonic.common.calcite.SemanticSqlDialect;
import com.tencent.supersonic.common.calcite.SqlDialectFactory;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.optimizer.FilterToGroupScanRule;
import com.tencent.supersonic.headless.core.translator.calcite.sql.FilterToGroupScanRule;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.hep.HepPlanner;
@@ -397,8 +397,8 @@ public abstract class SemanticNode {
return parseInfo;
}
public static SqlNode optimize(SqlValidatorScope scope, S2SemanticSchema schema,
SqlNode sqlNode, EngineType engineType) {
public static SqlNode optimize(SqlValidatorScope scope, S2CalciteSchema schema, SqlNode sqlNode,
EngineType engineType) {
try {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
SemanticSqlDialect sqlDialect = SqlDialectFactory.getSqlDialect(engineType);

View File

@@ -5,8 +5,7 @@ import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.FilterNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
@@ -28,13 +27,12 @@ public class FilterRender extends Renderer {
@Override
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
TableView tableView = super.tableView;
SqlNode filterNode = null;
List<String> queryMetrics = new ArrayList<>(metricCommand.getMetrics());
List<String> queryDimensions = new ArrayList<>(metricCommand.getDimensions());
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
if (metricCommand.getWhere() != null && !metricCommand.getWhere().isEmpty()) {
filterNode = SemanticNode.parse(metricCommand.getWhere(), scope, engineType);

View File

@@ -9,11 +9,10 @@ import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.AggFunctionNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataModelNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.FilterNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.IdentifyNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
@@ -49,10 +48,9 @@ public class JoinRender extends Renderer {
@Override
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
String queryWhere = metricCommand.getWhere();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
Set<String> whereFields = new HashSet<>();
List<String> fieldWhere = new ArrayList<>();
if (queryWhere != null && !queryWhere.isEmpty()) {
@@ -62,7 +60,7 @@ public class JoinRender extends Renderer {
}
Set<String> queryAllDimension = new HashSet<>();
List<String> measures = new ArrayList<>();
DataSourceNode.getQueryDimensionMeasure(schema, metricCommand, queryAllDimension, measures);
DataModelNode.getQueryDimensionMeasure(schema, metricCommand, queryAllDimension, measures);
SqlNode left = null;
TableView leftTable = null;
TableView innerView = new TableView();
@@ -145,11 +143,10 @@ public class JoinRender extends Renderer {
private void doMetric(Map<String, SqlNode> innerSelect, TableView filterView,
List<String> queryMetrics, List<String> reqMetrics, DataModel dataModel,
Set<String> sourceMeasure, SqlValidatorScope scope, S2SemanticSchema schema,
Set<String> sourceMeasure, SqlValidatorScope scope, S2CalciteSchema schema,
boolean nonAgg) throws Exception {
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
for (String m : reqMetrics) {
if (getMatchMetric(schema, sourceMeasure, m, queryMetrics)) {
MetricNode metricNode = buildMetricNode(m, dataModel, scope, schema, nonAgg, alias);
@@ -181,11 +178,10 @@ public class JoinRender extends Renderer {
private void doDimension(Map<String, SqlNode> innerSelect, Set<String> filterDimension,
List<String> queryDimension, List<String> reqDimensions, DataModel dataModel,
Set<String> dimension, SqlValidatorScope scope, S2SemanticSchema schema)
Set<String> dimension, SqlValidatorScope scope, S2CalciteSchema schema)
throws Exception {
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
for (String d : reqDimensions) {
if (getMatchDimension(schema, dimension, dataModel, d, queryDimension)) {
if (d.contains(Constants.DIMENSION_IDENTIFY)) {
@@ -208,7 +204,7 @@ public class JoinRender extends Renderer {
.collect(Collectors.toSet());
}
private boolean getMatchMetric(S2SemanticSchema schema, Set<String> sourceMeasure, String m,
private boolean getMatchMetric(S2CalciteSchema schema, Set<String> sourceMeasure, String m,
List<String> queryMetrics) {
Optional<Metric> metric = schema.getMetrics().stream()
.filter(mm -> mm.getName().equalsIgnoreCase(m)).findFirst();
@@ -229,7 +225,7 @@ public class JoinRender extends Renderer {
return isAdd;
}
private boolean getMatchDimension(S2SemanticSchema schema, Set<String> sourceDimension,
private boolean getMatchDimension(S2CalciteSchema schema, Set<String> sourceDimension,
DataModel dataModel, String d, List<String> queryDimension) {
String oriDimension = d;
boolean isAdd = false;
@@ -263,10 +259,9 @@ public class JoinRender extends Renderer {
}
private SqlNode buildJoin(SqlNode left, TableView leftTable, TableView tableView,
Map<String, String> before, DataModel dataModel, S2SemanticSchema schema,
Map<String, String> before, DataModel dataModel, S2CalciteSchema schema,
SqlValidatorScope scope) throws Exception {
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
SqlNode condition =
getCondition(leftTable, tableView, dataModel, schema, scope, engineType);
SqlLiteral sqlLiteral = SemanticNode.getJoinSqlLiteral("");
@@ -298,7 +293,7 @@ public class JoinRender extends Renderer {
}
private JoinRelation getMatchJoinRelation(Map<String, String> before, TableView tableView,
S2SemanticSchema schema) {
S2CalciteSchema schema) {
JoinRelation matchJoinRelation = JoinRelation.builder().build();
if (!CollectionUtils.isEmpty(schema.getJoinRelations())) {
for (JoinRelation joinRelation : schema.getJoinRelations()) {
@@ -338,7 +333,7 @@ public class JoinRender extends Renderer {
}
private SqlNode getCondition(TableView left, TableView right, DataModel dataModel,
S2SemanticSchema schema, SqlValidatorScope scope, EngineType engineType)
S2CalciteSchema schema, SqlValidatorScope scope, EngineType engineType)
throws Exception {
Set<String> selectLeft = SemanticNode.getSelect(left.getTable());
@@ -413,7 +408,7 @@ public class JoinRender extends Renderer {
}
private SqlNode getZipperCondition(TableView left, TableView right, DataModel dataModel,
S2SemanticSchema schema, SqlValidatorScope scope) throws Exception {
S2CalciteSchema schema, SqlValidatorScope scope) throws Exception {
if (Materialization.TimePartType.ZIPPER.equals(left.getDataModel().getTimePartType())
&& Materialization.TimePartType.ZIPPER
.equals(right.getDataModel().getTimePartType())) {
@@ -460,7 +455,7 @@ public class JoinRender extends Renderer {
dateTime = partMetric.getAlias() + "." + partTime.get().getName();
}
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType.fromString(schema.getOntology().getDatabase().getType());
ArrayList<SqlNode> operandList =
new ArrayList<>(Arrays.asList(SemanticNode.parse(endTime, scope, engineType),
SemanticNode.parse(dateTime, scope, engineType)));

View File

@@ -4,8 +4,7 @@ import com.tencent.supersonic.common.pojo.ColumnOrder;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
@@ -24,10 +23,9 @@ public class OutputRender extends Renderer {
@Override
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
TableView selectDataSet = super.tableView;
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
for (String dimension : metricCommand.getDimensions()) {
selectDataSet.getMeasure().add(SemanticNode.parse(dimension, scope, engineType));
}

View File

@@ -1,4 +1,4 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql;
package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
@@ -7,7 +7,8 @@ import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MeasureNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
@@ -37,7 +38,7 @@ public abstract class Renderer {
.findFirst();
}
public static Optional<Metric> getMetricByName(String name, S2SemanticSchema schema) {
public static Optional<Metric> getMetricByName(String name, S2CalciteSchema schema) {
Optional<Metric> metric = schema.getMetrics().stream()
.filter(m -> m.getName().equalsIgnoreCase(name)).findFirst();
return metric;
@@ -49,7 +50,7 @@ public abstract class Renderer {
}
public static MetricNode buildMetricNode(String metric, DataModel datasource,
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg, String alias)
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg, String alias)
throws Exception {
Optional<Metric> metricOpt = getMetricByName(metric, schema);
MetricNode metricNode = new MetricNode();
@@ -114,5 +115,5 @@ public abstract class Renderer {
}
public abstract void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception;
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception;
}

View File

@@ -9,10 +9,9 @@ import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataModelNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DimensionNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.FilterNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.IdentifyNode;
@@ -43,7 +42,7 @@ public class SourceRender extends Renderer {
public static TableView renderOne(String alias, List<String> fieldWheres,
List<String> reqMetrics, List<String> reqDimensions, String queryWhere,
DataModel datasource, SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg)
DataModel datasource, SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg)
throws Exception {
TableView dataSet = new TableView();
@@ -96,7 +95,7 @@ public class SourceRender extends Renderer {
output.setMeasure(SemanticNode.deduplicateNode(output.getMeasure()));
dataSet.setMeasure(SemanticNode.deduplicateNode(dataSet.getMeasure()));
SqlNode tableNode = DataSourceNode.buildExtend(datasource, extendFields, scope);
SqlNode tableNode = DataModelNode.buildExtend(datasource, extendFields, scope);
dataSet.setTable(tableNode);
output.setTable(
SemanticNode.buildAs(Constants.DATASOURCE_TABLE_OUT_PREFIX + datasource.getName()
@@ -107,11 +106,10 @@ public class SourceRender extends Renderer {
private static void buildDimension(String alias, String dimension, DataModel datasource,
S2SemanticSchema schema, boolean nonAgg, Map<String, String> extendFields,
S2CalciteSchema schema, boolean nonAgg, Map<String, String> extendFields,
TableView dataSet, TableView output, SqlValidatorScope scope) throws Exception {
List<Dimension> dimensionList = schema.getDimension().get(datasource.getName());
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
boolean isAdd = false;
if (!CollectionUtils.isEmpty(dimensionList)) {
for (Dimension dim : dimensionList) {
@@ -186,11 +184,10 @@ public class SourceRender extends Renderer {
private static List<SqlNode> getWhereMeasure(List<String> fields, List<String> queryMetrics,
List<String> queryDimensions, Map<String, String> extendFields, DataModel datasource,
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
Iterator<String> iterator = fields.iterator();
List<SqlNode> whereNode = new ArrayList<>();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
while (iterator.hasNext()) {
String cur = iterator.next();
if (queryDimensions.contains(cur) || queryMetrics.contains(cur)) {
@@ -229,7 +226,7 @@ public class SourceRender extends Renderer {
private static void mergeWhere(List<String> fields, TableView dataSet, TableView outputSet,
List<String> queryMetrics, List<String> queryDimensions,
Map<String, String> extendFields, DataModel datasource, SqlValidatorScope scope,
S2SemanticSchema schema, boolean nonAgg) throws Exception {
S2CalciteSchema schema, boolean nonAgg) throws Exception {
List<SqlNode> whereNode = getWhereMeasure(fields, queryMetrics, queryDimensions,
extendFields, datasource, scope, schema, nonAgg);
dataSet.getMeasure().addAll(whereNode);
@@ -237,7 +234,7 @@ public class SourceRender extends Renderer {
}
public static void whereDimMetric(List<String> fields, List<String> queryMetrics,
List<String> queryDimensions, DataModel datasource, S2SemanticSchema schema,
List<String> queryDimensions, DataModel datasource, S2CalciteSchema schema,
Set<String> dimensions, Set<String> metrics) {
for (String field : fields) {
if (queryDimensions.contains(field) || queryMetrics.contains(field)) {
@@ -252,7 +249,7 @@ public class SourceRender extends Renderer {
}
private static void addField(String field, String oriField, DataModel datasource,
S2SemanticSchema schema, Set<String> dimensions, Set<String> metrics) {
S2CalciteSchema schema, Set<String> dimensions, Set<String> metrics) {
Optional<Dimension> dimension = datasource.getDimensions().stream()
.filter(d -> d.getName().equalsIgnoreCase(field)).findFirst();
if (dimension.isPresent()) {
@@ -292,7 +289,7 @@ public class SourceRender extends Renderer {
}
}
public static boolean isDimension(String name, DataModel datasource, S2SemanticSchema schema) {
public static boolean isDimension(String name, DataModel datasource, S2CalciteSchema schema) {
Optional<Dimension> dimension = datasource.getDimensions().stream()
.filter(d -> d.getName().equalsIgnoreCase(name)).findFirst();
if (dimension.isPresent()) {
@@ -340,12 +337,11 @@ public class SourceRender extends Renderer {
}
public void render(MetricQueryParam metricQueryParam, List<DataModel> dataModels,
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
String queryWhere = metricQueryParam.getWhere();
Set<String> whereFields = new HashSet<>();
List<String> fieldWhere = new ArrayList<>();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
if (queryWhere != null && !queryWhere.isEmpty()) {
SqlNode sqlNode = SemanticNode.parse(queryWhere, scope, engineType);
FilterNode.getFilterField(sqlNode, whereFields);

View File

@@ -34,7 +34,6 @@ import com.tencent.supersonic.headless.core.cache.QueryCache;
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.SemanticTranslator;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import com.tencent.supersonic.headless.core.utils.ComponentFactory;
import com.tencent.supersonic.headless.server.annotation.S2DataPermission;
import com.tencent.supersonic.headless.server.facade.service.SemanticLayerService;
@@ -44,7 +43,6 @@ import com.tencent.supersonic.headless.server.service.DimensionService;
import com.tencent.supersonic.headless.server.service.MetricService;
import com.tencent.supersonic.headless.server.service.SchemaService;
import com.tencent.supersonic.headless.server.utils.MetricDrillDownChecker;
import com.tencent.supersonic.headless.server.utils.QueryReqConverter;
import com.tencent.supersonic.headless.server.utils.QueryUtils;
import com.tencent.supersonic.headless.server.utils.StatUtils;
import lombok.SneakyThrows;
@@ -68,7 +66,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
private final StatUtils statUtils;
private final QueryUtils queryUtils;
private final QueryReqConverter queryReqConverter;
private final SemanticSchemaManager semanticSchemaManager;
private final DataSetService dataSetService;
private final SchemaService schemaService;
@@ -81,14 +78,13 @@ public class S2SemanticLayerService implements SemanticLayerService {
private final List<QueryExecutor> queryExecutors = ComponentFactory.getQueryExecutors();
public S2SemanticLayerService(StatUtils statUtils, QueryUtils queryUtils,
QueryReqConverter queryReqConverter, SemanticSchemaManager semanticSchemaManager,
DataSetService dataSetService, SchemaService schemaService,
SemanticTranslator semanticTranslator, MetricDrillDownChecker metricDrillDownChecker,
SemanticSchemaManager semanticSchemaManager, DataSetService dataSetService,
SchemaService schemaService, SemanticTranslator semanticTranslator,
MetricDrillDownChecker metricDrillDownChecker,
KnowledgeBaseService knowledgeBaseService, MetricService metricService,
DimensionService dimensionService) {
this.statUtils = statUtils;
this.queryUtils = queryUtils;
this.queryReqConverter = queryReqConverter;
this.semanticSchemaManager = semanticSchemaManager;
this.dataSetService = dataSetService;
this.schemaService = schemaService;
@@ -123,7 +119,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
statUtils.initStatInfo(queryReq, user);
// 2.query from cache
String cacheKey = queryCache.getCacheKey(queryReq);
Object query = queryCache.query(queryReq, cacheKey);
if (Objects.nonNull(query)) {
@@ -137,16 +132,16 @@ public class S2SemanticLayerService implements SemanticLayerService {
}
StatUtils.get().setUseResultCache(false);
// 3 query
// 3 translate query
QueryStatement queryStatement = buildQueryStatement(queryReq, user);
semanticTranslator.translate(queryStatement);
// Check whether the dimensions of the metric drill-down are correct temporarily,
// add the abstraction of a validator later.
metricDrillDownChecker.checkQuery(queryStatement);
// 4.execute query
SemanticQueryResp queryResp = null;
// skip translation if already done.
if (!queryStatement.isTranslated()) {
semanticTranslator.translate(queryStatement);
}
queryPreCheck(queryStatement);
for (QueryExecutor queryExecutor : queryExecutors) {
if (queryExecutor.accept(queryStatement)) {
queryResp = queryExecutor.execute(queryStatement);
@@ -155,7 +150,7 @@ public class S2SemanticLayerService implements SemanticLayerService {
}
}
// 4 reset cache and set stateInfo
// 5.reset cache and set stateInfo
Boolean setCacheSuccess = queryCache.put(cacheKey, queryResp);
if (setCacheSuccess) {
// if result is not null, update cache data
@@ -186,7 +181,7 @@ public class S2SemanticLayerService implements SemanticLayerService {
List<String> dimensionValues = getDimensionValuesFromDict(dimensionValueReq, dataSetIds);
// If the search results are null, search dimensionValue from the database
// try to query dimensionValue from the database.
if (CollectionUtils.isEmpty(dimensionValues)) {
return getDimensionValuesFromDb(dimensionValueReq, user);
}
@@ -219,9 +214,29 @@ public class S2SemanticLayerService implements SemanticLayerService {
.map(MapResult::getName).collect(Collectors.toList());
}
private SemanticQueryResp getDimensionValuesFromDb(DimensionValueReq dimensionValueReq,
private SemanticQueryResp getDimensionValuesFromDb(DimensionValueReq queryDimValueReq,
User user) {
QuerySqlReq querySqlReq = buildQuerySqlReq(dimensionValueReq);
QuerySqlReq querySqlReq = new QuerySqlReq();
List<ModelResp> modelResps =
schemaService.getModelList(Lists.newArrayList(queryDimValueReq.getModelId()));
DimensionResp dimensionResp = schemaService.getDimension(queryDimValueReq.getBizName(),
queryDimValueReq.getModelId());
ModelResp modelResp = modelResps.get(0);
String sql = String.format("select distinct %s from %s where 1=1", dimensionResp.getName(),
modelResp.getName());
List<Dim> timeDims = modelResp.getTimeDimension();
if (CollectionUtils.isNotEmpty(timeDims)) {
sql = String.format("%s and %s >= '%s' and %s <= '%s'", sql,
TimeDimensionEnum.DAY.getName(), queryDimValueReq.getDateInfo().getStartDate(),
TimeDimensionEnum.DAY.getName(), queryDimValueReq.getDateInfo().getEndDate());
}
if (StringUtils.isNotBlank(queryDimValueReq.getValue())) {
sql += " AND " + queryDimValueReq.getBizName() + " LIKE '%"
+ queryDimValueReq.getValue() + "%'";
}
querySqlReq.setModelIds(Sets.newHashSet(queryDimValueReq.getModelId()));
querySqlReq.setSql(sql);
return queryByReq(querySqlReq, user);
}
@@ -272,18 +287,16 @@ public class S2SemanticLayerService implements SemanticLayerService {
return metricService.getMetrics(metaFilter);
}
private QueryStatement buildQueryStatement(SemanticQueryReq semanticQueryReq, User user)
throws Exception {
private QueryStatement buildQueryStatement(SemanticQueryReq semanticQueryReq, User user) {
QueryStatement queryStatement = null;
if (semanticQueryReq instanceof QuerySqlReq) {
queryStatement = buildSqlQueryStatement((QuerySqlReq) semanticQueryReq, user);
}
if (semanticQueryReq instanceof QueryStructReq) {
queryStatement = buildStructQueryStatement((QueryStructReq) semanticQueryReq);
queryStatement = buildStructQueryStatement(semanticQueryReq);
}
if (semanticQueryReq instanceof QueryMultiStructReq) {
queryStatement =
buildMultiStructQueryStatement((QueryMultiStructReq) semanticQueryReq, user);
queryStatement = buildMultiStructQueryStatement((QueryMultiStructReq) semanticQueryReq);
}
if (Objects.nonNull(queryStatement) && Objects.nonNull(semanticQueryReq.getSqlInfo())
&& StringUtils.isNotBlank(semanticQueryReq.getSqlInfo().getQuerySQL())) {
@@ -300,77 +313,40 @@ public class S2SemanticLayerService implements SemanticLayerService {
Long dataSetId = dataSetService.getDataSetIdFromSql(querySqlReq.getSql(), user);
querySqlReq.setDataSetId(dataSetId);
}
SchemaFilterReq filter = buildSchemaFilterReq(querySqlReq);
SemanticSchemaResp semanticSchemaResp = schemaService.fetchSemanticSchema(filter);
return queryReqConverter.buildQueryStatement(querySqlReq, semanticSchemaResp);
QueryStatement queryStatement = buildStructQueryStatement(querySqlReq);
queryStatement.setIsS2SQL(true);
queryStatement.setSql(querySqlReq.getSql());
return queryStatement;
}
private QueryStatement buildStructQueryStatement(QueryStructReq queryStructReq) {
SchemaFilterReq filter = buildSchemaFilterReq(queryStructReq);
SemanticSchemaResp semanticSchemaResp = schemaService.fetchSemanticSchema(filter);
private QueryStatement buildStructQueryStatement(SemanticQueryReq queryReq) {
SchemaFilterReq schemaFilterReq = new SchemaFilterReq();
schemaFilterReq.setDataSetId(queryReq.getDataSetId());
schemaFilterReq.setModelIds(queryReq.getModelIds());
SemanticSchemaResp semanticSchemaResp = schemaService.fetchSemanticSchema(schemaFilterReq);
QueryStatement queryStatement = new QueryStatement();
QueryParam queryParam = new QueryParam();
BeanUtils.copyProperties(queryStructReq, queryParam);
BeanUtils.copyProperties(queryReq, queryParam);
queryStatement.setQueryParam(queryParam);
queryStatement.setIsS2SQL(false);
queryStatement.setModelIds(queryReq.getModelIds());
queryStatement.setEnableOptimize(queryUtils.enableOptimize());
queryStatement.setDataSetId(queryStructReq.getDataSetId());
queryStatement.setDataSetId(queryReq.getDataSetId());
queryStatement.setSemanticSchemaResp(semanticSchemaResp);
queryStatement.setOntology(semanticSchemaManager.buildOntology(semanticSchemaResp));
return queryStatement;
}
private QueryStatement buildMultiStructQueryStatement(QueryMultiStructReq queryMultiStructReq,
User user) throws Exception {
List<QueryStatement> sqlParsers = new ArrayList<>();
private QueryStatement buildMultiStructQueryStatement(QueryMultiStructReq queryMultiStructReq) {
List<QueryStatement> queryStatements = new ArrayList<>();
for (QueryStructReq queryStructReq : queryMultiStructReq.getQueryStructReqs()) {
QueryStatement queryStatement = buildQueryStatement(queryStructReq, user);
Ontology ontology = queryStatement.getOntology();
queryStatement.setModelIds(queryStructReq.getModelIds());
queryStatement.setOntology(ontology);
queryStatement.setEnableOptimize(queryUtils.enableOptimize());
QueryStatement queryStatement = buildStructQueryStatement(queryStructReq);
semanticTranslator.translate(queryStatement);
sqlParsers.add(queryStatement);
queryStatements.add(queryStatement);
}
log.info("multi sqlParser:{}", sqlParsers);
return queryUtils.sqlParserUnion(queryMultiStructReq, sqlParsers);
}
private SchemaFilterReq buildSchemaFilterReq(SemanticQueryReq semanticQueryReq) {
SchemaFilterReq schemaFilterReq = new SchemaFilterReq();
schemaFilterReq.setDataSetId(semanticQueryReq.getDataSetId());
schemaFilterReq.setModelIds(semanticQueryReq.getModelIds());
return schemaFilterReq;
}
private QuerySqlReq buildQuerySqlReq(DimensionValueReq queryDimValueReq) {
QuerySqlReq querySqlReq = new QuerySqlReq();
List<ModelResp> modelResps =
schemaService.getModelList(Lists.newArrayList(queryDimValueReq.getModelId()));
DimensionResp dimensionResp = schemaService.getDimension(queryDimValueReq.getBizName(),
queryDimValueReq.getModelId());
ModelResp modelResp = modelResps.get(0);
String sql = String.format("select distinct %s from %s where 1=1", dimensionResp.getName(),
modelResp.getName());
List<Dim> timeDims = modelResp.getTimeDimension();
if (CollectionUtils.isNotEmpty(timeDims)) {
sql = String.format("%s and %s >= '%s' and %s <= '%s'", sql,
TimeDimensionEnum.DAY.getName(), queryDimValueReq.getDateInfo().getStartDate(),
TimeDimensionEnum.DAY.getName(), queryDimValueReq.getDateInfo().getEndDate());
}
if (StringUtils.isNotBlank(queryDimValueReq.getValue())) {
sql += " AND " + queryDimValueReq.getBizName() + " LIKE '%"
+ queryDimValueReq.getValue() + "%'";
}
querySqlReq.setModelIds(Sets.newHashSet(queryDimValueReq.getModelId()));
querySqlReq.setSql(sql);
return querySqlReq;
}
private void queryPreCheck(QueryStatement queryStatement) {
// Check whether the dimensions of the metric drill-down are correct temporarily,
// add the abstraction of a validator later.
metricDrillDownChecker.checkQuery(queryStatement);
log.info("Union multiple query statements:{}", queryStatements);
return queryUtils.unionAll(queryMultiStructReq, queryStatements);
}
}

View File

@@ -19,7 +19,7 @@ import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.MetricTypeParams;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.server.pojo.yaml.DataModelYamlTpl;
import com.tencent.supersonic.headless.server.pojo.yaml.DimensionTimeTypeParamsTpl;
import com.tencent.supersonic.headless.server.pojo.yaml.DimensionYamlTpl;
@@ -59,7 +59,6 @@ public class SemanticSchemaManager {
public Ontology buildOntology(SemanticSchemaResp semanticSchemaResp) {
Ontology ontology = new Ontology();
ontology.setSchemaKey(semanticSchemaResp.getSchemaKey());
Map<String, List<DimensionYamlTpl>> dimensionYamlTpls = new HashMap<>();
List<DataModelYamlTpl> dataModelYamlTpls = new ArrayList<>();
List<MetricYamlTpl> metricYamlTpls = new ArrayList<>();
@@ -177,7 +176,7 @@ public class SemanticSchemaManager {
}
public static DataModel getDatasource(final DataModelYamlTpl d) {
DataModel datasource = DataModel.builder().id(d.getId()).sourceId(d.getSourceId())
DataModel datasource = DataModel.builder().id(d.getId()).modelId(d.getSourceId())
.type(d.getType()).sqlQuery(d.getSqlQuery()).name(d.getName())
.tableQuery(d.getTableQuery()).identifiers(getIdentify(d.getIdentifiers()))
.measures(getMeasureParams(d.getMeasures()))
@@ -354,13 +353,13 @@ public class SemanticSchemaManager {
return joinRelations;
}
public static void update(S2SemanticSchema schema, List<Metric> metric) throws Exception {
public static void update(S2CalciteSchema schema, List<Metric> metric) throws Exception {
if (schema != null) {
updateMetric(metric, schema.getMetrics());
}
}
public static void update(S2SemanticSchema schema, DataModel datasourceYamlTpl)
public static void update(S2CalciteSchema schema, DataModel datasourceYamlTpl)
throws Exception {
if (schema != null) {
String dataSourceName = datasourceYamlTpl.getName();
@@ -375,7 +374,7 @@ public class SemanticSchemaManager {
}
}
public static void update(S2SemanticSchema schema, String datasourceBizName,
public static void update(S2CalciteSchema schema, String datasourceBizName,
List<Dimension> dimensionYamlTpls) throws Exception {
if (schema != null) {
Optional<Map.Entry<String, List<Dimension>>> datasourceYamlTplMap = schema

View File

@@ -32,7 +32,7 @@ public class MetricDrillDownChecker {
public void checkQuery(QueryStatement queryStatement) {
SemanticSchemaResp semanticSchemaResp = queryStatement.getSemanticSchemaResp();
String sql = queryStatement.getDataSetQueryParam().getSql();
String sql = queryStatement.getSql();
if (StringUtils.isBlank(sql)) {
return;
}

View File

@@ -1,378 +0,0 @@
package com.tencent.supersonic.headless.server.utils;
import com.tencent.supersonic.common.jsqlparser.SqlRemoveHelper;
import com.tencent.supersonic.common.jsqlparser.SqlReplaceHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectFunctionHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
import com.tencent.supersonic.common.pojo.Aggregator;
import com.tencent.supersonic.common.pojo.Constants;
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.common.pojo.enums.QueryType;
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
import com.tencent.supersonic.headless.api.pojo.Measure;
import com.tencent.supersonic.headless.api.pojo.MetricTable;
import com.tencent.supersonic.headless.api.pojo.QueryParam;
import com.tencent.supersonic.headless.api.pojo.SchemaItem;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.api.pojo.enums.MetricType;
import com.tencent.supersonic.headless.api.pojo.request.QuerySqlReq;
import com.tencent.supersonic.headless.api.pojo.request.QueryStructReq;
import com.tencent.supersonic.headless.api.pojo.response.DatabaseResp;
import com.tencent.supersonic.headless.api.pojo.response.DimSchemaResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricSchemaResp;
import com.tencent.supersonic.headless.api.pojo.response.ModelResp;
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptor;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptorFactory;
import com.tencent.supersonic.headless.core.pojo.DataSetQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.utils.SqlGenerateUtils;
import com.tencent.supersonic.headless.server.manager.SemanticSchemaManager;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component
@Slf4j
public class QueryReqConverter {
@Autowired
private QueryStructUtils queryStructUtils;
@Autowired
private SqlGenerateUtils sqlGenerateUtils;
@Autowired
private QueryUtils queryUtils;
@Autowired
private SemanticSchemaManager semanticSchemaManager;
public QueryStatement buildQueryStatement(QuerySqlReq querySQLReq,
SemanticSchemaResp semanticSchemaResp) {
if (semanticSchemaResp == null) {
return new QueryStatement();
}
// 1.convert name to bizName
convertNameToBizName(querySQLReq, semanticSchemaResp);
// 2.functionName corrector
functionNameCorrector(querySQLReq, semanticSchemaResp);
// 3.correct tableName
correctTableName(querySQLReq);
// 4.remove Underscores
querySQLReq.setSql(SqlRemoveHelper.removeUnderscores(querySQLReq.getSql()));
String tableName = SqlSelectHelper.getTableName(querySQLReq.getSql());
if (StringUtils.isEmpty(tableName)) {
return new QueryStatement();
}
// correct order item is same as agg alias
String reqSql = querySQLReq.getSql();
querySQLReq.setSql(SqlReplaceHelper.replaceAggAliasOrderItem(querySQLReq.getSql()));
log.debug("replaceOrderAggSameAlias {} -> {}", reqSql, querySQLReq.getSql());
// 5.build MetricTables
List<String> allFields = SqlSelectHelper.getAllSelectFields(querySQLReq.getSql());
List<MetricSchemaResp> metricSchemas = getMetrics(semanticSchemaResp, allFields);
List<String> metrics =
metricSchemas.stream().map(m -> m.getBizName()).collect(Collectors.toList());
QueryStructReq queryStructReq = new QueryStructReq();
MetricTable metricTable = new MetricTable();
metricTable.getMetrics().addAll(metrics);
Set<String> dimensions = getDimensions(semanticSchemaResp, allFields);
metricTable.getDimensions().addAll(dimensions);
metricTable.setAlias(tableName.toLowerCase());
// if metric empty , fill model default
if (CollectionUtils.isEmpty(metricTable.getMetrics())) {
metricTable.getMetrics().add(sqlGenerateUtils.generateInternalMetricName(
getDefaultModel(semanticSchemaResp, metricTable.getDimensions())));
} else {
queryStructReq.setAggregators(metricTable.getMetrics().stream()
.map(m -> new Aggregator(m, AggOperatorEnum.UNKNOWN))
.collect(Collectors.toList()));
}
AggOption aggOption = getAggOption(querySQLReq, metricSchemas);
metricTable.setAggOption(aggOption);
List<MetricTable> tables = new ArrayList<>();
tables.add(metricTable);
// 6.build ParseSqlReq
DataSetQueryParam result = new DataSetQueryParam();
BeanUtils.copyProperties(querySQLReq, result);
result.setTables(tables);
DatabaseResp database = semanticSchemaResp.getDatabaseResp();
if (!sqlGenerateUtils.isSupportWith(EngineType.fromString(database.getType().toUpperCase()),
database.getVersion())) {
result.setSupportWith(false);
result.setWithAlias(false);
}
// 7. do deriveMetric
generateDerivedMetric(semanticSchemaResp, aggOption, result);
// 8.physicalSql by ParseSqlReq
queryStructReq.setDateInfo(queryStructUtils.getDateConfBySql(querySQLReq.getSql()));
queryStructReq.setDataSetId(querySQLReq.getDataSetId());
queryStructReq.setQueryType(getQueryType(aggOption));
log.debug("QueryReqConverter queryStructReq[{}]", queryStructReq);
QueryParam queryParam = new QueryParam();
BeanUtils.copyProperties(queryStructReq, queryParam);
QueryStatement queryStatement = new QueryStatement();
queryStatement.setQueryParam(queryParam);
queryStatement.setDataSetQueryParam(result);
queryStatement.setIsS2SQL(true);
queryStatement.setMinMaxTime(queryStructUtils.getBeginEndTime(queryStructReq));
queryStatement.setDataSetId(querySQLReq.getDataSetId());
queryStatement.setLimit(querySQLReq.getLimit());
queryStatement.setModelIds(querySQLReq.getModelIds());
queryStatement.setEnableOptimize(queryUtils.enableOptimize());
queryStatement.setSemanticSchemaResp(semanticSchemaResp);
queryStatement.setOntology(semanticSchemaManager.buildOntology(semanticSchemaResp));
return queryStatement;
}
private AggOption getAggOption(QuerySqlReq databaseReq, List<MetricSchemaResp> metricSchemas) {
String sql = databaseReq.getSql();
if (!SqlSelectFunctionHelper.hasAggregateFunction(sql) && !SqlSelectHelper.hasGroupBy(sql)
&& !SqlSelectHelper.hasWith(sql) && !SqlSelectHelper.hasSubSelect(sql)) {
log.debug("getAggOption simple sql set to DEFAULT");
return AggOption.DEFAULT;
}
// if there is no group by in S2SQL,set MetricTable's aggOption to "NATIVE"
// if there is count() in S2SQL,set MetricTable's aggOption to "NATIVE"
if (!SqlSelectFunctionHelper.hasAggregateFunction(sql)
|| SqlSelectFunctionHelper.hasFunction(sql, "count")
|| SqlSelectFunctionHelper.hasFunction(sql, "count_distinct")) {
return AggOption.OUTER;
}
if (databaseReq.isInnerLayerNative()) {
return AggOption.NATIVE;
}
if (SqlSelectHelper.hasSubSelect(sql) || SqlSelectHelper.hasWith(sql)
|| SqlSelectHelper.hasGroupBy(sql)) {
return AggOption.OUTER;
}
long defaultAggNullCnt = metricSchemas.stream().filter(
m -> Objects.isNull(m.getDefaultAgg()) || StringUtils.isBlank(m.getDefaultAgg()))
.count();
if (defaultAggNullCnt > 0) {
log.debug("getAggOption find null defaultAgg metric set to NATIVE");
return AggOption.OUTER;
}
return AggOption.DEFAULT;
}
private void convertNameToBizName(QuerySqlReq querySqlReq,
SemanticSchemaResp semanticSchemaResp) {
Map<String, String> fieldNameToBizNameMap = getFieldNameToBizNameMap(semanticSchemaResp);
String sql = querySqlReq.getSql();
log.debug("dataSetId:{},convert name to bizName before:{}", querySqlReq.getDataSetId(),
sql);
sql = SqlReplaceHelper.replaceSqlByPositions(sql);
log.debug("replaceSqlByPositions:{}", sql);
String replaceFields = SqlReplaceHelper.replaceFields(sql, fieldNameToBizNameMap, true);
log.debug("dataSetId:{},convert name to bizName after:{}", querySqlReq.getDataSetId(),
replaceFields);
querySqlReq.setSql(replaceFields);
}
private Set<String> getDimensions(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) {
Map<String, String> dimensionLowerToNameMap = semanticSchemaResp.getDimensions().stream()
.collect(Collectors.toMap(entry -> entry.getBizName().toLowerCase(),
SchemaItem::getBizName, (k1, k2) -> k1));
Map<String, String> internalLowerToNameMap = QueryStructUtils.internalCols.stream()
.collect(Collectors.toMap(String::toLowerCase, a -> a));
dimensionLowerToNameMap.putAll(internalLowerToNameMap);
return allFields.stream()
.filter(entry -> dimensionLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> dimensionLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toSet());
}
private List<MetricSchemaResp> getMetrics(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) {
Map<String, MetricSchemaResp> metricLowerToNameMap =
semanticSchemaResp.getMetrics().stream().collect(Collectors
.toMap(entry -> entry.getBizName().toLowerCase(), entry -> entry));
return allFields.stream()
.filter(entry -> metricLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> metricLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toList());
}
private void functionNameCorrector(QuerySqlReq databaseReq,
SemanticSchemaResp semanticSchemaResp) {
DatabaseResp database = semanticSchemaResp.getDatabaseResp();
if (Objects.isNull(database) || Objects.isNull(database.getType())) {
return;
}
String type = database.getType();
DbAdaptor engineAdaptor = DbAdaptorFactory.getEngineAdaptor(type.toLowerCase());
if (Objects.nonNull(engineAdaptor)) {
String functionNameCorrector =
engineAdaptor.functionNameCorrector(databaseReq.getSql());
databaseReq.setSql(functionNameCorrector);
}
}
protected Map<String, String> getFieldNameToBizNameMap(SemanticSchemaResp semanticSchemaResp) {
// support fieldName and field alias to bizName
Map<String, String> dimensionResults = semanticSchemaResp.getDimensions().stream().flatMap(
entry -> getPairStream(entry.getAlias(), entry.getName(), entry.getBizName()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (k1, k2) -> k1));
Map<String, String> metricResults = semanticSchemaResp.getMetrics().stream().flatMap(
entry -> getPairStream(entry.getAlias(), entry.getName(), entry.getBizName()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (k1, k2) -> k1));
dimensionResults.putAll(TimeDimensionEnum.getChNameToNameMap());
dimensionResults.putAll(TimeDimensionEnum.getNameToNameMap());
dimensionResults.putAll(metricResults);
return dimensionResults;
}
private Stream<Pair<String, String>> getPairStream(String aliasStr, String name,
String bizName) {
Set<Pair<String, String>> elements = new HashSet<>();
elements.add(Pair.of(name, bizName));
if (StringUtils.isNotBlank(aliasStr)) {
List<String> aliasList = SchemaItem.getAliasList(aliasStr);
for (String alias : aliasList) {
elements.add(Pair.of(alias, bizName));
}
}
return elements.stream();
}
public void correctTableName(QuerySqlReq querySqlReq) {
String sql = querySqlReq.getSql();
sql = SqlReplaceHelper.replaceTable(sql,
Constants.TABLE_PREFIX + querySqlReq.getDataSetId());
log.debug("correctTableName after:{}", sql);
querySqlReq.setSql(sql);
}
private QueryType getQueryType(AggOption aggOption) {
boolean isAgg = AggOption.isAgg(aggOption);
QueryType queryType = QueryType.DETAIL;
if (isAgg) {
queryType = QueryType.AGGREGATE;
}
return queryType;
}
private void generateDerivedMetric(SemanticSchemaResp semanticSchemaResp, AggOption aggOption,
DataSetQueryParam viewQueryParam) {
String sql = viewQueryParam.getSql();
for (MetricTable metricTable : viewQueryParam.getTables()) {
Set<String> measures = new HashSet<>();
Map<String, String> replaces = generateDerivedMetric(semanticSchemaResp, aggOption,
metricTable.getMetrics(), metricTable.getDimensions(), measures);
if (!CollectionUtils.isEmpty(replaces)) {
// metricTable sql use measures replace metric
sql = SqlReplaceHelper.replaceSqlByExpression(sql, replaces);
metricTable.setAggOption(AggOption.NATIVE);
// metricTable use measures replace metric
if (!CollectionUtils.isEmpty(measures)) {
metricTable.setMetrics(new ArrayList<>(measures));
} else {
// empty measure , fill default
metricTable.setMetrics(new ArrayList<>());
metricTable.getMetrics().add(sqlGenerateUtils.generateInternalMetricName(
getDefaultModel(semanticSchemaResp, metricTable.getDimensions())));
}
}
}
viewQueryParam.setSql(sql);
}
private Map<String, String> generateDerivedMetric(SemanticSchemaResp semanticSchemaResp,
AggOption aggOption, List<String> metrics, List<String> dimensions,
Set<String> measures) {
Map<String, String> result = new HashMap<>();
List<MetricSchemaResp> metricResps = semanticSchemaResp.getMetrics();
List<DimSchemaResp> dimensionResps = semanticSchemaResp.getDimensions();
// Check if any metric is derived
boolean hasDerivedMetrics =
metricResps.stream().anyMatch(m -> metrics.contains(m.getBizName()) && MetricType
.isDerived(m.getMetricDefineType(), m.getMetricDefineByMeasureParams()));
if (!hasDerivedMetrics) {
return result;
}
log.debug("begin to generateDerivedMetric {} [{}]", aggOption, metrics);
Set<String> allFields = new HashSet<>();
Map<String, Measure> allMeasures = new HashMap<>();
semanticSchemaResp.getModelResps().forEach(modelResp -> {
allFields.addAll(modelResp.getFieldList());
if (modelResp.getModelDetail().getMeasures() != null) {
modelResp.getModelDetail().getMeasures()
.forEach(measure -> allMeasures.put(measure.getBizName(), measure));
}
});
Set<String> derivedDimensions = new HashSet<>();
Set<String> derivedMetrics = new HashSet<>();
Map<String, String> visitedMetrics = new HashMap<>();
for (MetricResp metricResp : metricResps) {
if (metrics.contains(metricResp.getBizName())) {
boolean isDerived = MetricType.isDerived(metricResp.getMetricDefineType(),
metricResp.getMetricDefineByMeasureParams());
if (isDerived) {
String expr = sqlGenerateUtils.generateDerivedMetric(metricResps, allFields,
allMeasures, dimensionResps, sqlGenerateUtils.getExpr(metricResp),
metricResp.getMetricDefineType(), aggOption, visitedMetrics,
derivedMetrics, derivedDimensions);
result.put(metricResp.getBizName(), expr);
log.debug("derived metric {}->{}", metricResp.getBizName(), expr);
} else {
measures.add(metricResp.getBizName());
}
}
}
measures.addAll(derivedMetrics);
derivedDimensions.stream().filter(dimension -> !dimensions.contains(dimension))
.forEach(dimensions::add);
return result;
}
private String getDefaultModel(SemanticSchemaResp semanticSchemaResp, List<String> dimensions) {
if (!CollectionUtils.isEmpty(dimensions)) {
Map<String, Long> modelMatchCnt = new HashMap<>();
for (ModelResp modelResp : semanticSchemaResp.getModelResps()) {
modelMatchCnt.put(modelResp.getBizName(), modelResp.getModelDetail().getDimensions()
.stream().filter(d -> dimensions.contains(d.getBizName())).count());
}
return modelMatchCnt.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
.map(m -> m.getKey()).findFirst().orElse("");
}
return semanticSchemaResp.getModelResps().get(0).getBizName();
}
}

View File

@@ -140,15 +140,15 @@ public class QueryUtils {
return null;
}
public QueryStatement sqlParserUnion(QueryMultiStructReq queryMultiStructCmd,
List<QueryStatement> sqlParsers) {
public QueryStatement unionAll(QueryMultiStructReq queryMultiStructCmd,
List<QueryStatement> queryStatements) {
QueryStatement sqlParser = new QueryStatement();
StringBuilder unionSqlBuilder = new StringBuilder();
for (int i = 0; i < sqlParsers.size(); i++) {
for (int i = 0; i < queryStatements.size(); i++) {
String selectStr = SqlGenerateUtils
.getUnionSelect(queryMultiStructCmd.getQueryStructReqs().get(i));
unionSqlBuilder.append(String.format("select %s from ( %s ) sub_sql_%s", selectStr,
sqlParsers.get(i).getSql(), i));
queryStatements.get(i).getSql(), i));
unionSqlBuilder.append(UNIONALL);
}
String unionSql = unionSqlBuilder.substring(0,

View File

@@ -6,8 +6,8 @@ import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.api.pojo.response.SqlParserResp;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.calcite.planner.AggPlanner;
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.SqlBuilder;
import com.tencent.supersonic.headless.server.manager.SemanticSchemaManager;
import com.tencent.supersonic.headless.server.pojo.yaml.DataModelYamlTpl;
import com.tencent.supersonic.headless.server.pojo.yaml.DimensionTimeTypeParamsTpl;
@@ -20,16 +20,12 @@ import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
class HeadlessParserServiceTest {
private static Map<String, S2SemanticSchema> headlessSchemaMap = new HashMap<>();
public static SqlParserResp parser(S2SemanticSchema semanticSchema,
public static SqlParserResp parser(S2CalciteSchema semanticSchema,
MetricQueryParam metricQueryParam, boolean isAgg) {
SqlParserResp sqlParser = new SqlParserResp();
try {
@@ -37,14 +33,13 @@ class HeadlessParserServiceTest {
sqlParser.setErrMsg("headlessSchema not found");
return sqlParser;
}
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
SqlBuilder aggBuilder = new SqlBuilder(semanticSchema);
QueryStatement queryStatement = new QueryStatement();
queryStatement.setMetricQueryParam(metricQueryParam);
aggBuilder.plan(queryStatement, AggOption.getAggregation(!isAgg));
EngineType engineType = EngineType
.fromString(semanticSchema.getSemanticModel().getDatabase().getType());
aggBuilder.build(queryStatement, AggOption.getAggregation(!isAgg));
EngineType engineType =
EngineType.fromString(semanticSchema.getOntology().getDatabase().getType());
sqlParser.setSql(aggBuilder.getSql(engineType));
sqlParser.setSourceId(aggBuilder.getSourceId());
} catch (Exception e) {
sqlParser.setErrMsg(e.getMessage());
log.error("parser error metricQueryReq[{}] error [{}]", metricQueryParam, e);
@@ -122,7 +117,7 @@ class HeadlessParserServiceTest {
identify.setType("primary");
identifies.add(identify);
datasource.setIdentifiers(identifies);
S2SemanticSchema semanticSchema = S2SemanticSchema.newBuilder("1").build();
S2CalciteSchema semanticSchema = S2CalciteSchema.builder().build();
SemanticSchemaManager.update(semanticSchema,
SemanticSchemaManager.getDatasource(datasource));
@@ -192,7 +187,7 @@ class HeadlessParserServiceTest {
System.out.println(parser(semanticSchema, metricCommand2, true));
}
private static void addDepartment(S2SemanticSchema semanticSchema) {
private static void addDepartment(S2CalciteSchema semanticSchema) {
DataModelYamlTpl datasource = new DataModelYamlTpl();
datasource.setName("user_department");
datasource.setSourceId(1L);

View File

@@ -134,7 +134,7 @@ public class S2VisitsDemo extends S2BaseDemo {
private void addSampleChats(Integer agentId) {
Long chatId = chatManageService.addChat(defaultUser, "样例对话1", agentId);
submitText(chatId.intValue(), agentId, "超音数 访问次数");
submitText(chatId.intValue(), agentId, "访问过超音数的部门有哪些");
submitText(chatId.intValue(), agentId, "按部门统计近7天访问次数");
submitText(chatId.intValue(), agentId, "alice 停留时长");
}
@@ -446,7 +446,7 @@ public class S2VisitsDemo extends S2BaseDemo {
termReq1.setDescription("用户为tom和lucy");
termReq1.setAlias(Lists.newArrayList("VIP用户"));
termReq1.setDomainId(s2Domain.getId());
termService.saveOrUpdate(termReq, defaultUser);
termService.saveOrUpdate(termReq1, defaultUser);
}
private void addAuthGroup_1(ModelResp stayTimeModel) {