mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-10 11:07:06 +00:00
(improvement)(headless)Optimize relationship probe in translation of multi-table join scenarios.
This commit is contained in:
@@ -133,6 +133,10 @@
|
||||
<groupId>io.trino</groupId>
|
||||
<artifactId>trino-jdbc</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jgrapht</groupId>
|
||||
<artifactId>jgrapht-core</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.tencent.supersonic.headless.core.translator.parser.calcite;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.tencent.supersonic.common.calcite.Configuration;
|
||||
import com.tencent.supersonic.common.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.api.pojo.Dimension;
|
||||
@@ -9,17 +10,26 @@ import com.tencent.supersonic.headless.api.pojo.response.DatabaseResp;
|
||||
import com.tencent.supersonic.headless.api.pojo.response.DimSchemaResp;
|
||||
import com.tencent.supersonic.headless.api.pojo.response.MetricSchemaResp;
|
||||
import com.tencent.supersonic.headless.api.pojo.response.ModelResp;
|
||||
import com.tencent.supersonic.headless.core.pojo.*;
|
||||
import com.tencent.supersonic.headless.core.pojo.JoinRelation;
|
||||
import com.tencent.supersonic.headless.core.pojo.Ontology;
|
||||
import com.tencent.supersonic.headless.core.pojo.OntologyQuery;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.translator.parser.Constants;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.*;
|
||||
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
|
||||
import org.apache.calcite.sql.parser.SqlParseException;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.jgrapht.Graph;
|
||||
import org.jgrapht.GraphPath;
|
||||
import org.jgrapht.alg.shortestpath.DijkstraShortestPath;
|
||||
import org.jgrapht.graph.DefaultEdge;
|
||||
import org.jgrapht.graph.DefaultUndirectedGraph;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -37,6 +47,8 @@ public class SqlBuilder {
|
||||
|
||||
public String buildOntologySql(QueryStatement queryStatement) throws Exception {
|
||||
OntologyQuery ontologyQuery = queryStatement.getOntologyQuery();
|
||||
Ontology ontology = queryStatement.getOntology();
|
||||
|
||||
if (ontologyQuery.getLimit() == null) {
|
||||
ontologyQuery.setLimit(0L);
|
||||
}
|
||||
@@ -46,7 +58,14 @@ public class SqlBuilder {
|
||||
throw new Exception("data model not found");
|
||||
}
|
||||
|
||||
TableView tableView = render(ontologyQuery, new ArrayList<>(dataModels), scope, schema);
|
||||
TableView tableView;
|
||||
if (!CollectionUtils.isEmpty(ontology.getJoinRelations()) && dataModels.size() > 1) {
|
||||
Set<ModelResp> models = probeRelatedModels(dataModels, queryStatement.getOntology());
|
||||
tableView = render(ontologyQuery, models, scope, schema);
|
||||
} else {
|
||||
tableView = render(ontologyQuery, dataModels, scope, schema);
|
||||
}
|
||||
|
||||
SqlNode parserNode = tableView.build();
|
||||
DatabaseResp database = queryStatement.getOntology().getDatabase();
|
||||
EngineType engineType = EngineType.fromString(database.getType());
|
||||
@@ -54,7 +73,58 @@ public class SqlBuilder {
|
||||
return SemanticNode.getSql(parserNode, engineType);
|
||||
}
|
||||
|
||||
private SqlNode optimizeParseNode(SqlNode parserNode, EngineType engineType) {
|
||||
private Set<ModelResp> probeRelatedModels(Set<ModelResp> dataModels, Ontology ontology) {
|
||||
List<JoinRelation> joinRelations = ontology.getJoinRelations();
|
||||
Graph<String, DefaultEdge> graph = buildGraph(joinRelations);
|
||||
DijkstraShortestPath<String, DefaultEdge> dijkstraAlg = new DijkstraShortestPath<>(graph);
|
||||
Set<String> queryModels =
|
||||
dataModels.stream().map(ModelResp::getName).collect(Collectors.toSet());
|
||||
GraphPath<String, DefaultEdge> selectedGraphPath = null;
|
||||
for (String fromModel : queryModels) {
|
||||
for (String toModel : queryModels) {
|
||||
if (fromModel != toModel) {
|
||||
GraphPath<String, DefaultEdge> path = dijkstraAlg.getPath(fromModel, toModel);
|
||||
if (isGraphPathContainsAll(path, queryModels)) {
|
||||
selectedGraphPath = path;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Set<String> modelNames = Sets.newHashSet();
|
||||
for (DefaultEdge edge : selectedGraphPath.getEdgeList()) {
|
||||
modelNames.add(selectedGraphPath.getGraph().getEdgeSource(edge));
|
||||
modelNames.add(selectedGraphPath.getGraph().getEdgeTarget(edge));
|
||||
}
|
||||
return modelNames.stream().map(m -> ontology.getModelMap().get(m))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
private boolean isGraphPathContainsAll(GraphPath<String, DefaultEdge> graphPath,
|
||||
Set<String> vertex) {
|
||||
Set<String> allVertex = Sets.newHashSet();
|
||||
for (DefaultEdge edge : graphPath.getEdgeList()) {
|
||||
allVertex.add(graphPath.getGraph().getEdgeSource(edge));
|
||||
allVertex.add(graphPath.getGraph().getEdgeTarget(edge));
|
||||
}
|
||||
Collection<String> intersect =
|
||||
org.apache.commons.collections.CollectionUtils.intersection(vertex, allVertex);
|
||||
|
||||
return intersect.size() == vertex.size() ? true : false;
|
||||
}
|
||||
|
||||
private Graph<String, DefaultEdge> buildGraph(List<JoinRelation> joinRelations) {
|
||||
Graph<String, DefaultEdge> directedGraph = new DefaultUndirectedGraph<>(DefaultEdge.class);
|
||||
for (JoinRelation joinRelation : joinRelations) {
|
||||
directedGraph.addVertex(joinRelation.getLeft());
|
||||
directedGraph.addVertex(joinRelation.getRight());
|
||||
directedGraph.addEdge(joinRelation.getLeft(), joinRelation.getRight());
|
||||
}
|
||||
return directedGraph;
|
||||
}
|
||||
|
||||
private SqlNode optimizeParseNode(SqlNode parserNode, EngineType engineType)
|
||||
throws SqlParseException {
|
||||
if (Objects.isNull(schema.getRuntimeOptions())
|
||||
|| Objects.isNull(schema.getRuntimeOptions().getEnableOptimize())
|
||||
|| !schema.getRuntimeOptions().getEnableOptimize()) {
|
||||
@@ -62,14 +132,10 @@ public class SqlBuilder {
|
||||
}
|
||||
|
||||
SqlNode optimizeNode = null;
|
||||
try {
|
||||
SqlNode sqlNode = SqlParser.create(SemanticNode.getSql(parserNode, engineType),
|
||||
Configuration.getParserConfig(engineType)).parseStmt();
|
||||
if (Objects.nonNull(sqlNode)) {
|
||||
optimizeNode = SemanticNode.optimize(scope, schema, sqlNode, engineType);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("optimize error {}", e);
|
||||
SqlNode sqlNode = SqlParser.create(SemanticNode.getSql(parserNode, engineType),
|
||||
Configuration.getParserConfig(engineType)).parseStmt();
|
||||
if (Objects.nonNull(sqlNode)) {
|
||||
optimizeNode = SemanticNode.optimize(scope, schema, sqlNode, engineType);
|
||||
}
|
||||
|
||||
if (Objects.nonNull(optimizeNode)) {
|
||||
@@ -79,7 +145,7 @@ public class SqlBuilder {
|
||||
return parserNode;
|
||||
}
|
||||
|
||||
private TableView render(OntologyQuery ontologyQuery, List<ModelResp> dataModels,
|
||||
private TableView render(OntologyQuery ontologyQuery, Set<ModelResp> dataModels,
|
||||
SqlValidatorScope scope, S2CalciteSchema schema) throws Exception {
|
||||
SqlNode left = null;
|
||||
TableView leftTable = null;
|
||||
@@ -88,8 +154,7 @@ public class SqlBuilder {
|
||||
Map<String, String> beforeModels = new HashMap<>();
|
||||
EngineType engineType = EngineType.fromString(schema.getOntology().getDatabase().getType());
|
||||
|
||||
for (int i = 0; i < dataModels.size(); i++) {
|
||||
final ModelResp dataModel = dataModels.get(i);
|
||||
for (ModelResp dataModel : dataModels) {
|
||||
final Set<DimSchemaResp> queryDimensions =
|
||||
ontologyQuery.getDimensionsByModel(dataModel.getName());
|
||||
final Set<MetricSchemaResp> queryMetrics =
|
||||
@@ -141,7 +206,8 @@ public class SqlBuilder {
|
||||
SqlLiteral sqlLiteral = SemanticNode.getJoinSqlLiteral("");
|
||||
JoinRelation matchJoinRelation = getMatchJoinRelation(before, rightTable, schema);
|
||||
SqlNode joinRelationCondition;
|
||||
if (!CollectionUtils.isEmpty(matchJoinRelation.getJoinCondition())) {
|
||||
if (!org.apache.commons.collections.CollectionUtils
|
||||
.isEmpty(matchJoinRelation.getJoinCondition())) {
|
||||
sqlLiteral = SemanticNode.getJoinSqlLiteral(matchJoinRelation.getJoinType());
|
||||
joinRelationCondition = getCondition(matchJoinRelation, scope, engineType);
|
||||
condition = joinRelationCondition;
|
||||
@@ -170,12 +236,19 @@ public class SqlBuilder {
|
||||
} else if (joinRelation.getLeft()
|
||||
.equalsIgnoreCase(tableView.getDataModel().getName())
|
||||
&& before.containsKey(joinRelation.getRight())) {
|
||||
matchJoinRelation.setJoinCondition(joinRelation.getJoinCondition().stream()
|
||||
List<Triple<String, String, String>> candidateJoinCon = joinRelation
|
||||
.getJoinCondition().stream()
|
||||
.map(r -> Triple.of(
|
||||
before.get(joinRelation.getRight()) + "." + r.getRight(),
|
||||
r.getMiddle(), tableView.getAlias() + "." + r.getLeft()))
|
||||
.collect(Collectors.toList()));
|
||||
matchJoinRelation.setJoinType(joinRelation.getJoinType());
|
||||
.collect(Collectors.toList());
|
||||
// added by jerryjzhang on 20250214
|
||||
// use the one with the most conditions to join left and right tables
|
||||
if (matchJoinRelation.getJoinCondition() == null || candidateJoinCon
|
||||
.size() > matchJoinRelation.getJoinCondition().size()) {
|
||||
matchJoinRelation.setJoinCondition(candidateJoinCon);
|
||||
matchJoinRelation.setJoinType(joinRelation.getJoinType());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user