diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/config/ExecutorConfig.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/config/ExecutorConfig.java new file mode 100644 index 000000000..9a1bd013b --- /dev/null +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/config/ExecutorConfig.java @@ -0,0 +1,35 @@ +package com.tencent.supersonic.headless.core.config; + +import lombok.Data; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +@Data +@Configuration +public class ExecutorConfig { + + @Value("${metricParser.agg.mysql.lowVersion:5.7}") + private String mysqlLowVersion; + @Value("${metricParser.agg.ck.lowVersion:20.4}") + private String ckLowVersion; + @Value("${internal.metric.cnt.suffix:internal_cnt}") + private String internalMetricNameSuffix; + + @Value("${accelerator.duckDb.enable:false}") + private Boolean duckEnable = false; + + @Value("${accelerator.duckDb.temp:/data1/duck/tmp/}") + private String duckDbTemp; + + @Value("${accelerator.duckDb.maximumPoolSize:10}") + private Integer duckDbMaximumPoolSize; + + @Value("${accelerator.duckDb.MaxLifetime:3}") + private Integer duckDbMaxLifetime; + + @Value("${accelerator.duckDb.memoryLimit:31}") + private Integer memoryLimit; + + @Value("${accelerator.duckDb.threads:32}") + private Integer threads; +} diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/DuckDbSource.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/DuckDbSource.java index 22beac0c7..dd01635ee 100644 --- a/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/DuckDbSource.java +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/DuckDbSource.java @@ -3,6 +3,7 @@ package com.tencent.supersonic.headless.core.pojo; import com.tencent.supersonic.common.pojo.QueryColumn; import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp; +import com.tencent.supersonic.headless.core.config.ExecutorConfig; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import java.sql.ResultSet; @@ -13,11 +14,6 @@ import java.util.List; import java.util.Map; import javax.sql.DataSource; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.DependsOn; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; @@ -27,51 +23,38 @@ import org.springframework.stereotype.Component; @Component @Slf4j public class DuckDbSource { - - @Value("${accelerator.duckDb.temp:/data1/duck/tmp/}") - protected String duckDbTemp; - - @Value("${accelerator.duckDb.maximumPoolSize:10}") - protected Integer duckDbMaximumPoolSize; - - @Value("${accelerator.duckDb.MaxLifetime:3}") - protected Integer duckDbMaxLifetime; - - @Value("${accelerator.duckDb.memoryLimit:31}") - protected Integer memoryLimit; - - @Value("${accelerator.duckDb.threads:32}") - protected Integer threads; - - - @Autowired - @Qualifier("duckDbDataSource") protected DataSource duckDbDataSource; - @Autowired - @Qualifier("duckDbJdbcTemplate") protected JdbcTemplate duckDbJdbcTemplate; - @Bean(name = "duckDbConfig") + protected HikariConfig hikariConfig; + + private final ExecutorConfig executorConfig; + + public DuckDbSource(ExecutorConfig executorConfig) { + this.executorConfig = executorConfig; + if (executorConfig.getDuckEnable()) { + hikariConfig = getHikariConfig(); + duckDbDataSource = getDuckDbDataSource(hikariConfig); + duckDbJdbcTemplate = getDuckDbTemplate(duckDbDataSource); + } + } + public HikariConfig getHikariConfig() { HikariConfig config = new HikariConfig(); config.setDriverClassName("org.duckdb.DuckDBDriver"); - config.setMaximumPoolSize(duckDbMaximumPoolSize); - config.setMaxLifetime(duckDbMaxLifetime); + config.setMaximumPoolSize(executorConfig.getDuckDbMaximumPoolSize()); + config.setMaxLifetime(executorConfig.getDuckDbMaxLifetime()); config.setJdbcUrl("jdbc:duckdb:"); return config; } - @Bean(name = "duckDbDataSource") - @DependsOn("duckDbConfig") - public DataSource getDuckDbDataSource(@Qualifier("duckDbConfig") HikariConfig config) { + public DataSource getDuckDbDataSource(HikariConfig config) { HikariDataSource ds = new HikariDataSource(config); return ds; } - @Bean("duckDbJdbcTemplate") - @DependsOn("duckDbDataSource") - public JdbcTemplate getDuckDbTemplate(@Qualifier("duckDbDataSource") DataSource dataSource) { + public JdbcTemplate getDuckDbTemplate(DataSource dataSource) { JdbcTemplate jdbcTemplate = new JdbcTemplate(); jdbcTemplate.setDataSource(dataSource); init(jdbcTemplate); @@ -79,9 +62,9 @@ public class DuckDbSource { } protected void init(JdbcTemplate jdbcTemplate) { - jdbcTemplate.execute(String.format("SET memory_limit = '%sGB';", memoryLimit)); - jdbcTemplate.execute(String.format("SET temp_directory='%s';", duckDbTemp)); - jdbcTemplate.execute(String.format("SET threads TO %s;", threads)); + jdbcTemplate.execute(String.format("SET memory_limit = '%sGB';", executorConfig.getMemoryLimit())); + jdbcTemplate.execute(String.format("SET temp_directory='%s';", executorConfig.getDuckDbTemp())); + jdbcTemplate.execute(String.format("SET threads TO %s;", executorConfig.getThreads())); jdbcTemplate.execute("SET enable_object_cache = true;"); } diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/SqlGenerateUtils.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/SqlGenerateUtils.java index ae4d27d1c..52fd87fb2 100644 --- a/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/SqlGenerateUtils.java +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/SqlGenerateUtils.java @@ -26,6 +26,7 @@ import com.tencent.supersonic.headless.api.pojo.request.QueryStructReq; import com.tencent.supersonic.headless.api.pojo.response.DimSchemaResp; import com.tencent.supersonic.headless.api.pojo.response.MetricResp; import com.tencent.supersonic.headless.api.pojo.response.MetricSchemaResp; +import com.tencent.supersonic.headless.core.config.ExecutorConfig; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.Collections; @@ -42,7 +43,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.util.Strings; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; @@ -57,17 +57,13 @@ public class SqlGenerateUtils { private final DateModeUtils dateModeUtils; - @Value("${metricParser.agg.mysql.lowVersion:5.7}") - private String mysqlLowVersion; - @Value("${metricParser.agg.ck.lowVersion:20.4}") - private String ckLowVersion; - @Value("${internal.metric.cnt.suffix:internal_cnt}") - private String internalMetricNameSuffix; + private final ExecutorConfig executorConfig; public SqlGenerateUtils(SqlFilterUtils sqlFilterUtils, - DateModeUtils dateModeUtils) { + DateModeUtils dateModeUtils, ExecutorConfig executorConfig) { this.sqlFilterUtils = sqlFilterUtils; this.dateModeUtils = dateModeUtils; + this.executorConfig = executorConfig; } public static String getUnionSelect(QueryStructReq queryStructCmd) { @@ -256,19 +252,19 @@ public class SqlGenerateUtils { public boolean isSupportWith(EngineType engineTypeEnum, String version) { if (engineTypeEnum.equals(EngineType.MYSQL) && Objects.nonNull(version) && version.startsWith( - mysqlLowVersion)) { + executorConfig.getMysqlLowVersion())) { return false; } if (engineTypeEnum.equals(EngineType.CLICKHOUSE) && Objects.nonNull(version) && StringUtil.compareVersion(version, - ckLowVersion) < 0) { + executorConfig.getCkLowVersion()) < 0) { return false; } return true; } public String generateInternalMetricName(String modelBizName) { - return modelBizName + UNDERLINE + internalMetricNameSuffix; + return modelBizName + UNDERLINE + executorConfig.getInternalMetricNameSuffix(); } public String generateDerivedMetric(final List metricResps, final Set allFields, diff --git a/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/FlightSqlListener.java b/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/FlightSqlListener.java index c4ff30ced..c61f87cf1 100644 --- a/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/FlightSqlListener.java +++ b/headless/server/src/main/java/com/tencent/supersonic/headless/server/listener/FlightSqlListener.java @@ -36,6 +36,7 @@ public class FlightSqlListener implements CommandLineRunner { private ExecutorService executorService; private FlightServer flightServer; private BufferAllocator allocator; + private Boolean isRunning = false; public FlightSqlListener(FlightService flightService) { this.allocator = new RootAllocator(); @@ -60,12 +61,27 @@ public class FlightSqlListener implements CommandLineRunner { try { log.info("Arrow Flight JDBC server started on {} {}", host, port); flightServer.start(); + isRunning = true; } catch (Exception e) { log.error("FlightSqlListener start error {}", e); } } + public Boolean isRunning() { + return isRunning; + } + + public void stop() { + try { + log.info("Arrow Flight JDBC server stop on {} {}", host, port); + flightServer.close(); + allocator.close(); + } catch (Exception e) { + log.error("FlightSqlListener start error {}", e); + } + } + @Override public void run(String... args) throws Exception { if (enable) { diff --git a/launchers/standalone/src/test/java/com/tencent/supersonic/headless/FlightSqlTest.java b/launchers/standalone/src/test/java/com/tencent/supersonic/headless/FlightSqlTest.java index 13f5bfdc4..616704e6e 100644 --- a/launchers/standalone/src/test/java/com/tencent/supersonic/headless/FlightSqlTest.java +++ b/launchers/standalone/src/test/java/com/tencent/supersonic/headless/FlightSqlTest.java @@ -30,10 +30,9 @@ public class FlightSqlTest extends BaseTest { @Test void test01() throws Exception { + startServer(); String host = flightSqlListener.getHost(); Integer port = flightSqlListener.getPort(); - UserHolder.setStrategy(fakeUserStrategy); - flightSqlListener.startServer(); FlightSqlClient sqlClient = new FlightSqlClient( FlightClient.builder(new RootAllocator(Integer.MAX_VALUE), Location.forGrpcInsecure(host, port)) .build()); @@ -67,10 +66,9 @@ public class FlightSqlTest extends BaseTest { @Test void test02() throws Exception { + startServer(); String host = flightSqlListener.getHost(); Integer port = flightSqlListener.getPort(); - UserHolder.setStrategy(fakeUserStrategy); - flightSqlListener.startServer(); FlightSqlClient sqlClient = new FlightSqlClient( FlightClient.builder(new RootAllocator(Integer.MAX_VALUE), Location.forGrpcInsecure(host, port)) .build()); @@ -81,7 +79,8 @@ public class FlightSqlTest extends BaseTest { headers.insert("password", "admin"); HeaderCallOption headerOption = new HeaderCallOption(headers); try { - FlightInfo flightInfo = sqlClient.execute("SELECT 部门, SUM(访问次数) AS 访问次数 FROM 超音数PVUV统计 GROUP BY 部门", + FlightInfo flightInfo = sqlClient.execute( + "SELECT 部门, SUM(访问次数) AS 访问次数 FROM 超音数PVUV统计 GROUP BY 部门", headerOption); FlightStream stream = sqlClient.getStream(flightInfo .getEndpoints() @@ -100,4 +99,11 @@ public class FlightSqlTest extends BaseTest { log.error("", e); } } + + private void startServer() { + if (!flightSqlListener.isRunning()) { + UserHolder.setStrategy(fakeUserStrategy); + flightSqlListener.startServer(); + } + } }