mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-10 02:46:56 +00:00
(improvement)(Headless) explode split dimension (#956)
This commit is contained in:
@@ -21,8 +21,12 @@ import net.sf.jsqlparser.expression.WhenClause;
|
||||
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
|
||||
import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
|
||||
import net.sf.jsqlparser.expression.operators.conditional.XorExpression;
|
||||
import net.sf.jsqlparser.expression.operators.relational.Between;
|
||||
import net.sf.jsqlparser.expression.operators.relational.ComparisonOperator;
|
||||
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
|
||||
import net.sf.jsqlparser.expression.operators.relational.InExpression;
|
||||
import net.sf.jsqlparser.expression.operators.relational.IsBooleanExpression;
|
||||
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
|
||||
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
|
||||
import net.sf.jsqlparser.schema.Column;
|
||||
import net.sf.jsqlparser.schema.Table;
|
||||
@@ -564,6 +568,22 @@ public class SqlSelectHelper {
|
||||
getColumnFromExpr(expr.getLeftExpression(), columns);
|
||||
getColumnFromExpr(expr.getRightExpression(), columns);
|
||||
}
|
||||
if (expression instanceof InExpression) {
|
||||
InExpression inExpression = (InExpression) expression;
|
||||
getColumnFromExpr(inExpression.getLeftExpression(), columns);
|
||||
}
|
||||
if (expression instanceof Between) {
|
||||
Between between = (Between) expression;
|
||||
getColumnFromExpr(between.getLeftExpression(), columns);
|
||||
}
|
||||
if (expression instanceof IsBooleanExpression) {
|
||||
IsBooleanExpression isBooleanExpression = (IsBooleanExpression) expression;
|
||||
getColumnFromExpr(isBooleanExpression.getLeftExpression(), columns);
|
||||
}
|
||||
if (expression instanceof IsNullExpression) {
|
||||
IsNullExpression isNullExpression = (IsNullExpression) expression;
|
||||
getColumnFromExpr(isNullExpression.getLeftExpression(), columns);
|
||||
}
|
||||
if (expression instanceof Parenthesis) {
|
||||
Parenthesis expr = (Parenthesis) expression;
|
||||
getColumnFromExpr(expr.getExpression(), columns);
|
||||
|
||||
@@ -17,5 +17,6 @@ public class Constants {
|
||||
public static final String SQL_PARSER_TABLE = "parsed_tb";
|
||||
public static final String SQL_PARSER_DB = "parsed_db";
|
||||
public static final String SQL_PARSER_FIELD = "parsed_field";
|
||||
public static final String DIMENSION_DELIMITER = "dimension_delimiter";
|
||||
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.tencent.supersonic.headless.core.parser.calcite.s2sql;
|
||||
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.schema.SemanticItem;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
|
||||
@@ -19,6 +20,7 @@ public class Dimension implements SemanticItem {
|
||||
private DataType dataType = DataType.UNKNOWN;
|
||||
private String bizName;
|
||||
private List<String> defaultValues;
|
||||
private Map<String, Object> ext;
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
|
||||
@@ -13,17 +13,6 @@ import com.tencent.supersonic.headless.core.parser.calcite.schema.SchemaBuilder;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.schema.SemanticSchema;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.sql.node.extend.LateralViewExplodeNode;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
import org.apache.calcite.sql.SqlDataTypeSpec;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.SqlNodeList;
|
||||
import org.apache.calcite.sql.SqlUserDefinedTypeNameSpec;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
@@ -35,6 +24,16 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlBasicCall;
|
||||
import org.apache.calcite.sql.SqlDataTypeSpec;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.SqlNodeList;
|
||||
import org.apache.calcite.sql.SqlUserDefinedTypeNameSpec;
|
||||
import org.apache.calcite.sql.parser.SqlParser;
|
||||
import org.apache.calcite.sql.parser.SqlParserPos;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Slf4j
|
||||
public class DataSourceNode extends SemanticNode {
|
||||
@@ -120,22 +119,23 @@ public class DataSourceNode extends SemanticNode {
|
||||
dimensions, metrics);
|
||||
}
|
||||
|
||||
public static SqlNode buildExtend(DataSource datasource, Set<String> exprList,
|
||||
public static SqlNode buildExtend(DataSource datasource, Map<String, String> exprList,
|
||||
SqlValidatorScope scope)
|
||||
throws Exception {
|
||||
if (CollectionUtils.isEmpty(exprList)) {
|
||||
return build(datasource, scope);
|
||||
}
|
||||
EngineType engineType = EngineType.fromString(datasource.getType());
|
||||
SqlNode dataSet = new SqlBasicCall(new LateralViewExplodeNode(), Arrays.asList(build(datasource, scope),
|
||||
SqlNode dataSet = new SqlBasicCall(new LateralViewExplodeNode(exprList), Arrays.asList(build(datasource, scope),
|
||||
new SqlNodeList(getExtendField(exprList, scope, engineType), SqlParserPos.ZERO)), SqlParserPos.ZERO);
|
||||
return buildAs(datasource.getName() + Constants.DIMENSION_ARRAY_SINGLE_SUFFIX, dataSet);
|
||||
}
|
||||
|
||||
public static List<SqlNode> getExtendField(Set<String> exprList, SqlValidatorScope scope, EngineType engineType)
|
||||
public static List<SqlNode> getExtendField(Map<String, String> exprList, SqlValidatorScope scope,
|
||||
EngineType engineType)
|
||||
throws Exception {
|
||||
List<SqlNode> sqlNodeList = new ArrayList<>();
|
||||
for (String expr : exprList) {
|
||||
for (String expr : exprList.keySet()) {
|
||||
sqlNodeList.add(parse(expr, scope, engineType));
|
||||
sqlNodeList.add(new SqlDataTypeSpec(
|
||||
new SqlUserDefinedTypeNameSpec(expr + Constants.DIMENSION_ARRAY_SINGLE_SUFFIX, SqlParserPos.ZERO),
|
||||
|
||||
@@ -2,6 +2,8 @@ package com.tencent.supersonic.headless.core.parser.calcite.sql.node.extend;
|
||||
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.sql.node.ExtendNode;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import org.apache.calcite.linq4j.Ord;
|
||||
import org.apache.calcite.sql.SqlCall;
|
||||
import org.apache.calcite.sql.SqlIdentifier;
|
||||
@@ -16,11 +18,14 @@ import org.apache.calcite.sql.SqlWriter;
|
||||
*/
|
||||
public class LateralViewExplodeNode extends ExtendNode {
|
||||
|
||||
public final String sqlNameView = "dataSet";
|
||||
public final String sqlNameView = "view";
|
||||
public final String sqlNameExplode = "explode";
|
||||
public final String sqlNameExplodeSplit = "explode_split";
|
||||
private Map<String, String> delimiterMap;
|
||||
|
||||
public LateralViewExplodeNode() {
|
||||
public LateralViewExplodeNode(Map<String, String> delimiterMap) {
|
||||
super();
|
||||
this.delimiterMap = delimiterMap;
|
||||
}
|
||||
|
||||
public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
|
||||
@@ -55,9 +60,20 @@ public class LateralViewExplodeNode extends ExtendNode {
|
||||
}
|
||||
|
||||
public void explode(SqlWriter writer, SqlNode sqlNode) {
|
||||
writer.sep(sqlNameExplode);
|
||||
String delimiter =
|
||||
Objects.nonNull(delimiterMap) && delimiterMap.containsKey(sqlNode.toString()) ? delimiterMap.get(
|
||||
sqlNode.toString()) : "";
|
||||
if (delimiter.isEmpty()) {
|
||||
writer.sep(sqlNameExplode);
|
||||
} else {
|
||||
writer.sep(sqlNameExplodeSplit);
|
||||
}
|
||||
SqlWriter.Frame frame = writer.startList("(", ")");
|
||||
sqlNode.unparse(writer, 0, 0);
|
||||
if (!delimiter.isEmpty()) {
|
||||
writer.sep(",");
|
||||
writer.sep(String.format("'%s'", delimiter));
|
||||
}
|
||||
writer.endList(frame);
|
||||
writer.sep("tmp_sgl_" + sqlNode.toString());
|
||||
}
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
package com.tencent.supersonic.headless.core.parser.calcite.sql.render;
|
||||
|
||||
|
||||
import static com.tencent.supersonic.headless.core.parser.calcite.s2sql.Constants.DIMENSION_DELIMITER;
|
||||
|
||||
import com.tencent.supersonic.headless.api.pojo.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.s2sql.Constants;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.s2sql.DataSource;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.s2sql.Dimension;
|
||||
@@ -19,19 +20,22 @@ import com.tencent.supersonic.headless.core.parser.calcite.sql.node.FilterNode;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.sql.node.IdentifyNode;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.sql.node.MetricNode;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.sql.node.SemanticNode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import com.tencent.supersonic.headless.core.pojo.MetricQueryParam;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.calcite.sql.SqlNode;
|
||||
import org.apache.calcite.sql.validate.SqlValidatorScope;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
* process the table dataSet from the defined data model schema
|
||||
@@ -49,7 +53,7 @@ public class SourceRender extends Renderer {
|
||||
List<String> queryMetrics = new ArrayList<>(reqMetrics);
|
||||
List<String> queryDimensions = new ArrayList<>(reqDimensions);
|
||||
List<String> fieldWhere = new ArrayList<>(fieldWheres);
|
||||
Set<String> extendFields = new HashSet<>();
|
||||
Map<String, String> extendFields = new HashMap<>();
|
||||
if (!fieldWhere.isEmpty()) {
|
||||
Set<String> dimensions = new HashSet<>();
|
||||
Set<String> metrics = new HashSet<>();
|
||||
@@ -95,7 +99,8 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
|
||||
private static void buildDimension(String alias, String dimension, DataSource datasource, SemanticSchema schema,
|
||||
boolean nonAgg, Set<String> extendFields, TableView dataSet, TableView output, SqlValidatorScope scope)
|
||||
boolean nonAgg, Map<String, String> extendFields, TableView dataSet, TableView output,
|
||||
SqlValidatorScope scope)
|
||||
throws Exception {
|
||||
List<Dimension> dimensionList = schema.getDimension().get(datasource.getName());
|
||||
EngineType engineType = EngineType.fromString(schema.getSemanticModel().getDatabase().getType());
|
||||
@@ -106,6 +111,7 @@ public class SourceRender extends Renderer {
|
||||
continue;
|
||||
}
|
||||
dataSet.getMeasure().add(DimensionNode.build(dim, scope, engineType));
|
||||
addExtendFields(dim, extendFields);
|
||||
if (nonAgg) {
|
||||
output.getMeasure().add(DimensionNode.buildName(dim, scope, engineType));
|
||||
isAdd = true;
|
||||
@@ -141,9 +147,7 @@ public class SourceRender extends Renderer {
|
||||
Optional<Dimension> dimensionOptional = getDimensionByName(dimension, datasource);
|
||||
if (dimensionOptional.isPresent()) {
|
||||
dataSet.getMeasure().add(DimensionNode.buildArray(dimensionOptional.get(), scope, engineType));
|
||||
if (dimensionOptional.get().getDataType().isArray()) {
|
||||
extendFields.add(dimensionOptional.get().getExpr());
|
||||
}
|
||||
addExtendFields(dimensionOptional.get(), extendFields);
|
||||
if (nonAgg) {
|
||||
output.getMeasure().add(DimensionNode.buildName(dimensionOptional.get(), scope, engineType));
|
||||
return;
|
||||
@@ -152,16 +156,21 @@ public class SourceRender extends Renderer {
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isWhereHasMetric(List<String> fields, DataSource datasource) {
|
||||
Long metricNum = datasource.getMeasures().stream().filter(m -> fields.contains(m.getName().toLowerCase()))
|
||||
.count();
|
||||
Long measureNum = datasource.getMeasures().stream().filter(m -> fields.contains(m.getName().toLowerCase()))
|
||||
.count();
|
||||
return metricNum > 0 || measureNum > 0;
|
||||
private static void addExtendFields(Dimension dimension, Map<String, String> extendFields) {
|
||||
if (dimension.getDataType().isArray()) {
|
||||
if (Objects.nonNull(dimension.getExt()) && dimension.getExt()
|
||||
.containsKey(DIMENSION_DELIMITER)) {
|
||||
extendFields.put(dimension.getExpr(),
|
||||
(String) dimension.getExt().get(DIMENSION_DELIMITER));
|
||||
} else {
|
||||
extendFields.put(dimension.getExpr(), "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static List<SqlNode> getWhereMeasure(List<String> fields, List<String> queryMetrics,
|
||||
List<String> queryDimensions, Set<String> extendFields, DataSource datasource, SqlValidatorScope scope,
|
||||
List<String> queryDimensions, Map<String, String> extendFields, DataSource datasource,
|
||||
SqlValidatorScope scope,
|
||||
SemanticSchema schema, boolean nonAgg) throws Exception {
|
||||
Iterator<String> iterator = fields.iterator();
|
||||
List<SqlNode> whereNode = new ArrayList<>();
|
||||
@@ -195,9 +204,7 @@ public class SourceRender extends Renderer {
|
||||
Optional<Dimension> dimensionOptional = getDimensionByName(where, datasource);
|
||||
if (dimensionOptional.isPresent()) {
|
||||
whereNode.add(DimensionNode.buildArray(dimensionOptional.get(), scope, engineType));
|
||||
if (dimensionOptional.get().getDataType().isArray()) {
|
||||
extendFields.add(dimensionOptional.get().getExpr());
|
||||
}
|
||||
addExtendFields(dimensionOptional.get(), extendFields);
|
||||
}
|
||||
}
|
||||
return whereNode;
|
||||
@@ -205,7 +212,8 @@ public class SourceRender extends Renderer {
|
||||
|
||||
private static void mergeWhere(List<String> fields, TableView dataSet, TableView outputSet,
|
||||
List<String> queryMetrics,
|
||||
List<String> queryDimensions, Set<String> extendFields, DataSource datasource, SqlValidatorScope scope,
|
||||
List<String> queryDimensions, Map<String, String> extendFields, DataSource datasource,
|
||||
SqlValidatorScope scope,
|
||||
SemanticSchema schema,
|
||||
boolean nonAgg) throws Exception {
|
||||
List<SqlNode> whereNode = getWhereMeasure(fields, queryMetrics, queryDimensions, extendFields, datasource,
|
||||
|
||||
@@ -31,7 +31,6 @@ import com.tencent.supersonic.headless.server.pojo.yaml.MetricTypeParamsYamlTpl;
|
||||
import com.tencent.supersonic.headless.server.pojo.yaml.MetricYamlTpl;
|
||||
import com.tencent.supersonic.headless.server.service.Catalog;
|
||||
import com.tencent.supersonic.headless.server.utils.DatabaseConverter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
@@ -42,7 +41,6 @@ import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -292,6 +290,9 @@ public class SemanticSchemaManager {
|
||||
if (Objects.isNull(dimension.getDataType())) {
|
||||
dimension.setDataType(DataType.UNKNOWN);
|
||||
}
|
||||
if (Objects.nonNull(dimensionYamlTpl.getExt())) {
|
||||
dimension.setExt(dimensionYamlTpl.getExt());
|
||||
}
|
||||
dimension.setDimensionTimeTypeParams(getDimensionTimeTypeParams(dimensionYamlTpl.getTypeParams()));
|
||||
dimensions.add(dimension);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.tencent.supersonic.headless.server.pojo.yaml;
|
||||
|
||||
import com.tencent.supersonic.common.pojo.enums.DataTypeEnums;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import lombok.Data;
|
||||
|
||||
|
||||
@@ -24,4 +25,6 @@ public class DimensionYamlTpl {
|
||||
private DataTypeEnums dataType;
|
||||
|
||||
private List<String> defaultValues;
|
||||
|
||||
private Map<String, Object> ext;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user