2 Commits

Author SHA1 Message Date
Jun Zhang
be59b051fc [fix][headless]Fix logic bug in s2sql parsing. (#1996)
Some checks are pending
supersonic CentOS CI / build (21) (push) Waiting to run
supersonic mac CI / build (21) (push) Waiting to run
supersonic ubuntu CI / build (21) (push) Waiting to run
supersonic windows CI / build (21) (push) Waiting to run
2025-01-04 14:58:48 +08:00
jerryjzhang
83cb6967e7 [improvement][headless]Setup thread pool for data event listeners. 2025-01-04 14:37:48 +08:00
10 changed files with 71 additions and 49 deletions

View File

@@ -11,6 +11,14 @@ import java.util.concurrent.TimeUnit;
@Component @Component
public class ThreadPoolConfig { public class ThreadPoolConfig {
@Bean("eventExecutor")
public ThreadPoolExecutor getTaskEventExecutor() {
return new ThreadPoolExecutor(4, 8, 60 * 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("supersonic-event-pool-").build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Bean("commonExecutor") @Bean("commonExecutor")
public ThreadPoolExecutor getCommonExecutor() { public ThreadPoolExecutor getCommonExecutor() {
return new ThreadPoolExecutor(8, 16, 60 * 3, TimeUnit.SECONDS, return new ThreadPoolExecutor(8, 16, 60 * 3, TimeUnit.SECONDS,

View File

@@ -1,5 +1,7 @@
package com.tencent.supersonic.common.jsqlparser; package com.tencent.supersonic.common.jsqlparser;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.tencent.supersonic.common.util.StringUtil; import com.tencent.supersonic.common.util.StringUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.JSQLParserException;
@@ -292,19 +294,37 @@ public class SqlSelectHelper {
} }
List<PlainSelect> plainSelectList = new ArrayList<>(); List<PlainSelect> plainSelectList = new ArrayList<>();
plainSelectList.add(plainSelect); plainSelectList.add(plainSelect);
Set<String> result = getSelectFields(plainSelectList); Set<String> selectFields = getSelectFields(plainSelectList);
Set<String> aliases = getAliasFields(plainSelect);
getGroupByFields(plainSelect, result); Set<String> groupByFields = Sets.newHashSet();
getGroupByFields(plainSelect, groupByFields);
groupByFields.removeAll(aliases);
getOrderByFields(plainSelect, result); Set<String> orderByFields = Sets.newHashSet();
getOrderByFields(plainSelect, orderByFields);
orderByFields.removeAll(aliases);
getWhereFields(plainSelectList, result); Set<String> whereFields = Sets.newHashSet();
getWhereFields(plainSelectList, whereFields);
whereFields.removeAll(aliases);
getHavingFields(plainSelect, result); Set<String> havingFields = Sets.newHashSet();
getHavingFields(plainSelect, havingFields);
havingFields.removeAll(aliases);
getLateralViewsFields(plainSelect, result); Set<String> lateralFields = Sets.newHashSet();
getLateralViewsFields(plainSelect, lateralFields);
lateralFields.removeAll(aliases);
return new ArrayList<>(result); List<String> results = Lists.newArrayList();
results.addAll(selectFields);
results.addAll(groupByFields);
results.addAll(orderByFields);
results.addAll(whereFields);
results.addAll(havingFields);
results.addAll(lateralFields);
return new ArrayList<>(results);
} }
private static void getHavingFields(PlainSelect plainSelect, Set<String> result) { private static void getHavingFields(PlainSelect plainSelect, Set<String> result) {

View File

@@ -31,8 +31,8 @@ public class AllFieldMapper extends BaseMapper {
List<SchemaElementMatch> allMatches = Lists.newArrayList(); List<SchemaElementMatch> allMatches = Lists.newArrayList();
for (SchemaElement schemaElement : schemaElements) { for (SchemaElement schemaElement : schemaElements) {
allMatches.add(SchemaElementMatch.builder().word(schemaElement.getName()) allMatches.add(SchemaElementMatch.builder().word(schemaElement.getName())
.element(schemaElement).detectWord(schemaElement.getName()) .element(schemaElement).detectWord(schemaElement.getName()).similarity(0.1)
.similarity(0.1).build()); .build());
} }
chatQueryContext.getMapInfo().setMatchedElements(entry.getKey(), allMatches); chatQueryContext.getMapInfo().setMatchedElements(entry.getKey(), allMatches);
} }

View File

@@ -1,5 +1,6 @@
package com.tencent.supersonic.headless.core.translator.parser; package com.tencent.supersonic.headless.core.translator.parser;
import com.google.common.collect.Lists;
import com.tencent.supersonic.common.jsqlparser.SqlAsHelper; import com.tencent.supersonic.common.jsqlparser.SqlAsHelper;
import com.tencent.supersonic.common.jsqlparser.SqlReplaceHelper; import com.tencent.supersonic.common.jsqlparser.SqlReplaceHelper;
import com.tencent.supersonic.common.jsqlparser.SqlSelectFunctionHelper; import com.tencent.supersonic.common.jsqlparser.SqlSelectFunctionHelper;
@@ -57,15 +58,22 @@ public class SqlQueryParser implements QueryParser {
} }
// build ontologyQuery // build ontologyQuery
List<String> allFields = SqlSelectHelper.getAllSelectFields(sqlQuery.getSql()); List<String> queryFields = SqlSelectHelper.getAllSelectFields(sqlQuery.getSql());
List<MetricSchemaResp> metricSchemas = getMetrics(semanticSchemaResp, allFields); List<MetricSchemaResp> metricSchemas = getMetrics(semanticSchemaResp, queryFields);
List<String> metrics = List<String> metrics =
metricSchemas.stream().map(SchemaItem::getBizName).collect(Collectors.toList()); metricSchemas.stream().map(SchemaItem::getBizName).collect(Collectors.toList());
Set<String> dimensions = getDimensions(semanticSchemaResp, allFields); List<DimSchemaResp> dimensionSchemas = getDimensions(semanticSchemaResp, queryFields);
List<String> dimensions =
dimensionSchemas.stream().map(SchemaItem::getBizName).collect(Collectors.toList());
// check if there are fields not matched with any metric or dimension // check if there are fields not matched with any metric or dimension
if (allFields.size() > metricSchemas.size() + dimensions.size()) { if (queryFields.size() > metricSchemas.size() + dimensions.size()) {
queryStatement List<String> semanticFields = Lists.newArrayList();
.setErrMsg("There are querying columns in the SQL not matched with any semantic field."); metricSchemas.forEach(m -> semanticFields.add(m.getBizName()));
dimensionSchemas.forEach(d -> semanticFields.add(d.getBizName()));
String errMsg =
String.format("Querying columns[%s] not matched with semantic fields[%s].",
queryFields, semanticFields);
queryStatement.setErrMsg(errMsg);
queryStatement.setStatus(1); queryStatement.setStatus(1);
return; return;
} }
@@ -125,15 +133,15 @@ public class SqlQueryParser implements QueryParser {
return AggOption.DEFAULT; return AggOption.DEFAULT;
} }
private Set<String> getDimensions(SemanticSchemaResp semanticSchemaResp, private List<DimSchemaResp> getDimensions(SemanticSchemaResp semanticSchemaResp,
List<String> allFields) { List<String> allFields) {
Map<String, String> dimensionLowerToNameMap = semanticSchemaResp.getDimensions().stream() Map<String, DimSchemaResp> dimensionLowerToNameMap =
.collect(Collectors.toMap(entry -> entry.getBizName().toLowerCase(), semanticSchemaResp.getDimensions().stream().collect(Collectors
SchemaItem::getBizName, (k1, k2) -> k1)); .toMap(entry -> entry.getBizName().toLowerCase(), entry -> entry));
return allFields.stream() return allFields.stream()
.filter(entry -> dimensionLowerToNameMap.containsKey(entry.toLowerCase())) .filter(entry -> dimensionLowerToNameMap.containsKey(entry.toLowerCase()))
.map(entry -> dimensionLowerToNameMap.get(entry.toLowerCase())) .map(entry -> dimensionLowerToNameMap.get(entry.toLowerCase()))
.collect(Collectors.toSet()); .collect(Collectors.toList());
} }
private List<MetricSchemaResp> getMetrics(SemanticSchemaResp semanticSchemaResp, private List<MetricSchemaResp> getMetrics(SemanticSchemaResp semanticSchemaResp,

View File

@@ -30,7 +30,6 @@ import com.tencent.supersonic.headless.server.service.DataSetService;
import com.tencent.supersonic.headless.server.service.DimensionService; import com.tencent.supersonic.headless.server.service.DimensionService;
import com.tencent.supersonic.headless.server.service.MetricService; import com.tencent.supersonic.headless.server.service.MetricService;
import com.tencent.supersonic.headless.server.service.SchemaService; import com.tencent.supersonic.headless.server.service.SchemaService;
import com.tencent.supersonic.headless.server.utils.MetricDrillDownChecker;
import com.tencent.supersonic.headless.server.utils.QueryUtils; import com.tencent.supersonic.headless.server.utils.QueryUtils;
import com.tencent.supersonic.headless.server.utils.StatUtils; import com.tencent.supersonic.headless.server.utils.StatUtils;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@@ -53,7 +52,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
private final DataSetService dataSetService; private final DataSetService dataSetService;
private final SchemaService schemaService; private final SchemaService schemaService;
private final SemanticTranslator semanticTranslator; private final SemanticTranslator semanticTranslator;
private final MetricDrillDownChecker metricDrillDownChecker;
private final KnowledgeBaseService knowledgeBaseService; private final KnowledgeBaseService knowledgeBaseService;
private final MetricService metricService; private final MetricService metricService;
private final DimensionService dimensionService; private final DimensionService dimensionService;
@@ -63,7 +61,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
public S2SemanticLayerService(StatUtils statUtils, QueryUtils queryUtils, public S2SemanticLayerService(StatUtils statUtils, QueryUtils queryUtils,
SemanticSchemaManager semanticSchemaManager, DataSetService dataSetService, SemanticSchemaManager semanticSchemaManager, DataSetService dataSetService,
SchemaService schemaService, SemanticTranslator semanticTranslator, SchemaService schemaService, SemanticTranslator semanticTranslator,
MetricDrillDownChecker metricDrillDownChecker,
KnowledgeBaseService knowledgeBaseService, MetricService metricService, KnowledgeBaseService knowledgeBaseService, MetricService metricService,
DimensionService dimensionService) { DimensionService dimensionService) {
this.statUtils = statUtils; this.statUtils = statUtils;
@@ -72,7 +69,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
this.dataSetService = dataSetService; this.dataSetService = dataSetService;
this.schemaService = schemaService; this.schemaService = schemaService;
this.semanticTranslator = semanticTranslator; this.semanticTranslator = semanticTranslator;
this.metricDrillDownChecker = metricDrillDownChecker;
this.knowledgeBaseService = knowledgeBaseService; this.knowledgeBaseService = knowledgeBaseService;
this.metricService = metricService; this.metricService = metricService;
this.dimensionService = dimensionService; this.dimensionService = dimensionService;
@@ -119,10 +115,6 @@ public class S2SemanticLayerService implements SemanticLayerService {
QueryStatement queryStatement = buildQueryStatement(queryReq, user); QueryStatement queryStatement = buildQueryStatement(queryReq, user);
semanticTranslator.translate(queryStatement); semanticTranslator.translate(queryStatement);
// Check whether the dimensions of the metric drill-down are correct temporarily,
// add the abstraction of a validator later.
metricDrillDownChecker.checkQuery(queryStatement);
// 4.execute query // 4.execute query
SemanticQueryResp queryResp = null; SemanticQueryResp queryResp = null;
for (QueryExecutor queryExecutor : queryExecutors) { for (QueryExecutor queryExecutor : queryExecutors) {

View File

@@ -10,7 +10,7 @@ import dev.langchain4j.store.embedding.TextSegmentConvert;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@@ -19,7 +19,7 @@ import java.util.List;
@Component @Component
@Slf4j @Slf4j
public class MetaEmbeddingListener implements ApplicationListener<DataEvent> { public class MetaEmbeddingListener {
@Autowired @Autowired
private EmbeddingConfig embeddingConfig; private EmbeddingConfig embeddingConfig;
@@ -30,8 +30,8 @@ public class MetaEmbeddingListener implements ApplicationListener<DataEvent> {
@Value("${s2.embedding.operation.sleep.time:3000}") @Value("${s2.embedding.operation.sleep.time:3000}")
private Integer embeddingOperationSleepTime; private Integer embeddingOperationSleepTime;
@Async @Async("eventExecutor")
@Override @EventListener
public void onApplicationEvent(DataEvent event) { public void onApplicationEvent(DataEvent event) {
List<DataItem> dataItems = event.getDataItems(); List<DataItem> dataItems = event.getDataItems();
if (CollectionUtils.isEmpty(dataItems)) { if (CollectionUtils.isEmpty(dataItems)) {

View File

@@ -7,17 +7,17 @@ import com.tencent.supersonic.common.pojo.enums.EventType;
import com.tencent.supersonic.headless.chat.knowledge.DictWord; import com.tencent.supersonic.headless.chat.knowledge.DictWord;
import com.tencent.supersonic.headless.chat.knowledge.helper.HanlpHelper; import com.tencent.supersonic.headless.chat.knowledge.helper.HanlpHelper;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener; import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@Component @Component
@Slf4j @Slf4j
public class SchemaDictUpdateListener implements ApplicationListener<DataEvent> { public class SchemaDictUpdateListener {
@Async @Async("eventExecutor")
@Override @EventListener
public void onApplicationEvent(DataEvent dataEvent) { public void onApplicationEvent(DataEvent dataEvent) {
if (CollectionUtils.isEmpty(dataEvent.getDataItems())) { if (CollectionUtils.isEmpty(dataEvent.getDataItems())) {
return; return;

View File

@@ -116,7 +116,7 @@ public class ModelServiceImpl implements ModelService {
@Override @Override
@Transactional @Transactional
public ModelResp createModel(ModelReq modelReq, User user) throws Exception { public ModelResp createModel(ModelReq modelReq, User user) throws Exception {
checkParams(modelReq); // checkParams(modelReq);
ModelDO modelDO = ModelConverter.convert(modelReq, user); ModelDO modelDO = ModelConverter.convert(modelReq, user);
modelRepository.createModel(modelDO); modelRepository.createModel(modelDO);
batchCreateDimension(modelDO, user); batchCreateDimension(modelDO, user);
@@ -140,7 +140,7 @@ public class ModelServiceImpl implements ModelService {
@Override @Override
@Transactional @Transactional
public ModelResp updateModel(ModelReq modelReq, User user) throws Exception { public ModelResp updateModel(ModelReq modelReq, User user) throws Exception {
checkParams(modelReq); // checkParams(modelReq);
checkRelations(modelReq); checkRelations(modelReq);
ModelDO modelDO = modelRepository.getModelById(modelReq.getId()); ModelDO modelDO = modelRepository.getModelById(modelReq.getId());
ModelConverter.convert(modelDO, modelReq, user); ModelConverter.convert(modelDO, modelReq, user);

View File

@@ -15,15 +15,12 @@ public class CoreComponentFactory extends ComponentFactory {
private static List<SemanticModeller> semanticModellers = new ArrayList<>(); private static List<SemanticModeller> semanticModellers = new ArrayList<>();
public static List<SemanticModeller> getSemanticModellers() { static {
if (semanticModellers.isEmpty()) {
initSemanticModellers();
}
return semanticModellers;
}
private static void initSemanticModellers() {
init(SemanticModeller.class, semanticModellers); init(SemanticModeller.class, semanticModellers);
} }
public static List<SemanticModeller> getSemanticModellers() {
return semanticModellers;
}
} }

View File

@@ -33,9 +33,6 @@ public class MetricCheckUtils {
throw new InvalidArgumentException("指标定义参数不可为空"); throw new InvalidArgumentException("指标定义参数不可为空");
} }
expr = typeParams.getExpr(); expr = typeParams.getExpr();
if (CollectionUtils.isEmpty(typeParams.getMeasures())) {
throw new InvalidArgumentException("定义指标的度量列表参数不可为空");
}
if (hasAggregateFunction(expr)) { if (hasAggregateFunction(expr)) {
throw new InvalidArgumentException("基于度量来创建指标,表达式中不可再包含聚合函数"); throw new InvalidArgumentException("基于度量来创建指标,表达式中不可再包含聚合函数");
} }