mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-11 03:58:14 +00:00
[improvement][headless]Clean code logic of headless core.
This commit is contained in:
@@ -42,7 +42,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
|
||||
}
|
||||
}
|
||||
|
||||
public void parse(QueryStatement queryStatement) throws Exception {
|
||||
private void parse(QueryStatement queryStatement) throws Exception {
|
||||
QueryParam queryParam = queryStatement.getQueryParam();
|
||||
if (Objects.isNull(queryStatement.getDataSetQueryParam())) {
|
||||
queryStatement.setDataSetQueryParam(new DataSetQueryParam());
|
||||
@@ -64,7 +64,8 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
|
||||
} else {
|
||||
queryStatement.getMetricQueryParam()
|
||||
.setNativeQuery(queryParam.getQueryType().isNativeAggQuery());
|
||||
doParse(queryStatement);
|
||||
doParse(queryStatement,
|
||||
AggOption.getAggregation(queryStatement.getMetricQueryParam().isNativeQuery()));
|
||||
}
|
||||
if (StringUtils.isEmpty(queryStatement.getSql())) {
|
||||
throw new RuntimeException("parse Exception: " + queryStatement.getErrMsg());
|
||||
@@ -77,7 +78,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
|
||||
}
|
||||
}
|
||||
|
||||
public QueryStatement doParse(DataSetQueryParam dataSetQueryParam,
|
||||
private QueryStatement doParse(DataSetQueryParam dataSetQueryParam,
|
||||
QueryStatement queryStatement) {
|
||||
log.info("parse dataSetQuery [{}] ", dataSetQueryParam);
|
||||
SemanticModel semanticModel = queryStatement.getSemanticModel();
|
||||
@@ -132,12 +133,7 @@ public class DefaultSemanticTranslator implements SemanticTranslator {
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
public QueryStatement doParse(QueryStatement queryStatement) {
|
||||
return doParse(queryStatement,
|
||||
AggOption.getAggregation(queryStatement.getMetricQueryParam().isNativeQuery()));
|
||||
}
|
||||
|
||||
public QueryStatement doParse(QueryStatement queryStatement, AggOption isAgg) {
|
||||
private QueryStatement doParse(QueryStatement queryStatement, AggOption isAgg) {
|
||||
MetricQueryParam metricQueryParam = queryStatement.getMetricQueryParam();
|
||||
log.info("parse metricQuery [{}] isAgg [{}]", metricQueryParam, isAgg);
|
||||
try {
|
||||
|
||||
@@ -9,7 +9,7 @@ import com.tencent.supersonic.headless.core.translator.QueryParser;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.planner.AggPlanner;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.SemanticModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.RuntimeOptions;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.parser.SqlParseException;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -31,18 +31,18 @@ public class CalciteQueryParser implements QueryParser {
|
||||
return;
|
||||
}
|
||||
queryStatement.setMetricQueryParam(metricReq);
|
||||
SemanticSchema semanticSchema = getSemanticSchema(semanticModel, queryStatement);
|
||||
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
|
||||
aggBuilder.explain(queryStatement, isAgg);
|
||||
S2SemanticSchema semanticSchema = getSemanticSchema(semanticModel, queryStatement);
|
||||
AggPlanner aggPlanner = new AggPlanner(semanticSchema);
|
||||
aggPlanner.plan(queryStatement, isAgg);
|
||||
EngineType engineType = EngineType.fromString(semanticModel.getDatabase().getType());
|
||||
queryStatement.setSql(aggBuilder.getSql(engineType));
|
||||
queryStatement.setSql(aggPlanner.getSql(engineType));
|
||||
if (Objects.nonNull(queryStatement.getEnableOptimize())
|
||||
&& queryStatement.getEnableOptimize()
|
||||
&& Objects.nonNull(queryStatement.getDataSetAlias())
|
||||
&& !queryStatement.getDataSetAlias().isEmpty()) {
|
||||
// simplify model sql with query sql
|
||||
String simplifySql = aggBuilder.simplify(
|
||||
getSqlByDataSet(engineType, aggBuilder.getSql(engineType),
|
||||
String simplifySql = aggPlanner.simplify(
|
||||
getSqlByDataSet(engineType, aggPlanner.getSql(engineType),
|
||||
queryStatement.getDataSetSql(), queryStatement.getDataSetAlias()),
|
||||
engineType);
|
||||
if (Objects.nonNull(simplifySql) && !simplifySql.isEmpty()) {
|
||||
@@ -52,10 +52,10 @@ public class CalciteQueryParser implements QueryParser {
|
||||
}
|
||||
}
|
||||
|
||||
private SemanticSchema getSemanticSchema(SemanticModel semanticModel,
|
||||
private S2SemanticSchema getSemanticSchema(SemanticModel semanticModel,
|
||||
QueryStatement queryStatement) {
|
||||
SemanticSchema semanticSchema =
|
||||
SemanticSchema.newBuilder(semanticModel.getSchemaKey()).build();
|
||||
S2SemanticSchema semanticSchema =
|
||||
S2SemanticSchema.newBuilder(semanticModel.getSchemaKey()).build();
|
||||
semanticSchema.setSemanticModel(semanticModel);
|
||||
semanticSchema.setDatasource(semanticModel.getDatasourceMap());
|
||||
semanticSchema.setDimension(semanticModel.getDimensionMap());
|
||||
|
||||
@@ -7,11 +7,10 @@ import com.tencent.supersonic.headless.core.pojo.Database;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.render.FilterRender;
|
||||
@@ -27,29 +26,27 @@ import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Objects;
|
||||
import java.util.Stack;
|
||||
|
||||
/** parsing from query dimensions and metrics */
|
||||
@Slf4j
|
||||
public class AggPlanner implements Planner {
|
||||
|
||||
private MetricQueryParam metricReq;
|
||||
private SemanticSchema schema;
|
||||
private final S2SemanticSchema schema;
|
||||
private SqlValidatorScope scope;
|
||||
private Stack<TableView> dataSets = new Stack<>();
|
||||
private SqlNode parserNode;
|
||||
private String sourceId;
|
||||
private boolean isAgg = false;
|
||||
private AggOption aggOption = AggOption.DEFAULT;
|
||||
|
||||
public AggPlanner(SemanticSchema schema) {
|
||||
public AggPlanner(S2SemanticSchema schema) {
|
||||
this.schema = schema;
|
||||
}
|
||||
|
||||
public void parse() throws Exception {
|
||||
private void parse() throws Exception {
|
||||
// find the match Datasource
|
||||
scope = SchemaBuilder.getScope(schema);
|
||||
List<DataSource> datasource = getMatchDataSource(scope);
|
||||
List<DataModel> datasource = getMatchDataSource(scope);
|
||||
if (datasource == null || datasource.isEmpty()) {
|
||||
throw new Exception("datasource not found");
|
||||
}
|
||||
@@ -78,16 +75,16 @@ public class AggPlanner implements Planner {
|
||||
parserNode = builders.getLast().builder();
|
||||
}
|
||||
|
||||
private List<DataSource> getMatchDataSource(SqlValidatorScope scope) throws Exception {
|
||||
private List<DataModel> getMatchDataSource(SqlValidatorScope scope) throws Exception {
|
||||
return DataSourceNode.getMatchDataSources(scope, schema, metricReq);
|
||||
}
|
||||
|
||||
private boolean getAgg(DataSource dataSource) {
|
||||
private boolean getAgg(DataModel dataModel) {
|
||||
if (!AggOption.DEFAULT.equals(aggOption)) {
|
||||
return AggOption.isAgg(aggOption);
|
||||
}
|
||||
// default by dataSource time aggregation
|
||||
if (Objects.nonNull(dataSource.getAggTime()) && !dataSource.getAggTime()
|
||||
// default by dataModel time aggregation
|
||||
if (Objects.nonNull(dataModel.getAggTime()) && !dataModel.getAggTime()
|
||||
.equalsIgnoreCase(Constants.DIMENSION_TYPE_TIME_GRANULARITY_NONE)) {
|
||||
if (!metricReq.isNativeQuery()) {
|
||||
return true;
|
||||
@@ -97,7 +94,7 @@ public class AggPlanner implements Planner {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void explain(QueryStatement queryStatement, AggOption aggOption) throws Exception {
|
||||
public void plan(QueryStatement queryStatement, AggOption aggOption) throws Exception {
|
||||
this.metricReq = queryStatement.getMetricQueryParam();
|
||||
if (metricReq.getMetrics() == null) {
|
||||
metricReq.setMetrics(new ArrayList<>());
|
||||
@@ -129,22 +126,6 @@ public class AggPlanner implements Planner {
|
||||
|
||||
@Override
|
||||
public String simplify(String sql, EngineType engineType) {
|
||||
return optimize(sql, engineType);
|
||||
}
|
||||
|
||||
public void optimize(EngineType engineType) {
|
||||
if (Objects.isNull(schema.getRuntimeOptions())
|
||||
|| Objects.isNull(schema.getRuntimeOptions().getEnableOptimize())
|
||||
|| !schema.getRuntimeOptions().getEnableOptimize()) {
|
||||
return;
|
||||
}
|
||||
SqlNode optimizeNode = optimizeSql(SemanticNode.getSql(parserNode, engineType), engineType);
|
||||
if (Objects.nonNull(optimizeNode)) {
|
||||
parserNode = optimizeNode;
|
||||
}
|
||||
}
|
||||
|
||||
public String optimize(String sql, EngineType engineType) {
|
||||
try {
|
||||
SqlNode sqlNode =
|
||||
SqlParser.create(sql, Configuration.getParserConfig(engineType)).parseStmt();
|
||||
@@ -153,21 +134,32 @@ public class AggPlanner implements Planner {
|
||||
SemanticNode.optimize(scope, schema, sqlNode, engineType), engineType);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("optimize error {}", e);
|
||||
log.error("optimize error {}", e.toString());
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private SqlNode optimizeSql(String sql, EngineType engineType) {
|
||||
private void optimize(EngineType engineType) {
|
||||
if (Objects.isNull(schema.getRuntimeOptions())
|
||||
|| Objects.isNull(schema.getRuntimeOptions().getEnableOptimize())
|
||||
|| !schema.getRuntimeOptions().getEnableOptimize()) {
|
||||
return;
|
||||
}
|
||||
|
||||
SqlNode optimizeNode = null;
|
||||
try {
|
||||
SqlNode sqlNode =
|
||||
SqlParser.create(sql, Configuration.getParserConfig(engineType)).parseStmt();
|
||||
SqlNode sqlNode = SqlParser.create(SemanticNode.getSql(parserNode, engineType),
|
||||
Configuration.getParserConfig(engineType)).parseStmt();
|
||||
if (Objects.nonNull(sqlNode)) {
|
||||
return SemanticNode.optimize(scope, schema, sqlNode, engineType);
|
||||
optimizeNode = SemanticNode.optimize(scope, schema, sqlNode, engineType);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("optimize error {}", e);
|
||||
}
|
||||
return null;
|
||||
|
||||
if (Objects.nonNull(optimizeNode)) {
|
||||
parserNode = optimizeNode;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -7,11 +7,11 @@ import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
/** parse and generate SQL and other execute information */
|
||||
public interface Planner {
|
||||
|
||||
public void explain(QueryStatement queryStatement, AggOption aggOption) throws Exception;
|
||||
void plan(QueryStatement queryStatement, AggOption aggOption) throws Exception;
|
||||
|
||||
public String getSql(EngineType enginType);
|
||||
String getSql(EngineType enginType);
|
||||
|
||||
public String getSourceId();
|
||||
String getSourceId();
|
||||
|
||||
public String simplify(String sql, EngineType engineType);
|
||||
String simplify(String sql, EngineType engineType);
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import java.util.List;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
public class DataSource {
|
||||
public class DataModel {
|
||||
|
||||
private Long id;
|
||||
|
||||
@@ -15,7 +15,7 @@ public class SemanticModel {
|
||||
|
||||
private String schemaKey;
|
||||
private List<Metric> metrics = new ArrayList<>();
|
||||
private Map<String, DataSource> datasourceMap = new HashMap<>();
|
||||
private Map<String, DataModel> datasourceMap = new HashMap<>();
|
||||
private Map<String, List<Dimension>> dimensionMap = new HashMap<>();
|
||||
private List<Materialization> materializationList = new ArrayList<>();
|
||||
private List<JoinRelation> joinRelations;
|
||||
@@ -26,8 +26,8 @@ public class SemanticModel {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public Map<Long, DataSource> getModelMap() {
|
||||
public Map<Long, DataModel> getModelMap() {
|
||||
return datasourceMap.values().stream()
|
||||
.collect(Collectors.toMap(DataSource::getId, dataSource -> dataSource));
|
||||
.collect(Collectors.toMap(DataModel::getId, dataSource -> dataSource));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.tencent.supersonic.headless.core.translator.calcite.schema;
|
||||
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
|
||||
@@ -15,9 +15,10 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class SemanticSchema extends AbstractSchema {
|
||||
public class S2SemanticSchema extends AbstractSchema {
|
||||
|
||||
private final String schemaKey;
|
||||
|
||||
private final Map<String, Table> tableMap;
|
||||
|
||||
private SemanticModel semanticModel = new SemanticModel();
|
||||
@@ -26,7 +27,7 @@ public class SemanticSchema extends AbstractSchema {
|
||||
|
||||
private RuntimeOptions runtimeOptions;
|
||||
|
||||
private SemanticSchema(String schemaKey, Map<String, Table> tableMap) {
|
||||
private S2SemanticSchema(String schemaKey, Map<String, Table> tableMap) {
|
||||
this.schemaKey = schemaKey;
|
||||
this.tableMap = tableMap;
|
||||
}
|
||||
@@ -57,11 +58,11 @@ public class SemanticSchema extends AbstractSchema {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Map<String, DataSource> getDatasource() {
|
||||
public Map<String, DataModel> getDatasource() {
|
||||
return semanticModel.getDatasourceMap();
|
||||
}
|
||||
|
||||
public void setDatasource(Map<String, DataSource> datasource) {
|
||||
public void setDatasource(Map<String, DataModel> datasource) {
|
||||
semanticModel.setDatasourceMap(datasource);
|
||||
}
|
||||
|
||||
@@ -129,8 +130,8 @@ public class SemanticSchema extends AbstractSchema {
|
||||
return this;
|
||||
}
|
||||
|
||||
public SemanticSchema build() {
|
||||
return new SemanticSchema(schemaKey, tableMap);
|
||||
public S2SemanticSchema build() {
|
||||
return new S2SemanticSchema(schemaKey, tableMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,7 @@ public class SchemaBuilder {
|
||||
public static final String MATERIALIZATION_SYS_FIELD_DATE = "C1";
|
||||
public static final String MATERIALIZATION_SYS_FIELD_DATA = "C2";
|
||||
|
||||
public static SqlValidatorScope getScope(SemanticSchema schema) throws Exception {
|
||||
public static SqlValidatorScope getScope(S2SemanticSchema schema) throws Exception {
|
||||
Map<String, RelDataType> nameToTypeMap = new HashMap<>();
|
||||
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
|
||||
rootSchema.add(schema.getSchemaKey(), schema);
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package com.tencent.supersonic.headless.core.translator.calcite.sql;
|
||||
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
|
||||
public interface Optimization {
|
||||
|
||||
public void visit(SemanticSchema semanticSchema);
|
||||
public void visit(S2SemanticSchema semanticSchema);
|
||||
}
|
||||
|
||||
@@ -2,12 +2,12 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MeasureNode;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.SemanticNode;
|
||||
@@ -27,29 +27,29 @@ public abstract class Renderer {
|
||||
|
||||
protected TableView tableView = new TableView();
|
||||
|
||||
public static Optional<Dimension> getDimensionByName(String name, DataSource datasource) {
|
||||
public static Optional<Dimension> getDimensionByName(String name, DataModel datasource) {
|
||||
return datasource.getDimensions().stream().filter(d -> d.getName().equalsIgnoreCase(name))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
public static Optional<Measure> getMeasureByName(String name, DataSource datasource) {
|
||||
public static Optional<Measure> getMeasureByName(String name, DataModel datasource) {
|
||||
return datasource.getMeasures().stream().filter(mm -> mm.getName().equalsIgnoreCase(name))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
public static Optional<Metric> getMetricByName(String name, SemanticSchema schema) {
|
||||
public static Optional<Metric> getMetricByName(String name, S2SemanticSchema schema) {
|
||||
Optional<Metric> metric = schema.getMetrics().stream()
|
||||
.filter(m -> m.getName().equalsIgnoreCase(name)).findFirst();
|
||||
return metric;
|
||||
}
|
||||
|
||||
public static Optional<Identify> getIdentifyByName(String name, DataSource datasource) {
|
||||
public static Optional<Identify> getIdentifyByName(String name, DataModel datasource) {
|
||||
return datasource.getIdentifiers().stream().filter(i -> i.getName().equalsIgnoreCase(name))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
public static MetricNode buildMetricNode(String metric, DataSource datasource,
|
||||
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg, String alias)
|
||||
public static MetricNode buildMetricNode(String metric, DataModel datasource,
|
||||
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg, String alias)
|
||||
throws Exception {
|
||||
Optional<Metric> metricOpt = getMetricByName(metric, schema);
|
||||
MetricNode metricNode = new MetricNode();
|
||||
@@ -113,6 +113,6 @@ public abstract class Renderer {
|
||||
return SemanticNode.buildAs(alias, tableView.build());
|
||||
}
|
||||
|
||||
public abstract void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
|
||||
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception;
|
||||
public abstract void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
|
||||
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception;
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.tencent.supersonic.headless.core.translator.calcite.sql;
|
||||
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import lombok.Data;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
import org.apache.calcite.sql.SqlKind;
|
||||
@@ -27,7 +27,7 @@ public class TableView {
|
||||
|
||||
private String alias;
|
||||
private List<String> primary;
|
||||
private DataSource dataSource;
|
||||
private DataModel dataModel;
|
||||
|
||||
public SqlNode build() {
|
||||
measure.addAll(dimension);
|
||||
|
||||
@@ -6,13 +6,13 @@ import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SchemaBuilder;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.extend.LateralViewExplodeNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
@@ -40,7 +40,7 @@ import java.util.stream.Collectors;
|
||||
@Slf4j
|
||||
public class DataSourceNode extends SemanticNode {
|
||||
|
||||
public static SqlNode build(DataSource datasource, SqlValidatorScope scope) throws Exception {
|
||||
public static SqlNode build(DataModel datasource, SqlValidatorScope scope) throws Exception {
|
||||
String sqlTable = "";
|
||||
if (datasource.getSqlQuery() != null && !datasource.getSqlQuery().isEmpty()) {
|
||||
sqlTable = datasource.getSqlQuery();
|
||||
@@ -61,7 +61,7 @@ public class DataSourceNode extends SemanticNode {
|
||||
return buildAs(datasource.getName(), source);
|
||||
}
|
||||
|
||||
private static void addSchema(SqlValidatorScope scope, DataSource datasource, String table)
|
||||
private static void addSchema(SqlValidatorScope scope, DataModel datasource, String table)
|
||||
throws Exception {
|
||||
Map<String, Set<String>> sqlTable = SqlSelectHelper.getFieldsWithSubQuery(table);
|
||||
for (Map.Entry<String, Set<String>> entry : sqlTable.entrySet()) {
|
||||
@@ -75,7 +75,7 @@ public class DataSourceNode extends SemanticNode {
|
||||
}
|
||||
}
|
||||
|
||||
private static void addSchemaTable(SqlValidatorScope scope, DataSource datasource, String db,
|
||||
private static void addSchemaTable(SqlValidatorScope scope, DataModel datasource, String db,
|
||||
String tb, Set<String> fields) throws Exception {
|
||||
Set<String> dateInfo = new HashSet<>();
|
||||
Set<String> dimensions = new HashSet<>();
|
||||
@@ -112,7 +112,7 @@ public class DataSourceNode extends SemanticNode {
|
||||
dateInfo, dimensions, metrics);
|
||||
}
|
||||
|
||||
public static SqlNode buildExtend(DataSource datasource, Map<String, String> exprList,
|
||||
public static SqlNode buildExtend(DataModel datasource, Map<String, String> exprList,
|
||||
SqlValidatorScope scope) throws Exception {
|
||||
if (CollectionUtils.isEmpty(exprList)) {
|
||||
return build(datasource, scope);
|
||||
@@ -146,11 +146,11 @@ public class DataSourceNode extends SemanticNode {
|
||||
return sqlNode;
|
||||
}
|
||||
|
||||
public static String getNames(List<DataSource> dataSourceList) {
|
||||
return dataSourceList.stream().map(d -> d.getName()).collect(Collectors.joining("_"));
|
||||
public static String getNames(List<DataModel> dataModelList) {
|
||||
return dataModelList.stream().map(d -> d.getName()).collect(Collectors.joining("_"));
|
||||
}
|
||||
|
||||
public static void getQueryDimensionMeasure(SemanticSchema schema,
|
||||
public static void getQueryDimensionMeasure(S2SemanticSchema schema,
|
||||
MetricQueryParam metricCommand, Set<String> queryDimension, List<String> measures) {
|
||||
queryDimension.addAll(metricCommand.getDimensions().stream()
|
||||
.map(d -> d.contains(Constants.DIMENSION_IDENTIFY)
|
||||
@@ -166,7 +166,7 @@ public class DataSourceNode extends SemanticNode {
|
||||
.forEach(m -> measures.add(m));
|
||||
}
|
||||
|
||||
public static void mergeQueryFilterDimensionMeasure(SemanticSchema schema,
|
||||
public static void mergeQueryFilterDimensionMeasure(S2SemanticSchema schema,
|
||||
MetricQueryParam metricCommand, Set<String> queryDimension, List<String> measures,
|
||||
SqlValidatorScope scope) throws Exception {
|
||||
EngineType engineType =
|
||||
@@ -193,18 +193,18 @@ public class DataSourceNode extends SemanticNode {
|
||||
}
|
||||
}
|
||||
|
||||
public static List<DataSource> getMatchDataSources(SqlValidatorScope scope,
|
||||
SemanticSchema schema, MetricQueryParam metricCommand) throws Exception {
|
||||
List<DataSource> dataSources = new ArrayList<>();
|
||||
public static List<DataModel> getMatchDataSources(SqlValidatorScope scope,
|
||||
S2SemanticSchema schema, MetricQueryParam metricCommand) throws Exception {
|
||||
List<DataModel> dataModels = new ArrayList<>();
|
||||
|
||||
// check by metric
|
||||
List<String> measures = new ArrayList<>();
|
||||
Set<String> queryDimension = new HashSet<>();
|
||||
getQueryDimensionMeasure(schema, metricCommand, queryDimension, measures);
|
||||
DataSource baseDataSource = null;
|
||||
DataModel baseDataModel = null;
|
||||
// one , match measure count
|
||||
Map<String, Integer> dataSourceMeasures = new HashMap<>();
|
||||
for (Map.Entry<String, DataSource> entry : schema.getDatasource().entrySet()) {
|
||||
for (Map.Entry<String, DataModel> entry : schema.getDatasource().entrySet()) {
|
||||
Set<String> sourceMeasure = entry.getValue().getMeasures().stream()
|
||||
.map(mm -> mm.getName()).collect(Collectors.toSet());
|
||||
sourceMeasure.retainAll(measures);
|
||||
@@ -214,19 +214,19 @@ public class DataSourceNode extends SemanticNode {
|
||||
Optional<Map.Entry<String, Integer>> base = dataSourceMeasures.entrySet().stream()
|
||||
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())).findFirst();
|
||||
if (base.isPresent()) {
|
||||
baseDataSource = schema.getDatasource().get(base.get().getKey());
|
||||
dataSources.add(baseDataSource);
|
||||
baseDataModel = schema.getDatasource().get(base.get().getKey());
|
||||
dataModels.add(baseDataModel);
|
||||
}
|
||||
// second , check match all dimension and metric
|
||||
if (baseDataSource != null) {
|
||||
if (baseDataModel != null) {
|
||||
Set<String> filterMeasure = new HashSet<>();
|
||||
Set<String> sourceMeasure = baseDataSource.getMeasures().stream()
|
||||
.map(mm -> mm.getName()).collect(Collectors.toSet());
|
||||
Set<String> dimension = baseDataSource.getDimensions().stream().map(dd -> dd.getName())
|
||||
Set<String> sourceMeasure = baseDataModel.getMeasures().stream().map(mm -> mm.getName())
|
||||
.collect(Collectors.toSet());
|
||||
baseDataSource.getIdentifiers().stream().forEach(i -> dimension.add(i.getName()));
|
||||
if (schema.getDimension().containsKey(baseDataSource.getName())) {
|
||||
schema.getDimension().get(baseDataSource.getName()).stream()
|
||||
Set<String> dimension = baseDataModel.getDimensions().stream().map(dd -> dd.getName())
|
||||
.collect(Collectors.toSet());
|
||||
baseDataModel.getIdentifiers().stream().forEach(i -> dimension.add(i.getName()));
|
||||
if (schema.getDimension().containsKey(baseDataModel.getName())) {
|
||||
schema.getDimension().get(baseDataModel.getName()).stream()
|
||||
.forEach(d -> dimension.add(d.getName()));
|
||||
}
|
||||
filterMeasure.addAll(sourceMeasure);
|
||||
@@ -238,34 +238,34 @@ public class DataSourceNode extends SemanticNode {
|
||||
boolean isAllMatch = checkMatch(sourceMeasure, queryDimension, measures, dimension,
|
||||
metricCommand, scope, engineType);
|
||||
if (isAllMatch) {
|
||||
log.debug("baseDataSource match all ");
|
||||
return dataSources;
|
||||
log.debug("baseDataModel match all ");
|
||||
return dataModels;
|
||||
}
|
||||
// find all dataSource has the same identifiers
|
||||
List<DataSource> linkDataSources = getLinkDataSourcesByJoinRelation(queryDimension,
|
||||
measures, baseDataSource, schema);
|
||||
if (CollectionUtils.isEmpty(linkDataSources)) {
|
||||
log.debug("baseDataSource get by identifiers ");
|
||||
Set<String> baseIdentifiers = baseDataSource.getIdentifiers().stream()
|
||||
List<DataModel> linkDataModels = getLinkDataSourcesByJoinRelation(queryDimension,
|
||||
measures, baseDataModel, schema);
|
||||
if (CollectionUtils.isEmpty(linkDataModels)) {
|
||||
log.debug("baseDataModel get by identifiers ");
|
||||
Set<String> baseIdentifiers = baseDataModel.getIdentifiers().stream()
|
||||
.map(i -> i.getName()).collect(Collectors.toSet());
|
||||
if (baseIdentifiers.isEmpty()) {
|
||||
throw new Exception(
|
||||
"datasource error : " + baseDataSource.getName() + " miss identifier");
|
||||
"datasource error : " + baseDataModel.getName() + " miss identifier");
|
||||
}
|
||||
linkDataSources = getLinkDataSources(baseIdentifiers, queryDimension, measures,
|
||||
baseDataSource, schema);
|
||||
if (linkDataSources.isEmpty()) {
|
||||
linkDataModels = getLinkDataSources(baseIdentifiers, queryDimension, measures,
|
||||
baseDataModel, schema);
|
||||
if (linkDataModels.isEmpty()) {
|
||||
throw new Exception(String.format(
|
||||
"not find the match datasource : dimension[%s],measure[%s]",
|
||||
queryDimension, measures));
|
||||
}
|
||||
}
|
||||
log.debug("linkDataSources {}", linkDataSources);
|
||||
return linkDataSources;
|
||||
// dataSources.addAll(linkDataSources);
|
||||
log.debug("linkDataModels {}", linkDataModels);
|
||||
return linkDataModels;
|
||||
// dataModels.addAll(linkDataModels);
|
||||
}
|
||||
|
||||
return dataSources;
|
||||
return dataModels;
|
||||
}
|
||||
|
||||
private static boolean checkMatch(Set<String> sourceMeasure, Set<String> queryDimension,
|
||||
@@ -301,17 +301,17 @@ public class DataSourceNode extends SemanticNode {
|
||||
return isAllMatch;
|
||||
}
|
||||
|
||||
private static List<DataSource> getLinkDataSourcesByJoinRelation(Set<String> queryDimension,
|
||||
List<String> measures, DataSource baseDataSource, SemanticSchema schema) {
|
||||
private static List<DataModel> getLinkDataSourcesByJoinRelation(Set<String> queryDimension,
|
||||
List<String> measures, DataModel baseDataModel, S2SemanticSchema schema) {
|
||||
Set<String> linkDataSourceName = new HashSet<>();
|
||||
List<DataSource> linkDataSources = new ArrayList<>();
|
||||
List<DataModel> linkDataModels = new ArrayList<>();
|
||||
Set<String> before = new HashSet<>();
|
||||
before.add(baseDataSource.getName());
|
||||
before.add(baseDataModel.getName());
|
||||
if (!CollectionUtils.isEmpty(schema.getJoinRelations())) {
|
||||
Set<Long> visitJoinRelations = new HashSet<>();
|
||||
List<JoinRelation> sortedJoinRelation = new ArrayList<>();
|
||||
sortJoinRelation(schema.getJoinRelations(), baseDataSource.getName(),
|
||||
visitJoinRelations, sortedJoinRelation);
|
||||
sortJoinRelation(schema.getJoinRelations(), baseDataModel.getName(), visitJoinRelations,
|
||||
sortedJoinRelation);
|
||||
schema.getJoinRelations().stream().filter(j -> !visitJoinRelations.contains(j.getId()))
|
||||
.forEach(j -> sortedJoinRelation.add(j));
|
||||
for (JoinRelation joinRelation : sortedJoinRelation) {
|
||||
@@ -321,7 +321,7 @@ public class DataSourceNode extends SemanticNode {
|
||||
}
|
||||
boolean isMatch = false;
|
||||
boolean isRight = before.contains(joinRelation.getLeft());
|
||||
DataSource other = isRight ? schema.getDatasource().get(joinRelation.getRight())
|
||||
DataModel other = isRight ? schema.getDatasource().get(joinRelation.getRight())
|
||||
: schema.getDatasource().get(joinRelation.getLeft());
|
||||
if (!queryDimension.isEmpty()) {
|
||||
Set<String> linkDimension = other.getDimensions().stream()
|
||||
@@ -354,8 +354,8 @@ public class DataSourceNode extends SemanticNode {
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(linkDataSourceName)) {
|
||||
Map<String, Long> orders = new HashMap<>();
|
||||
linkDataSourceName.add(baseDataSource.getName());
|
||||
orders.put(baseDataSource.getName(), 0L);
|
||||
linkDataSourceName.add(baseDataModel.getName());
|
||||
orders.put(baseDataModel.getName(), 0L);
|
||||
for (JoinRelation joinRelation : schema.getJoinRelations()) {
|
||||
if (linkDataSourceName.contains(joinRelation.getLeft())
|
||||
&& linkDataSourceName.contains(joinRelation.getRight())) {
|
||||
@@ -364,10 +364,10 @@ public class DataSourceNode extends SemanticNode {
|
||||
}
|
||||
}
|
||||
orders.entrySet().stream().sorted(Map.Entry.comparingByValue()).forEach(d -> {
|
||||
linkDataSources.add(schema.getDatasource().get(d.getKey()));
|
||||
linkDataModels.add(schema.getDatasource().get(d.getKey()));
|
||||
});
|
||||
}
|
||||
return linkDataSources;
|
||||
return linkDataModels;
|
||||
}
|
||||
|
||||
private static void sortJoinRelation(List<JoinRelation> joinRelations, String next,
|
||||
@@ -385,13 +385,13 @@ public class DataSourceNode extends SemanticNode {
|
||||
}
|
||||
}
|
||||
|
||||
private static List<DataSource> getLinkDataSources(Set<String> baseIdentifiers,
|
||||
Set<String> queryDimension, List<String> measures, DataSource baseDataSource,
|
||||
SemanticSchema schema) {
|
||||
private static List<DataModel> getLinkDataSources(Set<String> baseIdentifiers,
|
||||
Set<String> queryDimension, List<String> measures, DataModel baseDataModel,
|
||||
S2SemanticSchema schema) {
|
||||
Set<String> linkDataSourceName = new HashSet<>();
|
||||
List<DataSource> linkDataSources = new ArrayList<>();
|
||||
for (Map.Entry<String, DataSource> entry : schema.getDatasource().entrySet()) {
|
||||
if (entry.getKey().equalsIgnoreCase(baseDataSource.getName())) {
|
||||
List<DataModel> linkDataModels = new ArrayList<>();
|
||||
for (Map.Entry<String, DataModel> entry : schema.getDatasource().entrySet()) {
|
||||
if (entry.getKey().equalsIgnoreCase(baseDataModel.getName())) {
|
||||
continue;
|
||||
}
|
||||
Long identifierNum = entry.getValue().getIdentifiers().stream().map(i -> i.getName())
|
||||
@@ -432,12 +432,12 @@ public class DataSourceNode extends SemanticNode {
|
||||
}
|
||||
}
|
||||
for (String linkName : linkDataSourceName) {
|
||||
linkDataSources.add(schema.getDatasource().get(linkName));
|
||||
linkDataModels.add(schema.getDatasource().get(linkName));
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(linkDataSources)) {
|
||||
List<DataSource> all = new ArrayList<>();
|
||||
all.add(baseDataSource);
|
||||
all.addAll(linkDataSources);
|
||||
if (!CollectionUtils.isEmpty(linkDataModels)) {
|
||||
List<DataModel> all = new ArrayList<>();
|
||||
all.add(baseDataModel);
|
||||
all.addAll(linkDataModels);
|
||||
return all;
|
||||
}
|
||||
return Lists.newArrayList();
|
||||
|
||||
@@ -2,7 +2,7 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.node;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import lombok.Data;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
@@ -30,7 +30,7 @@ public class MetricNode extends SemanticNode {
|
||||
return buildAs(metric.getName(), sqlNode);
|
||||
}
|
||||
|
||||
public static Boolean isMetricField(String name, SemanticSchema schema) {
|
||||
public static Boolean isMetricField(String name, S2SemanticSchema schema) {
|
||||
Optional<Metric> metric = schema.getMetrics().stream()
|
||||
.filter(m -> m.getName().equalsIgnoreCase(name)).findFirst();
|
||||
return metric.isPresent() && metric.get().getMetricTypeParams().isFieldMetric();
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.tencent.supersonic.common.calcite.SemanticSqlDialect;
|
||||
import com.tencent.supersonic.common.calcite.SqlDialectFactory;
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.optimizer.FilterToGroupScanRule;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.plan.RelOptPlanner;
|
||||
@@ -397,8 +397,8 @@ public abstract class SemanticNode {
|
||||
return parseInfo;
|
||||
}
|
||||
|
||||
public static SqlNode optimize(SqlValidatorScope scope, SemanticSchema schema, SqlNode sqlNode,
|
||||
EngineType engineType) {
|
||||
public static SqlNode optimize(SqlValidatorScope scope, S2SemanticSchema schema,
|
||||
SqlNode sqlNode, EngineType engineType) {
|
||||
try {
|
||||
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
|
||||
SemanticSqlDialect sqlDialect = SqlDialectFactory.getSqlDialect(engineType);
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package com.tencent.supersonic.headless.core.translator.calcite.sql.optimizer;
|
||||
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import org.apache.calcite.plan.RelOptRuleCall;
|
||||
import org.apache.calcite.plan.RelRule;
|
||||
import org.apache.calcite.rel.core.Aggregate;
|
||||
@@ -40,9 +40,10 @@ public class FilterToGroupScanRule extends RelRule<Config> implements Transforma
|
||||
});
|
||||
}).as(FilterTableScanRule.Config.class);
|
||||
|
||||
private SemanticSchema semanticSchema;
|
||||
private S2SemanticSchema semanticSchema;
|
||||
|
||||
public FilterToGroupScanRule(FilterTableScanRule.Config config, SemanticSchema semanticSchema) {
|
||||
public FilterToGroupScanRule(FilterTableScanRule.Config config,
|
||||
S2SemanticSchema semanticSchema) {
|
||||
super(config);
|
||||
this.semanticSchema = semanticSchema;
|
||||
}
|
||||
|
||||
@@ -3,9 +3,9 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.FilterNode;
|
||||
@@ -27,8 +27,8 @@ import java.util.stream.Collectors;
|
||||
public class FilterRender extends Renderer {
|
||||
|
||||
@Override
|
||||
public void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
|
||||
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
|
||||
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
TableView tableView = super.tableView;
|
||||
SqlNode filterNode = null;
|
||||
List<String> queryMetrics = new ArrayList<>(metricCommand.getMetrics());
|
||||
@@ -43,9 +43,9 @@ public class FilterRender extends Renderer {
|
||||
List<String> fieldWhere = whereFields.stream().collect(Collectors.toList());
|
||||
Set<String> dimensions = new HashSet<>();
|
||||
Set<String> metrics = new HashSet<>();
|
||||
for (DataSource dataSource : dataSources) {
|
||||
for (DataModel dataModel : dataModels) {
|
||||
SourceRender.whereDimMetric(fieldWhere, metricCommand.getMetrics(),
|
||||
metricCommand.getDimensions(), dataSource, schema, dimensions, metrics);
|
||||
metricCommand.getDimensions(), dataModel, schema, dimensions, metrics);
|
||||
}
|
||||
queryMetrics.addAll(metrics);
|
||||
queryDimensions.addAll(dimensions);
|
||||
|
||||
@@ -3,13 +3,13 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.JoinRelation;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.AggFunctionNode;
|
||||
@@ -48,8 +48,8 @@ import java.util.stream.Collectors;
|
||||
public class JoinRender extends Renderer {
|
||||
|
||||
@Override
|
||||
public void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
|
||||
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
|
||||
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
String queryWhere = metricCommand.getWhere();
|
||||
EngineType engineType =
|
||||
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
|
||||
@@ -71,14 +71,14 @@ public class JoinRender extends Renderer {
|
||||
Set<String> filterDimension = new HashSet<>();
|
||||
Map<String, String> beforeSources = new HashMap<>();
|
||||
|
||||
for (int i = 0; i < dataSources.size(); i++) {
|
||||
final DataSource dataSource = dataSources.get(i);
|
||||
for (int i = 0; i < dataModels.size(); i++) {
|
||||
final DataModel dataModel = dataModels.get(i);
|
||||
final Set<String> filterDimensions = new HashSet<>();
|
||||
final Set<String> filterMetrics = new HashSet<>();
|
||||
final List<String> queryDimension = new ArrayList<>();
|
||||
final List<String> queryMetrics = new ArrayList<>();
|
||||
SourceRender.whereDimMetric(fieldWhere, queryMetrics, queryDimension, dataSource,
|
||||
schema, filterDimensions, filterMetrics);
|
||||
SourceRender.whereDimMetric(fieldWhere, queryMetrics, queryDimension, dataModel, schema,
|
||||
filterDimensions, filterMetrics);
|
||||
List<String> reqMetric = new ArrayList<>(metricCommand.getMetrics());
|
||||
reqMetric.addAll(filterMetrics);
|
||||
reqMetric = uniqList(reqMetric);
|
||||
@@ -87,40 +87,40 @@ public class JoinRender extends Renderer {
|
||||
reqDimension.addAll(filterDimensions);
|
||||
reqDimension = uniqList(reqDimension);
|
||||
|
||||
Set<String> sourceMeasure = dataSource.getMeasures().stream().map(mm -> mm.getName())
|
||||
Set<String> sourceMeasure = dataModel.getMeasures().stream().map(mm -> mm.getName())
|
||||
.collect(Collectors.toSet());
|
||||
doMetric(innerSelect, filterView, queryMetrics, reqMetric, dataSource, sourceMeasure,
|
||||
doMetric(innerSelect, filterView, queryMetrics, reqMetric, dataModel, sourceMeasure,
|
||||
scope, schema, nonAgg);
|
||||
Set<String> dimension = dataSource.getDimensions().stream().map(dd -> dd.getName())
|
||||
Set<String> dimension = dataModel.getDimensions().stream().map(dd -> dd.getName())
|
||||
.collect(Collectors.toSet());
|
||||
doDimension(innerSelect, filterDimension, queryDimension, reqDimension, dataSource,
|
||||
doDimension(innerSelect, filterDimension, queryDimension, reqDimension, dataModel,
|
||||
dimension, scope, schema);
|
||||
List<String> primary = new ArrayList<>();
|
||||
for (Identify identify : dataSource.getIdentifiers()) {
|
||||
for (Identify identify : dataModel.getIdentifiers()) {
|
||||
primary.add(identify.getName());
|
||||
if (!fieldWhere.contains(identify.getName())) {
|
||||
fieldWhere.add(identify.getName());
|
||||
}
|
||||
}
|
||||
List<String> dataSourceWhere = new ArrayList<>(fieldWhere);
|
||||
addZipperField(dataSource, dataSourceWhere);
|
||||
addZipperField(dataModel, dataSourceWhere);
|
||||
TableView tableView =
|
||||
SourceRender.renderOne("", dataSourceWhere, queryMetrics, queryDimension,
|
||||
metricCommand.getWhere(), dataSources.get(i), scope, schema, true);
|
||||
metricCommand.getWhere(), dataModels.get(i), scope, schema, true);
|
||||
log.info("tableView {}", StringUtils.normalizeSpace(tableView.getTable().toString()));
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataSource.getName();
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
|
||||
tableView.setAlias(alias);
|
||||
tableView.setPrimary(primary);
|
||||
tableView.setDataSource(dataSource);
|
||||
tableView.setDataModel(dataModel);
|
||||
if (left == null) {
|
||||
leftTable = tableView;
|
||||
left = SemanticNode.buildAs(tableView.getAlias(), getTable(tableView, scope));
|
||||
beforeSources.put(dataSource.getName(), leftTable.getAlias());
|
||||
beforeSources.put(dataModel.getName(), leftTable.getAlias());
|
||||
continue;
|
||||
}
|
||||
left = buildJoin(left, leftTable, tableView, beforeSources, dataSource, schema, scope);
|
||||
left = buildJoin(left, leftTable, tableView, beforeSources, dataModel, schema, scope);
|
||||
leftTable = tableView;
|
||||
beforeSources.put(dataSource.getName(), tableView.getAlias());
|
||||
beforeSources.put(dataModel.getName(), tableView.getAlias());
|
||||
}
|
||||
|
||||
for (Map.Entry<String, SqlNode> entry : innerSelect.entrySet()) {
|
||||
@@ -144,16 +144,15 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private void doMetric(Map<String, SqlNode> innerSelect, TableView filterView,
|
||||
List<String> queryMetrics, List<String> reqMetrics, DataSource dataSource,
|
||||
Set<String> sourceMeasure, SqlValidatorScope scope, SemanticSchema schema,
|
||||
List<String> queryMetrics, List<String> reqMetrics, DataModel dataModel,
|
||||
Set<String> sourceMeasure, SqlValidatorScope scope, S2SemanticSchema schema,
|
||||
boolean nonAgg) throws Exception {
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataSource.getName();
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
|
||||
EngineType engineType =
|
||||
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
|
||||
for (String m : reqMetrics) {
|
||||
if (getMatchMetric(schema, sourceMeasure, m, queryMetrics)) {
|
||||
MetricNode metricNode =
|
||||
buildMetricNode(m, dataSource, scope, schema, nonAgg, alias);
|
||||
MetricNode metricNode = buildMetricNode(m, dataModel, scope, schema, nonAgg, alias);
|
||||
|
||||
if (!metricNode.getNonAggNode().isEmpty()) {
|
||||
for (String measure : metricNode.getNonAggNode().keySet()) {
|
||||
@@ -181,14 +180,14 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private void doDimension(Map<String, SqlNode> innerSelect, Set<String> filterDimension,
|
||||
List<String> queryDimension, List<String> reqDimensions, DataSource dataSource,
|
||||
Set<String> dimension, SqlValidatorScope scope, SemanticSchema schema)
|
||||
List<String> queryDimension, List<String> reqDimensions, DataModel dataModel,
|
||||
Set<String> dimension, SqlValidatorScope scope, S2SemanticSchema schema)
|
||||
throws Exception {
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataSource.getName();
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
|
||||
EngineType engineType =
|
||||
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
|
||||
for (String d : reqDimensions) {
|
||||
if (getMatchDimension(schema, dimension, dataSource, d, queryDimension)) {
|
||||
if (getMatchDimension(schema, dimension, dataModel, d, queryDimension)) {
|
||||
if (d.contains(Constants.DIMENSION_IDENTIFY)) {
|
||||
String[] identifyDimension = d.split(Constants.DIMENSION_IDENTIFY);
|
||||
innerSelect.put(d, SemanticNode.buildAs(d, SemanticNode
|
||||
@@ -209,7 +208,7 @@ public class JoinRender extends Renderer {
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private boolean getMatchMetric(SemanticSchema schema, Set<String> sourceMeasure, String m,
|
||||
private boolean getMatchMetric(S2SemanticSchema schema, Set<String> sourceMeasure, String m,
|
||||
List<String> queryMetrics) {
|
||||
Optional<Metric> metric = schema.getMetrics().stream()
|
||||
.filter(mm -> mm.getName().equalsIgnoreCase(m)).findFirst();
|
||||
@@ -230,8 +229,8 @@ public class JoinRender extends Renderer {
|
||||
return isAdd;
|
||||
}
|
||||
|
||||
private boolean getMatchDimension(SemanticSchema schema, Set<String> sourceDimension,
|
||||
DataSource dataSource, String d, List<String> queryDimension) {
|
||||
private boolean getMatchDimension(S2SemanticSchema schema, Set<String> sourceDimension,
|
||||
DataModel dataModel, String d, List<String> queryDimension) {
|
||||
String oriDimension = d;
|
||||
boolean isAdd = false;
|
||||
if (d.contains(Constants.DIMENSION_IDENTIFY)) {
|
||||
@@ -240,14 +239,14 @@ public class JoinRender extends Renderer {
|
||||
if (sourceDimension.contains(oriDimension)) {
|
||||
isAdd = true;
|
||||
}
|
||||
for (Identify identify : dataSource.getIdentifiers()) {
|
||||
for (Identify identify : dataModel.getIdentifiers()) {
|
||||
if (identify.getName().equalsIgnoreCase(oriDimension)) {
|
||||
isAdd = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (schema.getDimension().containsKey(dataSource.getName())) {
|
||||
for (Dimension dim : schema.getDimension().get(dataSource.getName())) {
|
||||
if (schema.getDimension().containsKey(dataModel.getName())) {
|
||||
for (Dimension dim : schema.getDimension().get(dataModel.getName())) {
|
||||
if (dim.getName().equalsIgnoreCase(oriDimension)) {
|
||||
isAdd = true;
|
||||
}
|
||||
@@ -264,12 +263,12 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private SqlNode buildJoin(SqlNode left, TableView leftTable, TableView tableView,
|
||||
Map<String, String> before, DataSource dataSource, SemanticSchema schema,
|
||||
Map<String, String> before, DataModel dataModel, S2SemanticSchema schema,
|
||||
SqlValidatorScope scope) throws Exception {
|
||||
EngineType engineType =
|
||||
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
|
||||
SqlNode condition =
|
||||
getCondition(leftTable, tableView, dataSource, schema, scope, engineType);
|
||||
getCondition(leftTable, tableView, dataModel, schema, scope, engineType);
|
||||
SqlLiteral sqlLiteral = SemanticNode.getJoinSqlLiteral("");
|
||||
JoinRelation matchJoinRelation = getMatchJoinRelation(before, tableView, schema);
|
||||
SqlNode joinRelationCondition = null;
|
||||
@@ -278,11 +277,11 @@ public class JoinRender extends Renderer {
|
||||
joinRelationCondition = getCondition(matchJoinRelation, scope, engineType);
|
||||
condition = joinRelationCondition;
|
||||
}
|
||||
if (Materialization.TimePartType.ZIPPER.equals(leftTable.getDataSource().getTimePartType())
|
||||
if (Materialization.TimePartType.ZIPPER.equals(leftTable.getDataModel().getTimePartType())
|
||||
|| Materialization.TimePartType.ZIPPER
|
||||
.equals(tableView.getDataSource().getTimePartType())) {
|
||||
.equals(tableView.getDataModel().getTimePartType())) {
|
||||
SqlNode zipperCondition =
|
||||
getZipperCondition(leftTable, tableView, dataSource, schema, scope);
|
||||
getZipperCondition(leftTable, tableView, dataModel, schema, scope);
|
||||
if (Objects.nonNull(joinRelationCondition)) {
|
||||
condition = new SqlBasicCall(SqlStdOperatorTable.AND,
|
||||
new ArrayList<>(Arrays.asList(zipperCondition, joinRelationCondition)),
|
||||
@@ -299,11 +298,11 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private JoinRelation getMatchJoinRelation(Map<String, String> before, TableView tableView,
|
||||
SemanticSchema schema) {
|
||||
S2SemanticSchema schema) {
|
||||
JoinRelation matchJoinRelation = JoinRelation.builder().build();
|
||||
if (!CollectionUtils.isEmpty(schema.getJoinRelations())) {
|
||||
for (JoinRelation joinRelation : schema.getJoinRelations()) {
|
||||
if (joinRelation.getRight().equalsIgnoreCase(tableView.getDataSource().getName())
|
||||
if (joinRelation.getRight().equalsIgnoreCase(tableView.getDataModel().getName())
|
||||
&& before.containsKey(joinRelation.getLeft())) {
|
||||
matchJoinRelation.setJoinCondition(joinRelation.getJoinCondition().stream()
|
||||
.map(r -> Triple.of(
|
||||
@@ -338,8 +337,8 @@ public class JoinRender extends Renderer {
|
||||
return condition;
|
||||
}
|
||||
|
||||
private SqlNode getCondition(TableView left, TableView right, DataSource dataSource,
|
||||
SemanticSchema schema, SqlValidatorScope scope, EngineType engineType)
|
||||
private SqlNode getCondition(TableView left, TableView right, DataModel dataModel,
|
||||
S2SemanticSchema schema, SqlValidatorScope scope, EngineType engineType)
|
||||
throws Exception {
|
||||
|
||||
Set<String> selectLeft = SemanticNode.getSelect(left.getTable());
|
||||
@@ -347,16 +346,16 @@ public class JoinRender extends Renderer {
|
||||
selectLeft.retainAll(selectRight);
|
||||
SqlNode condition = null;
|
||||
for (String on : selectLeft) {
|
||||
if (!SourceRender.isDimension(on, dataSource, schema)) {
|
||||
if (!SourceRender.isDimension(on, dataModel, schema)) {
|
||||
continue;
|
||||
}
|
||||
if (IdentifyNode.isForeign(on, left.getDataSource().getIdentifiers())) {
|
||||
if (!IdentifyNode.isPrimary(on, right.getDataSource().getIdentifiers())) {
|
||||
if (IdentifyNode.isForeign(on, left.getDataModel().getIdentifiers())) {
|
||||
if (!IdentifyNode.isPrimary(on, right.getDataModel().getIdentifiers())) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (IdentifyNode.isForeign(on, right.getDataSource().getIdentifiers())) {
|
||||
if (!IdentifyNode.isPrimary(on, left.getDataSource().getIdentifiers())) {
|
||||
if (IdentifyNode.isForeign(on, right.getDataModel().getIdentifiers())) {
|
||||
if (!IdentifyNode.isPrimary(on, left.getDataModel().getIdentifiers())) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -396,9 +395,9 @@ public class JoinRender extends Renderer {
|
||||
visited.put(id, false);
|
||||
}
|
||||
|
||||
private void addZipperField(DataSource dataSource, List<String> fields) {
|
||||
if (Materialization.TimePartType.ZIPPER.equals(dataSource.getTimePartType())) {
|
||||
dataSource.getDimensions().stream()
|
||||
private void addZipperField(DataModel dataModel, List<String> fields) {
|
||||
if (Materialization.TimePartType.ZIPPER.equals(dataModel.getTimePartType())) {
|
||||
dataModel.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.forEach(t -> {
|
||||
if (t.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_END)
|
||||
@@ -413,18 +412,18 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
}
|
||||
|
||||
private SqlNode getZipperCondition(TableView left, TableView right, DataSource dataSource,
|
||||
SemanticSchema schema, SqlValidatorScope scope) throws Exception {
|
||||
if (Materialization.TimePartType.ZIPPER.equals(left.getDataSource().getTimePartType())
|
||||
private SqlNode getZipperCondition(TableView left, TableView right, DataModel dataModel,
|
||||
S2SemanticSchema schema, SqlValidatorScope scope) throws Exception {
|
||||
if (Materialization.TimePartType.ZIPPER.equals(left.getDataModel().getTimePartType())
|
||||
&& Materialization.TimePartType.ZIPPER
|
||||
.equals(right.getDataSource().getTimePartType())) {
|
||||
.equals(right.getDataModel().getTimePartType())) {
|
||||
throw new Exception("not support two zipper table");
|
||||
}
|
||||
SqlNode condition = null;
|
||||
Optional<Dimension> leftTime = left.getDataSource().getDimensions().stream()
|
||||
Optional<Dimension> leftTime = left.getDataModel().getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.findFirst();
|
||||
Optional<Dimension> rightTime = right.getDataSource().getDimensions().stream()
|
||||
Optional<Dimension> rightTime = right.getDataModel().getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.findFirst();
|
||||
if (leftTime.isPresent() && rightTime.isPresent()) {
|
||||
@@ -434,7 +433,7 @@ public class JoinRender extends Renderer {
|
||||
String dateTime = "";
|
||||
|
||||
Optional<Dimension> startTimeOp = (Materialization.TimePartType.ZIPPER
|
||||
.equals(left.getDataSource().getTimePartType()) ? left : right).getDataSource()
|
||||
.equals(left.getDataModel().getTimePartType()) ? left : right).getDataModel()
|
||||
.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME
|
||||
.equalsIgnoreCase(d.getType()))
|
||||
@@ -442,7 +441,7 @@ public class JoinRender extends Renderer {
|
||||
.startsWith(Constants.MATERIALIZATION_ZIPPER_START))
|
||||
.findFirst();
|
||||
Optional<Dimension> endTimeOp = (Materialization.TimePartType.ZIPPER
|
||||
.equals(left.getDataSource().getTimePartType()) ? left : right).getDataSource()
|
||||
.equals(left.getDataModel().getTimePartType()) ? left : right).getDataModel()
|
||||
.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME
|
||||
.equalsIgnoreCase(d.getType()))
|
||||
@@ -451,11 +450,11 @@ public class JoinRender extends Renderer {
|
||||
.findFirst();
|
||||
if (startTimeOp.isPresent() && endTimeOp.isPresent()) {
|
||||
TableView zipper = Materialization.TimePartType.ZIPPER
|
||||
.equals(left.getDataSource().getTimePartType()) ? left : right;
|
||||
.equals(left.getDataModel().getTimePartType()) ? left : right;
|
||||
TableView partMetric = Materialization.TimePartType.ZIPPER
|
||||
.equals(left.getDataSource().getTimePartType()) ? right : left;
|
||||
.equals(left.getDataModel().getTimePartType()) ? right : left;
|
||||
Optional<Dimension> partTime = Materialization.TimePartType.ZIPPER
|
||||
.equals(left.getDataSource().getTimePartType()) ? rightTime : leftTime;
|
||||
.equals(left.getDataModel().getTimePartType()) ? rightTime : leftTime;
|
||||
startTime = zipper.getAlias() + "." + startTimeOp.get().getName();
|
||||
endTime = zipper.getAlias() + "." + endTimeOp.get().getName();
|
||||
dateTime = partMetric.getAlias() + "." + partTime.get().getName();
|
||||
|
||||
@@ -3,8 +3,8 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
|
||||
import com.tencent.supersonic.common.pojo.ColumnOrder;
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.MetricNode;
|
||||
@@ -23,8 +23,8 @@ import java.util.List;
|
||||
public class OutputRender extends Renderer {
|
||||
|
||||
@Override
|
||||
public void render(MetricQueryParam metricCommand, List<DataSource> dataSources,
|
||||
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
public void render(MetricQueryParam metricCommand, List<DataModel> dataModels,
|
||||
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
TableView selectDataSet = super.tableView;
|
||||
EngineType engineType =
|
||||
EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
|
||||
|
||||
@@ -3,13 +3,13 @@ package com.tencent.supersonic.headless.core.translator.calcite.sql.render;
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Identify;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Materialization;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.Renderer;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.TableView;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.sql.node.DataSourceNode;
|
||||
@@ -21,7 +21,6 @@ import com.tencent.supersonic.headless.core.translator.calcite.sql.node.Semantic
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.apache.calcite.util.Litmus;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@@ -44,7 +43,7 @@ public class SourceRender extends Renderer {
|
||||
|
||||
public static TableView renderOne(String alias, List<String> fieldWheres,
|
||||
List<String> reqMetrics, List<String> reqDimensions, String queryWhere,
|
||||
DataSource datasource, SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg)
|
||||
DataModel datasource, SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg)
|
||||
throws Exception {
|
||||
|
||||
TableView dataSet = new TableView();
|
||||
@@ -107,8 +106,8 @@ public class SourceRender extends Renderer {
|
||||
|
||||
|
||||
|
||||
private static void buildDimension(String alias, String dimension, DataSource datasource,
|
||||
SemanticSchema schema, boolean nonAgg, Map<String, String> extendFields,
|
||||
private static void buildDimension(String alias, String dimension, DataModel datasource,
|
||||
S2SemanticSchema schema, boolean nonAgg, Map<String, String> extendFields,
|
||||
TableView dataSet, TableView output, SqlValidatorScope scope) throws Exception {
|
||||
List<Dimension> dimensionList = schema.getDimension().get(datasource.getName());
|
||||
EngineType engineType =
|
||||
@@ -186,8 +185,8 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
|
||||
private static List<SqlNode> getWhereMeasure(List<String> fields, List<String> queryMetrics,
|
||||
List<String> queryDimensions, Map<String, String> extendFields, DataSource datasource,
|
||||
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
List<String> queryDimensions, Map<String, String> extendFields, DataModel datasource,
|
||||
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
Iterator<String> iterator = fields.iterator();
|
||||
List<SqlNode> whereNode = new ArrayList<>();
|
||||
EngineType engineType =
|
||||
@@ -229,8 +228,8 @@ public class SourceRender extends Renderer {
|
||||
|
||||
private static void mergeWhere(List<String> fields, TableView dataSet, TableView outputSet,
|
||||
List<String> queryMetrics, List<String> queryDimensions,
|
||||
Map<String, String> extendFields, DataSource datasource, SqlValidatorScope scope,
|
||||
SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
Map<String, String> extendFields, DataModel datasource, SqlValidatorScope scope,
|
||||
S2SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
List<SqlNode> whereNode = getWhereMeasure(fields, queryMetrics, queryDimensions,
|
||||
extendFields, datasource, scope, schema, nonAgg);
|
||||
dataSet.getMeasure().addAll(whereNode);
|
||||
@@ -238,7 +237,7 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
|
||||
public static void whereDimMetric(List<String> fields, List<String> queryMetrics,
|
||||
List<String> queryDimensions, DataSource datasource, SemanticSchema schema,
|
||||
List<String> queryDimensions, DataModel datasource, S2SemanticSchema schema,
|
||||
Set<String> dimensions, Set<String> metrics) {
|
||||
for (String field : fields) {
|
||||
if (queryDimensions.contains(field) || queryMetrics.contains(field)) {
|
||||
@@ -252,8 +251,8 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
}
|
||||
|
||||
private static void addField(String field, String oriField, DataSource datasource,
|
||||
SemanticSchema schema, Set<String> dimensions, Set<String> metrics) {
|
||||
private static void addField(String field, String oriField, DataModel datasource,
|
||||
S2SemanticSchema schema, Set<String> dimensions, Set<String> metrics) {
|
||||
Optional<Dimension> dimension = datasource.getDimensions().stream()
|
||||
.filter(d -> d.getName().equalsIgnoreCase(field)).findFirst();
|
||||
if (dimension.isPresent()) {
|
||||
@@ -293,7 +292,7 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isDimension(String name, DataSource datasource, SemanticSchema schema) {
|
||||
public static boolean isDimension(String name, DataModel datasource, S2SemanticSchema schema) {
|
||||
Optional<Dimension> dimension = datasource.getDimensions().stream()
|
||||
.filter(d -> d.getName().equalsIgnoreCase(name)).findFirst();
|
||||
if (dimension.isPresent()) {
|
||||
@@ -314,13 +313,13 @@ public class SourceRender extends Renderer {
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void addTimeDimension(DataSource dataSource, List<String> queryDimension) {
|
||||
if (Materialization.TimePartType.ZIPPER.equals(dataSource.getTimePartType())) {
|
||||
Optional<Dimension> startTimeOp = dataSource.getDimensions().stream()
|
||||
private static void addTimeDimension(DataModel dataModel, List<String> queryDimension) {
|
||||
if (Materialization.TimePartType.ZIPPER.equals(dataModel.getTimePartType())) {
|
||||
Optional<Dimension> startTimeOp = dataModel.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_START))
|
||||
.findFirst();
|
||||
Optional<Dimension> endTimeOp = dataSource.getDimensions().stream()
|
||||
Optional<Dimension> endTimeOp = dataModel.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.filter(d -> d.getName().startsWith(Constants.MATERIALIZATION_ZIPPER_END))
|
||||
.findFirst();
|
||||
@@ -331,7 +330,7 @@ public class SourceRender extends Renderer {
|
||||
queryDimension.add(endTimeOp.get().getName());
|
||||
}
|
||||
} else {
|
||||
Optional<Dimension> timeOp = dataSource.getDimensions().stream()
|
||||
Optional<Dimension> timeOp = dataModel.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
.findFirst();
|
||||
if (timeOp.isPresent() && !queryDimension.contains(timeOp.get().getName())) {
|
||||
@@ -340,8 +339,8 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
}
|
||||
|
||||
public void render(MetricQueryParam metricQueryParam, List<DataSource> dataSources,
|
||||
SqlValidatorScope scope, SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
public void render(MetricQueryParam metricQueryParam, List<DataModel> dataModels,
|
||||
SqlValidatorScope scope, S2SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
String queryWhere = metricQueryParam.getWhere();
|
||||
Set<String> whereFields = new HashSet<>();
|
||||
List<String> fieldWhere = new ArrayList<>();
|
||||
@@ -352,15 +351,15 @@ public class SourceRender extends Renderer {
|
||||
FilterNode.getFilterField(sqlNode, whereFields);
|
||||
fieldWhere = whereFields.stream().collect(Collectors.toList());
|
||||
}
|
||||
if (dataSources.size() == 1) {
|
||||
DataSource dataSource = dataSources.get(0);
|
||||
if (dataModels.size() == 1) {
|
||||
DataModel dataModel = dataModels.get(0);
|
||||
super.tableView = renderOne("", fieldWhere, metricQueryParam.getMetrics(),
|
||||
metricQueryParam.getDimensions(), metricQueryParam.getWhere(), dataSource,
|
||||
scope, schema, nonAgg);
|
||||
metricQueryParam.getDimensions(), metricQueryParam.getWhere(), dataModel, scope,
|
||||
schema, nonAgg);
|
||||
return;
|
||||
}
|
||||
JoinRender joinRender = new JoinRender();
|
||||
joinRender.render(metricQueryParam, dataSources, scope, schema, nonAgg);
|
||||
joinRender.render(metricQueryParam, dataModels, scope, schema, nonAgg);
|
||||
super.tableView = joinRender.getTableView();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import com.tencent.supersonic.common.util.ContextUtils;
|
||||
import com.tencent.supersonic.headless.api.pojo.QueryParam;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.utils.SqlGenerateUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
@@ -60,7 +60,7 @@ public class ParserDefaultConverter implements QueryConverter {
|
||||
// support detail query
|
||||
if (queryParam.getQueryType().isNativeAggQuery()
|
||||
&& CollectionUtils.isEmpty(metricQueryParam.getMetrics())) {
|
||||
Map<Long, DataSource> modelMap = queryStatement.getSemanticModel().getModelMap();
|
||||
Map<Long, DataModel> modelMap = queryStatement.getSemanticModel().getModelMap();
|
||||
for (Long modelId : modelMap.keySet()) {
|
||||
String modelBizName = modelMap.get(modelId).getName();
|
||||
String internalMetricName =
|
||||
|
||||
@@ -4,7 +4,7 @@ import com.tencent.supersonic.headless.api.pojo.enums.ModelDefineType;
|
||||
import com.tencent.supersonic.headless.api.pojo.response.ModelResp;
|
||||
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.utils.SqlVariableParseUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
@@ -39,9 +39,9 @@ public class SqlVariableParseConverter implements QueryConverter {
|
||||
SqlVariableParseUtils.parse(modelResp.getModelDetail().getSqlQuery(),
|
||||
modelResp.getModelDetail().getSqlVariables(),
|
||||
queryStatement.getQueryParam().getParams());
|
||||
DataSource dataSource = queryStatement.getSemanticModel().getDatasourceMap()
|
||||
DataModel dataModel = queryStatement.getSemanticModel().getDatasourceMap()
|
||||
.get(modelResp.getBizName());
|
||||
dataSource.setSqlQuery(sqlParsed);
|
||||
dataModel.setSqlQuery(sqlParsed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import com.tencent.supersonic.headless.api.pojo.response.DatabaseResp;
|
||||
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
|
||||
import com.tencent.supersonic.headless.api.pojo.response.TagResp;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DataType;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Dimension;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.DimensionTimeTypeParams;
|
||||
@@ -19,7 +19,7 @@ import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Measure;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.Metric;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.MetricTypeParams;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.s2sql.SemanticModel;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.server.pojo.yaml.DataModelYamlTpl;
|
||||
import com.tencent.supersonic.headless.server.pojo.yaml.DimensionTimeTypeParamsTpl;
|
||||
import com.tencent.supersonic.headless.server.pojo.yaml.DimensionYamlTpl;
|
||||
@@ -73,9 +73,9 @@ public class SemanticSchemaManager {
|
||||
getJoinRelation(semanticSchemaResp.getModelRelas(), modelIdName));
|
||||
}
|
||||
if (!dataModelYamlTpls.isEmpty()) {
|
||||
Map<String, DataSource> dataSourceMap =
|
||||
Map<String, DataModel> dataSourceMap =
|
||||
dataModelYamlTpls.stream().map(SemanticSchemaManager::getDatasource).collect(
|
||||
Collectors.toMap(DataSource::getName, item -> item, (k1, k2) -> k1));
|
||||
Collectors.toMap(DataModel::getName, item -> item, (k1, k2) -> k1));
|
||||
semanticModel.setDatasourceMap(dataSourceMap);
|
||||
}
|
||||
if (!dimensionYamlTpls.isEmpty()) {
|
||||
@@ -107,8 +107,7 @@ public class SemanticSchemaManager {
|
||||
}
|
||||
if (Objects.nonNull(semanticModel.getDatasourceMap())
|
||||
&& !semanticModel.getDatasourceMap().isEmpty()) {
|
||||
for (Map.Entry<String, DataSource> entry : semanticModel.getDatasourceMap()
|
||||
.entrySet()) {
|
||||
for (Map.Entry<String, DataModel> entry : semanticModel.getDatasourceMap().entrySet()) {
|
||||
List<Dimension> modelDimensions = new ArrayList<>();
|
||||
if (!semanticModel.getDimensionMap().containsKey(entry.getKey())) {
|
||||
semanticModel.getDimensionMap().put(entry.getKey(), modelDimensions);
|
||||
@@ -178,8 +177,8 @@ public class SemanticSchemaManager {
|
||||
return getDimension(t);
|
||||
}
|
||||
|
||||
public static DataSource getDatasource(final DataModelYamlTpl d) {
|
||||
DataSource datasource = DataSource.builder().id(d.getId()).sourceId(d.getSourceId())
|
||||
public static DataModel getDatasource(final DataModelYamlTpl d) {
|
||||
DataModel datasource = DataModel.builder().id(d.getId()).sourceId(d.getSourceId())
|
||||
.type(d.getType()).sqlQuery(d.getSqlQuery()).name(d.getName())
|
||||
.tableQuery(d.getTableQuery()).identifiers(getIdentify(d.getIdentifiers()))
|
||||
.measures(getMeasureParams(d.getMeasures()))
|
||||
@@ -356,17 +355,17 @@ public class SemanticSchemaManager {
|
||||
return joinRelations;
|
||||
}
|
||||
|
||||
public static void update(SemanticSchema schema, List<Metric> metric) throws Exception {
|
||||
public static void update(S2SemanticSchema schema, List<Metric> metric) throws Exception {
|
||||
if (schema != null) {
|
||||
updateMetric(metric, schema.getMetrics());
|
||||
}
|
||||
}
|
||||
|
||||
public static void update(SemanticSchema schema, DataSource datasourceYamlTpl)
|
||||
public static void update(S2SemanticSchema schema, DataModel datasourceYamlTpl)
|
||||
throws Exception {
|
||||
if (schema != null) {
|
||||
String dataSourceName = datasourceYamlTpl.getName();
|
||||
Optional<Entry<String, DataSource>> datasourceYamlTplMap =
|
||||
Optional<Entry<String, DataModel>> datasourceYamlTplMap =
|
||||
schema.getDatasource().entrySet().stream()
|
||||
.filter(t -> t.getKey().equalsIgnoreCase(dataSourceName)).findFirst();
|
||||
if (datasourceYamlTplMap.isPresent()) {
|
||||
@@ -377,7 +376,7 @@ public class SemanticSchemaManager {
|
||||
}
|
||||
}
|
||||
|
||||
public static void update(SemanticSchema schema, String datasourceBizName,
|
||||
public static void update(S2SemanticSchema schema, String datasourceBizName,
|
||||
List<Dimension> dimensionYamlTpls) throws Exception {
|
||||
if (schema != null) {
|
||||
Optional<Map.Entry<String, List<Dimension>>> datasourceYamlTplMap = schema
|
||||
|
||||
@@ -7,7 +7,7 @@ import com.tencent.supersonic.headless.api.pojo.response.SqlParserResp;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.planner.AggPlanner;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.translator.calcite.schema.S2SemanticSchema;
|
||||
import com.tencent.supersonic.headless.server.manager.SemanticSchemaManager;
|
||||
import com.tencent.supersonic.headless.server.pojo.yaml.DataModelYamlTpl;
|
||||
import com.tencent.supersonic.headless.server.pojo.yaml.DimensionTimeTypeParamsTpl;
|
||||
@@ -27,9 +27,9 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
class HeadlessParserServiceTest {
|
||||
|
||||
private static Map<String, SemanticSchema> headlessSchemaMap = new HashMap<>();
|
||||
private static Map<String, S2SemanticSchema> headlessSchemaMap = new HashMap<>();
|
||||
|
||||
public static SqlParserResp parser(SemanticSchema semanticSchema,
|
||||
public static SqlParserResp parser(S2SemanticSchema semanticSchema,
|
||||
MetricQueryParam metricQueryParam, boolean isAgg) {
|
||||
SqlParserResp sqlParser = new SqlParserResp();
|
||||
try {
|
||||
@@ -40,7 +40,7 @@ class HeadlessParserServiceTest {
|
||||
AggPlanner aggBuilder = new AggPlanner(semanticSchema);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setMetricQueryParam(metricQueryParam);
|
||||
aggBuilder.explain(queryStatement, AggOption.getAggregation(!isAgg));
|
||||
aggBuilder.plan(queryStatement, AggOption.getAggregation(!isAgg));
|
||||
EngineType engineType = EngineType
|
||||
.fromString(semanticSchema.getSemanticModel().getDatabase().getType());
|
||||
sqlParser.setSql(aggBuilder.getSql(engineType));
|
||||
@@ -122,7 +122,7 @@ class HeadlessParserServiceTest {
|
||||
identify.setType("primary");
|
||||
identifies.add(identify);
|
||||
datasource.setIdentifiers(identifies);
|
||||
SemanticSchema semanticSchema = SemanticSchema.newBuilder("1").build();
|
||||
S2SemanticSchema semanticSchema = S2SemanticSchema.newBuilder("1").build();
|
||||
|
||||
SemanticSchemaManager.update(semanticSchema,
|
||||
SemanticSchemaManager.getDatasource(datasource));
|
||||
@@ -192,7 +192,7 @@ class HeadlessParserServiceTest {
|
||||
System.out.println(parser(semanticSchema, metricCommand2, true));
|
||||
}
|
||||
|
||||
private static void addDepartment(SemanticSchema semanticSchema) {
|
||||
private static void addDepartment(S2SemanticSchema semanticSchema) {
|
||||
DataModelYamlTpl datasource = new DataModelYamlTpl();
|
||||
datasource.setName("user_department");
|
||||
datasource.setSourceId(1L);
|
||||
|
||||
@@ -50,8 +50,7 @@ public class SchemaAuthTest extends BaseTest {
|
||||
@Test
|
||||
public void test_getVisibleModelList_alice() {
|
||||
User user = DataUtils.getUserAlice();
|
||||
List<ModelResp> modelResps =
|
||||
modelService.getModelListWithAuth(user, null, AuthType.VIEWER);
|
||||
List<ModelResp> modelResps = modelService.getModelListWithAuth(user, null, AuthType.VIEWER);
|
||||
List<String> expectedModelBizNames = Lists.newArrayList("user_department", "singer");
|
||||
Assertions.assertEquals(expectedModelBizNames,
|
||||
modelResps.stream().map(ModelResp::getBizName).collect(Collectors.toList()));
|
||||
|
||||
Reference in New Issue
Block a user