mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-15 06:27:21 +00:00
[improvement][headless]Clean code logic of headless translator.
This commit is contained in:
@@ -1,16 +1,17 @@
|
||||
package com.tencent.supersonic.headless.core.translator.calcite.s2sql;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.tencent.supersonic.common.pojo.ColumnOrder;
|
||||
import com.tencent.supersonic.headless.api.pojo.enums.AggOption;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@Data
|
||||
public class OntologyQueryParam {
|
||||
private List<String> metrics = Lists.newArrayList();
|
||||
private List<String> dimensions = Lists.newArrayList();
|
||||
private Set<String> metrics = Sets.newHashSet();
|
||||
private Set<String> dimensions = Sets.newHashSet();
|
||||
private String where;
|
||||
private Long limit;
|
||||
private List<ColumnOrder> order;
|
||||
|
||||
@@ -37,12 +37,6 @@ public class SqlBuilder {
|
||||
|
||||
public String buildOntologySql(QueryStatement queryStatement) throws Exception {
|
||||
this.ontologyQueryParam = queryStatement.getOntologyQueryParam();
|
||||
if (ontologyQueryParam.getMetrics() == null) {
|
||||
ontologyQueryParam.setMetrics(new ArrayList<>());
|
||||
}
|
||||
if (ontologyQueryParam.getDimensions() == null) {
|
||||
ontologyQueryParam.setDimensions(new ArrayList<>());
|
||||
}
|
||||
if (ontologyQueryParam.getLimit() == null) {
|
||||
ontologyQueryParam.setLimit(0L);
|
||||
}
|
||||
|
||||
@@ -150,23 +150,23 @@ public class DataModelNode extends SemanticNode {
|
||||
}
|
||||
|
||||
public static void getQueryDimensionMeasure(S2CalciteSchema schema,
|
||||
OntologyQueryParam metricCommand, Set<String> queryDimension, List<String> measures) {
|
||||
queryDimension.addAll(metricCommand.getDimensions().stream()
|
||||
OntologyQueryParam queryParam, Set<String> queryDimensions, Set<String> queryMeasures) {
|
||||
queryDimensions.addAll(queryParam.getDimensions().stream()
|
||||
.map(d -> d.contains(Constants.DIMENSION_IDENTIFY)
|
||||
? d.split(Constants.DIMENSION_IDENTIFY)[1]
|
||||
: d)
|
||||
.collect(Collectors.toSet()));
|
||||
Set<String> schemaMetricName =
|
||||
schema.getMetrics().stream().map(m -> m.getName()).collect(Collectors.toSet());
|
||||
schema.getMetrics().stream().filter(m -> metricCommand.getMetrics().contains(m.getName()))
|
||||
schema.getMetrics().stream().filter(m -> queryParam.getMetrics().contains(m.getName()))
|
||||
.forEach(m -> m.getMetricTypeParams().getMeasures().stream()
|
||||
.forEach(mm -> measures.add(mm.getName())));
|
||||
metricCommand.getMetrics().stream().filter(m -> !schemaMetricName.contains(m))
|
||||
.forEach(m -> measures.add(m));
|
||||
.forEach(mm -> queryMeasures.add(mm.getName())));
|
||||
queryParam.getMetrics().stream().filter(m -> !schemaMetricName.contains(m))
|
||||
.forEach(m -> queryMeasures.add(m));
|
||||
}
|
||||
|
||||
public static void mergeQueryFilterDimensionMeasure(S2CalciteSchema schema,
|
||||
OntologyQueryParam metricCommand, Set<String> queryDimension, List<String> measures,
|
||||
OntologyQueryParam metricCommand, Set<String> queryDimension, Set<String> measures,
|
||||
SqlValidatorScope scope) throws Exception {
|
||||
EngineType engineType = schema.getOntology().getDatabase().getType();
|
||||
if (Objects.nonNull(metricCommand.getWhere()) && !metricCommand.getWhere().isEmpty()) {
|
||||
@@ -192,20 +192,20 @@ public class DataModelNode extends SemanticNode {
|
||||
}
|
||||
|
||||
public static List<DataModel> getRelatedDataModels(SqlValidatorScope scope,
|
||||
S2CalciteSchema schema, OntologyQueryParam metricCommand) throws Exception {
|
||||
S2CalciteSchema schema, OntologyQueryParam queryParam) throws Exception {
|
||||
List<DataModel> dataModels = new ArrayList<>();
|
||||
|
||||
// check by metric
|
||||
List<String> measures = new ArrayList<>();
|
||||
Set<String> queryDimension = new HashSet<>();
|
||||
getQueryDimensionMeasure(schema, metricCommand, queryDimension, measures);
|
||||
Set<String> queryMeasures = new HashSet<>();
|
||||
Set<String> queryDimensions = new HashSet<>();
|
||||
getQueryDimensionMeasure(schema, queryParam, queryDimensions, queryMeasures);
|
||||
DataModel baseDataModel = null;
|
||||
// one , match measure count
|
||||
Map<String, Integer> dataSourceMeasures = new HashMap<>();
|
||||
for (Map.Entry<String, DataModel> entry : schema.getDataModels().entrySet()) {
|
||||
Set<String> sourceMeasure = entry.getValue().getMeasures().stream()
|
||||
.map(mm -> mm.getName()).collect(Collectors.toSet());
|
||||
sourceMeasure.retainAll(measures);
|
||||
sourceMeasure.retainAll(queryMeasures);
|
||||
dataSourceMeasures.put(entry.getKey(), sourceMeasure.size());
|
||||
}
|
||||
log.info("metrics: [{}]", dataSourceMeasures);
|
||||
@@ -230,17 +230,17 @@ public class DataModelNode extends SemanticNode {
|
||||
filterMeasure.addAll(sourceMeasure);
|
||||
filterMeasure.addAll(dimension);
|
||||
EngineType engineType = schema.getOntology().getDatabase().getType();
|
||||
mergeQueryFilterDimensionMeasure(schema, metricCommand, queryDimension, measures,
|
||||
mergeQueryFilterDimensionMeasure(schema, queryParam, queryDimensions, queryMeasures,
|
||||
scope);
|
||||
boolean isAllMatch = checkMatch(sourceMeasure, queryDimension, measures, dimension,
|
||||
metricCommand, scope, engineType);
|
||||
boolean isAllMatch = checkMatch(sourceMeasure, queryDimensions, queryMeasures,
|
||||
dimension, queryParam, scope, engineType);
|
||||
if (isAllMatch) {
|
||||
log.debug("baseDataModel match all ");
|
||||
return dataModels;
|
||||
}
|
||||
// find all dataSource has the same identifiers
|
||||
List<DataModel> linkDataModels = getLinkDataSourcesByJoinRelation(queryDimension,
|
||||
measures, baseDataModel, schema);
|
||||
List<DataModel> linkDataModels = getLinkDataSourcesByJoinRelation(queryDimensions,
|
||||
queryMeasures, baseDataModel, schema);
|
||||
if (CollectionUtils.isEmpty(linkDataModels)) {
|
||||
log.debug("baseDataModel get by identifiers ");
|
||||
Set<String> baseIdentifiers = baseDataModel.getIdentifiers().stream()
|
||||
@@ -249,24 +249,23 @@ public class DataModelNode extends SemanticNode {
|
||||
throw new Exception(
|
||||
"datasource error : " + baseDataModel.getName() + " miss identifier");
|
||||
}
|
||||
linkDataModels = getLinkDataSources(baseIdentifiers, queryDimension, measures,
|
||||
linkDataModels = getLinkDataSources(baseIdentifiers, queryDimensions, queryMeasures,
|
||||
baseDataModel, schema);
|
||||
if (linkDataModels.isEmpty()) {
|
||||
throw new Exception(String.format(
|
||||
"not find the match datasource : dimension[%s],measure[%s]",
|
||||
queryDimension, measures));
|
||||
queryDimensions, queryMeasures));
|
||||
}
|
||||
}
|
||||
log.debug("linkDataModels {}", linkDataModels);
|
||||
return linkDataModels;
|
||||
// dataModels.addAll(linkDataModels);
|
||||
}
|
||||
|
||||
return dataModels;
|
||||
}
|
||||
|
||||
private static boolean checkMatch(Set<String> sourceMeasure, Set<String> queryDimension,
|
||||
List<String> measures, Set<String> dimension, OntologyQueryParam metricCommand,
|
||||
Set<String> measures, Set<String> dimension, OntologyQueryParam metricCommand,
|
||||
SqlValidatorScope scope, EngineType engineType) throws Exception {
|
||||
boolean isAllMatch = true;
|
||||
sourceMeasure.retainAll(measures);
|
||||
@@ -299,7 +298,7 @@ public class DataModelNode extends SemanticNode {
|
||||
}
|
||||
|
||||
private static List<DataModel> getLinkDataSourcesByJoinRelation(Set<String> queryDimension,
|
||||
List<String> measures, DataModel baseDataModel, S2CalciteSchema schema) {
|
||||
Set<String> measures, DataModel baseDataModel, S2CalciteSchema schema) {
|
||||
Set<String> linkDataSourceName = new HashSet<>();
|
||||
List<DataModel> linkDataModels = new ArrayList<>();
|
||||
Set<String> before = new HashSet<>();
|
||||
@@ -383,7 +382,7 @@ public class DataModelNode extends SemanticNode {
|
||||
}
|
||||
|
||||
private static List<DataModel> getLinkDataSources(Set<String> baseIdentifiers,
|
||||
Set<String> queryDimension, List<String> measures, DataModel baseDataModel,
|
||||
Set<String> queryDimension, Set<String> measures, DataModel baseDataModel,
|
||||
S2CalciteSchema schema) {
|
||||
Set<String> linkDataSourceName = new HashSet<>();
|
||||
List<DataModel> linkDataModels = new ArrayList<>();
|
||||
|
||||
@@ -59,7 +59,7 @@ public class JoinRender extends Renderer {
|
||||
fieldWhere = whereFields.stream().collect(Collectors.toList());
|
||||
}
|
||||
Set<String> queryAllDimension = new HashSet<>();
|
||||
List<String> measures = new ArrayList<>();
|
||||
Set<String> measures = new HashSet<>();
|
||||
DataModelNode.getQueryDimensionMeasure(schema, metricCommand, queryAllDimension, measures);
|
||||
SqlNode left = null;
|
||||
TableView leftTable = null;
|
||||
@@ -73,8 +73,8 @@ public class JoinRender extends Renderer {
|
||||
final DataModel dataModel = dataModels.get(i);
|
||||
final Set<String> filterDimensions = new HashSet<>();
|
||||
final Set<String> filterMetrics = new HashSet<>();
|
||||
final List<String> queryDimension = new ArrayList<>();
|
||||
final List<String> queryMetrics = new ArrayList<>();
|
||||
final Set<String> queryDimension = new HashSet<>();
|
||||
final Set<String> queryMetrics = new HashSet<>();
|
||||
SourceRender.whereDimMetric(fieldWhere, queryMetrics, queryDimension, dataModel, schema,
|
||||
filterDimensions, filterMetrics);
|
||||
List<String> reqMetric = new ArrayList<>(metricCommand.getMetrics());
|
||||
@@ -142,7 +142,7 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private void doMetric(Map<String, SqlNode> innerSelect, TableView filterView,
|
||||
List<String> queryMetrics, List<String> reqMetrics, DataModel dataModel,
|
||||
Set<String> queryMetrics, List<String> reqMetrics, DataModel dataModel,
|
||||
Set<String> sourceMeasure, SqlValidatorScope scope, S2CalciteSchema schema,
|
||||
boolean nonAgg) throws Exception {
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
|
||||
@@ -177,7 +177,7 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private void doDimension(Map<String, SqlNode> innerSelect, Set<String> filterDimension,
|
||||
List<String> queryDimension, List<String> reqDimensions, DataModel dataModel,
|
||||
Set<String> queryDimension, List<String> reqDimensions, DataModel dataModel,
|
||||
Set<String> dimension, SqlValidatorScope scope, S2CalciteSchema schema)
|
||||
throws Exception {
|
||||
String alias = Constants.JOIN_TABLE_PREFIX + dataModel.getName();
|
||||
@@ -205,7 +205,7 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private boolean getMatchMetric(S2CalciteSchema schema, Set<String> sourceMeasure, String m,
|
||||
List<String> queryMetrics) {
|
||||
Set<String> queryMetrics) {
|
||||
Optional<Metric> metric = schema.getMetrics().stream()
|
||||
.filter(mm -> mm.getName().equalsIgnoreCase(m)).findFirst();
|
||||
boolean isAdd = false;
|
||||
@@ -226,7 +226,7 @@ public class JoinRender extends Renderer {
|
||||
}
|
||||
|
||||
private boolean getMatchDimension(S2CalciteSchema schema, Set<String> sourceDimension,
|
||||
DataModel dataModel, String d, List<String> queryDimension) {
|
||||
DataModel dataModel, String d, Set<String> queryDimension) {
|
||||
String oriDimension = d;
|
||||
boolean isAdd = false;
|
||||
if (d.contains(Constants.DIMENSION_IDENTIFY)) {
|
||||
|
||||
@@ -41,14 +41,14 @@ import static com.tencent.supersonic.headless.core.translator.calcite.s2sql.Cons
|
||||
public class SourceRender extends Renderer {
|
||||
|
||||
public static TableView renderOne(String alias, List<String> fieldWheres,
|
||||
List<String> reqMetrics, List<String> reqDimensions, String queryWhere,
|
||||
Set<String> reqMetrics, Set<String> reqDimensions, String queryWhere,
|
||||
DataModel datasource, SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg)
|
||||
throws Exception {
|
||||
|
||||
TableView dataSet = new TableView();
|
||||
TableView output = new TableView();
|
||||
List<String> queryMetrics = new ArrayList<>(reqMetrics);
|
||||
List<String> queryDimensions = new ArrayList<>(reqDimensions);
|
||||
Set<String> queryMetrics = new HashSet<>(reqMetrics);
|
||||
Set<String> queryDimensions = new HashSet<>(reqDimensions);
|
||||
List<String> fieldWhere = new ArrayList<>(fieldWheres);
|
||||
Map<String, String> extendFields = new HashMap<>();
|
||||
if (!fieldWhere.isEmpty()) {
|
||||
@@ -57,9 +57,7 @@ public class SourceRender extends Renderer {
|
||||
whereDimMetric(fieldWhere, queryMetrics, queryDimensions, datasource, schema,
|
||||
dimensions, metrics);
|
||||
queryMetrics.addAll(metrics);
|
||||
queryMetrics = uniqList(queryMetrics);
|
||||
queryDimensions.addAll(dimensions);
|
||||
queryDimensions = uniqList(queryDimensions);
|
||||
mergeWhere(fieldWhere, dataSet, output, queryMetrics, queryDimensions, extendFields,
|
||||
datasource, scope, schema, nonAgg);
|
||||
}
|
||||
@@ -182,8 +180,8 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
}
|
||||
|
||||
private static List<SqlNode> getWhereMeasure(List<String> fields, List<String> queryMetrics,
|
||||
List<String> queryDimensions, Map<String, String> extendFields, DataModel datasource,
|
||||
private static List<SqlNode> getWhereMeasure(List<String> fields, Set<String> queryMetrics,
|
||||
Set<String> queryDimensions, Map<String, String> extendFields, DataModel datasource,
|
||||
SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg) throws Exception {
|
||||
Iterator<String> iterator = fields.iterator();
|
||||
List<SqlNode> whereNode = new ArrayList<>();
|
||||
@@ -224,17 +222,17 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
|
||||
private static void mergeWhere(List<String> fields, TableView dataSet, TableView outputSet,
|
||||
List<String> queryMetrics, List<String> queryDimensions,
|
||||
Map<String, String> extendFields, DataModel datasource, SqlValidatorScope scope,
|
||||
S2CalciteSchema schema, boolean nonAgg) throws Exception {
|
||||
Set<String> queryMetrics, Set<String> queryDimensions, Map<String, String> extendFields,
|
||||
DataModel datasource, SqlValidatorScope scope, S2CalciteSchema schema, boolean nonAgg)
|
||||
throws Exception {
|
||||
List<SqlNode> whereNode = getWhereMeasure(fields, queryMetrics, queryDimensions,
|
||||
extendFields, datasource, scope, schema, nonAgg);
|
||||
dataSet.getMeasure().addAll(whereNode);
|
||||
// getWhere(outputSet,fields,queryMetrics,queryDimensions,datasource,scope,schema);
|
||||
}
|
||||
|
||||
public static void whereDimMetric(List<String> fields, List<String> queryMetrics,
|
||||
List<String> queryDimensions, DataModel datasource, S2CalciteSchema schema,
|
||||
public static void whereDimMetric(List<String> fields, Set<String> queryMetrics,
|
||||
Set<String> queryDimensions, DataModel datasource, S2CalciteSchema schema,
|
||||
Set<String> dimensions, Set<String> metrics) {
|
||||
for (String field : fields) {
|
||||
if (queryDimensions.contains(field) || queryMetrics.contains(field)) {
|
||||
@@ -310,7 +308,7 @@ public class SourceRender extends Renderer {
|
||||
return false;
|
||||
}
|
||||
|
||||
private static void addTimeDimension(DataModel dataModel, List<String> queryDimension) {
|
||||
private static void addTimeDimension(DataModel dataModel, Set<String> queryDimension) {
|
||||
if (Materialization.TimePartType.ZIPPER.equals(dataModel.getTimePartType())) {
|
||||
Optional<Dimension> startTimeOp = dataModel.getDimensions().stream()
|
||||
.filter(d -> Constants.DIMENSION_TYPE_TIME.equalsIgnoreCase(d.getType()))
|
||||
|
||||
@@ -158,7 +158,6 @@ public class SqlQueryConverter implements QueryConverter {
|
||||
ontologyParam.getMetrics().addAll(measures);
|
||||
} else {
|
||||
// empty measure , fill default
|
||||
ontologyParam.setMetrics(new ArrayList<>());
|
||||
ontologyParam.getMetrics().add(sqlGenerateUtils.generateInternalMetricName(
|
||||
getDefaultModel(semanticSchemaResp, ontologyParam.getDimensions())));
|
||||
}
|
||||
@@ -168,8 +167,8 @@ public class SqlQueryConverter implements QueryConverter {
|
||||
}
|
||||
|
||||
private Map<String, String> generateDerivedMetric(SqlGenerateUtils sqlGenerateUtils,
|
||||
SemanticSchemaResp semanticSchemaResp, AggOption aggOption, List<String> metrics,
|
||||
List<String> dimensions, Set<String> measures) {
|
||||
SemanticSchemaResp semanticSchemaResp, AggOption aggOption, Set<String> metrics,
|
||||
Set<String> dimensions, Set<String> measures) {
|
||||
Map<String, String> result = new HashMap<>();
|
||||
List<MetricSchemaResp> metricResps = semanticSchemaResp.getMetrics();
|
||||
List<DimSchemaResp> dimensionResps = semanticSchemaResp.getDimensions();
|
||||
@@ -291,7 +290,7 @@ public class SqlQueryConverter implements QueryConverter {
|
||||
return elements.stream();
|
||||
}
|
||||
|
||||
private String getDefaultModel(SemanticSchemaResp semanticSchemaResp, List<String> dimensions) {
|
||||
private String getDefaultModel(SemanticSchemaResp semanticSchemaResp, Set<String> dimensions) {
|
||||
if (!CollectionUtils.isEmpty(dimensions)) {
|
||||
Map<String, Long> modelMatchCnt = new HashMap<>();
|
||||
for (ModelResp modelResp : semanticSchemaResp.getModelResps()) {
|
||||
|
||||
Reference in New Issue
Block a user