mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-10 11:00:23 +00:00
(improvement)(headless) refator query code (#658)
This commit is contained in:
@@ -12,6 +12,10 @@ import com.tencent.supersonic.common.util.ContextUtils;
|
||||
import com.tencent.supersonic.common.util.DateModeUtils;
|
||||
import com.tencent.supersonic.common.util.SqlFilterUtils;
|
||||
import com.tencent.supersonic.common.util.jsqlparser.SqlParserAddHelper;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import net.sf.jsqlparser.JSQLParserException;
|
||||
@@ -35,11 +39,6 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Data
|
||||
@Slf4j
|
||||
@@ -268,10 +267,7 @@ public class QueryStructReq extends SemanticQueryReq {
|
||||
}
|
||||
|
||||
public String getModelName() {
|
||||
if (StringUtils.isNotBlank(modelName)) {
|
||||
return modelName;
|
||||
}
|
||||
return "table";
|
||||
return Objects.nonNull(modelName) ? modelName : "m_" + String.valueOf(StringUtils.join(modelIds, "_"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.tencent.supersonic.headless.api.pojo.MetricTable;
|
||||
import com.tencent.supersonic.headless.api.request.MetricQueryReq;
|
||||
import com.tencent.supersonic.headless.api.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.api.request.SqlExecuteReq;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.utils.ComponentFactory;
|
||||
import java.util.ArrayList;
|
||||
@@ -54,6 +55,12 @@ public class QueryParser {
|
||||
|| Strings.isNullOrEmpty(queryStatement.getSourceId())) {
|
||||
throw new RuntimeException("parse Exception: " + queryStatement.getErrMsg());
|
||||
}
|
||||
String querySql =
|
||||
Objects.nonNull(queryStatement.getEnableLimitWrapper()) && queryStatement.getEnableLimitWrapper()
|
||||
? String.format(SqlExecuteReq.LIMIT_WRAPPER,
|
||||
queryStatement.getSql())
|
||||
: queryStatement.getSql();
|
||||
queryStatement.setSql(querySql);
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.tencent.supersonic.headless.core.parser.calcite.sql.node;
|
||||
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.tencent.supersonic.headless.api.enums.EngineType;
|
||||
import com.tencent.supersonic.headless.api.request.MetricQueryReq;
|
||||
@@ -265,8 +264,15 @@ public class DataSourceNode extends SemanticNode {
|
||||
boolean isAllMatch = true;
|
||||
sourceMeasure.retainAll(measures);
|
||||
if (sourceMeasure.size() < measures.size()) {
|
||||
log.info("baseDataSource not match all measure");
|
||||
isAllMatch = false;
|
||||
log.info("baseDataSource measures not match all measure");
|
||||
// check dimension again
|
||||
Set<String> dimensionMeasures = new HashSet<>();
|
||||
dimensionMeasures.addAll(dimension);
|
||||
dimensionMeasures.retainAll(measures);
|
||||
if (sourceMeasure.size() + dimensionMeasures.size() < measures.size()) {
|
||||
log.info("baseDataSource not match all measure");
|
||||
isAllMatch = false;
|
||||
}
|
||||
}
|
||||
measures.removeAll(sourceMeasure);
|
||||
|
||||
|
||||
@@ -28,6 +28,7 @@ public class QueryStatement {
|
||||
private String viewSql = "";
|
||||
private String viewAlias = "";
|
||||
private String viewSimplifySql = "";
|
||||
private Boolean enableLimitWrapper = false;
|
||||
|
||||
|
||||
private SemanticModel semanticModel;
|
||||
|
||||
@@ -31,16 +31,6 @@ import com.tencent.supersonic.headless.core.pojo.yaml.MetricTypeParamsYamlTpl;
|
||||
import com.tencent.supersonic.headless.core.pojo.yaml.MetricYamlTpl;
|
||||
import com.tencent.supersonic.headless.server.service.Catalog;
|
||||
import com.tencent.supersonic.headless.server.utils.DatabaseConverter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
@@ -53,6 +43,15 @@ import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.cache.annotation.EnableCaching;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@@ -143,8 +142,8 @@ public class SemanticSchemaManager {
|
||||
|| identifiers.contains(f.getFieldName())) {
|
||||
continue;
|
||||
}
|
||||
datasource.getMeasures().add(Measure.builder().name(f.getFieldName())
|
||||
.expr(f.getFieldName()).agg("").build());
|
||||
datasource.getMeasures()
|
||||
.add(Measure.builder().expr(f.getFieldName()).name(f.getFieldName()).agg("").build());
|
||||
}
|
||||
}
|
||||
return datasource;
|
||||
@@ -180,9 +179,11 @@ public class SemanticSchemaManager {
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricTypeParamsYamlTpl.getMetrics())) {
|
||||
metricTypeParams.setMeasures(getMetricParams(metricTypeParamsYamlTpl.getMetrics()));
|
||||
metricTypeParams.setExpr(metricTypeParams.getMeasures().get(0).getExpr());
|
||||
}
|
||||
if (!CollectionUtils.isEmpty(metricTypeParamsYamlTpl.getFields())) {
|
||||
metricTypeParams.setMeasures(getFieldParams(metricTypeParamsYamlTpl.getFields()));
|
||||
metricTypeParams.setExpr(metricTypeParams.getMeasures().get(0).getExpr());
|
||||
}
|
||||
|
||||
return metricTypeParams;
|
||||
|
||||
@@ -21,11 +21,8 @@ import com.tencent.supersonic.headless.api.response.SemanticQueryResp;
|
||||
import com.tencent.supersonic.headless.api.response.SqlParserResp;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.server.service.DownloadService;
|
||||
import com.tencent.supersonic.headless.server.service.SemantciQueryEngine;
|
||||
import com.tencent.supersonic.headless.server.service.QueryService;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import javax.validation.Valid;
|
||||
@@ -44,8 +41,6 @@ public class QueryController {
|
||||
|
||||
@Autowired
|
||||
private QueryService queryService;
|
||||
@Autowired
|
||||
private SemantciQueryEngine semantciQueryEngine;
|
||||
|
||||
@Autowired
|
||||
private DownloadService downloadService;
|
||||
@@ -63,7 +58,9 @@ public class QueryController {
|
||||
HttpServletRequest request,
|
||||
HttpServletResponse response) throws Exception {
|
||||
User user = UserHolder.findUser(request, response);
|
||||
return queryService.queryByStructWithAuth(queryStructReq, user);
|
||||
QuerySqlReq querySqlReq = queryStructReq.convert(queryStructReq);
|
||||
return queryService.queryBySql(querySqlReq, user);
|
||||
//return queryService.queryByStructWithAuth(queryStructReq, user);
|
||||
}
|
||||
|
||||
@PostMapping("/queryMetricDataById")
|
||||
@@ -95,11 +92,7 @@ public class QueryController {
|
||||
|
||||
@PostMapping("/struct/parse")
|
||||
public SqlParserResp parseByStruct(@RequestBody ParseSqlReq parseSqlReq) throws Exception {
|
||||
QueryStructReq queryStructCmd = new QueryStructReq();
|
||||
Set<Long> models = new HashSet<>();
|
||||
models.add(Long.valueOf(parseSqlReq.getRootPath()));
|
||||
queryStructCmd.setModelIds(models);
|
||||
QueryStatement queryStatement = semantciQueryEngine.physicalSql(queryStructCmd, parseSqlReq);
|
||||
QueryStatement queryStatement = queryService.explain(parseSqlReq);
|
||||
SqlParserResp sqlParserResp = new SqlParserResp();
|
||||
BeanUtils.copyProperties(queryStatement, sqlParserResp);
|
||||
return sqlParserResp;
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.tencent.supersonic.headless.server.service;
|
||||
import com.tencent.supersonic.auth.api.authentication.pojo.User;
|
||||
import com.tencent.supersonic.headless.api.request.ExplainSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.ItemUseReq;
|
||||
import com.tencent.supersonic.headless.api.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryDimValueReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryItemReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryMultiStructReq;
|
||||
@@ -36,6 +37,8 @@ public interface QueryService {
|
||||
|
||||
<T> ExplainResp explain(ExplainSqlReq<T> explainSqlReq, User user) throws Exception;
|
||||
|
||||
QueryStatement explain(ParseSqlReq parseSqlReq) throws Exception;
|
||||
|
||||
@ApiHeaderCheck
|
||||
ItemQueryResultResp queryMetricDataById(QueryItemReq queryApiReq,
|
||||
HttpServletRequest request) throws Exception;
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
package com.tencent.supersonic.headless.server.service;
|
||||
|
||||
import com.tencent.supersonic.headless.api.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.api.response.SemanticQueryResp;
|
||||
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
|
||||
public interface SemantciQueryEngine {
|
||||
|
||||
QueryStatement plan(QueryStatement queryStatement) throws Exception;
|
||||
|
||||
QueryExecutor route(QueryStatement queryStatement);
|
||||
|
||||
SemanticQueryResp execute(QueryStatement queryStatement);
|
||||
|
||||
QueryStatement physicalSql(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend) throws Exception;
|
||||
|
||||
}
|
||||
@@ -21,6 +21,7 @@ import com.tencent.supersonic.headless.api.pojo.SingleItemQueryResult;
|
||||
import com.tencent.supersonic.headless.api.request.ExplainSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.ItemUseReq;
|
||||
import com.tencent.supersonic.headless.api.request.ModelSchemaFilterReq;
|
||||
import com.tencent.supersonic.headless.api.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryDimValueReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryItemReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryMultiStructReq;
|
||||
@@ -51,11 +52,11 @@ import com.tencent.supersonic.headless.server.service.AppService;
|
||||
import com.tencent.supersonic.headless.server.service.Catalog;
|
||||
import com.tencent.supersonic.headless.server.service.QueryService;
|
||||
import com.tencent.supersonic.headless.server.service.SchemaService;
|
||||
import com.tencent.supersonic.headless.server.service.SemantciQueryEngine;
|
||||
import com.tencent.supersonic.headless.server.utils.QueryReqConverter;
|
||||
import com.tencent.supersonic.headless.server.utils.QueryUtils;
|
||||
import com.tencent.supersonic.headless.server.utils.StatUtils;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@@ -67,7 +68,6 @@ import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
|
||||
@@ -90,7 +90,6 @@ public class QueryServiceImpl implements QueryService {
|
||||
|
||||
private final QueryCache queryCache;
|
||||
|
||||
private final SemantciQueryEngine semantciQueryEngine;
|
||||
private final SemanticSchemaManager semanticSchemaManager;
|
||||
|
||||
private final QueryParser queryParser;
|
||||
@@ -102,7 +101,6 @@ public class QueryServiceImpl implements QueryService {
|
||||
CacheManager cacheManager,
|
||||
QueryUtils queryUtils,
|
||||
QueryReqConverter queryReqConverter,
|
||||
@Lazy SemantciQueryEngine semantciQueryEngine,
|
||||
Catalog catalog,
|
||||
AppService appService,
|
||||
QueryCache queryCache,
|
||||
@@ -113,7 +111,6 @@ public class QueryServiceImpl implements QueryService {
|
||||
this.cacheManager = cacheManager;
|
||||
this.queryUtils = queryUtils;
|
||||
this.queryReqConverter = queryReqConverter;
|
||||
this.semantciQueryEngine = semantciQueryEngine;
|
||||
this.catalog = catalog;
|
||||
this.appService = appService;
|
||||
this.queryCache = queryCache;
|
||||
@@ -128,19 +125,50 @@ public class QueryServiceImpl implements QueryService {
|
||||
public Object queryBySql(QuerySqlReq querySQLReq, User user) {
|
||||
statUtils.initStatInfo(querySQLReq, user);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
SemanticQueryResp results = null;
|
||||
TaskStatusEnum state = TaskStatusEnum.SUCCESS;
|
||||
try {
|
||||
//1.initStatInfo
|
||||
statUtils.initStatInfo(querySQLReq, user);
|
||||
//2.query from cache
|
||||
Object query = queryCache.query(querySQLReq);
|
||||
if (Objects.nonNull(query)) {
|
||||
return (SemanticQueryResp) query;
|
||||
}
|
||||
StatUtils.get().setUseResultCache(false);
|
||||
//3 query from db
|
||||
queryStatement = convertToQueryStatement(querySQLReq, user);
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
results = query(queryStatement);
|
||||
//4 reset cache and set stateInfo
|
||||
Boolean setCacheSuccess = queryCache.put(querySQLReq, results);
|
||||
if (setCacheSuccess) {
|
||||
// if semanticQueryResp is not null, update cache data
|
||||
statUtils.updateResultCacheKey(queryCache.getCacheKey(querySQLReq));
|
||||
}
|
||||
if (Objects.isNull(results)) {
|
||||
state = TaskStatusEnum.ERROR;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.info("convertToQueryStatement has a exception:", e);
|
||||
}
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
SemanticQueryResp results = semantciQueryEngine.execute(queryStatement);
|
||||
statUtils.statInfo2DbAsync(TaskStatusEnum.SUCCESS);
|
||||
statUtils.statInfo2DbAsync(state);
|
||||
return results;
|
||||
}
|
||||
|
||||
public SemanticQueryResp queryByQueryStatement(QueryStatement queryStatement) {
|
||||
return semantciQueryEngine.execute(queryStatement);
|
||||
|
||||
SemanticQueryResp queryResultWithColumns = null;
|
||||
QueryExecutor queryExecutor = queryPlanner.route(queryStatement);
|
||||
if (queryExecutor != null) {
|
||||
queryResultWithColumns = queryExecutor.execute(queryStatement);
|
||||
queryResultWithColumns.setSql(queryStatement.getSql());
|
||||
if (!CollectionUtils.isEmpty(queryStatement.getModelIds())) {
|
||||
queryUtils.fillItemNameInfo(queryResultWithColumns, queryStatement.getModelIds());
|
||||
}
|
||||
}
|
||||
return queryResultWithColumns;
|
||||
|
||||
}
|
||||
|
||||
private QueryStatement convertToQueryStatement(QuerySqlReq querySQLReq, User user) throws Exception {
|
||||
@@ -150,12 +178,46 @@ public class QueryServiceImpl implements QueryService {
|
||||
List<ModelSchemaResp> modelSchemaResps = schemaService.fetchModelSchema(filter, user);
|
||||
QueryStatement queryStatement = queryReqConverter.convert(querySQLReq, modelSchemaResps);
|
||||
queryStatement.setModelIds(querySQLReq.getModelIds());
|
||||
queryStatement.setEnableOptimize(queryUtils.enableOptimize());
|
||||
SemanticModel semanticModel = semanticSchemaManager.get(querySQLReq.getModelIdStr());
|
||||
queryStatement.setSemanticModel(semanticModel);
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SemanticQueryResp queryByStruct(QueryStructReq queryStructReq, User user) throws Exception {
|
||||
return (SemanticQueryResp) queryBySql(queryStructReq.convert(queryStructReq), user);
|
||||
SemanticQueryResp semanticQueryResp = null;
|
||||
TaskStatusEnum state = TaskStatusEnum.SUCCESS;
|
||||
log.info("[queryStructReq:{}]", queryStructReq);
|
||||
try {
|
||||
//1.initStatInfo
|
||||
statUtils.initStatInfo(queryStructReq, user);
|
||||
//2.query from cache
|
||||
Object query = queryCache.query(queryStructReq);
|
||||
if (Objects.nonNull(query)) {
|
||||
return (SemanticQueryResp) query;
|
||||
}
|
||||
StatUtils.get().setUseResultCache(false);
|
||||
//3 query
|
||||
QueryStatement queryStatement = buildQueryStatement(queryStructReq);
|
||||
semanticQueryResp = query(queryStatement);
|
||||
//4 reset cache and set stateInfo
|
||||
Boolean setCacheSuccess = queryCache.put(queryStructReq, semanticQueryResp);
|
||||
if (setCacheSuccess) {
|
||||
// if semanticQueryResp is not null, update cache data
|
||||
statUtils.updateResultCacheKey(queryCache.getCacheKey(queryStructReq));
|
||||
}
|
||||
if (Objects.isNull(semanticQueryResp)) {
|
||||
state = TaskStatusEnum.ERROR;
|
||||
}
|
||||
return semanticQueryResp;
|
||||
} catch (Exception e) {
|
||||
log.error("exception in queryByStruct, e: ", e);
|
||||
state = TaskStatusEnum.ERROR;
|
||||
throw e;
|
||||
} finally {
|
||||
statUtils.statInfo2DbAsync(state);
|
||||
}
|
||||
}
|
||||
|
||||
private QueryStatement buildQueryStatement(QueryStructReq queryStructReq) throws Exception {
|
||||
@@ -173,8 +235,11 @@ public class QueryServiceImpl implements QueryService {
|
||||
List<QueryStatement> sqlParsers = new ArrayList<>();
|
||||
for (QueryStructReq queryStructReq : queryMultiStructReq.getQueryStructReqs()) {
|
||||
QueryStatement queryStatement = buildQueryStatement(queryStructReq);
|
||||
queryStatement = semantciQueryEngine.plan(queryStatement);
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
SemanticModel semanticModel = semanticSchemaManager.get(queryStructReq.getModelIdStr());
|
||||
queryStatement.setModelIds(queryStatement.getQueryStructReq().getModelIds());
|
||||
queryStatement.setSemanticModel(semanticModel);
|
||||
queryStatement.setEnableOptimize(queryUtils.enableOptimize());
|
||||
queryStatement = plan(queryStatement);
|
||||
sqlParsers.add(queryStatement);
|
||||
}
|
||||
log.info("multi sqlParser:{}", sqlParsers);
|
||||
@@ -267,7 +332,7 @@ public class QueryServiceImpl implements QueryService {
|
||||
}
|
||||
if (QueryType.STRUCT.equals(queryTypeEnum) && queryReq instanceof QueryStructReq) {
|
||||
QueryStatement queryStatement = buildQueryStatement((QueryStructReq) queryReq);
|
||||
queryStatement = semantciQueryEngine.plan(queryStatement);
|
||||
queryStatement = plan(queryStatement);
|
||||
return getExplainResp(queryStatement);
|
||||
}
|
||||
if (QueryType.STRUCT.equals(queryTypeEnum) && queryReq instanceof QueryMultiStructReq) {
|
||||
@@ -279,6 +344,22 @@ public class QueryServiceImpl implements QueryService {
|
||||
throw new IllegalArgumentException("Parameters are invalid, explainSqlReq: " + explainSqlReq);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryStatement explain(ParseSqlReq parseSqlReq) throws Exception {
|
||||
QueryStructReq queryStructCmd = new QueryStructReq();
|
||||
Set<Long> models = new HashSet<>();
|
||||
models.add(Long.valueOf(parseSqlReq.getRootPath()));
|
||||
queryStructCmd.setModelIds(models);
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setParseSqlReq(parseSqlReq);
|
||||
queryStatement.setSql(parseSqlReq.getSql());
|
||||
queryStatement.setIsS2SQL(true);
|
||||
SemanticModel semanticModel = semanticSchemaManager.get(parseSqlReq.getRootPath());
|
||||
queryStatement.setSemanticModel(semanticModel);
|
||||
return plan(queryStatement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ItemQueryResultResp queryMetricDataById(QueryItemReq queryItemReq,
|
||||
HttpServletRequest request) throws Exception {
|
||||
@@ -411,4 +492,32 @@ public class QueryServiceImpl implements QueryService {
|
||||
.map(Object::toString).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private QueryStatement plan(QueryStatement queryStatement) throws Exception {
|
||||
queryStatement = queryParser.parse(queryStatement);
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
queryPlanner.optimizer(queryStatement);
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
private SemanticQueryResp query(QueryStatement queryStatement) throws Exception {
|
||||
SemanticQueryResp semanticQueryResp = null;
|
||||
log.info("[QueryStatement:{}]", queryStatement);
|
||||
try {
|
||||
//1 parse
|
||||
queryStatement = queryParser.parse(queryStatement);
|
||||
//2 plan
|
||||
QueryExecutor queryExecutor = queryPlanner.plan(queryStatement);
|
||||
//3 execute
|
||||
if (queryExecutor != null) {
|
||||
semanticQueryResp = queryExecutor.execute(queryStatement);
|
||||
if (!CollectionUtils.isEmpty(queryStatement.getModelIds())) {
|
||||
queryUtils.fillItemNameInfo(semanticQueryResp, queryStatement.getModelIds());
|
||||
}
|
||||
}
|
||||
return semanticQueryResp;
|
||||
} catch (Exception e) {
|
||||
log.error("exception in query, e: ", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,89 +0,0 @@
|
||||
package com.tencent.supersonic.headless.server.service.impl;
|
||||
|
||||
import com.tencent.supersonic.headless.api.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.api.response.SemanticQueryResp;
|
||||
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
|
||||
import com.tencent.supersonic.headless.core.parser.QueryParser;
|
||||
import com.tencent.supersonic.headless.core.parser.calcite.s2sql.SemanticModel;
|
||||
import com.tencent.supersonic.headless.core.planner.QueryOptimizer;
|
||||
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.utils.ComponentFactory;
|
||||
import com.tencent.supersonic.headless.server.manager.SemanticSchemaManager;
|
||||
import com.tencent.supersonic.headless.server.service.SemantciQueryEngine;
|
||||
import com.tencent.supersonic.headless.server.utils.QueryUtils;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class SemantciQueryEngineImpl implements SemantciQueryEngine {
|
||||
|
||||
private final QueryParser queryParser;
|
||||
private final QueryUtils queryUtils;
|
||||
private final SemanticSchemaManager semanticSchemaManager;
|
||||
|
||||
public SemantciQueryEngineImpl(QueryParser queryParser,
|
||||
QueryUtils queryUtils, SemanticSchemaManager semanticSchemaManager) {
|
||||
this.queryParser = queryParser;
|
||||
this.queryUtils = queryUtils;
|
||||
this.semanticSchemaManager = semanticSchemaManager;
|
||||
}
|
||||
|
||||
public SemanticQueryResp execute(QueryStatement queryStatement) {
|
||||
SemanticQueryResp queryResultWithColumns = null;
|
||||
QueryExecutor queryExecutor = route(queryStatement);
|
||||
if (queryExecutor != null) {
|
||||
queryResultWithColumns = queryExecutor.execute(queryStatement);
|
||||
queryResultWithColumns.setSql(queryStatement.getSql());
|
||||
if (!CollectionUtils.isEmpty(queryStatement.getModelIds())) {
|
||||
queryUtils.fillItemNameInfo(queryResultWithColumns, queryStatement.getModelIds());
|
||||
}
|
||||
}
|
||||
return queryResultWithColumns;
|
||||
}
|
||||
|
||||
public QueryStatement plan(QueryStatement queryStatement) throws Exception {
|
||||
queryStatement.setEnableOptimize(queryUtils.enableOptimize());
|
||||
queryStatement.setSemanticModel(getSemanticModel(queryStatement));
|
||||
queryStatement = queryParser.parse(queryStatement);
|
||||
queryUtils.checkSqlParse(queryStatement);
|
||||
queryStatement.setModelIds(queryStatement.getQueryStructReq().getModelIds());
|
||||
log.info("queryStatement:{}", queryStatement);
|
||||
return optimize(queryStatement.getQueryStructReq(), queryStatement);
|
||||
}
|
||||
|
||||
public QueryStatement optimize(QueryStructReq queryStructCmd, QueryStatement queryStatement) {
|
||||
for (QueryOptimizer queryOptimizer : ComponentFactory.getQueryOptimizers()) {
|
||||
queryOptimizer.rewrite(queryStructCmd, queryStatement);
|
||||
}
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
public QueryExecutor route(QueryStatement queryStatement) {
|
||||
for (QueryExecutor queryExecutor : ComponentFactory.getQueryExecutors()) {
|
||||
if (queryExecutor.accept(queryStatement)) {
|
||||
return queryExecutor;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueryStatement physicalSql(QueryStructReq queryStructCmd, ParseSqlReq sqlCommend) throws Exception {
|
||||
QueryStatement queryStatement = new QueryStatement();
|
||||
queryStatement.setQueryStructReq(queryStructCmd);
|
||||
queryStatement.setParseSqlReq(sqlCommend);
|
||||
queryStatement.setSql(sqlCommend.getSql());
|
||||
queryStatement.setIsS2SQL(true);
|
||||
queryStatement.setSemanticModel(getSemanticModel(queryStatement));
|
||||
return optimize(queryStructCmd, queryParser.parser(sqlCommend, queryStatement));
|
||||
}
|
||||
|
||||
private SemanticModel getSemanticModel(QueryStatement queryStatement) throws Exception {
|
||||
QueryStructReq queryStructReq = queryStatement.getQueryStructReq();
|
||||
return semanticSchemaManager.get(queryStructReq.getModelIdStr());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.tencent.supersonic.headless.server.utils;
|
||||
|
||||
|
||||
import com.tencent.supersonic.common.pojo.Aggregator;
|
||||
import com.tencent.supersonic.common.pojo.Constants;
|
||||
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
|
||||
@@ -18,7 +17,6 @@ import com.tencent.supersonic.headless.api.pojo.SchemaItem;
|
||||
import com.tencent.supersonic.headless.api.request.ParseSqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.QuerySqlReq;
|
||||
import com.tencent.supersonic.headless.api.request.QueryStructReq;
|
||||
import com.tencent.supersonic.headless.api.request.SqlExecuteReq;
|
||||
import com.tencent.supersonic.headless.api.response.DatabaseResp;
|
||||
import com.tencent.supersonic.headless.api.response.DimensionResp;
|
||||
import com.tencent.supersonic.headless.api.response.MetricResp;
|
||||
@@ -29,7 +27,6 @@ import com.tencent.supersonic.headless.core.pojo.QueryStatement;
|
||||
import com.tencent.supersonic.headless.core.utils.SqlGenerateUtils;
|
||||
import com.tencent.supersonic.headless.server.pojo.MetaFilter;
|
||||
import com.tencent.supersonic.headless.server.service.Catalog;
|
||||
import com.tencent.supersonic.headless.server.service.SemantciQueryEngine;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
@@ -55,8 +52,6 @@ public class QueryReqConverter {
|
||||
@Value("${query.sql.limitWrapper:true}")
|
||||
private Boolean limitWrapper;
|
||||
|
||||
@Autowired
|
||||
private SemantciQueryEngine semantciQueryEngine;
|
||||
@Autowired
|
||||
private QueryStructUtils queryStructUtils;
|
||||
|
||||
@@ -138,9 +133,8 @@ public class QueryReqConverter {
|
||||
queryStatement.setIsS2SQL(true);
|
||||
queryStatement.setMinMaxTime(queryStructUtils.getBeginEndTime(queryStructReq));
|
||||
queryStatement.setModelIds(querySQLReq.getModelIds());
|
||||
queryStatement = semantciQueryEngine.plan(queryStatement);
|
||||
queryStatement.setSql(limitWrapper ? String.format(SqlExecuteReq.LIMIT_WRAPPER, queryStatement.getSql())
|
||||
: queryStatement.getSql());
|
||||
queryStatement.setEnableLimitWrapper(limitWrapper);
|
||||
|
||||
return queryStatement;
|
||||
}
|
||||
|
||||
@@ -260,10 +254,10 @@ public class QueryReqConverter {
|
||||
// metricTable sql use measures replace metric
|
||||
sql = SqlParserReplaceHelper.replaceSqlByExpression(sql, replaces);
|
||||
metricTable.setAggOption(AggOption.NATIVE);
|
||||
}
|
||||
// metricTable use measures replace metric
|
||||
if (!CollectionUtils.isEmpty(measures)) {
|
||||
metricTable.setMetrics(measures);
|
||||
// metricTable use measures replace metric
|
||||
if (!CollectionUtils.isEmpty(measures)) {
|
||||
metricTable.setMetrics(measures);
|
||||
}
|
||||
}
|
||||
}
|
||||
parseSqlReq.setSql(sql);
|
||||
@@ -313,7 +307,7 @@ public class QueryReqConverter {
|
||||
}
|
||||
}
|
||||
measures.addAll(deriveMetric);
|
||||
dimensions.addAll(deriveDimension);
|
||||
deriveDimension.stream().filter(d -> !dimensions.contains(d)).forEach(d -> dimensions.add(d));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user