From 83cb6967e73fce50a1060dd5afd1b5a7d587c924 Mon Sep 17 00:00:00 2001 From: jerryjzhang Date: Thu, 2 Jan 2025 15:27:24 +0800 Subject: [PATCH] [improvement][headless]Setup thread pool for data event listeners. --- .../supersonic/common/config/ThreadPoolConfig.java | 8 ++++++++ .../headless/server/listener/MetaEmbeddingListener.java | 8 ++++---- .../server/listener/SchemaDictUpdateListener.java | 8 ++++---- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/common/src/main/java/com/tencent/supersonic/common/config/ThreadPoolConfig.java b/common/src/main/java/com/tencent/supersonic/common/config/ThreadPoolConfig.java index 8e6a730e0..69a04161f 100644 --- a/common/src/main/java/com/tencent/supersonic/common/config/ThreadPoolConfig.java +++ b/common/src/main/java/com/tencent/supersonic/common/config/ThreadPoolConfig.java @@ -11,6 +11,14 @@ import java.util.concurrent.TimeUnit; @Component public class ThreadPoolConfig { + @Bean("eventExecutor") + public ThreadPoolExecutor getTaskEventExecutor() { + return new ThreadPoolExecutor(4, 8, 60 * 3, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024), + new ThreadFactoryBuilder().setNameFormat("supersonic-event-pool-").build(), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Bean("commonExecutor") public ThreadPoolExecutor getCommonExecutor() { return new ThreadPoolExecutor(8, 16, 60 * 3, TimeUnit.SECONDS, diff --git a/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/MetaEmbeddingListener.java b/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/MetaEmbeddingListener.java index 0133f8df9..e991351ed 100644 --- a/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/MetaEmbeddingListener.java +++ b/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/MetaEmbeddingListener.java @@ -10,7 +10,7 @@ import dev.langchain4j.store.embedding.TextSegmentConvert; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationListener; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -19,7 +19,7 @@ import java.util.List; @Component @Slf4j -public class MetaEmbeddingListener implements ApplicationListener { +public class MetaEmbeddingListener { @Autowired private EmbeddingConfig embeddingConfig; @@ -30,8 +30,8 @@ public class MetaEmbeddingListener implements ApplicationListener { @Value("${s2.embedding.operation.sleep.time:3000}") private Integer embeddingOperationSleepTime; - @Async - @Override + @Async("eventExecutor") + @EventListener public void onApplicationEvent(DataEvent event) { List dataItems = event.getDataItems(); if (CollectionUtils.isEmpty(dataItems)) { diff --git a/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/SchemaDictUpdateListener.java b/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/SchemaDictUpdateListener.java index dc56f1700..d0a0bfe8a 100644 --- a/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/SchemaDictUpdateListener.java +++ b/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/SchemaDictUpdateListener.java @@ -7,17 +7,17 @@ import com.tencent.supersonic.common.pojo.enums.EventType; import com.tencent.supersonic.headless.chat.knowledge.DictWord; import com.tencent.supersonic.headless.chat.knowledge.helper.HanlpHelper; import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationListener; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @Component @Slf4j -public class SchemaDictUpdateListener implements ApplicationListener { +public class SchemaDictUpdateListener { - @Async - @Override + @Async("eventExecutor") + @EventListener public void onApplicationEvent(DataEvent dataEvent) { if (CollectionUtils.isEmpty(dataEvent.getDataItems())) { return;