(improvement)(headless) adapter for derived metrics (#646)

This commit is contained in:
jipeli
2024-01-18 11:11:56 +08:00
committed by GitHub
parent 71c491a80d
commit d4eecc1bf8
11 changed files with 537 additions and 58 deletions

View File

@@ -6,21 +6,21 @@ import com.tencent.supersonic.headless.api.request.ParseSqlReq;
import com.tencent.supersonic.headless.api.request.QueryStructReq;
import com.tencent.supersonic.headless.api.response.ModelSchemaResp;
import com.tencent.supersonic.headless.api.response.QueryResultWithSchemaResp;
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
import com.tencent.supersonic.headless.core.optimizer.QueryOptimizer;
import com.tencent.supersonic.headless.core.parser.QueryParser;
import com.tencent.supersonic.headless.core.parser.calcite.s2sql.HeadlessModel;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.utils.ComponentFactory;
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
import com.tencent.supersonic.headless.server.manager.HeadlessSchemaManager;
import com.tencent.supersonic.headless.server.service.Catalog;
import com.tencent.supersonic.headless.server.service.HeadlessQueryEngine;
import com.tencent.supersonic.headless.server.utils.QueryStructUtils;
import com.tencent.supersonic.headless.server.utils.QueryUtils;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Slf4j
@Component
@@ -82,22 +82,20 @@ public class HeadlessQueryEngineImpl implements HeadlessQueryEngine {
}
@Override
public QueryStatement physicalSql(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend) throws Exception {
public QueryStatement physicalSql(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend) {
QueryStatement queryStatement = new QueryStatement();
queryStatement.setSql(sqlCommend.getSql());
queryStatement.setQueryStructReq(queryStructCmd);
queryStatement.setParseSqlReq(sqlCommend);
queryStatement.setSql(sqlCommend.getSql());
queryStatement.setIsS2SQL(true);
queryStatement.setHeadlessModel(getHeadLessModel(queryStatement));
return optimize(queryStructCmd, queryParser.parser(sqlCommend, queryStatement));
}
public QueryStatement physicalSql(QueryStructReq queryStructCmd, MetricQueryReq metricCommand) throws Exception {
public QueryStatement physicalSql(QueryStructReq queryStructCmd, MetricQueryReq metricCommand) {
QueryStatement queryStatement = new QueryStatement();
queryStatement.setQueryStructReq(queryStructCmd);
queryStatement.setMetricReq(metricCommand);
queryStatement.setIsS2SQL(false);
queryStatement.setHeadlessModel(getHeadLessModel(queryStatement));
return queryParser.parser(queryStatement);
}

View File

@@ -11,6 +11,8 @@ import com.tencent.supersonic.common.util.jsqlparser.SqlParserSelectFunctionHelp
import com.tencent.supersonic.common.util.jsqlparser.SqlParserSelectHelper;
import com.tencent.supersonic.headless.api.enums.AggOption;
import com.tencent.supersonic.headless.api.enums.EngineType;
import com.tencent.supersonic.headless.api.enums.MetricType;
import com.tencent.supersonic.headless.api.pojo.Measure;
import com.tencent.supersonic.headless.api.pojo.MetricTable;
import com.tencent.supersonic.headless.api.pojo.SchemaItem;
import com.tencent.supersonic.headless.api.request.ParseSqlReq;
@@ -18,14 +20,18 @@ import com.tencent.supersonic.headless.api.request.QueryS2SQLReq;
import com.tencent.supersonic.headless.api.request.QueryStructReq;
import com.tencent.supersonic.headless.api.request.SqlExecuteReq;
import com.tencent.supersonic.headless.api.response.DatabaseResp;
import com.tencent.supersonic.headless.api.response.DimensionResp;
import com.tencent.supersonic.headless.api.response.MetricResp;
import com.tencent.supersonic.headless.api.response.ModelSchemaResp;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptor;
import com.tencent.supersonic.headless.core.adaptor.db.DbAdaptorFactory;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.utils.SqlGenerateUtils;
import com.tencent.supersonic.headless.server.pojo.MetaFilter;
import com.tencent.supersonic.headless.server.service.Catalog;
import com.tencent.supersonic.headless.server.service.HeadlessQueryEngine;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -119,7 +125,9 @@ public class QueryReqConverter {
result.setSupportWith(false);
result.setWithAlias(false);
}
//5.physicalSql by ParseSqlReq
//5. do deriveMetric
generateDerivedMetric(queryS2SQLReq.getModelIds(), modelSchemaResps, result);
//6.physicalSql by ParseSqlReq
queryStructReq.setDateInfo(queryStructUtils.getDateConfBySql(queryS2SQLReq.getSql()));
queryStructReq.setModelIds(new HashSet<>(queryS2SQLReq.getModelIds()));
queryStructReq.setQueryType(getQueryType(aggOption));
@@ -240,4 +248,70 @@ public class QueryReqConverter {
return queryType;
}
private void generateDerivedMetric(List<Long> modelIds, List<ModelSchemaResp> modelSchemaResps,
ParseSqlReq parseSqlReq) {
String sql = parseSqlReq.getSql();
for (MetricTable metricTable : parseSqlReq.getTables()) {
List<String> measures = new ArrayList<>();
Map<String, String> replaces = new HashMap<>();
generateDerivedMetric(modelIds, modelSchemaResps, metricTable.getMetrics(), metricTable.getDimensions(),
measures, replaces);
if (!CollectionUtils.isEmpty(replaces)) {
// metricTable sql use measures replace metric
sql = SqlParserReplaceHelper.replaceSqlByExpression(sql, replaces);
metricTable.setAggOption(AggOption.NATIVE);
}
// metricTable use measures replace metric
if (!CollectionUtils.isEmpty(measures)) {
metricTable.setMetrics(measures);
}
}
parseSqlReq.setSql(sql);
}
private void generateDerivedMetric(List<Long> modelIds, List<ModelSchemaResp> modelSchemaResps,
List<String> metrics, List<String> dimensions,
List<String> measures, Map<String, String> replaces) {
MetaFilter metaFilter = new MetaFilter();
metaFilter.setModelIds(modelIds);
List<MetricResp> metricResps = catalog.getMetrics(metaFilter);
List<DimensionResp> dimensionResps = catalog.getDimensions(metaFilter);
// check metrics has derived
if (!metricResps.stream()
.anyMatch(m -> metrics.contains(m.getBizName()) && MetricType.isDerived(m.getMetricDefineType(),
m.getTypeParams()))) {
return;
}
Set<String> allFields = new HashSet<>();
Map<String, Measure> allMeasures = new HashMap<>();
modelSchemaResps.stream().forEach(modelSchemaResp -> {
allFields.addAll(modelSchemaResp.getFieldList());
if (Objects.nonNull(modelSchemaResp.getModelDetail().getMeasures())) {
modelSchemaResp.getModelDetail().getMeasures().stream()
.forEach(mm -> allMeasures.put(mm.getBizName(), mm));
}
});
Set<String> deriveDimension = new HashSet<>();
Set<String> deriveMetric = new HashSet<>();
Set<String> visitedMetric = new HashSet<>();
if (!CollectionUtils.isEmpty(metricResps)) {
for (MetricResp metricResp : metricResps) {
if (metrics.contains(metricResp.getBizName())) {
if (MetricType.isDerived(metricResp.getMetricDefineType(), metricResp.getTypeParams())) {
String expr = sqlGenerateUtils.generateDerivedMetric(metricResps, allFields, allMeasures,
dimensionResps,
sqlGenerateUtils.getExpr(metricResp), metricResp.getMetricDefineType(), visitedMetric,
deriveMetric, deriveDimension);
replaces.put(metricResp.getBizName(), expr);
} else {
measures.add(metricResp.getBizName());
}
}
}
}
measures.addAll(deriveMetric);
dimensions.addAll(deriveDimension);
}
}