mirror of
https://github.com/tencentmusic/supersonic.git
synced 2025-12-10 11:07:06 +00:00
[improvement][common] The thread pool adopts a generic thread pool configuration. (#1966)
This commit is contained in:
@@ -12,6 +12,7 @@ import com.tencent.supersonic.chat.server.service.AgentService;
|
|||||||
import com.tencent.supersonic.chat.server.service.ChatQueryService;
|
import com.tencent.supersonic.chat.server.service.ChatQueryService;
|
||||||
import com.tencent.supersonic.chat.server.service.MemoryService;
|
import com.tencent.supersonic.chat.server.service.MemoryService;
|
||||||
import com.tencent.supersonic.common.config.ChatModel;
|
import com.tencent.supersonic.common.config.ChatModel;
|
||||||
|
import com.tencent.supersonic.common.config.ThreadPoolConfig;
|
||||||
import com.tencent.supersonic.common.pojo.ChatApp;
|
import com.tencent.supersonic.common.pojo.ChatApp;
|
||||||
import com.tencent.supersonic.common.pojo.User;
|
import com.tencent.supersonic.common.pojo.User;
|
||||||
import com.tencent.supersonic.common.pojo.enums.AuthType;
|
import com.tencent.supersonic.common.pojo.enums.AuthType;
|
||||||
@@ -25,8 +26,6 @@ import org.springframework.util.CollectionUtils;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@@ -42,7 +41,8 @@ public class AgentServiceImpl extends ServiceImpl<AgentDOMapper, AgentDO> implem
|
|||||||
@Autowired
|
@Autowired
|
||||||
private ChatModelService chatModelService;
|
private ChatModelService chatModelService;
|
||||||
|
|
||||||
private ExecutorService executorService = Executors.newFixedThreadPool(1);
|
@Autowired
|
||||||
|
private ThreadPoolConfig threadPoolConfig;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Agent> getAgents(User user, AuthType authType) {
|
public List<Agent> getAgents(User user, AuthType authType) {
|
||||||
@@ -108,7 +108,7 @@ public class AgentServiceImpl extends ServiceImpl<AgentDOMapper, AgentDO> implem
|
|||||||
* @param agent
|
* @param agent
|
||||||
*/
|
*/
|
||||||
private void executeAgentExamplesAsync(Agent agent) {
|
private void executeAgentExamplesAsync(Agent agent) {
|
||||||
executorService.execute(() -> doExecuteAgentExamples(agent));
|
threadPoolConfig.getChatExecutor().execute(() -> doExecuteAgentExamples(agent));
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void doExecuteAgentExamples(Agent agent) {
|
private synchronized void doExecuteAgentExamples(Agent agent) {
|
||||||
|
|||||||
@@ -26,4 +26,12 @@ public class ThreadPoolConfig {
|
|||||||
new ThreadFactoryBuilder().setNameFormat("supersonic-map-pool-").build(),
|
new ThreadFactoryBuilder().setNameFormat("supersonic-map-pool-").build(),
|
||||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Bean("chatExecutor")
|
||||||
|
public ThreadPoolExecutor getChatExecutor() {
|
||||||
|
return new ThreadPoolExecutor(8, 16, 60 * 3, TimeUnit.SECONDS,
|
||||||
|
new LinkedBlockingQueue<Runnable>(1024),
|
||||||
|
new ThreadFactoryBuilder().setNameFormat("supersonic-chat-pool-").build(),
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,11 +1,5 @@
|
|||||||
package com.tencent.supersonic.headless.server.service.impl;
|
package com.tencent.supersonic.headless.server.service.impl;
|
||||||
|
|
||||||
import javax.sql.RowSetMetaData;
|
|
||||||
import javax.sql.rowset.CachedRowSet;
|
|
||||||
import javax.sql.rowset.RowSetFactory;
|
|
||||||
import javax.sql.rowset.RowSetMetaDataImpl;
|
|
||||||
import javax.sql.rowset.RowSetProvider;
|
|
||||||
|
|
||||||
import com.google.common.cache.Cache;
|
import com.google.common.cache.Cache;
|
||||||
import com.google.common.cache.CacheBuilder;
|
import com.google.common.cache.CacheBuilder;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
@@ -50,6 +44,11 @@ import org.apache.arrow.vector.types.pojo.Schema;
|
|||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import javax.sql.RowSetMetaData;
|
||||||
|
import javax.sql.rowset.CachedRowSet;
|
||||||
|
import javax.sql.rowset.RowSetFactory;
|
||||||
|
import javax.sql.rowset.RowSetMetaDataImpl;
|
||||||
|
import javax.sql.rowset.RowSetProvider;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.sql.ResultSet;
|
import java.sql.ResultSet;
|
||||||
import java.sql.ResultSetMetaData;
|
import java.sql.ResultSetMetaData;
|
||||||
@@ -78,9 +77,9 @@ public class FlightServiceImpl extends BasicFlightSqlProducer implements FlightS
|
|||||||
|
|
||||||
private String host;
|
private String host;
|
||||||
private Integer port;
|
private Integer port;
|
||||||
private ExecutorService executorService;
|
|
||||||
private Cache<ByteString, SemanticQueryReq> preparedStatementCache;
|
private Cache<ByteString, SemanticQueryReq> preparedStatementCache;
|
||||||
private final String dataSetIdHeaderKey = "dataSetId";
|
private final String dataSetIdHeaderKey = "dataSetId";
|
||||||
|
private ExecutorService executorService;
|
||||||
private final String nameHeaderKey = "name";
|
private final String nameHeaderKey = "name";
|
||||||
private final String passwordHeaderKey = "password";
|
private final String passwordHeaderKey = "password";
|
||||||
private final Calendar defaultCalendar = JdbcToArrowUtils.getUtcCalendar();
|
private final Calendar defaultCalendar = JdbcToArrowUtils.getUtcCalendar();
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.tencent.supersonic.headless.server.service.impl;
|
|||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.tencent.supersonic.auth.api.authentication.service.UserService;
|
import com.tencent.supersonic.auth.api.authentication.service.UserService;
|
||||||
|
import com.tencent.supersonic.common.config.ThreadPoolConfig;
|
||||||
import com.tencent.supersonic.common.pojo.ItemDateResp;
|
import com.tencent.supersonic.common.pojo.ItemDateResp;
|
||||||
import com.tencent.supersonic.common.pojo.ModelRela;
|
import com.tencent.supersonic.common.pojo.ModelRela;
|
||||||
import com.tencent.supersonic.common.pojo.User;
|
import com.tencent.supersonic.common.pojo.User;
|
||||||
@@ -68,10 +69,6 @@ import java.util.Map;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@@ -96,13 +93,12 @@ public class ModelServiceImpl implements ModelService {
|
|||||||
|
|
||||||
private final ModelRelaService modelRelaService;
|
private final ModelRelaService modelRelaService;
|
||||||
|
|
||||||
ExecutorService executor =
|
private final ThreadPoolConfig threadPoolConfig;
|
||||||
new ThreadPoolExecutor(0, 5, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
|
|
||||||
|
|
||||||
public ModelServiceImpl(ModelRepository modelRepository, DatabaseService databaseService,
|
public ModelServiceImpl(ModelRepository modelRepository, DatabaseService databaseService,
|
||||||
@Lazy DimensionService dimensionService, @Lazy MetricService metricService,
|
@Lazy DimensionService dimensionService, @Lazy MetricService metricService,
|
||||||
DomainService domainService, UserService userService, DataSetService dataSetService,
|
DomainService domainService, UserService userService, DataSetService dataSetService,
|
||||||
DateInfoRepository dateInfoRepository, ModelRelaService modelRelaService) {
|
DateInfoRepository dateInfoRepository, ModelRelaService modelRelaService, ThreadPoolConfig threadPoolConfig) {
|
||||||
this.modelRepository = modelRepository;
|
this.modelRepository = modelRepository;
|
||||||
this.databaseService = databaseService;
|
this.databaseService = databaseService;
|
||||||
this.dimensionService = dimensionService;
|
this.dimensionService = dimensionService;
|
||||||
@@ -112,6 +108,7 @@ public class ModelServiceImpl implements ModelService {
|
|||||||
this.dataSetService = dataSetService;
|
this.dataSetService = dataSetService;
|
||||||
this.dateInfoRepository = dateInfoRepository;
|
this.dateInfoRepository = dateInfoRepository;
|
||||||
this.modelRelaService = modelRelaService;
|
this.modelRelaService = modelRelaService;
|
||||||
|
this.threadPoolConfig = threadPoolConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -228,7 +225,7 @@ public class ModelServiceImpl implements ModelService {
|
|||||||
CompletableFuture.allOf(dbSchemas.stream()
|
CompletableFuture.allOf(dbSchemas.stream()
|
||||||
.map(dbSchema -> CompletableFuture.runAsync(
|
.map(dbSchema -> CompletableFuture.runAsync(
|
||||||
() -> doBuild(modelBuildReq, dbSchema, dbSchemas, modelSchemaMap),
|
() -> doBuild(modelBuildReq, dbSchema, dbSchemas, modelSchemaMap),
|
||||||
executor))
|
threadPoolConfig.getCommonExecutor()))
|
||||||
.toArray(CompletableFuture[]::new)).join();
|
.toArray(CompletableFuture[]::new)).join();
|
||||||
return modelSchemaMap;
|
return modelSchemaMap;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.tencent.supersonic.headless.server.service;
|
|||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.tencent.supersonic.auth.api.authentication.service.UserService;
|
import com.tencent.supersonic.auth.api.authentication.service.UserService;
|
||||||
|
import com.tencent.supersonic.common.config.ThreadPoolConfig;
|
||||||
import com.tencent.supersonic.common.pojo.User;
|
import com.tencent.supersonic.common.pojo.User;
|
||||||
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
|
import com.tencent.supersonic.common.pojo.enums.AggOperatorEnum;
|
||||||
import com.tencent.supersonic.common.pojo.enums.StatusEnum;
|
import com.tencent.supersonic.common.pojo.enums.StatusEnum;
|
||||||
@@ -76,9 +77,10 @@ class ModelServiceImplTest {
|
|||||||
DateInfoRepository dateInfoRepository = Mockito.mock(DateInfoRepository.class);
|
DateInfoRepository dateInfoRepository = Mockito.mock(DateInfoRepository.class);
|
||||||
DataSetService viewService = Mockito.mock(DataSetService.class);
|
DataSetService viewService = Mockito.mock(DataSetService.class);
|
||||||
ModelRelaService modelRelaService = Mockito.mock(ModelRelaService.class);
|
ModelRelaService modelRelaService = Mockito.mock(ModelRelaService.class);
|
||||||
|
ThreadPoolConfig threadPoolConfig = Mockito.mock(ThreadPoolConfig.class);
|
||||||
return new ModelServiceImpl(modelRepository, databaseService, dimensionService,
|
return new ModelServiceImpl(modelRepository, databaseService, dimensionService,
|
||||||
metricService, domainService, userService, viewService, dateInfoRepository,
|
metricService, domainService, userService, viewService, dateInfoRepository,
|
||||||
modelRelaService);
|
modelRelaService, threadPoolConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ModelReq mockModelReq() {
|
private ModelReq mockModelReq() {
|
||||||
|
|||||||
Reference in New Issue
Block a user