2 Commits

Author SHA1 Message Date
Jun Zhang
be59b051fc [fix][headless]Fix logic bug in s2sql parsing. (#1996)
Some checks are pending
supersonic CentOS CI / build (21) (push) Waiting to run
supersonic mac CI / build (21) (push) Waiting to run
supersonic ubuntu CI / build (21) (push) Waiting to run
supersonic windows CI / build (21) (push) Waiting to run
2025-01-04 14:58:48 +08:00
jerryjzhang
83cb6967e7 [improvement][headless]Setup thread pool for data event listeners. 2025-01-04 14:37:48 +08:00
10 changed files with 71 additions and 49 deletions

View File

@@ -11,6 +11,14 @@ import java.util.concurrent.TimeUnit;
@Component
public class ThreadPoolConfig {
@Bean("eventExecutor")
public ThreadPoolExecutor getTaskEventExecutor() {
return new ThreadPoolExecutor(4, 8, 60 * 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("supersonic-event-pool-").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Bean("commonExecutor")
public ThreadPoolExecutor getCommonExecutor() {
return new ThreadPoolExecutor(8, 16, 60 * 3, TimeUnit.SECONDS,

View File

@@ -1,5 +1,7 @@
package com.tencent.supersonic.common.jsqlparser;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.tencent.supersonic.common.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
@@ -292,19 +294,37 @@ public class SqlSelectHelper {
}
List<PlainSelect> plainSelectList = new ArrayList<>();
plainSelectList.add(plainSelect);
Set<String> result = getSelectFields(plainSelectList);
Set<String> selectFields = getSelectFields(plainSelectList);
Set<String> aliases = getAliasFields(plainSelect);
getGroupByFields(plainSelect, result);
Set<String> groupByFields = Sets.newHashSet();
getGroupByFields(plainSelect, groupByFields);
groupByFields.removeAll(aliases);
getOrderByFields(plainSelect, result);
Set<String> orderByFields = Sets.newHashSet();
getOrderByFields(plainSelect, orderByFields);
orderByFields.removeAll(aliases);
getWhereFields(plainSelectList, result);
Set<String> whereFields = Sets.newHashSet();
getWhereFields(plainSelectList, whereFields);
whereFields.removeAll(aliases);
getHavingFields(plainSelect, result);
Set<String> havingFields = Sets.newHashSet();
getHavingFields(plainSelect, havingFields);
havingFields.removeAll(aliases);
getLateralViewsFields(plainSelect, result);
Set<String> lateralFields = Sets.newHashSet();
getLateralViewsFields(plainSelect, lateralFields);
lateralFields.removeAll(aliases);
return new ArrayList<>(result);
List<String> results = Lists.newArrayList();
results.addAll(selectFields);
results.addAll(groupByFields);
results.addAll(orderByFields);
results.addAll(whereFields);
results.addAll(havingFields);
results.addAll(lateralFields);
return new ArrayList<>(results);
}
private static void getHavingFields(PlainSelect plainSelect, Set<String> result) {

View File

@@ -31,8 +31,8 @@ public class AllFieldMapper extends BaseMapper {
List<SchemaElementMatch> allMatches = Lists.newArrayList();
for (SchemaElement schemaElement : schemaElements) {
allMatches.add(SchemaElementMatch.builder().word(schemaElement.getName())
.element(schemaElement).detectWord(schemaElement.getName())
.similarity(0.1).build());
.element(schemaElement).detectWord(schemaElement.getName()).similarity(0.1)
.build());
}
chatQueryContext.getMapInfo().setMatchedElements(entry.getKey(), allMatches);
}

View File

@@ -1,5 +1,6 @@
package com.tencent.supersonic.headless.core.translator.parser;
import com.google.common.collect.Lists;
import com.tencent.supersonic.common.jsqlparser.SqlAsHelper;
import com.tencent.supersonic.common.jsqlparser.SqlReplaceHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectFunctionHelper;
@@ -57,15 +58,22 @@ public class SqlQueryParser implements QueryParser {
}
// build ontologyQuery
List<String> allFields = SqlSelectHelper.getAllSelectFields(sqlQuery.getSql());
List<MetricSchemaResp> metricSchemas = getMetrics(semanticSchemaResp, allFields);
List<String> queryFields = SqlSelectHelper.getAllSelectFields(sqlQuery.getSql());
List<MetricSchemaResp> metricSchemas = getMetrics(semanticSchemaResp, queryFields);
List<String> metrics =
metricSchemas.stream().map(SchemaItem::getBizName).collect(Collectors.toList());
Set<String> dimensions = getDimensions(semanticSchemaResp, allFields);
List<DimSchemaResp> dimensionSchemas = getDimensions(semanticSchemaResp, queryFields);
List<String> dimensions =
dimensionSchemas.stream().map(SchemaItem::getBizName).collect(Collectors.toList());
// check if there are fields not matched with any metric or dimension
if (allFields.size() > metricSchemas.size() + dimensions.size()) {
queryStatement
.setErrMsg("There are querying columns in the SQL not matched with any semantic field.");
if (queryFields.size() > metricSchemas.size() + dimensions.size()) {
List<String> semanticFields = Lists.newArrayList();
metricSchemas.forEach(m -> semanticFields.add(m.getBizName()));
dimensionSchemas.forEach(d -> semanticFields.add(d.getBizName()));
String errMsg =
String.format("Querying columns[%s] not matched with semantic fields[%s].",
queryFields, semanticFields);
queryStatement.setErrMsg(errMsg);
queryStatement.setStatus(1);
return;
}
@@ -125,15 +133,15 @@ public class SqlQueryParser implements QueryParser {
return AggOption.DEFAULT;
}
private Set<String> getDimensions(SemanticSchemaResp semanticSchemaResp,
private List<DimSchemaResp> getDimensions(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) {
Map<String, String> dimensionLowerToNameMap = semanticSchemaResp.getDimensions().stream()
.collect(Collectors.toMap(entry -> entry.getBizName().toLowerCase(),
SchemaItem::getBizName, (k1, k2) -> k1));
Map<String, DimSchemaResp> dimensionLowerToNameMap =
semanticSchemaResp.getDimensions().stream().collect(Collectors
.toMap(entry -> entry.getBizName().toLowerCase(), entry -> entry));
return allFields.stream()
.filter(entry -> dimensionLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> dimensionLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toSet());
.collect(Collectors.toList());
}
private List<MetricSchemaResp> getMetrics(SemanticSchemaResp semanticSchemaResp,

View File

@@ -30,7 +30,6 @@ import com.tencent.supersonic.headless.server.service.DataSetService;
import com.tencent.supersonic.headless.server.service.DimensionService;
import com.tencent.supersonic.headless.server.service.MetricService;
import com.tencent.supersonic.headless.server.service.SchemaService;
import com.tencent.supersonic.headless.server.utils.MetricDrillDownChecker;
import com.tencent.supersonic.headless.server.utils.QueryUtils;
import com.tencent.supersonic.headless.server.utils.StatUtils;
import lombok.SneakyThrows;
@@ -53,7 +52,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
private final DataSetService dataSetService;
private final SchemaService schemaService;
private final SemanticTranslator semanticTranslator;
private final MetricDrillDownChecker metricDrillDownChecker;
private final KnowledgeBaseService knowledgeBaseService;
private final MetricService metricService;
private final DimensionService dimensionService;
@@ -63,7 +61,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
public S2SemanticLayerService(StatUtils statUtils, QueryUtils queryUtils,
SemanticSchemaManager semanticSchemaManager, DataSetService dataSetService,
SchemaService schemaService, SemanticTranslator semanticTranslator,
MetricDrillDownChecker metricDrillDownChecker,
KnowledgeBaseService knowledgeBaseService, MetricService metricService,
DimensionService dimensionService) {
this.statUtils = statUtils;
@@ -72,7 +69,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
this.dataSetService = dataSetService;
this.schemaService = schemaService;
this.semanticTranslator = semanticTranslator;
this.metricDrillDownChecker = metricDrillDownChecker;
this.knowledgeBaseService = knowledgeBaseService;
this.metricService = metricService;
this.dimensionService = dimensionService;
@@ -119,10 +115,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
QueryStatement queryStatement = buildQueryStatement(queryReq, user);
semanticTranslator.translate(queryStatement);
// Check whether the dimensions of the metric drill-down are correct temporarily,
// add the abstraction of a validator later.
metricDrillDownChecker.checkQuery(queryStatement);
// 4.execute query
SemanticQueryResp queryResp = null;
for (QueryExecutor queryExecutor : queryExecutors) {

View File

@@ -10,7 +10,7 @@ import dev.langchain4j.store.embedding.TextSegmentConvert;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@@ -19,7 +19,7 @@ import java.util.List;
@Component
@Slf4j
public class MetaEmbeddingListener implements ApplicationListener<DataEvent> {
public class MetaEmbeddingListener {
@Autowired
private EmbeddingConfig embeddingConfig;
@@ -30,8 +30,8 @@ public class MetaEmbeddingListener implements ApplicationListener<DataEvent> {
@Value("${s2.embedding.operation.sleep.time:3000}")
private Integer embeddingOperationSleepTime;
@Async
@Override
@Async("eventExecutor")
@EventListener
public void onApplicationEvent(DataEvent event) {
List<DataItem> dataItems = event.getDataItems();
if (CollectionUtils.isEmpty(dataItems)) {

View File

@@ -7,17 +7,17 @@ import com.tencent.supersonic.common.pojo.enums.EventType;
import com.tencent.supersonic.headless.chat.knowledge.DictWord;
import com.tencent.supersonic.headless.chat.knowledge.helper.HanlpHelper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
@Component
@Slf4j
public class SchemaDictUpdateListener implements ApplicationListener<DataEvent> {
public class SchemaDictUpdateListener {
@Async
@Override
@Async("eventExecutor")
@EventListener
public void onApplicationEvent(DataEvent dataEvent) {
if (CollectionUtils.isEmpty(dataEvent.getDataItems())) {
return;

View File

@@ -116,7 +116,7 @@ public class ModelServiceImpl implements ModelService {
@Override
@Transactional
public ModelResp createModel(ModelReq modelReq, User user) throws Exception {
checkParams(modelReq);
// checkParams(modelReq);
ModelDO modelDO = ModelConverter.convert(modelReq, user);
modelRepository.createModel(modelDO);
batchCreateDimension(modelDO, user);
@@ -140,7 +140,7 @@ public class ModelServiceImpl implements ModelService {
@Override
@Transactional
public ModelResp updateModel(ModelReq modelReq, User user) throws Exception {
checkParams(modelReq);
// checkParams(modelReq);
checkRelations(modelReq);
ModelDO modelDO = modelRepository.getModelById(modelReq.getId());
ModelConverter.convert(modelDO, modelReq, user);

View File

@@ -15,15 +15,12 @@ public class CoreComponentFactory extends ComponentFactory {
private static List<SemanticModeller> semanticModellers = new ArrayList<>();
public static List<SemanticModeller> getSemanticModellers() {
if (semanticModellers.isEmpty()) {
initSemanticModellers();
}
return semanticModellers;
}
private static void initSemanticModellers() {
static {
init(SemanticModeller.class, semanticModellers);
}
public static List<SemanticModeller> getSemanticModellers() {
return semanticModellers;
}
}

View File

@@ -33,9 +33,6 @@ public class MetricCheckUtils {
throw new InvalidArgumentException("指标定义参数不可为空");
}
expr = typeParams.getExpr();
if (CollectionUtils.isEmpty(typeParams.getMeasures())) {
throw new InvalidArgumentException("定义指标的度量列表参数不可为空");
}
if (hasAggregateFunction(expr)) {
throw new InvalidArgumentException("基于度量来创建指标,表达式中不可再包含聚合函数");
}