[improvement][Chat] Support agent permission management (#1923)

* [improvement][Chat] Support agent permission management #1143

* [improvement][chat]Iterate LLM prompts of parsing and correction.

* [improvement][headless-fe] Added null-check conditions to the data formatting function.

* [improvement][headless]Clean code logic of headless translator.

---------

Co-authored-by: lxwcodemonkey <jolunoluo@tencent.com>
Co-authored-by: tristanliu <tristanliu@tencent.com>
This commit is contained in:
Jun Zhang
2024-11-23 09:09:04 +08:00
committed by GitHub
parent 244052e806
commit cb183b7ac8
66 changed files with 1023 additions and 1233 deletions

View File

@@ -4,9 +4,9 @@ import com.tencent.supersonic.common.calcite.Configuration;
import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
import com.tencent.supersonic.headless.core.pojo.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.TimeRange;
import com.tencent.supersonic.headless.core.translator.calcite.schema.DataSourceTable;
import com.tencent.supersonic.headless.core.translator.calcite.schema.DataSourceTable.Builder;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteTable;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteTable.Builder;
import com.tencent.supersonic.headless.core.translator.calcite.sql.SchemaBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.enumerable.EnumerableRules;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
@@ -156,14 +156,14 @@ public abstract class AbstractAccelerator implements QueryAccelerator {
String[] dbTable = materialization.getName().split("\\.");
String tb = dbTable[1].toLowerCase();
String db = dbTable[0].toLowerCase();
Builder builder = DataSourceTable.newBuilder(tb);
Builder builder = S2CalciteTable.newBuilder(tb);
for (String f : materialization.getColumns()) {
builder.addField(f, SqlTypeName.VARCHAR);
}
if (StringUtils.isNotBlank(materialization.getPartitionName())) {
builder.addField(materialization.getPartitionName(), SqlTypeName.VARCHAR);
}
DataSourceTable srcTable = builder.withRowCount(1L).build();
S2CalciteTable srcTable = builder.withRowCount(1L).build();
if (Objects.nonNull(db) && !db.isEmpty()) {
SchemaPlus schemaPlus = dataSetSchema.plus().getSubSchema(db);
if (Objects.isNull(schemaPlus)) {

View File

@@ -38,7 +38,7 @@ public class JdbcExecutor implements QueryExecutor {
SqlUtils sqlUtils = ContextUtils.getBean(SqlUtils.class);
String sql = StringUtils.normalizeSpace(queryStatement.getSql());
log.info("executing SQL: {}", sql);
Database database = queryStatement.getSemanticModel().getDatabase();
Database database = queryStatement.getOntology().getDatabase();
SemanticQueryResp queryResultWithColumns = new SemanticQueryResp();
try {
SqlUtils sqlUtil = sqlUtils.init(database);

View File

@@ -2,7 +2,7 @@ package com.tencent.supersonic.headless.core.pojo;
import com.tencent.supersonic.headless.api.pojo.QueryParam;
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.SemanticModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -29,7 +29,7 @@ public class QueryStatement {
private String dataSetAlias;
private String dataSetSimplifySql;
private Boolean enableLimitWrapper = false;
private SemanticModel semanticModel;
private Ontology ontology;
private SemanticSchemaResp semanticSchemaResp;
private Integer limit = 1000;
private Boolean isTranslated = false;

View File

@@ -1,34 +1,72 @@
package com.tencent.supersonic.headless.core.translator;
import com.tencent.supersonic.common.calcite.SqlMergeWithUtils;
import com.tencent.supersonic.common.jsqlparser.SqlRemoveHelper;
import com.tencent.supersonic.common.jsqlparser.SqlReplaceHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectFunctionHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
import com.tencent.supersonic.common.pojo.Aggregator;
import com.tencent.supersonic.common.pojo.Constants;
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.common.pojo.enums.QueryType;
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
import com.tencent.supersonic.common.util.StringUtil;
import com.tencent.supersonic.headless.api.pojo.Measure;
import com.tencent.supersonic.headless.api.pojo.MetricTable;
import com.tencent.supersonic.headless.api.pojo.QueryParam;
import com.tencent.supersonic.headless.api.pojo.SchemaItem;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.api.pojo.enums.MetricType;
import com.tencent.supersonic.headless.api.pojo.request.QueryStructReq;
import com.tencent.supersonic.headless.api.pojo.response.DatabaseResp;
import com.tencent.supersonic.headless.api.pojo.response.DimSchemaResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricSchemaResp;
import com.tencent.supersonic.headless.api.pojo.response.ModelResp;
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptor;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptorFactory;
import com.tencent.supersonic.headless.core.pojo.DataSetQueryParam;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.SemanticModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import com.tencent.supersonic.headless.core.translator.converter.QueryConverter;
import com.tencent.supersonic.headless.core.utils.ComponentFactory;
import com.tencent.supersonic.headless.core.utils.SqlGenerateUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Component
@Slf4j
public class DefaultSemanticTranslator implements SemanticTranslator {
@Autowired
private SqlGenerateUtils sqlGenerateUtils;
public void translate(QueryStatement queryStatement) {
if (queryStatement.isTranslated()) {
return;
}
try {
preprocess(queryStatement);
parse(queryStatement);
optimize(queryStatement);
} catch (Exception e) {
@@ -36,13 +74,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
}
}
public void optimize(QueryStatement queryStatement) {
for (QueryOptimizer queryOptimizer : ComponentFactory.getQueryOptimizers()) {
queryOptimizer.rewrite(queryStatement);
}
}
public void parse(QueryStatement queryStatement) throws Exception {
private void parse(QueryStatement queryStatement) throws Exception {
QueryParam queryParam = queryStatement.getQueryParam();
if (Objects.isNull(queryStatement.getDataSetQueryParam())) {
queryStatement.setDataSetQueryParam(new DataSetQueryParam());
@@ -50,6 +82,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
if (Objects.isNull(queryStatement.getMetricQueryParam())) {
queryStatement.setMetricQueryParam(new MetricQueryParam());
}
log.debug("SemanticConverter before [{}]", queryParam);
for (QueryConverter headlessConverter : ComponentFactory.getQueryConverters()) {
if (headlessConverter.accept(queryStatement)) {
@@ -59,13 +92,16 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
}
log.debug("SemanticConverter after {} {} {}", queryParam,
queryStatement.getDataSetQueryParam(), queryStatement.getMetricQueryParam());
if (!queryStatement.getDataSetQueryParam().getSql().isEmpty()) {
doParse(queryStatement.getDataSetQueryParam(), queryStatement);
} else {
queryStatement.getMetricQueryParam()
.setNativeQuery(queryParam.getQueryType().isNativeAggQuery());
doParse(queryStatement);
doParse(queryStatement,
AggOption.getAggregation(queryStatement.getMetricQueryParam().isNativeQuery()));
}
if (StringUtils.isEmpty(queryStatement.getSql())) {
throw new RuntimeException("parse Exception: " + queryStatement.getErrMsg());
}
@@ -77,11 +113,11 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
}
}
public QueryStatement doParse(DataSetQueryParam dataSetQueryParam,
private QueryStatement doParse(DataSetQueryParam dataSetQueryParam,
QueryStatement queryStatement) {
log.info("parse dataSetQuery [{}] ", dataSetQueryParam);
SemanticModel semanticModel = queryStatement.getSemanticModel();
EngineType engineType = EngineType.fromString(semanticModel.getDatabase().getType());
Ontology ontology = queryStatement.getOntology();
EngineType engineType = EngineType.fromString(ontology.getDatabase().getType());
try {
if (!CollectionUtils.isEmpty(dataSetQueryParam.getTables())) {
List<String[]> tables = new ArrayList<>();
@@ -132,12 +168,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
return queryStatement;
}
public QueryStatement doParse(QueryStatement queryStatement) {
return doParse(queryStatement,
AggOption.getAggregation(queryStatement.getMetricQueryParam().isNativeQuery()));
}
public QueryStatement doParse(QueryStatement queryStatement, AggOption isAgg) {
private QueryStatement doParse(QueryStatement queryStatement, AggOption isAgg) {
MetricQueryParam metricQueryParam = queryStatement.getMetricQueryParam();
log.info("parse metricQuery [{}] isAgg [{}]", metricQueryParam, isAgg);
try {
@@ -151,18 +182,19 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
private QueryStatement parserSql(MetricTable metricTable, Boolean isSingleMetricTable,
DataSetQueryParam dataSetQueryParam, QueryStatement queryStatement) throws Exception {
MetricQueryParam metricReq = new MetricQueryParam();
metricReq.setMetrics(metricTable.getMetrics());
metricReq.setDimensions(metricTable.getDimensions());
metricReq.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
metricReq.setNativeQuery(!AggOption.isAgg(metricTable.getAggOption()));
MetricQueryParam metricQueryParam = new MetricQueryParam();
metricQueryParam.setMetrics(metricTable.getMetrics());
metricQueryParam.setDimensions(metricTable.getDimensions());
metricQueryParam.setWhere(StringUtil.formatSqlQuota(metricTable.getWhere()));
metricQueryParam.setNativeQuery(!AggOption.isAgg(metricTable.getAggOption()));
QueryStatement tableSql = new QueryStatement();
tableSql.setIsS2SQL(false);
tableSql.setMetricQueryParam(metricReq);
tableSql.setMetricQueryParam(metricQueryParam);
tableSql.setMinMaxTime(queryStatement.getMinMaxTime());
tableSql.setEnableOptimize(queryStatement.getEnableOptimize());
tableSql.setDataSetId(queryStatement.getDataSetId());
tableSql.setSemanticModel(queryStatement.getSemanticModel());
tableSql.setOntology(queryStatement.getOntology());
if (isSingleMetricTable) {
tableSql.setDataSetSql(dataSetQueryParam.getSql());
tableSql.setDataSetAlias(metricTable.getAlias());
@@ -174,4 +206,302 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
}
return tableSql;
}
private void optimize(QueryStatement queryStatement) {
for (QueryOptimizer queryOptimizer : ComponentFactory.getQueryOptimizers()) {
queryOptimizer.rewrite(queryStatement);
}
}
private void preprocess(QueryStatement queryStatement) {
if (StringUtils.isBlank(queryStatement.getSql())) {
return;
}
SemanticSchemaResp semanticSchemaResp = queryStatement.getSemanticSchemaResp();
convertNameToBizName(queryStatement);
rewriteFunction(queryStatement);
queryStatement.setSql(SqlRemoveHelper.removeUnderscores(queryStatement.getSql()));
String tableName = SqlSelectHelper.getTableName(queryStatement.getSql());
if (StringUtils.isEmpty(tableName)) {
return;
}
// correct order item is same as agg alias
String reqSql = queryStatement.getSql();
queryStatement.setSql(SqlReplaceHelper.replaceAggAliasOrderItem(queryStatement.getSql()));
log.debug("replaceOrderAggSameAlias {} -> {}", reqSql, queryStatement.getSql());
// 5.build MetricTables
List<String> allFields = SqlSelectHelper.getAllSelectFields(queryStatement.getSql());
List<MetricSchemaResp> metricSchemas = getMetrics(semanticSchemaResp, allFields);
List<String> metrics =
metricSchemas.stream().map(SchemaItem::getBizName).collect(Collectors.toList());
Set<String> dimensions = getDimensions(semanticSchemaResp, allFields);
QueryStructReq queryStructReq = new QueryStructReq();
MetricTable metricTable = new MetricTable();
metricTable.getMetrics().addAll(metrics);
metricTable.getDimensions().addAll(dimensions);
metricTable.setAlias(tableName.toLowerCase());
// if metric empty , fill model default
if (CollectionUtils.isEmpty(metricTable.getMetrics())) {
metricTable.getMetrics().add(sqlGenerateUtils.generateInternalMetricName(
getDefaultModel(semanticSchemaResp, metricTable.getDimensions())));
} else {
queryStructReq.getAggregators()
.addAll(metricTable.getMetrics().stream()
.map(m -> new Aggregator(m, AggOperatorEnum.UNKNOWN))
.collect(Collectors.toList()));
}
AggOption aggOption = getAggOption(queryStatement, metricSchemas);
metricTable.setAggOption(aggOption);
List<MetricTable> tables = new ArrayList<>();
tables.add(metricTable);
// 6.build ParseSqlReq
DataSetQueryParam datasetQueryParam = new DataSetQueryParam();
datasetQueryParam.setTables(tables);
datasetQueryParam.setSql(queryStatement.getSql());
DatabaseResp database = semanticSchemaResp.getDatabaseResp();
if (!sqlGenerateUtils.isSupportWith(EngineType.fromString(database.getType().toUpperCase()),
database.getVersion())) {
datasetQueryParam.setSupportWith(false);
datasetQueryParam.setWithAlias(false);
}
// 7. do deriveMetric
generateDerivedMetric(semanticSchemaResp, aggOption, datasetQueryParam);
// 8.physicalSql by ParseSqlReq
// queryStructReq.setDateInfo(queryStructUtils.getDateConfBySql(queryStatement.getSql()));
queryStructReq.setDataSetId(queryStatement.getDataSetId());
queryStructReq.setQueryType(getQueryType(aggOption));
log.debug("QueryReqConverter queryStructReq[{}]", queryStructReq);
QueryParam queryParam = new QueryParam();
BeanUtils.copyProperties(queryStructReq, queryParam);
queryStatement.setQueryParam(queryParam);
queryStatement.setDataSetQueryParam(datasetQueryParam);
// queryStatement.setMinMaxTime(queryStructUtils.getBeginEndTime(queryStructReq));
}
private AggOption getAggOption(QueryStatement queryStatement,
List<MetricSchemaResp> metricSchemas) {
String sql = queryStatement.getSql();
if (!SqlSelectFunctionHelper.hasAggregateFunction(sql) && !SqlSelectHelper.hasGroupBy(sql)
&& !SqlSelectHelper.hasWith(sql) && !SqlSelectHelper.hasSubSelect(sql)) {
log.debug("getAggOption simple sql set to DEFAULT");
return AggOption.DEFAULT;
}
// if there is no group by in S2SQL,set MetricTable's aggOption to "NATIVE"
// if there is count() in S2SQL,set MetricTable's aggOption to "NATIVE"
if (!SqlSelectFunctionHelper.hasAggregateFunction(sql)
|| SqlSelectFunctionHelper.hasFunction(sql, "count")
|| SqlSelectFunctionHelper.hasFunction(sql, "count_distinct")) {
return AggOption.OUTER;
}
// if (queryStatement.isInnerLayerNative()) {
// return AggOption.NATIVE;
// }
if (SqlSelectHelper.hasSubSelect(sql) || SqlSelectHelper.hasWith(sql)
|| SqlSelectHelper.hasGroupBy(sql)) {
return AggOption.OUTER;
}
long defaultAggNullCnt = metricSchemas.stream().filter(
m -> Objects.isNull(m.getDefaultAgg()) || StringUtils.isBlank(m.getDefaultAgg()))
.count();
if (defaultAggNullCnt > 0) {
log.debug("getAggOption find null defaultAgg metric set to NATIVE");
return AggOption.OUTER;
}
return AggOption.DEFAULT;
}
private void convertNameToBizName(QueryStatement queryStatement) {
SemanticSchemaResp semanticSchemaResp = queryStatement.getSemanticSchemaResp();
Map<String, String> fieldNameToBizNameMap = getFieldNameToBizNameMap(semanticSchemaResp);
String sql = queryStatement.getSql();
log.debug("dataSetId:{},convert name to bizName before:{}", queryStatement.getDataSetId(),
sql);
sql = SqlReplaceHelper.replaceSqlByPositions(sql);
log.debug("replaceSqlByPositions:{}", sql);
sql = SqlReplaceHelper.replaceFields(sql, fieldNameToBizNameMap, true);
log.debug("dataSetId:{},convert name to bizName after:{}", queryStatement.getDataSetId(),
sql);
sql = SqlReplaceHelper.replaceTable(sql,
Constants.TABLE_PREFIX + queryStatement.getDataSetId());
log.debug("replaceTableName after:{}", sql);
queryStatement.setSql(sql);
}
private Set<String> getDimensions(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) {
Map<String, String> dimensionLowerToNameMap = semanticSchemaResp.getDimensions().stream()
.collect(Collectors.toMap(entry -> entry.getBizName().toLowerCase(),
SchemaItem::getBizName, (k1, k2) -> k1));
dimensionLowerToNameMap.put(TimeDimensionEnum.DAY.getName(),
TimeDimensionEnum.DAY.getName());
return allFields.stream()
.filter(entry -> dimensionLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> dimensionLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toSet());
}
private List<MetricSchemaResp> getMetrics(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) {
Map<String, MetricSchemaResp> metricLowerToNameMap =
semanticSchemaResp.getMetrics().stream().collect(Collectors
.toMap(entry -> entry.getBizName().toLowerCase(), entry -> entry));
return allFields.stream()
.filter(entry -> metricLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> metricLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toList());
}
private void rewriteFunction(QueryStatement queryStatement) {
SemanticSchemaResp semanticSchemaResp = queryStatement.getSemanticSchemaResp();
DatabaseResp database = semanticSchemaResp.getDatabaseResp();
if (Objects.isNull(database) || Objects.isNull(database.getType())) {
return;
}
String type = database.getType();
DbAdaptor engineAdaptor = DbAdaptorFactory.getEngineAdaptor(type.toLowerCase());
if (Objects.nonNull(engineAdaptor)) {
String functionNameCorrector =
engineAdaptor.functionNameCorrector(queryStatement.getSql());
queryStatement.setSql(functionNameCorrector);
}
}
protected Map<String, String> getFieldNameToBizNameMap(SemanticSchemaResp semanticSchemaResp) {
// support fieldName and field alias to bizName
Map<String, String> dimensionResults = semanticSchemaResp.getDimensions().stream().flatMap(
entry -> getPairStream(entry.getAlias(), entry.getName(), entry.getBizName()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (k1, k2) -> k1));
Map<String, String> metricResults = semanticSchemaResp.getMetrics().stream().flatMap(
entry -> getPairStream(entry.getAlias(), entry.getName(), entry.getBizName()))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (k1, k2) -> k1));
dimensionResults.putAll(TimeDimensionEnum.getChNameToNameMap());
dimensionResults.putAll(TimeDimensionEnum.getNameToNameMap());
dimensionResults.putAll(metricResults);
return dimensionResults;
}
private Stream<Pair<String, String>> getPairStream(String aliasStr, String name,
String bizName) {
Set<Pair<String, String>> elements = new HashSet<>();
elements.add(Pair.of(name, bizName));
if (StringUtils.isNotBlank(aliasStr)) {
List<String> aliasList = SchemaItem.getAliasList(aliasStr);
for (String alias : aliasList) {
elements.add(Pair.of(alias, bizName));
}
}
return elements.stream();
}
private QueryType getQueryType(AggOption aggOption) {
boolean isAgg = AggOption.isAgg(aggOption);
QueryType queryType = QueryType.DETAIL;
if (isAgg) {
queryType = QueryType.AGGREGATE;
}
return queryType;
}
private void generateDerivedMetric(SemanticSchemaResp semanticSchemaResp, AggOption aggOption,
DataSetQueryParam viewQueryParam) {
String sql = viewQueryParam.getSql();
for (MetricTable metricTable : viewQueryParam.getTables()) {
Set<String> measures = new HashSet<>();
Map<String, String> replaces = generateDerivedMetric(semanticSchemaResp, aggOption,
metricTable.getMetrics(), metricTable.getDimensions(), measures);
if (!CollectionUtils.isEmpty(replaces)) {
// metricTable sql use measures replace metric
sql = SqlReplaceHelper.replaceSqlByExpression(sql, replaces);
metricTable.setAggOption(AggOption.NATIVE);
// metricTable use measures replace metric
if (!CollectionUtils.isEmpty(measures)) {
metricTable.setMetrics(new ArrayList<>(measures));
} else {
// empty measure , fill default
metricTable.setMetrics(new ArrayList<>());
metricTable.getMetrics().add(sqlGenerateUtils.generateInternalMetricName(
getDefaultModel(semanticSchemaResp, metricTable.getDimensions())));
}
}
}
viewQueryParam.setSql(sql);
}
private Map<String, String> generateDerivedMetric(SemanticSchemaResp semanticSchemaResp,
AggOption aggOption, List<String> metrics, List<String> dimensions,
Set<String> measures) {
Map<String, String> result = new HashMap<>();
List<MetricSchemaResp> metricResps = semanticSchemaResp.getMetrics();
List<DimSchemaResp> dimensionResps = semanticSchemaResp.getDimensions();
// Check if any metric is derived
boolean hasDerivedMetrics =
metricResps.stream().anyMatch(m -> metrics.contains(m.getBizName()) && MetricType
.isDerived(m.getMetricDefineType(), m.getMetricDefineByMeasureParams()));
if (!hasDerivedMetrics) {
return result;
}
log.debug("begin to generateDerivedMetric {} [{}]", aggOption, metrics);
Set<String> allFields = new HashSet<>();
Map<String, Measure> allMeasures = new HashMap<>();
semanticSchemaResp.getModelResps().forEach(modelResp -> {
allFields.addAll(modelResp.getFieldList());
if (modelResp.getModelDetail().getMeasures() != null) {
modelResp.getModelDetail().getMeasures()
.forEach(measure -> allMeasures.put(measure.getBizName(), measure));
}
});
Set<String> derivedDimensions = new HashSet<>();
Set<String> derivedMetrics = new HashSet<>();
Map<String, String> visitedMetrics = new HashMap<>();
for (MetricResp metricResp : metricResps) {
if (metrics.contains(metricResp.getBizName())) {
boolean isDerived = MetricType.isDerived(metricResp.getMetricDefineType(),
metricResp.getMetricDefineByMeasureParams());
if (isDerived) {
String expr = sqlGenerateUtils.generateDerivedMetric(metricResps, allFields,
allMeasures, dimensionResps, sqlGenerateUtils.getExpr(metricResp),
metricResp.getMetricDefineType(), aggOption, visitedMetrics,
derivedMetrics, derivedDimensions);
result.put(metricResp.getBizName(), expr);
log.debug("derived metric {}->{}", metricResp.getBizName(), expr);
} else {
measures.add(metricResp.getBizName());
}
}
}
measures.addAll(derivedMetrics);
derivedDimensions.stream().filter(dimension -> !dimensions.contains(dimension))
.forEach(dimensions::add);
return result;
}
private String getDefaultModel(SemanticSchemaResp semanticSchemaResp, List<String> dimensions) {
if (!CollectionUtils.isEmpty(dimensions)) {
Map<String, Long> modelMatchCnt = new HashMap<>();
for (ModelResp modelResp : semanticSchemaResp.getModelResps()) {
modelMatchCnt.put(modelResp.getBizName(), modelResp.getModelDetail().getDimensions()
.stream().filter(d -> dimensions.contains(d.getBizName())).count());
}
return modelMatchCnt.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
.map(m -> m.getKey()).findFirst().orElse("");
}
return semanticSchemaResp.getModelResps().get(0).getBizName();
}
}

View File

@@ -1,22 +1,15 @@
package com.tencent.supersonic.headless.core.translator.calcite;
import com.tencent.supersonic.common.calcite.SqlMergeWithUtils;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.QueryParser;
import com.tencent.supersonic.headless.core.translator.calcite.planner.AggPlanner;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.SemanticModel;
import com.tencent.supersonic.headless.core.translator.calcite.schema.RuntimeOptions;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import com.tencent.supersonic.headless.core.translator.calcite.sql.RuntimeOptions;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.SqlBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.parser.SqlParseException;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Objects;
/** the calcite parse implements */
@Component("CalciteQueryParser")
@Slf4j
@@ -24,55 +17,19 @@ public class CalciteQueryParser implements QueryParser {
@Override
public void parse(QueryStatement queryStatement, AggOption isAgg) throws Exception {
MetricQueryParam metricReq = queryStatement.getMetricQueryParam();
SemanticModel semanticModel = queryStatement.getSemanticModel();
if (semanticModel == null) {
queryStatement.setErrMsg("semanticSchema not found");
Ontology ontology = queryStatement.getOntology();
if (ontology == null) {
queryStatement.setErrMsg("No ontology could be found");
return;
}
queryStatement.setMetricQueryParam(metricReq);
SemanticSchema semanticSchema = getSemanticSchema(semanticModel, queryStatement);
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
aggBuilder.explain(queryStatement, isAgg);
EngineType engineType = EngineType.fromString(semanticModel.getDatabase().getType());
queryStatement.setSql(aggBuilder.getSql(engineType));
if (Objects.nonNull(queryStatement.getEnableOptimize())
&& queryStatement.getEnableOptimize()
&& Objects.nonNull(queryStatement.getDataSetAlias())
&& !queryStatement.getDataSetAlias().isEmpty()) {
// simplify model sql with query sql
String simplifySql = aggBuilder.simplify(
getSqlByDataSet(engineType, aggBuilder.getSql(engineType),
queryStatement.getDataSetSql(), queryStatement.getDataSetAlias()),
engineType);
if (Objects.nonNull(simplifySql) && !simplifySql.isEmpty()) {
log.debug("simplifySql [{}]", simplifySql);
queryStatement.setDataSetSimplifySql(simplifySql);
}
}
S2CalciteSchema semanticSchema = S2CalciteSchema.builder()
.schemaKey("DATASET_" + queryStatement.getDataSetId()).ontology(ontology)
.runtimeOptions(RuntimeOptions.builder().minMaxTime(queryStatement.getMinMaxTime())
.enableOptimize(queryStatement.getEnableOptimize()).build())
.build();
SqlBuilder sqlBuilder = new SqlBuilder(semanticSchema);
sqlBuilder.build(queryStatement, isAgg);
}
private SemanticSchema getSemanticSchema(SemanticModel semanticModel,
QueryStatement queryStatement) {
SemanticSchema semanticSchema =
SemanticSchema.newBuilder(semanticModel.getSchemaKey()).build();
semanticSchema.setSemanticModel(semanticModel);
semanticSchema.setDatasource(semanticModel.getDatasourceMap());
semanticSchema.setDimension(semanticModel.getDimensionMap());
semanticSchema.setMetric(semanticModel.getMetrics());
semanticSchema.setJoinRelations(semanticModel.getJoinRelations());
semanticSchema.setRuntimeOptions(
RuntimeOptions.builder().minMaxTime(queryStatement.getMinMaxTime())
.enableOptimize(queryStatement.getEnableOptimize()).build());
return semanticSchema;
}
private String getSqlByDataSet(EngineType engineType, String parentSql, String dataSetSql,
String parentAlias) throws SqlParseException {
if (!SqlMergeWithUtils.hasWith(engineType, dataSetSql)) {
return String.format("with %s as (%s) %s", parentAlias, parentSql, dataSetSql);
}
return SqlMergeWithUtils.mergeWith(engineType, dataSetSql,
Collections.singletonList(parentSql), Collections.singletonList(parentAlias));
}
}

View File

@@ -1,17 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.planner;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
/** parse and generate SQL and other execute information */
public interface Planner {
public void explain(QueryStatement queryStatement, AggOption aggOption) throws Exception;
public String getSql(EngineType enginType);
public String getSourceId();
public String simplify(String sql, EngineType engineType);
}

View File

@@ -7,13 +7,13 @@ import java.util.List;
@Data
@Builder
public class DataSource {
public class DataModel {
private Long id;
private String name;
private Long sourceId;
private Long modelId;
private String type;

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.s2sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticItem;
import lombok.Builder;
import lombok.Data;

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.s2sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticItem;
import lombok.Data;
import java.util.List;

View File

@@ -11,11 +11,10 @@ import java.util.Map;
import java.util.stream.Collectors;
@Data
public class SemanticModel {
public class Ontology {
private String schemaKey;
private List<Metric> metrics = new ArrayList<>();
private Map<String, DataSource> datasourceMap = new HashMap<>();
private Map<String, DataModel> dataModelMap = new HashMap<>();
private Map<String, List<Dimension>> dimensionMap = new HashMap<>();
private List<Materialization> materializationList = new ArrayList<>();
private List<JoinRelation> joinRelations;
@@ -26,8 +25,8 @@ public class SemanticModel {
.collect(Collectors.toList());
}
public Map<Long, DataSource> getModelMap() {
return datasourceMap.values().stream()
.collect(Collectors.toMap(DataSource::getId, dataSource -> dataSource));
public Map<Long, DataModel> getModelMap() {
return dataModelMap.values().stream()
.collect(Collectors.toMap(DataModel::getId, dataSource -> dataSource));
}
}

View File

@@ -1,6 +1,7 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.s2sql;
public interface SemanticItem {
String getName();
public String getName();
String getType();
}

View File

@@ -1,136 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.SemanticModel;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class SemanticSchema extends AbstractSchema {
private final String schemaKey;
private final Map<String, Table> tableMap;
private SemanticModel semanticModel = new SemanticModel();
private List<JoinRelation> joinRelations;
private RuntimeOptions runtimeOptions;
private SemanticSchema(String schemaKey, Map<String, Table> tableMap) {
this.schemaKey = schemaKey;
this.tableMap = tableMap;
}
public static Builder newBuilder(String schemaKey) {
return new Builder(schemaKey);
}
public String getSchemaKey() {
return schemaKey;
}
public void setSemanticModel(SemanticModel semanticModel) {
this.semanticModel = semanticModel;
}
public SemanticModel getSemanticModel() {
return semanticModel;
}
@Override
public Map<String, Table> getTableMap() {
return tableMap;
}
@Override
public Schema snapshot(SchemaVersion version) {
return this;
}
public Map<String, DataSource> getDatasource() {
return semanticModel.getDatasourceMap();
}
public void setDatasource(Map<String, DataSource> datasource) {
semanticModel.setDatasourceMap(datasource);
}
public Map<String, List<Dimension>> getDimension() {
return semanticModel.getDimensionMap();
}
public void setDimension(Map<String, List<Dimension>> dimensions) {
semanticModel.setDimensionMap(dimensions);
}
public List<Metric> getMetrics() {
return semanticModel.getMetrics();
}
public void setMetric(List<Metric> metric) {
semanticModel.setMetrics(metric);
}
public void setMaterializationList(List<Materialization> materializationList) {
semanticModel.setMaterializationList(materializationList);
}
public List<Materialization> getMaterializationList() {
return semanticModel.getMaterializationList();
}
public void setJoinRelations(List<JoinRelation> joinRelations) {
this.joinRelations = joinRelations;
}
public List<JoinRelation> getJoinRelations() {
return joinRelations;
}
public void setRuntimeOptions(RuntimeOptions runtimeOptions) {
this.runtimeOptions = runtimeOptions;
}
public RuntimeOptions getRuntimeOptions() {
return runtimeOptions;
}
public static final class Builder {
private final String schemaKey;
private final Map<String, Table> tableMap = new HashMap<>();
private Builder(String schemaKey) {
if (schemaKey == null) {
throw new IllegalArgumentException("Schema name cannot be null or empty");
}
this.schemaKey = schemaKey;
}
public Builder addTable(DataSourceTable table) {
if (tableMap.containsKey(table.getTableName())) {
throw new IllegalArgumentException(
"Table already defined: " + table.getTableName());
}
tableMap.put(table.getTableName(), table);
return this;
}
public SemanticSchema build() {
return new SemanticSchema(schemaKey, tableMap);
}
}
}

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql.optimizer;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.core.Aggregate;
@@ -40,23 +39,23 @@ public class FilterToGroupScanRule extends RelRule<Config> implements Transforma
});
}).as(FilterTableScanRule.Config.class);
private SemanticSchema semanticSchema;
private S2CalciteSchema schema;
public FilterToGroupScanRule(FilterTableScanRule.Config config, SemanticSchema semanticSchema) {
public FilterToGroupScanRule(FilterTableScanRule.Config config, S2CalciteSchema schema) {
super(config);
this.semanticSchema = semanticSchema;
this.schema = schema;
}
public void onMatch(RelOptRuleCall call) {
if (call.rels.length != 4) {
return;
}
if (Objects.isNull(semanticSchema.getRuntimeOptions())
|| Objects.isNull(semanticSchema.getRuntimeOptions().getMinMaxTime())
|| semanticSchema.getRuntimeOptions().getMinMaxTime().getLeft().isEmpty()) {
if (Objects.isNull(schema.getRuntimeOptions())
|| Objects.isNull(schema.getRuntimeOptions().getMinMaxTime())
|| schema.getRuntimeOptions().getMinMaxTime().getLeft().isEmpty()) {
return;
}
Triple<String, String, String> minMax = semanticSchema.getRuntimeOptions().getMinMaxTime();
Triple<String, String, String> minMax = schema.getRuntimeOptions().getMinMaxTime();
Filter filter = (Filter) call.rel(0);
Project project0 = (Project) call.rel(1);
Project project1 = (Project) call.rel(3);

View File

@@ -1,8 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
public interface Optimization {
public void visit(SemanticSchema semanticSchema);
}

View File

@@ -1,4 +1,4 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import lombok.Builder;
import lombok.Data;

View File

@@ -0,0 +1,48 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Ontology;
import lombok.Builder;
import lombok.Data;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.impl.AbstractSchema;
import java.util.List;
import java.util.Map;
@Data
@Builder
public class S2CalciteSchema extends AbstractSchema {
private String schemaKey;
private Ontology ontology;
private RuntimeOptions runtimeOptions;
@Override
public Schema snapshot(SchemaVersion version) {
return this;
}
public Map<String, DataModel> getDataModels() {
return ontology.getDataModelMap();
}
public List<Metric> getMetrics() {
return ontology.getMetrics();
}
public Map<String, List<Dimension>> getDimensions() {
return ontology.getDimensionMap();
}
public List<JoinRelation> getJoinRelations() {
return ontology.getJoinRelations();
}
}

View File

@@ -1,4 +1,4 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
/** customize the AbstractTable */
public class DataSourceTable extends AbstractTable implements ScannableTable, TranslatableTable {
public class S2CalciteTable extends AbstractTable implements ScannableTable, TranslatableTable {
private final String tableName;
private final List<String> fieldNames;
@@ -32,7 +32,7 @@ public class DataSourceTable extends AbstractTable implements ScannableTable, Tr
private RelDataType rowType;
private DataSourceTable(String tableName, List<String> fieldNames, List<SqlTypeName> fieldTypes,
private S2CalciteTable(String tableName, List<String> fieldNames, List<SqlTypeName> fieldTypes,
Statistic statistic) {
this.tableName = tableName;
this.fieldNames = fieldNames;
@@ -116,7 +116,7 @@ public class DataSourceTable extends AbstractTable implements ScannableTable, Tr
return this;
}
public DataSourceTable build() {
public S2CalciteTable build() {
if (fieldNames.isEmpty()) {
throw new IllegalStateException("Table must have at least one field");
}
@@ -125,7 +125,7 @@ public class DataSourceTable extends AbstractTable implements ScannableTable, Tr
throw new IllegalStateException("Table must have positive row count");
}
return new DataSourceTable(tableName, fieldNames, fieldTypes,
return new S2CalciteTable(tableName, fieldNames, fieldTypes,
Statistics.of(rowCount, null));
}
}

View File

@@ -1,8 +1,7 @@
package com.tencent.supersonic.headless.core.translator.calcite.schema;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.common.calcite.Configuration;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2SQLSqlValidatorImpl;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.Prepare;
@@ -27,15 +26,14 @@ public class SchemaBuilder {
public static final String MATERIALIZATION_SYS_FIELD_DATE = "C1";
public static final String MATERIALIZATION_SYS_FIELD_DATA = "C2";
public static SqlValidatorScope getScope(SemanticSchema schema) throws Exception {
public static SqlValidatorScope getScope(S2CalciteSchema schema) throws Exception {
Map<String, RelDataType> nameToTypeMap = new HashMap<>();
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
rootSchema.add(schema.getSchemaKey(), schema);
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(rootSchema,
Collections.singletonList(schema.getSchemaKey()), Configuration.typeFactory,
Configuration.config);
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
S2SQLSqlValidatorImpl s2SQLSqlValidator =
new S2SQLSqlValidatorImpl(Configuration.operatorTable, catalogReader,
Configuration.typeFactory, Configuration.getValidatorConfig(engineType));
@@ -45,12 +43,12 @@ public class SchemaBuilder {
public static CalciteSchema getMaterializationSchema() {
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
SchemaPlus schema = rootSchema.plus().add(MATERIALIZATION_SYS_DB, new AbstractSchema());
DataSourceTable srcTable = DataSourceTable.newBuilder(MATERIALIZATION_SYS_SOURCE)
S2CalciteTable srcTable = S2CalciteTable.newBuilder(MATERIALIZATION_SYS_SOURCE)
.addField(MATERIALIZATION_SYS_FIELD_DATE, SqlTypeName.DATE)
.addField(MATERIALIZATION_SYS_FIELD_DATA, SqlTypeName.BIGINT).withRowCount(1)
.build();
schema.add(MATERIALIZATION_SYS_SOURCE, srcTable);
DataSourceTable dataSetTable = DataSourceTable.newBuilder(MATERIALIZATION_SYS_VIEW)
S2CalciteTable dataSetTable = S2CalciteTable.newBuilder(MATERIALIZATION_SYS_VIEW)
.addField(MATERIALIZATION_SYS_FIELD_DATE, SqlTypeName.DATE)
.addField(MATERIALIZATION_SYS_FIELD_DATA, SqlTypeName.BIGINT).withRowCount(1)
.build();
@@ -62,7 +60,7 @@ public class SchemaBuilder {
Set<String> dates, Set<String> dimensions, Set<String> metrics) {
String tb = tbSrc;
String db = dbSrc;
DataSourceTable.Builder builder = DataSourceTable.newBuilder(tb);
S2CalciteTable.Builder builder = S2CalciteTable.newBuilder(tb);
for (String date : dates) {
builder.addField(date, SqlTypeName.VARCHAR);
}
@@ -72,7 +70,7 @@ public class SchemaBuilder {
for (String metric : metrics) {
builder.addField(metric, SqlTypeName.ANY);
}
DataSourceTable srcTable = builder.withRowCount(1).build();
S2CalciteTable srcTable = builder.withRowCount(1).build();
if (Objects.nonNull(db) && !db.isEmpty()) {
SchemaPlus dbPs = dataSetSchema.plus();
for (String d : db.split("\\.")) {

View File

@@ -1,60 +1,91 @@
package com.tencent.supersonic.headless.core.translator.calcite.planner;
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.common.calcite.Configuration;
import com.tencent.supersonic.common.calcite.SqlMergeWithUtils;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
import com.tencent.supersonic.headless.core.pojo.Database;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataModelNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.FilterRender;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.OutputRender;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.SourceRender;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Stack;
/** parsing from query dimensions and metrics */
@Slf4j
public class AggPlanner implements Planner {
public class SqlBuilder {
private MetricQueryParam metricReq;
private SemanticSchema schema;
private final S2CalciteSchema schema;
private MetricQueryParam metricQueryParam;
private SqlValidatorScope scope;
private Stack<TableView> dataSets = new Stack<>();
private SqlNode parserNode;
private String sourceId;
private boolean isAgg = false;
private AggOption aggOption = AggOption.DEFAULT;
public AggPlanner(SemanticSchema schema) {
public SqlBuilder(S2CalciteSchema schema) {
this.schema = schema;
}
public void parse() throws Exception {
public void build(QueryStatement queryStatement, AggOption aggOption) throws Exception {
this.metricQueryParam = queryStatement.getMetricQueryParam();
if (metricQueryParam.getMetrics() == null) {
metricQueryParam.setMetrics(new ArrayList<>());
}
if (metricQueryParam.getDimensions() == null) {
metricQueryParam.setDimensions(new ArrayList<>());
}
if (metricQueryParam.getLimit() == null) {
metricQueryParam.setLimit(0L);
}
this.aggOption = aggOption;
buildParseNode();
Database database = queryStatement.getOntology().getDatabase();
EngineType engineType = EngineType.fromString(database.getType());
optimizeParseNode(engineType);
String sql = getSql(engineType);
queryStatement.setSql(sql);
if (Objects.nonNull(queryStatement.getEnableOptimize())
&& queryStatement.getEnableOptimize()
&& Objects.nonNull(queryStatement.getDataSetAlias())
&& !queryStatement.getDataSetAlias().isEmpty()) {
// simplify model sql with query sql
String simplifySql = rewrite(getSqlByDataSet(engineType, sql,
queryStatement.getDataSetSql(), queryStatement.getDataSetAlias()), engineType);
if (Objects.nonNull(simplifySql) && !simplifySql.isEmpty()) {
log.debug("simplifySql [{}]", simplifySql);
queryStatement.setDataSetSimplifySql(simplifySql);
}
}
}
private void buildParseNode() throws Exception {
// find the match Datasource
scope = SchemaBuilder.getScope(schema);
List<DataSource> datasource = getMatchDataSource(scope);
if (datasource == null || datasource.isEmpty()) {
throw new Exception("datasource not found");
List<DataModel> dataModels =
DataModelNode.getRelatedDataModels(scope, schema, metricQueryParam);
if (dataModels == null || dataModels.isEmpty()) {
throw new Exception("data model not found");
}
isAgg = getAgg(datasource.get(0));
sourceId = String.valueOf(datasource.get(0).getSourceId());
isAgg = getAgg(dataModels.get(0));
// build level by level
LinkedList<Renderer> builders = new LinkedList<>();
@@ -67,84 +98,36 @@ public class AggPlanner implements Planner {
while (it.hasNext()) {
Renderer renderer = it.next();
if (previous != null) {
previous.render(metricReq, datasource, scope, schema, !isAgg);
previous.render(metricQueryParam, dataModels, scope, schema, !isAgg);
renderer.setTable(previous
.builderAs(DataSourceNode.getNames(datasource) + "_" + String.valueOf(i)));
.builderAs(DataModelNode.getNames(dataModels) + "_" + String.valueOf(i)));
i++;
}
previous = renderer;
}
builders.getLast().render(metricReq, datasource, scope, schema, !isAgg);
builders.getLast().render(metricQueryParam, dataModels, scope, schema, !isAgg);
parserNode = builders.getLast().builder();
}
private List<DataSource> getMatchDataSource(SqlValidatorScope scope) throws Exception {
return DataSourceNode.getMatchDataSources(scope, schema, metricReq);
}
private boolean getAgg(DataSource dataSource) {
private boolean getAgg(DataModel dataModel) {
if (!AggOption.DEFAULT.equals(aggOption)) {
return AggOption.isAgg(aggOption);
}
// default by dataSource time aggregation
if (Objects.nonNull(dataSource.getAggTime()) && !dataSource.getAggTime()
// default by dataModel time aggregation
if (Objects.nonNull(dataModel.getAggTime()) && !dataModel.getAggTime()
.equalsIgnoreCase(Constants.DIMENSION_TYPE_TIME_GRANULARITY_NONE)) {
if (!metricReq.isNativeQuery()) {
if (!metricQueryParam.isNativeQuery()) {
return true;
}
}
return isAgg;
}
@Override
public void explain(QueryStatement queryStatement, AggOption aggOption) throws Exception {
this.metricReq = queryStatement.getMetricQueryParam();
if (metricReq.getMetrics() == null) {
metricReq.setMetrics(new ArrayList<>());
}
if (metricReq.getDimensions() == null) {
metricReq.setDimensions(new ArrayList<>());
}
if (metricReq.getLimit() == null) {
metricReq.setLimit(0L);
}
this.aggOption = aggOption;
// build a parse Node
parse();
// optimizer
Database database = queryStatement.getSemanticModel().getDatabase();
EngineType engineType = EngineType.fromString(database.getType());
optimize(engineType);
}
@Override
public String getSql(EngineType engineType) {
return SemanticNode.getSql(parserNode, engineType);
}
@Override
public String getSourceId() {
return sourceId;
}
@Override
public String simplify(String sql, EngineType engineType) {
return optimize(sql, engineType);
}
public void optimize(EngineType engineType) {
if (Objects.isNull(schema.getRuntimeOptions())
|| Objects.isNull(schema.getRuntimeOptions().getEnableOptimize())
|| !schema.getRuntimeOptions().getEnableOptimize()) {
return;
}
SqlNode optimizeNode = optimizeSql(SemanticNode.getSql(parserNode, engineType), engineType);
if (Objects.nonNull(optimizeNode)) {
parserNode = optimizeNode;
}
}
public String optimize(String sql, EngineType engineType) {
private String rewrite(String sql, EngineType engineType) {
try {
SqlNode sqlNode =
SqlParser.create(sql, Configuration.getParserConfig(engineType)).parseStmt();
@@ -153,21 +136,41 @@ public class AggPlanner implements Planner {
SemanticNode.optimize(scope, schema, sqlNode, engineType), engineType);
}
} catch (Exception e) {
log.error("optimize error {}", e);
log.error("optimize error {}", e.toString());
}
return "";
}
private SqlNode optimizeSql(String sql, EngineType engineType) {
private void optimizeParseNode(EngineType engineType) {
if (Objects.isNull(schema.getRuntimeOptions())
|| Objects.isNull(schema.getRuntimeOptions().getEnableOptimize())
|| !schema.getRuntimeOptions().getEnableOptimize()) {
return;
}
SqlNode optimizeNode = null;
try {
SqlNode sqlNode =
SqlParser.create(sql, Configuration.getParserConfig(engineType)).parseStmt();
SqlNode sqlNode = SqlParser.create(SemanticNode.getSql(parserNode, engineType),
Configuration.getParserConfig(engineType)).parseStmt();
if (Objects.nonNull(sqlNode)) {
return SemanticNode.optimize(scope, schema, sqlNode, engineType);
optimizeNode = SemanticNode.optimize(scope, schema, sqlNode, engineType);
}
} catch (Exception e) {
log.error("optimize error {}", e);
}
return null;
if (Objects.nonNull(optimizeNode)) {
parserNode = optimizeNode;
}
}
private String getSqlByDataSet(EngineType engineType, String parentSql, String dataSetSql,
String parentAlias) throws SqlParseException {
if (!SqlMergeWithUtils.hasWith(engineType, dataSetSql)) {
return String.format("with %s as (%s) %s", parentAlias, parentSql, dataSetSql);
}
return SqlMergeWithUtils.mergeWith(engineType, dataSetSql,
Collections.singletonList(parentSql), Collections.singletonList(parentAlias));
}
}

View File

@@ -1,6 +1,6 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import lombok.Data;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlKind;
@@ -27,7 +27,7 @@ public class TableView {
private String alias;
private List<String> primary;
private DataSource dataSource;
private DataModel dataModel;
public SqlNode build() {
measure.addAll(dimension);

View File

@@ -6,14 +6,13 @@ import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.extend.LateralViewExplodeNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.SchemaBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlDataTypeSpec;
@@ -38,30 +37,30 @@ import java.util.Set;
import java.util.stream.Collectors;
@Slf4j
public class DataSourceNode extends SemanticNode {
public class DataModelNode extends SemanticNode {
public static SqlNode build(DataSource datasource, SqlValidatorScope scope) throws Exception {
public static SqlNode build(DataModel dataModel, SqlValidatorScope scope) throws Exception {
String sqlTable = "";
if (datasource.getSqlQuery() != null && !datasource.getSqlQuery().isEmpty()) {
sqlTable = datasource.getSqlQuery();
} else if (datasource.getTableQuery() != null && !datasource.getTableQuery().isEmpty()) {
if (datasource.getType().equalsIgnoreCase(EngineType.POSTGRESQL.getName())) {
String fullTableName = Arrays.stream(datasource.getTableQuery().split("\\."))
if (dataModel.getSqlQuery() != null && !dataModel.getSqlQuery().isEmpty()) {
sqlTable = dataModel.getSqlQuery();
} else if (dataModel.getTableQuery() != null && !dataModel.getTableQuery().isEmpty()) {
if (dataModel.getType().equalsIgnoreCase(EngineType.POSTGRESQL.getName())) {
String fullTableName = Arrays.stream(dataModel.getTableQuery().split("\\."))
.collect(Collectors.joining(".public."));
sqlTable = "select * from " + fullTableName;
} else {
sqlTable = "select * from " + datasource.getTableQuery();
sqlTable = "select * from " + dataModel.getTableQuery();
}
}
if (sqlTable.isEmpty()) {
throw new Exception("DatasourceNode build error [tableSqlNode not found]");
}
SqlNode source = getTable(sqlTable, scope, EngineType.fromString(datasource.getType()));
addSchema(scope, datasource, sqlTable);
return buildAs(datasource.getName(), source);
SqlNode source = getTable(sqlTable, scope, EngineType.fromString(dataModel.getType()));
addSchema(scope, dataModel, sqlTable);
return buildAs(dataModel.getName(), source);
}
private static void addSchema(SqlValidatorScope scope, DataSource datasource, String table)
private static void addSchema(SqlValidatorScope scope, DataModel datasource, String table)
throws Exception {
Map<String, Set<String>> sqlTable = SqlSelectHelper.getFieldsWithSubQuery(table);
for (Map.Entry<String, Set<String>> entry : sqlTable.entrySet()) {
@@ -75,7 +74,7 @@ public class DataSourceNode extends SemanticNode {
}
}
private static void addSchemaTable(SqlValidatorScope scope, DataSource datasource, String db,
private static void addSchemaTable(SqlValidatorScope scope, DataModel datasource, String db,
String tb, Set<String> fields) throws Exception {
Set<String> dateInfo = new HashSet<>();
Set<String> dimensions = new HashSet<>();
@@ -112,7 +111,7 @@ public class DataSourceNode extends SemanticNode {
dateInfo, dimensions, metrics);
}
public static SqlNode buildExtend(DataSource datasource, Map<String, String> exprList,
public static SqlNode buildExtend(DataModel datasource, Map<String, String> exprList,
SqlValidatorScope scope) throws Exception {
if (CollectionUtils.isEmpty(exprList)) {
return build(datasource, scope);
@@ -146,11 +145,11 @@ public class DataSourceNode extends SemanticNode {
return sqlNode;
}
public static String getNames(List<DataSource> dataSourceList) {
return dataSourceList.stream().map(d -> d.getName()).collect(Collectors.joining("_"));
public static String getNames(List<DataModel> dataModelList) {
return dataModelList.stream().map(d -> d.getName()).collect(Collectors.joining("_"));
}
public static void getQueryDimensionMeasure(SemanticSchema schema,
public static void getQueryDimensionMeasure(S2CalciteSchema schema,
MetricQueryParam metricCommand, Set<String> queryDimension, List<String> measures) {
queryDimension.addAll(metricCommand.getDimensions().stream()
.map(d -> d.contains(Constants.DIMENSION_IDENTIFY)
@@ -166,11 +165,10 @@ public class DataSourceNode extends SemanticNode {
.forEach(m -> measures.add(m));
}
public static void mergeQueryFilterDimensionMeasure(SemanticSchema schema,
public static void mergeQueryFilterDimensionMeasure(S2CalciteSchema schema,
MetricQueryParam metricCommand, Set<String> queryDimension, List<String> measures,
SqlValidatorScope scope) throws Exception {
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
if (Objects.nonNull(metricCommand.getWhere()) && !metricCommand.getWhere().isEmpty()) {
Set<String> filterConditions = new HashSet<>();
FilterNode.getFilterField(parse(metricCommand.getWhere(), scope, engineType),
@@ -193,18 +191,18 @@ public class DataSourceNode extends SemanticNode {
}
}
public static List<DataSource> getMatchDataSources(SqlValidatorScope scope,
SemanticSchema schema, MetricQueryParam metricCommand) throws Exception {
List<DataSource> dataSources = new ArrayList<>();
public static List<DataModel> getRelatedDataModels(SqlValidatorScope scope,
S2CalciteSchema schema, MetricQueryParam metricCommand) throws Exception {
List<DataModel> dataModels = new ArrayList<>();
// check by metric
List<String> measures = new ArrayList<>();
Set<String> queryDimension = new HashSet<>();
getQueryDimensionMeasure(schema, metricCommand, queryDimension, measures);
DataSource baseDataSource = null;
DataModel baseDataModel = null;
// one , match measure count
Map<String, Integer> dataSourceMeasures = new HashMap<>();
for (Map.Entry<String, DataSource> entry : schema.getDatasource().entrySet()) {
for (Map.Entry<String, DataModel> entry : schema.getDataModels().entrySet()) {
Set<String> sourceMeasure = entry.getValue().getMeasures().stream()
.map(mm -> mm.getName()).collect(Collectors.toSet());
sourceMeasure.retainAll(measures);
@@ -214,58 +212,58 @@ public class DataSourceNode extends SemanticNode {
Optional<Map.Entry<String, Integer>> base = dataSourceMeasures.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())).findFirst();
if (base.isPresent()) {
baseDataSource = schema.getDatasource().get(base.get().getKey());
dataSources.add(baseDataSource);
baseDataModel = schema.getDataModels().get(base.get().getKey());
dataModels.add(baseDataModel);
}
// second , check match all dimension and metric
if (baseDataSource != null) {
if (baseDataModel != null) {
Set<String> filterMeasure = new HashSet<>();
Set<String> sourceMeasure = baseDataSource.getMeasures().stream()
.map(mm -> mm.getName()).collect(Collectors.toSet());
Set<String> dimension = baseDataSource.getDimensions().stream().map(dd -> dd.getName())
Set<String> sourceMeasure = baseDataModel.getMeasures().stream().map(mm -> mm.getName())
.collect(Collectors.toSet());
baseDataSource.getIdentifiers().stream().forEach(i -> dimension.add(i.getName()));
if (schema.getDimension().containsKey(baseDataSource.getName())) {
schema.getDimension().get(baseDataSource.getName()).stream()
Set<String> dimension = baseDataModel.getDimensions().stream().map(dd -> dd.getName())
.collect(Collectors.toSet());
baseDataModel.getIdentifiers().stream().forEach(i -> dimension.add(i.getName()));
if (schema.getDimensions().containsKey(baseDataModel.getName())) {
schema.getDimensions().get(baseDataModel.getName()).stream()
.forEach(d -> dimension.add(d.getName()));
}
filterMeasure.addAll(sourceMeasure);
filterMeasure.addAll(dimension);
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType.fromString(schema.getOntology().getDatabase().getType());
mergeQueryFilterDimensionMeasure(schema, metricCommand, queryDimension, measures,
scope);
boolean isAllMatch = checkMatch(sourceMeasure, queryDimension, measures, dimension,
metricCommand, scope, engineType);
if (isAllMatch) {
log.debug("baseDataSource match all ");
return dataSources;
log.debug("baseDataModel match all ");
return dataModels;
}
// find all dataSource has the same identifiers
List<DataSource> linkDataSources = getLinkDataSourcesByJoinRelation(queryDimension,
measures, baseDataSource, schema);
if (CollectionUtils.isEmpty(linkDataSources)) {
log.debug("baseDataSource get by identifiers ");
Set<String> baseIdentifiers = baseDataSource.getIdentifiers().stream()
List<DataModel> linkDataModels = getLinkDataSourcesByJoinRelation(queryDimension,
measures, baseDataModel, schema);
if (CollectionUtils.isEmpty(linkDataModels)) {
log.debug("baseDataModel get by identifiers ");
Set<String> baseIdentifiers = baseDataModel.getIdentifiers().stream()
.map(i -> i.getName()).collect(Collectors.toSet());
if (baseIdentifiers.isEmpty()) {
throw new Exception(
"datasource error : " + baseDataSource.getName() + " miss identifier");
"datasource error : " + baseDataModel.getName() + " miss identifier");
}
linkDataSources = getLinkDataSources(baseIdentifiers, queryDimension, measures,
baseDataSource, schema);
if (linkDataSources.isEmpty()) {
linkDataModels = getLinkDataSources(baseIdentifiers, queryDimension, measures,
baseDataModel, schema);
if (linkDataModels.isEmpty()) {
throw new Exception(String.format(
"not find the match datasource : dimension[%s],measure[%s]",
queryDimension, measures));
}
}
log.debug("linkDataSources {}", linkDataSources);
return linkDataSources;
// dataSources.addAll(linkDataSources);
log.debug("linkDataModels {}", linkDataModels);
return linkDataModels;
// dataModels.addAll(linkDataModels);
}
return dataSources;
return dataModels;
}
private static boolean checkMatch(Set<String> sourceMeasure, Set<String> queryDimension,
@@ -301,17 +299,17 @@ public class DataSourceNode extends SemanticNode {
return isAllMatch;
}
private static List<DataSource> getLinkDataSourcesByJoinRelation(Set<String> queryDimension,
List<String> measures, DataSource baseDataSource, SemanticSchema schema) {
private static List<DataModel> getLinkDataSourcesByJoinRelation(Set<String> queryDimension,
List<String> measures, DataModel baseDataModel, S2CalciteSchema schema) {
Set<String> linkDataSourceName = new HashSet<>();
List<DataSource> linkDataSources = new ArrayList<>();
List<DataModel> linkDataModels = new ArrayList<>();
Set<String> before = new HashSet<>();
before.add(baseDataSource.getName());
before.add(baseDataModel.getName());
if (!CollectionUtils.isEmpty(schema.getJoinRelations())) {
Set<Long> visitJoinRelations = new HashSet<>();
List<JoinRelation> sortedJoinRelation = new ArrayList<>();
sortJoinRelation(schema.getJoinRelations(), baseDataSource.getName(),
visitJoinRelations, sortedJoinRelation);
sortJoinRelation(schema.getJoinRelations(), baseDataModel.getName(), visitJoinRelations,
sortedJoinRelation);
schema.getJoinRelations().stream().filter(j -> !visitJoinRelations.contains(j.getId()))
.forEach(j -> sortedJoinRelation.add(j));
for (JoinRelation joinRelation : sortedJoinRelation) {
@@ -321,8 +319,8 @@ public class DataSourceNode extends SemanticNode {
}
boolean isMatch = false;
boolean isRight = before.contains(joinRelation.getLeft());
DataSource other = isRight ? schema.getDatasource().get(joinRelation.getRight())
: schema.getDatasource().get(joinRelation.getLeft());
DataModel other = isRight ? schema.getDataModels().get(joinRelation.getRight())
: schema.getDataModels().get(joinRelation.getLeft());
if (!queryDimension.isEmpty()) {
Set<String> linkDimension = other.getDimensions().stream()
.map(dd -> dd.getName()).collect(Collectors.toSet());
@@ -338,8 +336,8 @@ public class DataSourceNode extends SemanticNode {
if (!linkMeasure.isEmpty()) {
isMatch = true;
}
if (!isMatch && schema.getDimension().containsKey(other.getName())) {
Set<String> linkDimension = schema.getDimension().get(other.getName()).stream()
if (!isMatch && schema.getDimensions().containsKey(other.getName())) {
Set<String> linkDimension = schema.getDimensions().get(other.getName()).stream()
.map(dd -> dd.getName()).collect(Collectors.toSet());
linkDimension.retainAll(queryDimension);
if (!linkDimension.isEmpty()) {
@@ -354,8 +352,8 @@ public class DataSourceNode extends SemanticNode {
}
if (!CollectionUtils.isEmpty(linkDataSourceName)) {
Map<String, Long> orders = new HashMap<>();
linkDataSourceName.add(baseDataSource.getName());
orders.put(baseDataSource.getName(), 0L);
linkDataSourceName.add(baseDataModel.getName());
orders.put(baseDataModel.getName(), 0L);
for (JoinRelation joinRelation : schema.getJoinRelations()) {
if (linkDataSourceName.contains(joinRelation.getLeft())
&& linkDataSourceName.contains(joinRelation.getRight())) {
@@ -364,10 +362,10 @@ public class DataSourceNode extends SemanticNode {
}
}
orders.entrySet().stream().sorted(Map.Entry.comparingByValue()).forEach(d -> {
linkDataSources.add(schema.getDatasource().get(d.getKey()));
linkDataModels.add(schema.getDataModels().get(d.getKey()));
});
}
return linkDataSources;
return linkDataModels;
}
private static void sortJoinRelation(List<JoinRelation> joinRelations, String next,
@@ -385,13 +383,13 @@ public class DataSourceNode extends SemanticNode {
}
}
private static List<DataSource> getLinkDataSources(Set<String> baseIdentifiers,
Set<String> queryDimension, List<String> measures, DataSource baseDataSource,
SemanticSchema schema) {
private static List<DataModel> getLinkDataSources(Set<String> baseIdentifiers,
Set<String> queryDimension, List<String> measures, DataModel baseDataModel,
S2CalciteSchema schema) {
Set<String> linkDataSourceName = new HashSet<>();
List<DataSource> linkDataSources = new ArrayList<>();
for (Map.Entry<String, DataSource> entry : schema.getDatasource().entrySet()) {
if (entry.getKey().equalsIgnoreCase(baseDataSource.getName())) {
List<DataModel> linkDataModels = new ArrayList<>();
for (Map.Entry<String, DataModel> entry : schema.getDataModels().entrySet()) {
if (entry.getKey().equalsIgnoreCase(baseDataModel.getName())) {
continue;
}
Long identifierNum = entry.getValue().getIdentifiers().stream().map(i -> i.getName())
@@ -421,7 +419,7 @@ public class DataSourceNode extends SemanticNode {
}
}
}
for (Map.Entry<String, List<Dimension>> entry : schema.getDimension().entrySet()) {
for (Map.Entry<String, List<Dimension>> entry : schema.getDimensions().entrySet()) {
if (!queryDimension.isEmpty()) {
Set<String> linkDimension = entry.getValue().stream().map(dd -> dd.getName())
.collect(Collectors.toSet());
@@ -432,12 +430,12 @@ public class DataSourceNode extends SemanticNode {
}
}
for (String linkName : linkDataSourceName) {
linkDataSources.add(schema.getDatasource().get(linkName));
linkDataModels.add(schema.getDataModels().get(linkName));
}
if (!CollectionUtils.isEmpty(linkDataSources)) {
List<DataSource> all = new ArrayList<>();
all.add(baseDataSource);
all.addAll(linkDataSources);
if (!CollectionUtils.isEmpty(linkDataModels)) {
List<DataModel> all = new ArrayList<>();
all.add(baseDataModel);
all.addAll(linkDataModels);
return all;
}
return Lists.newArrayList();

View File

@@ -1,13 +0,0 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql.node;
import lombok.Data;
import org.apache.calcite.sql.SqlNode;
@Data
public class JoinNode extends SemanticNode {
private SqlNode join;
private SqlNode on;
private SqlNode left;
private SqlNode right;
}

View File

@@ -1,6 +1,5 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql.node.extend;
package com.tencent.supersonic.headless.core.translator.calcite.sql.node;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.ExtendNode;
import org.apache.calcite.linq4j.Ord;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;

View File

@@ -2,7 +2,7 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.node;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import lombok.Data;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidatorScope;
@@ -30,7 +30,7 @@ public class MetricNode extends SemanticNode {
return buildAs(metric.getName(), sqlNode);
}
public static Boolean isMetricField(String name, SemanticSchema schema) {
public static Boolean isMetricField(String name, S2CalciteSchema schema) {
Optional<Metric> metric = schema.getMetrics().stream()
.filter(m -> m.getName().equalsIgnoreCase(name)).findFirst();
return metric.isPresent() && metric.get().getMetricTypeParams().isFieldMetric();

View File

@@ -5,8 +5,8 @@ import com.tencent.supersonic.common.calcite.SemanticSqlDialect;
import com.tencent.supersonic.common.calcite.SqlDialectFactory;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.optimizer.FilterToGroupScanRule;
import com.tencent.supersonic.headless.core.translator.calcite.sql.FilterToGroupScanRule;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.hep.HepPlanner;
@@ -397,7 +397,7 @@ public abstract class SemanticNode {
return parseInfo;
}
public static SqlNode optimize(SqlValidatorScope scope, SemanticSchema schema, SqlNode sqlNode,
public static SqlNode optimize(SqlValidatorScope scope, S2CalciteSchema schema, SqlNode sqlNode,
EngineType engineType) {
try {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();

View File

@@ -3,10 +3,9 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.FilterNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
@@ -27,14 +26,13 @@ import java.util.stream.Collectors;
public class FilterRender extends Renderer {
@Override
public void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
TableView tableView = super.tableView;
SqlNode filterNode = null;
List<String> queryMetrics = new ArrayList<>(metricCommand.getMetrics());
List<String> queryDimensions = new ArrayList<>(metricCommand.getDimensions());
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
if (metricCommand.getWhere() != null && !metricCommand.getWhere().isEmpty()) {
filterNode = SemanticNode.parse(metricCommand.getWhere(), scope, engineType);
@@ -43,9 +41,9 @@ public class FilterRender extends Renderer {
List<String> fieldWhere = whereFields.stream().collect(Collectors.toList());
Set<String> dimensions = new HashSet<>();
Set<String> metrics = new HashSet<>();
for (DataSource dataSource : dataSources) {
for (DataModel dataModel : dataModels) {
SourceRender.whereDimMetric(fieldWhere, metricCommand.getMetrics(),
metricCommand.getDimensions(), dataSource, schema, dimensions, metrics);
metricCommand.getDimensions(), dataModel, schema, dimensions, metrics);
}
queryMetrics.addAll(metrics);
queryDimensions.addAll(dimensions);

View File

@@ -3,17 +3,16 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.AggFunctionNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataModelNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.FilterNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.IdentifyNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
@@ -48,11 +47,10 @@ import java.util.stream.Collectors;
public class JoinRender extends Renderer {
@Override
public void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
String queryWhere = metricCommand.getWhere();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
Set<String> whereFields = new HashSet<>();
List<String> fieldWhere = new ArrayList<>();
if (queryWhere != null && !queryWhere.isEmpty()) {
@@ -62,7 +60,7 @@ public class JoinRender extends Renderer {
}
Set<String> queryAllDimension = new HashSet<>();
List<String> measures = new ArrayList<>();
DataSourceNode.getQueryDimensionMeasure(schema, metricCommand, queryAllDimension, measures);
DataModelNode.getQueryDimensionMeasure(schema, metricCommand, queryAllDimension, measures);
SqlNode left = null;
TableView leftTable = null;
TableView innerView = new TableView();
@@ -71,14 +69,14 @@ public class JoinRender extends Renderer {
Set<String> filterDimension = new HashSet<>();
Map<String, String> beforeSources = new HashMap<>();
for (int i = 0; i < dataSources.size(); i++) {
final DataSource dataSource = dataSources.get(i);
for (int i = 0; i < dataModels.size(); i++) {
final DataModel dataModel = dataModels.get(i);
final Set<String> filterDimensions = new HashSet<>();
final Set<String> filterMetrics = new HashSet<>();
final List<String> queryDimension = new ArrayList<>();
final List<String> queryMetrics = new ArrayList<>();
SourceRender.whereDimMetric(fieldWhere, queryMetrics, queryDimension, dataSource,
schema, filterDimensions, filterMetrics);
SourceRender.whereDimMetric(fieldWhere, queryMetrics, queryDimension, dataModel, schema,
filterDimensions, filterMetrics);
List<String> reqMetric = new ArrayList<>(metricCommand.getMetrics());
reqMetric.addAll(filterMetrics);
reqMetric = uniqList(reqMetric);
@@ -87,40 +85,40 @@ public class JoinRender extends Renderer {
reqDimension.addAll(filterDimensions);
reqDimension = uniqList(reqDimension);
Set<String> sourceMeasure = dataSource.getMeasures().stream().map(mm -> mm.getName())
Set<String> sourceMeasure = dataModel.getMeasures().stream().map(mm -> mm.getName())
.collect(Collectors.toSet());
doMetric(innerSelect, filterView, queryMetrics, reqMetric, dataSource, sourceMeasure,
doMetric(innerSelect, filterView, queryMetrics, reqMetric, dataModel, sourceMeasure,
scope, schema, nonAgg);
Set<String> dimension = dataSource.getDimensions().stream().map(dd -> dd.getName())
Set<String> dimension = dataModel.getDimensions().stream().map(dd -> dd.getName())
.collect(Collectors.toSet());
doDimension(innerSelect, filterDimension, queryDimension, reqDimension, dataSource,
doDimension(innerSelect, filterDimension, queryDimension, reqDimension, dataModel,
dimension, scope, schema);
List<String> primary = new ArrayList<>();
for (Identify identify : dataSource.getIdentifiers()) {
for (Identify identify : dataModel.getIdentifiers()) {
primary.add(identify.getName());
if (!fieldWhere.contains(identify.getName())) {
fieldWhere.add(identify.getName());
}
}
List<String> dataSourceWhere = new ArrayList<>(fieldWhere);
addZipperField(dataSource, dataSourceWhere);
addZipperField(dataModel, dataSourceWhere);
TableView tableView =
SourceRender.renderOne("", dataSourceWhere, queryMetrics, queryDimension,
metricCommand.getWhere(), dataSources.get(i), scope, schema, true);
metricCommand.getWhere(), dataModels.get(i), scope, schema, true);
log.info("tableView {}", StringUtils.normalizeSpace(tableView.getTable().toString()));
String alias = Constants.JOIN_TABLE_PREFIX + dataSource.getName();
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
tableView.setAlias(alias);
tableView.setPrimary(primary);
tableView.setDataSource(dataSource);
tableView.setDataModel(dataModel);
if (left == null) {
leftTable = tableView;
left = SemanticNode.buildAs(tableView.getAlias(), getTable(tableView, scope));
beforeSources.put(dataSource.getName(), leftTable.getAlias());
beforeSources.put(dataModel.getName(), leftTable.getAlias());
continue;
}
left = buildJoin(left, leftTable, tableView, beforeSources, dataSource, schema, scope);
left = buildJoin(left, leftTable, tableView, beforeSources, dataModel, schema, scope);
leftTable = tableView;
beforeSources.put(dataSource.getName(), tableView.getAlias());
beforeSources.put(dataModel.getName(), tableView.getAlias());
}
for (Map.Entry<String, SqlNode> entry : innerSelect.entrySet()) {
@@ -144,16 +142,14 @@ public class JoinRender extends Renderer {
}
private void doMetric(Map<String, SqlNode> innerSelect, TableView filterView,
List<String> queryMetrics, List<String> reqMetrics, DataSource dataSource,
Set<String> sourceMeasure, SqlValidatorScope scope, SemanticSchema schema,
List<String> queryMetrics, List<String> reqMetrics, DataModel dataModel,
Set<String> sourceMeasure, SqlValidatorScope scope, S2CalciteSchema schema,
boolean nonAgg) throws Exception {
String alias = Constants.JOIN_TABLE_PREFIX + dataSource.getName();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
for (String m : reqMetrics) {
if (getMatchMetric(schema, sourceMeasure, m, queryMetrics)) {
MetricNode metricNode =
buildMetricNode(m, dataSource, scope, schema, nonAgg, alias);
MetricNode metricNode = buildMetricNode(m, dataModel, scope, schema, nonAgg, alias);
if (!metricNode.getNonAggNode().isEmpty()) {
for (String measure : metricNode.getNonAggNode().keySet()) {
@@ -181,14 +177,13 @@ public class JoinRender extends Renderer {
}
private void doDimension(Map<String, SqlNode> innerSelect, Set<String> filterDimension,
List<String> queryDimension, List<String> reqDimensions, DataSource dataSource,
Set<String> dimension, SqlValidatorScope scope, SemanticSchema schema)
List<String> queryDimension, List<String> reqDimensions, DataModel dataModel,
Set<String> dimension, SqlValidatorScope scope, S2CalciteSchema schema)
throws Exception {
String alias = Constants.JOIN_TABLE_PREFIX + dataSource.getName();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
for (String d : reqDimensions) {
if (getMatchDimension(schema, dimension, dataSource, d, queryDimension)) {
if (getMatchDimension(schema, dimension, dataModel, d, queryDimension)) {
if (d.contains(Constants.DIMENSION_IDENTIFY)) {
String[] identifyDimension = d.split(Constants.DIMENSION_IDENTIFY);
innerSelect.put(d, SemanticNode.buildAs(d, SemanticNode
@@ -209,7 +204,7 @@ public class JoinRender extends Renderer {
.collect(Collectors.toSet());
}
private boolean getMatchMetric(SemanticSchema schema, Set<String> sourceMeasure, String m,
private boolean getMatchMetric(S2CalciteSchema schema, Set<String> sourceMeasure, String m,
List<String> queryMetrics) {
Optional<Metric> metric = schema.getMetrics().stream()
.filter(mm -> mm.getName().equalsIgnoreCase(m)).findFirst();
@@ -230,8 +225,8 @@ public class JoinRender extends Renderer {
return isAdd;
}
private boolean getMatchDimension(SemanticSchema schema, Set<String> sourceDimension,
DataSource dataSource, String d, List<String> queryDimension) {
private boolean getMatchDimension(S2CalciteSchema schema, Set<String> sourceDimension,
DataModel dataModel, String d, List<String> queryDimension) {
String oriDimension = d;
boolean isAdd = false;
if (d.contains(Constants.DIMENSION_IDENTIFY)) {
@@ -240,14 +235,14 @@ public class JoinRender extends Renderer {
if (sourceDimension.contains(oriDimension)) {
isAdd = true;
}
for (Identify identify : dataSource.getIdentifiers()) {
for (Identify identify : dataModel.getIdentifiers()) {
if (identify.getName().equalsIgnoreCase(oriDimension)) {
isAdd = true;
break;
}
}
if (schema.getDimension().containsKey(dataSource.getName())) {
for (Dimension dim : schema.getDimension().get(dataSource.getName())) {
if (schema.getDimensions().containsKey(dataModel.getName())) {
for (Dimension dim : schema.getDimensions().get(dataModel.getName())) {
if (dim.getName().equalsIgnoreCase(oriDimension)) {
isAdd = true;
}
@@ -264,12 +259,11 @@ public class JoinRender extends Renderer {
}
private SqlNode buildJoin(SqlNode left, TableView leftTable, TableView tableView,
Map<String, String> before, DataSource dataSource, SemanticSchema schema,
Map<String, String> before, DataModel dataModel, S2CalciteSchema schema,
SqlValidatorScope scope) throws Exception {
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
SqlNode condition =
getCondition(leftTable, tableView, dataSource, schema, scope, engineType);
getCondition(leftTable, tableView, dataModel, schema, scope, engineType);
SqlLiteral sqlLiteral = SemanticNode.getJoinSqlLiteral("");
JoinRelation matchJoinRelation = getMatchJoinRelation(before, tableView, schema);
SqlNode joinRelationCondition = null;
@@ -278,11 +272,11 @@ public class JoinRender extends Renderer {
joinRelationCondition = getCondition(matchJoinRelation, scope, engineType);
condition = joinRelationCondition;
}
if (Materialization.TimePartType.ZIPPER.equals(leftTable.getDataSource().getTimePartType())
if (Materialization.TimePartType.ZIPPER.equals(leftTable.getDataModel().getTimePartType())
|| Materialization.TimePartType.ZIPPER
.equals(tableView.getDataSource().getTimePartType())) {
.equals(tableView.getDataModel().getTimePartType())) {
SqlNode zipperCondition =
getZipperCondition(leftTable, tableView, dataSource, schema, scope);
getZipperCondition(leftTable, tableView, dataModel, schema, scope);
if (Objects.nonNull(joinRelationCondition)) {
condition = new SqlBasicCall(SqlStdOperatorTable.AND,
new ArrayList<>(Arrays.asList(zipperCondition, joinRelationCondition)),
@@ -299,11 +293,11 @@ public class JoinRender extends Renderer {
}
private JoinRelation getMatchJoinRelation(Map<String, String> before, TableView tableView,
SemanticSchema schema) {
S2CalciteSchema schema) {
JoinRelation matchJoinRelation = JoinRelation.builder().build();
if (!CollectionUtils.isEmpty(schema.getJoinRelations())) {
for (JoinRelation joinRelation : schema.getJoinRelations()) {
if (joinRelation.getRight().equalsIgnoreCase(tableView.getDataSource().getName())
if (joinRelation.getRight().equalsIgnoreCase(tableView.getDataModel().getName())
&& before.containsKey(joinRelation.getLeft())) {
matchJoinRelation.setJoinCondition(joinRelation.getJoinCondition().stream()
.map(r -> Triple.of(
@@ -338,8 +332,8 @@ public class JoinRender extends Renderer {
return condition;
}
private SqlNode getCondition(TableView left, TableView right, DataSource dataSource,
SemanticSchema schema, SqlValidatorScope scope, EngineType engineType)
private SqlNode getCondition(TableView left, TableView right, DataModel dataModel,
S2CalciteSchema schema, SqlValidatorScope scope, EngineType engineType)
throws Exception {
Set<String> selectLeft = SemanticNode.getSelect(left.getTable());
@@ -347,16 +341,16 @@ public class JoinRender extends Renderer {
selectLeft.retainAll(selectRight);
SqlNode condition = null;
for (String on : selectLeft) {
if (!SourceRender.isDimension(on, dataSource, schema)) {
if (!SourceRender.isDimension(on, dataModel, schema)) {
continue;
}
if (IdentifyNode.isForeign(on, left.getDataSource().getIdentifiers())) {
if (!IdentifyNode.isPrimary(on, right.getDataSource().getIdentifiers())) {
if (IdentifyNode.isForeign(on, left.getDataModel().getIdentifiers())) {
if (!IdentifyNode.isPrimary(on, right.getDataModel().getIdentifiers())) {
continue;
}
}
if (IdentifyNode.isForeign(on, right.getDataSource().getIdentifiers())) {
if (!IdentifyNode.isPrimary(on, left.getDataSource().getIdentifiers())) {
if (IdentifyNode.isForeign(on, right.getDataModel().getIdentifiers())) {
if (!IdentifyNode.isPrimary(on, left.getDataModel().getIdentifiers())) {
continue;
}
}
@@ -396,9 +390,9 @@ public class JoinRender extends Renderer {
visited.put(id, false);
}
private void addZipperField(DataSource dataSource, List<String> fields) {
if (Materialization.TimePartType.ZIPPER.equals(dataSource.getTimePartType())) {
dataSource.getDimensions().stream()
private void addZipperField(DataModel dataModel, List<String> fields) {
if (Materialization.TimePartType.ZIPPER.equals(dataModel.getTimePartType())) {
dataModel.getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
.forEach(t -> {
if (t.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_END)
@@ -413,18 +407,18 @@ public class JoinRender extends Renderer {
}
}
private SqlNode getZipperCondition(TableView left, TableView right, DataSource dataSource,
SemanticSchema schema, SqlValidatorScope scope) throws Exception {
if (Materialization.TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType())
private SqlNode getZipperCondition(TableView left, TableView right, DataModel dataModel,
S2CalciteSchema schema, SqlValidatorScope scope) throws Exception {
if (Materialization.TimePartType.ZIPPER.equals(left.getDataModel().getTimePartType())
&& Materialization.TimePartType.ZIPPER
.equals(right.getDataSource().getTimePartType())) {
.equals(right.getDataModel().getTimePartType())) {
throw new Exception("not support two zipper table");
}
SqlNode condition = null;
Optional<Dimension> leftTime = left.getDataSource().getDimensions().stream()
Optional<Dimension> leftTime = left.getDataModel().getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
.findFirst();
Optional<Dimension> rightTime = right.getDataSource().getDimensions().stream()
Optional<Dimension> rightTime = right.getDataModel().getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
.findFirst();
if (leftTime.isPresent() && rightTime.isPresent()) {
@@ -434,7 +428,7 @@ public class JoinRender extends Renderer {
String dateTime = "";
Optional<Dimension> startTimeOp = (Materialization.TimePartType.ZIPPER
.equals(left.getDataSource().getTimePartType()) ? left : right).getDataSource()
.equals(left.getDataModel().getTimePartType()) ? left : right).getDataModel()
.getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME
.equalsIgnoreCase(d.getType()))
@@ -442,7 +436,7 @@ public class JoinRender extends Renderer {
.startsWith(Constants.MATERIALIZATION_ZIPPER_START))
.findFirst();
Optional<Dimension> endTimeOp = (Materialization.TimePartType.ZIPPER
.equals(left.getDataSource().getTimePartType()) ? left : right).getDataSource()
.equals(left.getDataModel().getTimePartType()) ? left : right).getDataModel()
.getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME
.equalsIgnoreCase(d.getType()))
@@ -451,17 +445,17 @@ public class JoinRender extends Renderer {
.findFirst();
if (startTimeOp.isPresent() && endTimeOp.isPresent()) {
TableView zipper = Materialization.TimePartType.ZIPPER
.equals(left.getDataSource().getTimePartType()) ? left : right;
.equals(left.getDataModel().getTimePartType()) ? left : right;
TableView partMetric = Materialization.TimePartType.ZIPPER
.equals(left.getDataSource().getTimePartType()) ? right : left;
.equals(left.getDataModel().getTimePartType()) ? right : left;
Optional<Dimension> partTime = Materialization.TimePartType.ZIPPER
.equals(left.getDataSource().getTimePartType()) ? rightTime : leftTime;
.equals(left.getDataModel().getTimePartType()) ? rightTime : leftTime;
startTime = zipper.getAlias() + "." + startTimeOp.get().getName();
endTime = zipper.getAlias() + "." + endTimeOp.get().getName();
dateTime = partMetric.getAlias() + "." + partTime.get().getName();
}
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType.fromString(schema.getOntology().getDatabase().getType());
ArrayList<SqlNode> operandList =
new ArrayList<>(Arrays.asList(SemanticNode.parse(endTime, scope, engineType),
SemanticNode.parse(dateTime, scope, engineType)));

View File

@@ -3,9 +3,8 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
import com.tencent.supersonic.common.pojo.ColumnOrder;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
@@ -23,11 +22,10 @@ import java.util.List;
public class OutputRender extends Renderer {
@Override
public void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
TableView selectDataSet = super.tableView;
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
for (String dimension : metricCommand.getDimensions()) {
selectDataSet.getMeasure().add(SemanticNode.parse(dimension, scope, engineType));
}

View File

@@ -1,13 +1,14 @@
package com.tencent.supersonic.headless.core.translator.calcite.sql;
package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MeasureNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
@@ -27,29 +28,29 @@ public abstract class Renderer {
protected TableView tableView = new TableView();
public static Optional<Dimension> getDimensionByName(String name, DataSource datasource) {
public static Optional<Dimension> getDimensionByName(String name, DataModel datasource) {
return datasource.getDimensions().stream().filter(d -> d.getName().equalsIgnoreCase(name))
.findFirst();
}
public static Optional<Measure> getMeasureByName(String name, DataSource datasource) {
public static Optional<Measure> getMeasureByName(String name, DataModel datasource) {
return datasource.getMeasures().stream().filter(mm -> mm.getName().equalsIgnoreCase(name))
.findFirst();
}
public static Optional<Metric> getMetricByName(String name, SemanticSchema schema) {
public static Optional<Metric> getMetricByName(String name, S2CalciteSchema schema) {
Optional<Metric> metric = schema.getMetrics().stream()
.filter(m -> m.getName().equalsIgnoreCase(name)).findFirst();
return metric;
}
public static Optional<Identify> getIdentifyByName(String name, DataSource datasource) {
public static Optional<Identify> getIdentifyByName(String name, DataModel datasource) {
return datasource.getIdentifiers().stream().filter(i -> i.getName().equalsIgnoreCase(name))
.findFirst();
}
public static MetricNode buildMetricNode(String metric, DataSource datasource,
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg, String alias)
public static MetricNode buildMetricNode(String metric, DataModel datasource,
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg, String alias)
throws Exception {
Optional<Metric> metricOpt = getMetricByName(metric, schema);
MetricNode metricNode = new MetricNode();
@@ -113,6 +114,6 @@ public abstract class Renderer {
return SemanticNode.buildAs(alias, tableView.build());
}
public abstract void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception;
public abstract void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception;
}

View File

@@ -3,16 +3,15 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
import com.tencent.supersonic.common.pojo.enums.EngineType;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
import com.tencent.supersonic.headless.core.translator.calcite.sql.S2CalciteSchema;
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataModelNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DimensionNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.FilterNode;
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.IdentifyNode;
@@ -21,7 +20,6 @@ import com.tencent.supersonic.headless.core.translator.calcite.sql.node.Semantic
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.util.Litmus;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
@@ -44,7 +42,7 @@ public class SourceRender extends Renderer {
public static TableView renderOne(String alias, List<String> fieldWheres,
List<String> reqMetrics, List<String> reqDimensions, String queryWhere,
DataSource datasource, SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg)
DataModel datasource, SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg)
throws Exception {
TableView dataSet = new TableView();
@@ -97,7 +95,7 @@ public class SourceRender extends Renderer {
output.setMeasure(SemanticNode.deduplicateNode(output.getMeasure()));
dataSet.setMeasure(SemanticNode.deduplicateNode(dataSet.getMeasure()));
SqlNode tableNode = DataSourceNode.buildExtend(datasource, extendFields, scope);
SqlNode tableNode = DataModelNode.buildExtend(datasource, extendFields, scope);
dataSet.setTable(tableNode);
output.setTable(
SemanticNode.buildAs(Constants.DATASOURCE_TABLE_OUT_PREFIX + datasource.getName()
@@ -107,12 +105,11 @@ public class SourceRender extends Renderer {
private static void buildDimension(String alias, String dimension, DataSource datasource,
SemanticSchema schema, boolean nonAgg, Map<String, String> extendFields,
private static void buildDimension(String alias, String dimension, DataModel datasource,
S2CalciteSchema schema, boolean nonAgg, Map<String, String> extendFields,
TableView dataSet, TableView output, SqlValidatorScope scope) throws Exception {
List<Dimension> dimensionList = schema.getDimension().get(datasource.getName());
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
List<Dimension> dimensionList = schema.getDimensions().get(datasource.getName());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
boolean isAdd = false;
if (!CollectionUtils.isEmpty(dimensionList)) {
for (Dimension dim : dimensionList) {
@@ -186,12 +183,11 @@ public class SourceRender extends Renderer {
}
private static List<SqlNode> getWhereMeasure(List<String> fields, List<String> queryMetrics,
List<String> queryDimensions, Map<String, String> extendFields, DataSource datasource,
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
List<String> queryDimensions, Map<String, String> extendFields, DataModel datasource,
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
Iterator<String> iterator = fields.iterator();
List<SqlNode> whereNode = new ArrayList<>();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
while (iterator.hasNext()) {
String cur = iterator.next();
if (queryDimensions.contains(cur) || queryMetrics.contains(cur)) {
@@ -199,7 +195,7 @@ public class SourceRender extends Renderer {
}
}
for (String where : fields) {
List<Dimension> dimensionList = schema.getDimension().get(datasource.getName());
List<Dimension> dimensionList = schema.getDimensions().get(datasource.getName());
boolean isAdd = false;
if (!CollectionUtils.isEmpty(dimensionList)) {
for (Dimension dim : dimensionList) {
@@ -229,8 +225,8 @@ public class SourceRender extends Renderer {
private static void mergeWhere(List<String> fields, TableView dataSet, TableView outputSet,
List<String> queryMetrics, List<String> queryDimensions,
Map<String, String> extendFields, DataSource datasource, SqlValidatorScope scope,
SemanticSchema schema, boolean nonAgg) throws Exception {
Map<String, String> extendFields, DataModel datasource, SqlValidatorScope scope,
S2CalciteSchema schema, boolean nonAgg) throws Exception {
List<SqlNode> whereNode = getWhereMeasure(fields, queryMetrics, queryDimensions,
extendFields, datasource, scope, schema, nonAgg);
dataSet.getMeasure().addAll(whereNode);
@@ -238,7 +234,7 @@ public class SourceRender extends Renderer {
}
public static void whereDimMetric(List<String> fields, List<String> queryMetrics,
List<String> queryDimensions, DataSource datasource, SemanticSchema schema,
List<String> queryDimensions, DataModel datasource, S2CalciteSchema schema,
Set<String> dimensions, Set<String> metrics) {
for (String field : fields) {
if (queryDimensions.contains(field) || queryMetrics.contains(field)) {
@@ -252,8 +248,8 @@ public class SourceRender extends Renderer {
}
}
private static void addField(String field, String oriField, DataSource datasource,
SemanticSchema schema, Set<String> dimensions, Set<String> metrics) {
private static void addField(String field, String oriField, DataModel datasource,
S2CalciteSchema schema, Set<String> dimensions, Set<String> metrics) {
Optional<Dimension> dimension = datasource.getDimensions().stream()
.filter(d -> d.getName().equalsIgnoreCase(field)).findFirst();
if (dimension.isPresent()) {
@@ -266,8 +262,8 @@ public class SourceRender extends Renderer {
dimensions.add(oriField);
return;
}
if (schema.getDimension().containsKey(datasource.getName())) {
Optional<Dimension> dataSourceDim = schema.getDimension().get(datasource.getName())
if (schema.getDimensions().containsKey(datasource.getName())) {
Optional<Dimension> dataSourceDim = schema.getDimensions().get(datasource.getName())
.stream().filter(d -> d.getName().equalsIgnoreCase(field)).findFirst();
if (dataSourceDim.isPresent()) {
dimensions.add(oriField);
@@ -293,7 +289,7 @@ public class SourceRender extends Renderer {
}
}
public static boolean isDimension(String name, DataSource datasource, SemanticSchema schema) {
public static boolean isDimension(String name, DataModel datasource, S2CalciteSchema schema) {
Optional<Dimension> dimension = datasource.getDimensions().stream()
.filter(d -> d.getName().equalsIgnoreCase(name)).findFirst();
if (dimension.isPresent()) {
@@ -304,8 +300,8 @@ public class SourceRender extends Renderer {
if (identify.isPresent()) {
return true;
}
if (schema.getDimension().containsKey(datasource.getName())) {
Optional<Dimension> dataSourceDim = schema.getDimension().get(datasource.getName())
if (schema.getDimensions().containsKey(datasource.getName())) {
Optional<Dimension> dataSourceDim = schema.getDimensions().get(datasource.getName())
.stream().filter(d -> d.getName().equalsIgnoreCase(name)).findFirst();
if (dataSourceDim.isPresent()) {
return true;
@@ -314,13 +310,13 @@ public class SourceRender extends Renderer {
return false;
}
private static void addTimeDimension(DataSource dataSource, List<String> queryDimension) {
if (Materialization.TimePartType.ZIPPER.equals(dataSource.getTimePartType())) {
Optional<Dimension> startTimeOp = dataSource.getDimensions().stream()
private static void addTimeDimension(DataModel dataModel, List<String> queryDimension) {
if (Materialization.TimePartType.ZIPPER.equals(dataModel.getTimePartType())) {
Optional<Dimension> startTimeOp = dataModel.getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_START))
.findFirst();
Optional<Dimension> endTimeOp = dataSource.getDimensions().stream()
Optional<Dimension> endTimeOp = dataModel.getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_END))
.findFirst();
@@ -331,7 +327,7 @@ public class SourceRender extends Renderer {
queryDimension.add(endTimeOp.get().getName());
}
} else {
Optional<Dimension> timeOp = dataSource.getDimensions().stream()
Optional<Dimension> timeOp = dataModel.getDimensions().stream()
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
.findFirst();
if (timeOp.isPresent() && !queryDimension.contains(timeOp.get().getName())) {
@@ -340,27 +336,26 @@ public class SourceRender extends Renderer {
}
}
public void render(MetricQueryParam metricQueryParam, List<DataSource> dataSources,
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
public void render(MetricQueryParam metricQueryParam, List<DataModel> dataModels,
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
String queryWhere = metricQueryParam.getWhere();
Set<String> whereFields = new HashSet<>();
List<String> fieldWhere = new ArrayList<>();
EngineType engineType =
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
if (queryWhere != null && !queryWhere.isEmpty()) {
SqlNode sqlNode = SemanticNode.parse(queryWhere, scope, engineType);
FilterNode.getFilterField(sqlNode, whereFields);
fieldWhere = whereFields.stream().collect(Collectors.toList());
}
if (dataSources.size() == 1) {
DataSource dataSource = dataSources.get(0);
if (dataModels.size() == 1) {
DataModel dataModel = dataModels.get(0);
super.tableView = renderOne("", fieldWhere, metricQueryParam.getMetrics(),
metricQueryParam.getDimensions(), metricQueryParam.getWhere(), dataSource,
scope, schema, nonAgg);
metricQueryParam.getDimensions(), metricQueryParam.getWhere(), dataModel, scope,
schema, nonAgg);
return;
}
JoinRender joinRender = new JoinRender();
joinRender.render(metricQueryParam, dataSources, scope, schema, nonAgg);
joinRender.render(metricQueryParam, dataModels, scope, schema, nonAgg);
super.tableView = joinRender.getTableView();
}
}

View File

@@ -97,7 +97,7 @@ public class CalculateAggConverter implements QueryConverter {
@Override
public void convert(QueryStatement queryStatement) throws Exception {
Database database = queryStatement.getSemanticModel().getDatabase();
Database database = queryStatement.getOntology().getDatabase();
DataSetQueryParam dataSetQueryParam = generateSqlCommend(queryStatement,
EngineType.fromString(database.getType().toUpperCase()), database.getVersion());
queryStatement.setDataSetQueryParam(dataSetQueryParam);

View File

@@ -34,7 +34,7 @@ public class DefaultDimValueConverter implements QueryConverter {
@Override
public void convert(QueryStatement queryStatement) {
List<Dimension> dimensions = queryStatement.getSemanticModel().getDimensions().stream()
List<Dimension> dimensions = queryStatement.getOntology().getDimensions().stream()
.filter(dimension -> !CollectionUtils.isEmpty(dimension.getDefaultValues()))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(dimensions)) {

View File

@@ -5,7 +5,7 @@ import com.tencent.supersonic.common.util.ContextUtils;
import com.tencent.supersonic.headless.api.pojo.QueryParam;
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.utils.SqlGenerateUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
@@ -60,7 +60,7 @@ public class ParserDefaultConverter implements QueryConverter {
// support detail query
if (queryParam.getQueryType().isNativeAggQuery()
&& CollectionUtils.isEmpty(metricQueryParam.getMetrics())) {
Map<Long, DataSource> modelMap = queryStatement.getSemanticModel().getModelMap();
Map<Long, DataModel> modelMap = queryStatement.getOntology().getModelMap();
for (Long modelId : modelMap.keySet()) {
String modelBizName = modelMap.get(modelId).getName();
String internalMetricName =

View File

@@ -4,7 +4,7 @@ import com.tencent.supersonic.headless.api.pojo.enums.ModelDefineType;
import com.tencent.supersonic.headless.api.pojo.response.ModelResp;
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
import com.tencent.supersonic.headless.core.utils.SqlVariableParseUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -39,9 +39,9 @@ public class SqlVariableParseConverter implements QueryConverter {
SqlVariableParseUtils.parse(modelResp.getModelDetail().getSqlQuery(),
modelResp.getModelDetail().getSqlVariables(),
queryStatement.getQueryParam().getParams());
DataSource dataSource = queryStatement.getSemanticModel().getDatasourceMap()
.get(modelResp.getBizName());
dataSource.setSqlQuery(sqlParsed);
DataModel dataModel =
queryStatement.getOntology().getDataModelMap().get(modelResp.getBizName());
dataModel.setSqlQuery(sqlParsed);
}
}
}