mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-13 21:17:08 +00:00
[improvement][chat]Move processor related logic from headless to chat.
This commit is contained in:
@@ -1,20 +1,34 @@
|
|||||||
package com.tencent.supersonic.chat.server.processor.parse;
|
package com.tencent.supersonic.chat.server.processor.parse;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.tencent.supersonic.chat.server.plugin.PluginQueryManager;
|
import com.tencent.supersonic.chat.server.plugin.PluginQueryManager;
|
||||||
import com.tencent.supersonic.chat.server.pojo.ParseContext;
|
import com.tencent.supersonic.chat.server.pojo.ParseContext;
|
||||||
|
import com.tencent.supersonic.common.jsqlparser.FieldExpression;
|
||||||
|
import com.tencent.supersonic.common.jsqlparser.SqlSelectFunctionHelper;
|
||||||
|
import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
|
||||||
|
import com.tencent.supersonic.common.pojo.DateConf;
|
||||||
|
import com.tencent.supersonic.common.pojo.enums.FilterOperatorEnum;
|
||||||
|
import com.tencent.supersonic.common.pojo.enums.QueryType;
|
||||||
|
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
|
||||||
|
import com.tencent.supersonic.common.util.ContextUtils;
|
||||||
|
import com.tencent.supersonic.headless.api.pojo.DataSetSchema;
|
||||||
import com.tencent.supersonic.headless.api.pojo.SchemaElement;
|
import com.tencent.supersonic.headless.api.pojo.SchemaElement;
|
||||||
import com.tencent.supersonic.headless.api.pojo.SemanticParseInfo;
|
import com.tencent.supersonic.headless.api.pojo.SemanticParseInfo;
|
||||||
|
import com.tencent.supersonic.headless.api.pojo.SqlInfo;
|
||||||
import com.tencent.supersonic.headless.api.pojo.request.QueryFilter;
|
import com.tencent.supersonic.headless.api.pojo.request.QueryFilter;
|
||||||
|
import com.tencent.supersonic.headless.server.facade.service.SemanticLayerService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.CollectionUtils;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.*;
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ParseInfoFormatProcessor formats parse info to make it more readable to the users.
|
* ParseInfoFormatProcessor formats parse info to make it more readable to the users.
|
||||||
**/
|
**/
|
||||||
|
@Slf4j
|
||||||
public class ParseInfoFormatProcessor implements ParseResultProcessor {
|
public class ParseInfoFormatProcessor implements ParseResultProcessor {
|
||||||
@Override
|
@Override
|
||||||
public void process(ParseContext parseContext) {
|
public void process(ParseContext parseContext) {
|
||||||
@@ -24,11 +38,12 @@ public class ParseInfoFormatProcessor implements ParseResultProcessor {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
formatNL2SQLParseInfo(p);
|
buildParseInfoFromSQL(p);
|
||||||
|
buildTextInfo(p);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void formatNL2SQLParseInfo(SemanticParseInfo parseInfo) {
|
private void buildTextInfo(SemanticParseInfo parseInfo) {
|
||||||
StringBuilder textBuilder = new StringBuilder();
|
StringBuilder textBuilder = new StringBuilder();
|
||||||
textBuilder.append("**数据集:** ").append(parseInfo.getDataSet().getName()).append(" ");
|
textBuilder.append("**数据集:** ").append(parseInfo.getDataSet().getName()).append(" ");
|
||||||
List<String> metricNames = parseInfo.getMetrics().stream().map(SchemaElement::getName)
|
List<String> metricNames = parseInfo.getMetrics().stream().map(SchemaElement::getName)
|
||||||
@@ -60,4 +75,198 @@ public class ParseInfoFormatProcessor implements ParseResultProcessor {
|
|||||||
}
|
}
|
||||||
parseInfo.setTextInfo(textBuilder.toString());
|
parseInfo.setTextInfo(textBuilder.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void buildParseInfoFromSQL(SemanticParseInfo parseInfo) {
|
||||||
|
SqlInfo sqlInfo = parseInfo.getSqlInfo();
|
||||||
|
String s2SQL = sqlInfo.getCorrectedS2SQL();
|
||||||
|
if (StringUtils.isBlank(s2SQL)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
parseQueryType(parseInfo);
|
||||||
|
List<FieldExpression> expressions = SqlSelectHelper.getFilterExpression(s2SQL);
|
||||||
|
Long dataSetId = parseInfo.getDataSetId();
|
||||||
|
SemanticLayerService semanticLayerService =
|
||||||
|
ContextUtils.getBean(SemanticLayerService.class);
|
||||||
|
DataSetSchema dsSchema = semanticLayerService.getDataSetSchema(dataSetId);
|
||||||
|
|
||||||
|
// extract date filter from S2SQL
|
||||||
|
try {
|
||||||
|
if (parseInfo.getDateInfo() == null && !CollectionUtils.isEmpty(expressions)) {
|
||||||
|
parseInfo.setDateInfo(extractDateFilter(expressions, dsSchema));
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("failed to extract date range:", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract dimension filters from S2SQL
|
||||||
|
try {
|
||||||
|
List<QueryFilter> queryFilters = extractDimensionFilter(dsSchema, expressions);
|
||||||
|
parseInfo.getDimensionFilters().addAll(queryFilters);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("failed to extract dimension filters:", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// extract metrics from S2SQL
|
||||||
|
List<String> allFields =
|
||||||
|
filterDateField(dsSchema, SqlSelectHelper.getAllSelectFields(s2SQL));
|
||||||
|
Set<SchemaElement> metrics = matchSchemaElements(allFields, dsSchema.getMetrics());
|
||||||
|
parseInfo.setMetrics(metrics);
|
||||||
|
|
||||||
|
// extract dimensions from S2SQL
|
||||||
|
if (QueryType.AGGREGATE.equals(parseInfo.getQueryType())) {
|
||||||
|
List<String> groupByFields = SqlSelectHelper.getGroupByFields(s2SQL);
|
||||||
|
List<String> groupByDimensions = filterDateField(dsSchema, groupByFields);
|
||||||
|
parseInfo.setDimensions(
|
||||||
|
matchSchemaElements(groupByDimensions, dsSchema.getDimensions()));
|
||||||
|
} else if (QueryType.DETAIL.equals(parseInfo.getQueryType())) {
|
||||||
|
List<String> selectFields = SqlSelectHelper.getSelectFields(s2SQL);
|
||||||
|
List<String> selectDimensions = filterDateField(dsSchema, selectFields);
|
||||||
|
parseInfo
|
||||||
|
.setDimensions(matchSchemaElements(selectDimensions, dsSchema.getDimensions()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Set<SchemaElement> matchSchemaElements(List<String> allFields,
|
||||||
|
Set<SchemaElement> elements) {
|
||||||
|
return elements.stream().filter(schemaElement -> {
|
||||||
|
if (CollectionUtils.isEmpty(schemaElement.getAlias())) {
|
||||||
|
return allFields.contains(schemaElement.getName());
|
||||||
|
}
|
||||||
|
Set<String> allFieldsSet = new HashSet<>(allFields);
|
||||||
|
Set<String> aliasSet = new HashSet<>(schemaElement.getAlias());
|
||||||
|
List<String> intersection =
|
||||||
|
allFieldsSet.stream().filter(aliasSet::contains).collect(Collectors.toList());
|
||||||
|
return allFields.contains(schemaElement.getName())
|
||||||
|
|| !CollectionUtils.isEmpty(intersection);
|
||||||
|
}).collect(Collectors.toSet());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> filterDateField(DataSetSchema dataSetSchema, List<String> allFields) {
|
||||||
|
return allFields.stream().filter(entry -> !isPartitionDimension(dataSetSchema, entry))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<QueryFilter> extractDimensionFilter(DataSetSchema dsSchema,
|
||||||
|
List<FieldExpression> fieldExpressions) {
|
||||||
|
|
||||||
|
Map<String, SchemaElement> fieldNameToElement = getNameToElement(dsSchema);
|
||||||
|
List<QueryFilter> result = Lists.newArrayList();
|
||||||
|
for (FieldExpression expression : fieldExpressions) {
|
||||||
|
QueryFilter dimensionFilter = new QueryFilter();
|
||||||
|
dimensionFilter.setValue(expression.getFieldValue());
|
||||||
|
SchemaElement schemaElement = fieldNameToElement.get(expression.getFieldName());
|
||||||
|
if (Objects.isNull(schemaElement)
|
||||||
|
|| isPartitionDimension(dsSchema, schemaElement.getName())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
dimensionFilter.setName(schemaElement.getName());
|
||||||
|
dimensionFilter.setBizName(schemaElement.getBizName());
|
||||||
|
dimensionFilter.setElementID(schemaElement.getId());
|
||||||
|
|
||||||
|
FilterOperatorEnum operatorEnum =
|
||||||
|
FilterOperatorEnum.getSqlOperator(expression.getOperator());
|
||||||
|
dimensionFilter.setOperator(operatorEnum);
|
||||||
|
dimensionFilter.setFunction(expression.getFunction());
|
||||||
|
result.add(dimensionFilter);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private DateConf extractDateFilter(List<FieldExpression> fieldExpressions,
|
||||||
|
DataSetSchema dataSetSchema) {
|
||||||
|
List<FieldExpression> dateExpressions = fieldExpressions.stream().filter(
|
||||||
|
expression -> isPartitionDimension(dataSetSchema, expression.getFieldName()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
if (CollectionUtils.isEmpty(dateExpressions)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
DateConf dateInfo = new DateConf();
|
||||||
|
dateInfo.setDateMode(DateConf.DateMode.BETWEEN);
|
||||||
|
FieldExpression firstExpression = dateExpressions.get(0);
|
||||||
|
|
||||||
|
FilterOperatorEnum firstOperator =
|
||||||
|
FilterOperatorEnum.getSqlOperator(firstExpression.getOperator());
|
||||||
|
if (FilterOperatorEnum.EQUALS.equals(firstOperator)
|
||||||
|
&& Objects.nonNull(firstExpression.getFieldValue())) {
|
||||||
|
dateInfo.setStartDate(firstExpression.getFieldValue().toString());
|
||||||
|
dateInfo.setEndDate(firstExpression.getFieldValue().toString());
|
||||||
|
dateInfo.setDateMode(DateConf.DateMode.BETWEEN);
|
||||||
|
return dateInfo;
|
||||||
|
}
|
||||||
|
if (containOperators(firstExpression, firstOperator, FilterOperatorEnum.GREATER_THAN,
|
||||||
|
FilterOperatorEnum.GREATER_THAN_EQUALS)) {
|
||||||
|
dateInfo.setStartDate(firstExpression.getFieldValue().toString());
|
||||||
|
if (hasSecondDate(dateExpressions)) {
|
||||||
|
dateInfo.setEndDate(dateExpressions.get(1).getFieldValue().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (containOperators(firstExpression, firstOperator, FilterOperatorEnum.MINOR_THAN,
|
||||||
|
FilterOperatorEnum.MINOR_THAN_EQUALS)) {
|
||||||
|
dateInfo.setEndDate(firstExpression.getFieldValue().toString());
|
||||||
|
if (hasSecondDate(dateExpressions)) {
|
||||||
|
dateInfo.setStartDate(dateExpressions.get(1).getFieldValue().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return dateInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean isPartitionDimension(DataSetSchema dataSetSchema, String sqlFieldName) {
|
||||||
|
if (TimeDimensionEnum.containsTimeDimension(sqlFieldName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (Objects.isNull(dataSetSchema) || Objects.isNull(dataSetSchema.getPartitionDimension())
|
||||||
|
|| Objects.isNull(dataSetSchema.getPartitionDimension().getName())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return sqlFieldName.equalsIgnoreCase(dataSetSchema.getPartitionDimension().getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean containOperators(FieldExpression expression, FilterOperatorEnum firstOperator,
|
||||||
|
FilterOperatorEnum... operatorEnums) {
|
||||||
|
return (Arrays.asList(operatorEnums).contains(firstOperator)
|
||||||
|
&& Objects.nonNull(expression.getFieldValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean hasSecondDate(List<FieldExpression> dateExpressions) {
|
||||||
|
return dateExpressions.size() > 1
|
||||||
|
&& Objects.nonNull(dateExpressions.get(1).getFieldValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, SchemaElement> getNameToElement(DataSetSchema dsSchema) {
|
||||||
|
Set<SchemaElement> dimensions = dsSchema.getDimensions();
|
||||||
|
Set<SchemaElement> metrics = dsSchema.getMetrics();
|
||||||
|
|
||||||
|
List<SchemaElement> allElements = Lists.newArrayList();
|
||||||
|
allElements.addAll(dimensions);
|
||||||
|
allElements.addAll(metrics);
|
||||||
|
// support alias
|
||||||
|
return allElements.stream().flatMap(schemaElement -> {
|
||||||
|
Set<Pair<String, SchemaElement>> result = new HashSet<>();
|
||||||
|
result.add(Pair.of(schemaElement.getName(), schemaElement));
|
||||||
|
List<String> aliasList = schemaElement.getAlias();
|
||||||
|
if (!org.springframework.util.CollectionUtils.isEmpty(aliasList)) {
|
||||||
|
for (String alias : aliasList) {
|
||||||
|
result.add(Pair.of(alias, schemaElement));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result.stream();
|
||||||
|
}).collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (value1, value2) -> value2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void parseQueryType(SemanticParseInfo parseInfo) {
|
||||||
|
parseInfo.setQueryType(QueryType.DETAIL);
|
||||||
|
SqlInfo sqlInfo = parseInfo.getSqlInfo();
|
||||||
|
if (Objects.isNull(sqlInfo) || StringUtils.isBlank(sqlInfo.getCorrectedS2SQL())) {
|
||||||
|
parseInfo.setQueryType(QueryType.DETAIL);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. AGG queryType
|
||||||
|
if (Objects.nonNull(sqlInfo) && StringUtils.isNotBlank(sqlInfo.getParsedS2SQL())
|
||||||
|
&& SqlSelectFunctionHelper.hasAggregateFunction(sqlInfo.getCorrectedS2SQL())) {
|
||||||
|
parseInfo.setQueryType(QueryType.AGGREGATE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,19 +31,18 @@ public class QueryTypeParser implements SemanticParser {
|
|||||||
chatQueryContext.getSemanticSchema().getDataSetSchemaMap().get(dataSetId);
|
chatQueryContext.getSemanticSchema().getDataSetSchemaMap().get(dataSetId);
|
||||||
semanticQuery.initS2Sql(dataSetSchema, user);
|
semanticQuery.initS2Sql(dataSetSchema, user);
|
||||||
// 2.set queryType
|
// 2.set queryType
|
||||||
QueryType queryType = getQueryType(chatQueryContext, semanticQuery);
|
QueryType queryType = getQueryType(semanticQuery);
|
||||||
semanticQuery.getParseInfo().setQueryType(queryType);
|
semanticQuery.getParseInfo().setQueryType(queryType);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private QueryType getQueryType(ChatQueryContext chatQueryContext, SemanticQuery semanticQuery) {
|
private QueryType getQueryType(SemanticQuery semanticQuery) {
|
||||||
SemanticParseInfo parseInfo = semanticQuery.getParseInfo();
|
SemanticParseInfo parseInfo = semanticQuery.getParseInfo();
|
||||||
SqlInfo sqlInfo = parseInfo.getSqlInfo();
|
SqlInfo sqlInfo = parseInfo.getSqlInfo();
|
||||||
if (Objects.isNull(sqlInfo) || StringUtils.isBlank(sqlInfo.getParsedS2SQL())) {
|
if (Objects.isNull(sqlInfo) || StringUtils.isBlank(sqlInfo.getParsedS2SQL())) {
|
||||||
return QueryType.DETAIL;
|
return QueryType.DETAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. AGG queryType
|
|
||||||
if (SqlSelectFunctionHelper.hasAggregateFunction(sqlInfo.getParsedS2SQL())) {
|
if (SqlSelectFunctionHelper.hasAggregateFunction(sqlInfo.getParsedS2SQL())) {
|
||||||
return QueryType.AGGREGATE;
|
return QueryType.AGGREGATE;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,216 +0,0 @@
|
|||||||
package com.tencent.supersonic.headless.server.processor;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.tencent.supersonic.common.jsqlparser.FieldExpression;
|
|
||||||
import com.tencent.supersonic.common.jsqlparser.SqlSelectHelper;
|
|
||||||
import com.tencent.supersonic.common.pojo.DateConf;
|
|
||||||
import com.tencent.supersonic.common.pojo.enums.FilterOperatorEnum;
|
|
||||||
import com.tencent.supersonic.common.pojo.enums.QueryType;
|
|
||||||
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
|
|
||||||
import com.tencent.supersonic.common.util.ContextUtils;
|
|
||||||
import com.tencent.supersonic.headless.api.pojo.DataSetSchema;
|
|
||||||
import com.tencent.supersonic.headless.api.pojo.SchemaElement;
|
|
||||||
import com.tencent.supersonic.headless.api.pojo.SemanticParseInfo;
|
|
||||||
import com.tencent.supersonic.headless.api.pojo.SqlInfo;
|
|
||||||
import com.tencent.supersonic.headless.api.pojo.request.QueryFilter;
|
|
||||||
import com.tencent.supersonic.headless.api.pojo.response.ParseResp;
|
|
||||||
import com.tencent.supersonic.headless.chat.ChatQueryContext;
|
|
||||||
import com.tencent.supersonic.headless.server.facade.service.SemanticLayerService;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
|
||||||
import org.springframework.util.CollectionUtils;
|
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
/** ParseInfoProcessor extracts structured info from S2SQL so that users get to know the details. */
|
|
||||||
@Slf4j
|
|
||||||
public class ParseInfoProcessor implements ResultProcessor {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void process(ParseResp parseResp, ChatQueryContext chatQueryContext) {
|
|
||||||
parseResp.getSelectedParses().forEach(this::updateParseInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void updateParseInfo(SemanticParseInfo parseInfo) {
|
|
||||||
SqlInfo sqlInfo = parseInfo.getSqlInfo();
|
|
||||||
String s2SQL = sqlInfo.getCorrectedS2SQL();
|
|
||||||
if (StringUtils.isBlank(s2SQL)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<FieldExpression> expressions = SqlSelectHelper.getFilterExpression(s2SQL);
|
|
||||||
Long dataSetId = parseInfo.getDataSetId();
|
|
||||||
SemanticLayerService semanticLayerService =
|
|
||||||
ContextUtils.getBean(SemanticLayerService.class);
|
|
||||||
DataSetSchema dsSchema = semanticLayerService.getDataSetSchema(dataSetId);
|
|
||||||
|
|
||||||
// extract date filter from S2SQL
|
|
||||||
try {
|
|
||||||
if (parseInfo.getDateInfo() == null && !CollectionUtils.isEmpty(expressions)) {
|
|
||||||
parseInfo.setDateInfo(extractDateFilter(expressions, dsSchema));
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("failed to extract date range:", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// extract dimension filters from S2SQL
|
|
||||||
try {
|
|
||||||
List<QueryFilter> queryFilters = extractDimensionFilter(dsSchema, expressions);
|
|
||||||
parseInfo.getDimensionFilters().addAll(queryFilters);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("failed to extract dimension filters:", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// extract metrics from S2SQL
|
|
||||||
List<String> allFields =
|
|
||||||
filterDateField(dsSchema, SqlSelectHelper.getAllSelectFields(s2SQL));
|
|
||||||
Set<SchemaElement> metrics = matchSchemaElements(allFields, dsSchema.getMetrics());
|
|
||||||
parseInfo.setMetrics(metrics);
|
|
||||||
|
|
||||||
// extract dimensions from S2SQL
|
|
||||||
if (QueryType.AGGREGATE.equals(parseInfo.getQueryType())) {
|
|
||||||
List<String> groupByFields = SqlSelectHelper.getGroupByFields(s2SQL);
|
|
||||||
List<String> groupByDimensions = filterDateField(dsSchema, groupByFields);
|
|
||||||
parseInfo.setDimensions(
|
|
||||||
matchSchemaElements(groupByDimensions, dsSchema.getDimensions()));
|
|
||||||
} else if (QueryType.DETAIL.equals(parseInfo.getQueryType())) {
|
|
||||||
List<String> selectFields = SqlSelectHelper.getSelectFields(s2SQL);
|
|
||||||
List<String> selectDimensions = filterDateField(dsSchema, selectFields);
|
|
||||||
parseInfo
|
|
||||||
.setDimensions(matchSchemaElements(selectDimensions, dsSchema.getDimensions()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Set<SchemaElement> matchSchemaElements(List<String> allFields,
|
|
||||||
Set<SchemaElement> elements) {
|
|
||||||
return elements.stream().filter(schemaElement -> {
|
|
||||||
if (CollectionUtils.isEmpty(schemaElement.getAlias())) {
|
|
||||||
return allFields.contains(schemaElement.getName());
|
|
||||||
}
|
|
||||||
Set<String> allFieldsSet = new HashSet<>(allFields);
|
|
||||||
Set<String> aliasSet = new HashSet<>(schemaElement.getAlias());
|
|
||||||
List<String> intersection =
|
|
||||||
allFieldsSet.stream().filter(aliasSet::contains).collect(Collectors.toList());
|
|
||||||
return allFields.contains(schemaElement.getName())
|
|
||||||
|| !CollectionUtils.isEmpty(intersection);
|
|
||||||
}).collect(Collectors.toSet());
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<String> filterDateField(DataSetSchema dataSetSchema, List<String> allFields) {
|
|
||||||
return allFields.stream().filter(entry -> !isPartitionDimension(dataSetSchema, entry))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<QueryFilter> extractDimensionFilter(DataSetSchema dsSchema,
|
|
||||||
List<FieldExpression> fieldExpressions) {
|
|
||||||
|
|
||||||
Map<String, SchemaElement> fieldNameToElement = getNameToElement(dsSchema);
|
|
||||||
List<QueryFilter> result = Lists.newArrayList();
|
|
||||||
for (FieldExpression expression : fieldExpressions) {
|
|
||||||
QueryFilter dimensionFilter = new QueryFilter();
|
|
||||||
dimensionFilter.setValue(expression.getFieldValue());
|
|
||||||
SchemaElement schemaElement = fieldNameToElement.get(expression.getFieldName());
|
|
||||||
if (Objects.isNull(schemaElement)
|
|
||||||
|| isPartitionDimension(dsSchema, schemaElement.getName())) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
dimensionFilter.setName(schemaElement.getName());
|
|
||||||
dimensionFilter.setBizName(schemaElement.getBizName());
|
|
||||||
dimensionFilter.setElementID(schemaElement.getId());
|
|
||||||
|
|
||||||
FilterOperatorEnum operatorEnum =
|
|
||||||
FilterOperatorEnum.getSqlOperator(expression.getOperator());
|
|
||||||
dimensionFilter.setOperator(operatorEnum);
|
|
||||||
dimensionFilter.setFunction(expression.getFunction());
|
|
||||||
result.add(dimensionFilter);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private DateConf extractDateFilter(List<FieldExpression> fieldExpressions,
|
|
||||||
DataSetSchema dataSetSchema) {
|
|
||||||
List<FieldExpression> dateExpressions = fieldExpressions.stream().filter(
|
|
||||||
expression -> isPartitionDimension(dataSetSchema, expression.getFieldName()))
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
if (CollectionUtils.isEmpty(dateExpressions)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
DateConf dateInfo = new DateConf();
|
|
||||||
dateInfo.setDateMode(DateConf.DateMode.BETWEEN);
|
|
||||||
FieldExpression firstExpression = dateExpressions.get(0);
|
|
||||||
|
|
||||||
FilterOperatorEnum firstOperator =
|
|
||||||
FilterOperatorEnum.getSqlOperator(firstExpression.getOperator());
|
|
||||||
if (FilterOperatorEnum.EQUALS.equals(firstOperator)
|
|
||||||
&& Objects.nonNull(firstExpression.getFieldValue())) {
|
|
||||||
dateInfo.setStartDate(firstExpression.getFieldValue().toString());
|
|
||||||
dateInfo.setEndDate(firstExpression.getFieldValue().toString());
|
|
||||||
dateInfo.setDateMode(DateConf.DateMode.BETWEEN);
|
|
||||||
return dateInfo;
|
|
||||||
}
|
|
||||||
if (containOperators(firstExpression, firstOperator, FilterOperatorEnum.GREATER_THAN,
|
|
||||||
FilterOperatorEnum.GREATER_THAN_EQUALS)) {
|
|
||||||
dateInfo.setStartDate(firstExpression.getFieldValue().toString());
|
|
||||||
if (hasSecondDate(dateExpressions)) {
|
|
||||||
dateInfo.setEndDate(dateExpressions.get(1).getFieldValue().toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (containOperators(firstExpression, firstOperator, FilterOperatorEnum.MINOR_THAN,
|
|
||||||
FilterOperatorEnum.MINOR_THAN_EQUALS)) {
|
|
||||||
dateInfo.setEndDate(firstExpression.getFieldValue().toString());
|
|
||||||
if (hasSecondDate(dateExpressions)) {
|
|
||||||
dateInfo.setStartDate(dateExpressions.get(1).getFieldValue().toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return dateInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isPartitionDimension(DataSetSchema dataSetSchema, String sqlFieldName) {
|
|
||||||
if (TimeDimensionEnum.containsTimeDimension(sqlFieldName)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (Objects.isNull(dataSetSchema) || Objects.isNull(dataSetSchema.getPartitionDimension())
|
|
||||||
|| Objects.isNull(dataSetSchema.getPartitionDimension().getName())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return sqlFieldName.equalsIgnoreCase(dataSetSchema.getPartitionDimension().getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean containOperators(FieldExpression expression, FilterOperatorEnum firstOperator,
|
|
||||||
FilterOperatorEnum... operatorEnums) {
|
|
||||||
return (Arrays.asList(operatorEnums).contains(firstOperator)
|
|
||||||
&& Objects.nonNull(expression.getFieldValue()));
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean hasSecondDate(List<FieldExpression> dateExpressions) {
|
|
||||||
return dateExpressions.size() > 1
|
|
||||||
&& Objects.nonNull(dateExpressions.get(1).getFieldValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Map<String, SchemaElement> getNameToElement(DataSetSchema dsSchema) {
|
|
||||||
Set<SchemaElement> dimensions = dsSchema.getDimensions();
|
|
||||||
Set<SchemaElement> metrics = dsSchema.getMetrics();
|
|
||||||
|
|
||||||
List<SchemaElement> allElements = Lists.newArrayList();
|
|
||||||
allElements.addAll(dimensions);
|
|
||||||
allElements.addAll(metrics);
|
|
||||||
// support alias
|
|
||||||
return allElements.stream().flatMap(schemaElement -> {
|
|
||||||
Set<Pair<String, SchemaElement>> result = new HashSet<>();
|
|
||||||
result.add(Pair.of(schemaElement.getName(), schemaElement));
|
|
||||||
List<String> aliasList = schemaElement.getAlias();
|
|
||||||
if (!org.springframework.util.CollectionUtils.isEmpty(aliasList)) {
|
|
||||||
for (String alias : aliasList) {
|
|
||||||
result.add(Pair.of(alias, schemaElement));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result.stream();
|
|
||||||
}).collect(Collectors.toMap(Pair::getLeft, Pair::getRight, (value1, value2) -> value2));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,10 +0,0 @@
|
|||||||
package com.tencent.supersonic.headless.server.processor;
|
|
||||||
|
|
||||||
import com.tencent.supersonic.headless.api.pojo.response.ParseResp;
|
|
||||||
import com.tencent.supersonic.headless.chat.ChatQueryContext;
|
|
||||||
|
|
||||||
/** A ParseResultProcessor wraps things up before returning results to users in parse stage. */
|
|
||||||
public interface ResultProcessor {
|
|
||||||
|
|
||||||
void process(ParseResp parseResp, ChatQueryContext chatQueryContext);
|
|
||||||
}
|
|
||||||
@@ -14,7 +14,6 @@ import com.tencent.supersonic.headless.chat.parser.SemanticParser;
|
|||||||
import com.tencent.supersonic.headless.chat.query.QueryManager;
|
import com.tencent.supersonic.headless.chat.query.QueryManager;
|
||||||
import com.tencent.supersonic.headless.chat.query.SemanticQuery;
|
import com.tencent.supersonic.headless.chat.query.SemanticQuery;
|
||||||
import com.tencent.supersonic.headless.server.facade.service.SemanticLayerService;
|
import com.tencent.supersonic.headless.server.facade.service.SemanticLayerService;
|
||||||
import com.tencent.supersonic.headless.server.processor.ResultProcessor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.collections.MapUtils;
|
import org.apache.commons.collections.MapUtils;
|
||||||
@@ -34,8 +33,6 @@ public class ChatWorkflowEngine {
|
|||||||
private final List<SemanticParser> semanticParsers = CoreComponentFactory.getSemanticParsers();
|
private final List<SemanticParser> semanticParsers = CoreComponentFactory.getSemanticParsers();
|
||||||
private final List<SemanticCorrector> semanticCorrectors =
|
private final List<SemanticCorrector> semanticCorrectors =
|
||||||
CoreComponentFactory.getSemanticCorrectors();
|
CoreComponentFactory.getSemanticCorrectors();
|
||||||
private final List<ResultProcessor> resultProcessors =
|
|
||||||
CoreComponentFactory.getResultProcessors();
|
|
||||||
|
|
||||||
public void start(ChatWorkflowState initialState, ChatQueryContext queryCtx,
|
public void start(ChatWorkflowState initialState, ChatQueryContext queryCtx,
|
||||||
ParseResp parseResult) {
|
ParseResp parseResult) {
|
||||||
@@ -74,11 +71,10 @@ public class ChatWorkflowEngine {
|
|||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
performTranslating(queryCtx, parseResult);
|
performTranslating(queryCtx, parseResult);
|
||||||
parseResult.getParseTimeCost().setSqlTime(System.currentTimeMillis() - start);
|
parseResult.getParseTimeCost().setSqlTime(System.currentTimeMillis() - start);
|
||||||
queryCtx.setChatWorkflowState(ChatWorkflowState.PROCESSING);
|
parseResult.setState(ParseResp.ParseState.COMPLETED);
|
||||||
|
queryCtx.setChatWorkflowState(ChatWorkflowState.FINISHED);
|
||||||
break;
|
break;
|
||||||
case PROCESSING:
|
|
||||||
default:
|
default:
|
||||||
performProcessing(queryCtx, parseResult);
|
|
||||||
if (parseResult.getState().equals(ParseResp.ParseState.PENDING)) {
|
if (parseResult.getState().equals(ParseResp.ParseState.PENDING)) {
|
||||||
parseResult.setState(ParseResp.ParseState.COMPLETED);
|
parseResult.setState(ParseResp.ParseState.COMPLETED);
|
||||||
}
|
}
|
||||||
@@ -117,10 +113,6 @@ public class ChatWorkflowEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void performProcessing(ChatQueryContext queryCtx, ParseResp parseResult) {
|
|
||||||
resultProcessors.forEach(processor -> processor.process(parseResult, queryCtx));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void performTranslating(ChatQueryContext queryCtx, ParseResp parseResult) {
|
private void performTranslating(ChatQueryContext queryCtx, ParseResp parseResult) {
|
||||||
List<SemanticParseInfo> semanticParseInfos = queryCtx.getCandidateQueries().stream()
|
List<SemanticParseInfo> semanticParseInfos = queryCtx.getCandidateQueries().stream()
|
||||||
.map(SemanticQuery::getParseInfo).collect(Collectors.toList());
|
.map(SemanticQuery::getParseInfo).collect(Collectors.toList());
|
||||||
|
|||||||
@@ -2,12 +2,7 @@ package com.tencent.supersonic.headless.server.utils;
|
|||||||
|
|
||||||
import com.tencent.supersonic.headless.chat.utils.ComponentFactory;
|
import com.tencent.supersonic.headless.chat.utils.ComponentFactory;
|
||||||
import com.tencent.supersonic.headless.server.modeller.SemanticModeller;
|
import com.tencent.supersonic.headless.server.modeller.SemanticModeller;
|
||||||
import com.tencent.supersonic.headless.server.processor.ResultProcessor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* QueryConverter QueryOptimizer QueryExecutor object factory
|
* QueryConverter QueryOptimizer QueryExecutor object factory
|
||||||
@@ -15,16 +10,8 @@ import java.util.List;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
public class CoreComponentFactory extends ComponentFactory {
|
public class CoreComponentFactory extends ComponentFactory {
|
||||||
|
|
||||||
private static List<ResultProcessor> resultProcessors = new ArrayList<>();
|
|
||||||
|
|
||||||
private static SemanticModeller semanticModeller;
|
private static SemanticModeller semanticModeller;
|
||||||
|
|
||||||
public static List<ResultProcessor> getResultProcessors() {
|
|
||||||
return CollectionUtils.isEmpty(resultProcessors)
|
|
||||||
? init(ResultProcessor.class, resultProcessors)
|
|
||||||
: resultProcessors;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static SemanticModeller getSemanticModeller() {
|
public static SemanticModeller getSemanticModeller() {
|
||||||
return semanticModeller == null ? init(SemanticModeller.class) : semanticModeller;
|
return semanticModeller == null ? init(SemanticModeller.class) : semanticModeller;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,8 +45,5 @@ com.tencent.supersonic.headless.core.cache.QueryCache=\
|
|||||||
|
|
||||||
### headless-server SPIs
|
### headless-server SPIs
|
||||||
|
|
||||||
com.tencent.supersonic.headless.server.processor.ResultProcessor=\
|
|
||||||
com.tencent.supersonic.headless.server.processor.ParseInfoProcessor
|
|
||||||
|
|
||||||
com.tencent.supersonic.headless.server.modeller.SemanticModeller=\
|
com.tencent.supersonic.headless.server.modeller.SemanticModeller=\
|
||||||
com.tencent.supersonic.headless.server.modeller.RuleSemanticModeller
|
com.tencent.supersonic.headless.server.modeller.RuleSemanticModeller
|
||||||
@@ -45,9 +45,6 @@ com.tencent.supersonic.headless.core.cache.QueryCache=\
|
|||||||
|
|
||||||
### headless-server SPIs
|
### headless-server SPIs
|
||||||
|
|
||||||
com.tencent.supersonic.headless.server.processor.ResultProcessor=\
|
|
||||||
com.tencent.supersonic.headless.server.processor.ParseInfoProcessor
|
|
||||||
|
|
||||||
com.tencent.supersonic.headless.server.modeller.SemanticModeller=\
|
com.tencent.supersonic.headless.server.modeller.SemanticModeller=\
|
||||||
com.tencent.supersonic.headless.server.modeller.RuleSemanticModeller
|
com.tencent.supersonic.headless.server.modeller.RuleSemanticModeller
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user