(improvement)(headless) refactor duck source configure (#1170)

This commit is contained in:
jipeli
2024-06-20 14:24:18 +08:00
committed by GitHub
parent 0d9afb5d87
commit 9e72d239f1
5 changed files with 90 additions and 54 deletions

View File

@@ -0,0 +1,35 @@
package com.tencent.supersonic.headless.core.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
public class ExecutorConfig {
@Value("${metricParser.agg.mysql.lowVersion:5.7}")
private String mysqlLowVersion;
@Value("${metricParser.agg.ck.lowVersion:20.4}")
private String ckLowVersion;
@Value("${internal.metric.cnt.suffix:internal_cnt}")
private String internalMetricNameSuffix;
@Value("${accelerator.duckDb.enable:false}")
private Boolean duckEnable = false;
@Value("${accelerator.duckDb.temp:/data1/duck/tmp/}")
private String duckDbTemp;
@Value("${accelerator.duckDb.maximumPoolSize:10}")
private Integer duckDbMaximumPoolSize;
@Value("${accelerator.duckDb.MaxLifetime:3}")
private Integer duckDbMaxLifetime;
@Value("${accelerator.duckDb.memoryLimit:31}")
private Integer memoryLimit;
@Value("${accelerator.duckDb.threads:32}")
private Integer threads;
}

View File

@@ -3,6 +3,7 @@ package com.tencent.supersonic.headless.core.pojo;
import com.tencent.supersonic.common.pojo.QueryColumn; import com.tencent.supersonic.common.pojo.QueryColumn;
import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp; import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp;
import com.tencent.supersonic.headless.core.config.ExecutorConfig;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import java.sql.ResultSet; import java.sql.ResultSet;
@@ -13,11 +14,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import javax.sql.DataSource; import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@@ -27,51 +23,38 @@ import org.springframework.stereotype.Component;
@Component @Component
@Slf4j @Slf4j
public class DuckDbSource { public class DuckDbSource {
@Value("${accelerator.duckDb.temp:/data1/duck/tmp/}")
protected String duckDbTemp;
@Value("${accelerator.duckDb.maximumPoolSize:10}")
protected Integer duckDbMaximumPoolSize;
@Value("${accelerator.duckDb.MaxLifetime:3}")
protected Integer duckDbMaxLifetime;
@Value("${accelerator.duckDb.memoryLimit:31}")
protected Integer memoryLimit;
@Value("${accelerator.duckDb.threads:32}")
protected Integer threads;
@Autowired
@Qualifier("duckDbDataSource")
protected DataSource duckDbDataSource; protected DataSource duckDbDataSource;
@Autowired
@Qualifier("duckDbJdbcTemplate")
protected JdbcTemplate duckDbJdbcTemplate; protected JdbcTemplate duckDbJdbcTemplate;
@Bean(name = "duckDbConfig") protected HikariConfig hikariConfig;
private final ExecutorConfig executorConfig;
public DuckDbSource(ExecutorConfig executorConfig) {
this.executorConfig = executorConfig;
if (executorConfig.getDuckEnable()) {
hikariConfig = getHikariConfig();
duckDbDataSource = getDuckDbDataSource(hikariConfig);
duckDbJdbcTemplate = getDuckDbTemplate(duckDbDataSource);
}
}
public HikariConfig getHikariConfig() { public HikariConfig getHikariConfig() {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
config.setDriverClassName("org.duckdb.DuckDBDriver"); config.setDriverClassName("org.duckdb.DuckDBDriver");
config.setMaximumPoolSize(duckDbMaximumPoolSize); config.setMaximumPoolSize(executorConfig.getDuckDbMaximumPoolSize());
config.setMaxLifetime(duckDbMaxLifetime); config.setMaxLifetime(executorConfig.getDuckDbMaxLifetime());
config.setJdbcUrl("jdbc:duckdb:"); config.setJdbcUrl("jdbc:duckdb:");
return config; return config;
} }
@Bean(name = "duckDbDataSource") public DataSource getDuckDbDataSource(HikariConfig config) {
@DependsOn("duckDbConfig")
public DataSource getDuckDbDataSource(@Qualifier("duckDbConfig") HikariConfig config) {
HikariDataSource ds = new HikariDataSource(config); HikariDataSource ds = new HikariDataSource(config);
return ds; return ds;
} }
@Bean("duckDbJdbcTemplate") public JdbcTemplate getDuckDbTemplate(DataSource dataSource) {
@DependsOn("duckDbDataSource")
public JdbcTemplate getDuckDbTemplate(@Qualifier("duckDbDataSource") DataSource dataSource) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(); JdbcTemplate jdbcTemplate = new JdbcTemplate();
jdbcTemplate.setDataSource(dataSource); jdbcTemplate.setDataSource(dataSource);
init(jdbcTemplate); init(jdbcTemplate);
@@ -79,9 +62,9 @@ public class DuckDbSource {
} }
protected void init(JdbcTemplate jdbcTemplate) { protected void init(JdbcTemplate jdbcTemplate) {
jdbcTemplate.execute(String.format("SET memory_limit = '%sGB';", memoryLimit)); jdbcTemplate.execute(String.format("SET memory_limit = '%sGB';", executorConfig.getMemoryLimit()));
jdbcTemplate.execute(String.format("SET temp_directory='%s';", duckDbTemp)); jdbcTemplate.execute(String.format("SET temp_directory='%s';", executorConfig.getDuckDbTemp()));
jdbcTemplate.execute(String.format("SET threads TO %s;", threads)); jdbcTemplate.execute(String.format("SET threads TO %s;", executorConfig.getThreads()));
jdbcTemplate.execute("SET enable_object_cache = true;"); jdbcTemplate.execute("SET enable_object_cache = true;");
} }

View File

@@ -26,6 +26,7 @@ import com.tencent.supersonic.headless.api.pojo.request.QueryStructReq;
import com.tencent.supersonic.headless.api.pojo.response.DimSchemaResp; import com.tencent.supersonic.headless.api.pojo.response.DimSchemaResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricResp; import com.tencent.supersonic.headless.api.pojo.response.MetricResp;
import com.tencent.supersonic.headless.api.pojo.response.MetricSchemaResp; import com.tencent.supersonic.headless.api.pojo.response.MetricSchemaResp;
import com.tencent.supersonic.headless.core.config.ExecutorConfig;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Collections; import java.util.Collections;
@@ -42,7 +43,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Triple; import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@@ -57,17 +57,13 @@ public class SqlGenerateUtils {
private final DateModeUtils dateModeUtils; private final DateModeUtils dateModeUtils;
@Value("${metricParser.agg.mysql.lowVersion:5.7}") private final ExecutorConfig executorConfig;
private String mysqlLowVersion;
@Value("${metricParser.agg.ck.lowVersion:20.4}")
private String ckLowVersion;
@Value("${internal.metric.cnt.suffix:internal_cnt}")
private String internalMetricNameSuffix;
public SqlGenerateUtils(SqlFilterUtils sqlFilterUtils, public SqlGenerateUtils(SqlFilterUtils sqlFilterUtils,
DateModeUtils dateModeUtils) { DateModeUtils dateModeUtils, ExecutorConfig executorConfig) {
this.sqlFilterUtils = sqlFilterUtils; this.sqlFilterUtils = sqlFilterUtils;
this.dateModeUtils = dateModeUtils; this.dateModeUtils = dateModeUtils;
this.executorConfig = executorConfig;
} }
public static String getUnionSelect(QueryStructReq queryStructCmd) { public static String getUnionSelect(QueryStructReq queryStructCmd) {
@@ -256,19 +252,19 @@ public class SqlGenerateUtils {
public boolean isSupportWith(EngineType engineTypeEnum, String version) { public boolean isSupportWith(EngineType engineTypeEnum, String version) {
if (engineTypeEnum.equals(EngineType.MYSQL) && Objects.nonNull(version) && version.startsWith( if (engineTypeEnum.equals(EngineType.MYSQL) && Objects.nonNull(version) && version.startsWith(
mysqlLowVersion)) { executorConfig.getMysqlLowVersion())) {
return false; return false;
} }
if (engineTypeEnum.equals(EngineType.CLICKHOUSE) && Objects.nonNull(version) if (engineTypeEnum.equals(EngineType.CLICKHOUSE) && Objects.nonNull(version)
&& StringUtil.compareVersion(version, && StringUtil.compareVersion(version,
ckLowVersion) < 0) { executorConfig.getCkLowVersion()) < 0) {
return false; return false;
} }
return true; return true;
} }
public String generateInternalMetricName(String modelBizName) { public String generateInternalMetricName(String modelBizName) {
return modelBizName + UNDERLINE + internalMetricNameSuffix; return modelBizName + UNDERLINE + executorConfig.getInternalMetricNameSuffix();
} }
public String generateDerivedMetric(final List<MetricSchemaResp> metricResps, final Set<String> allFields, public String generateDerivedMetric(final List<MetricSchemaResp> metricResps, final Set<String> allFields,

View File

@@ -36,6 +36,7 @@ public class FlightSqlListener implements CommandLineRunner {
private ExecutorService executorService; private ExecutorService executorService;
private FlightServer flightServer; private FlightServer flightServer;
private BufferAllocator allocator; private BufferAllocator allocator;
private Boolean isRunning = false;
public FlightSqlListener(FlightService flightService) { public FlightSqlListener(FlightService flightService) {
this.allocator = new RootAllocator(); this.allocator = new RootAllocator();
@@ -60,12 +61,27 @@ public class FlightSqlListener implements CommandLineRunner {
try { try {
log.info("Arrow Flight JDBC server started on {} {}", host, port); log.info("Arrow Flight JDBC server started on {} {}", host, port);
flightServer.start(); flightServer.start();
isRunning = true;
} catch (Exception e) { } catch (Exception e) {
log.error("FlightSqlListener start error {}", e); log.error("FlightSqlListener start error {}", e);
} }
} }
public Boolean isRunning() {
return isRunning;
}
public void stop() {
try {
log.info("Arrow Flight JDBC server stop on {} {}", host, port);
flightServer.close();
allocator.close();
} catch (Exception e) {
log.error("FlightSqlListener start error {}", e);
}
}
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
if (enable) { if (enable) {

View File

@@ -30,10 +30,9 @@ public class FlightSqlTest extends BaseTest {
@Test @Test
void test01() throws Exception { void test01() throws Exception {
startServer();
String host = flightSqlListener.getHost(); String host = flightSqlListener.getHost();
Integer port = flightSqlListener.getPort(); Integer port = flightSqlListener.getPort();
UserHolder.setStrategy(fakeUserStrategy);
flightSqlListener.startServer();
FlightSqlClient sqlClient = new FlightSqlClient( FlightSqlClient sqlClient = new FlightSqlClient(
FlightClient.builder(new RootAllocator(Integer.MAX_VALUE), Location.forGrpcInsecure(host, port)) FlightClient.builder(new RootAllocator(Integer.MAX_VALUE), Location.forGrpcInsecure(host, port))
.build()); .build());
@@ -67,10 +66,9 @@ public class FlightSqlTest extends BaseTest {
@Test @Test
void test02() throws Exception { void test02() throws Exception {
startServer();
String host = flightSqlListener.getHost(); String host = flightSqlListener.getHost();
Integer port = flightSqlListener.getPort(); Integer port = flightSqlListener.getPort();
UserHolder.setStrategy(fakeUserStrategy);
flightSqlListener.startServer();
FlightSqlClient sqlClient = new FlightSqlClient( FlightSqlClient sqlClient = new FlightSqlClient(
FlightClient.builder(new RootAllocator(Integer.MAX_VALUE), Location.forGrpcInsecure(host, port)) FlightClient.builder(new RootAllocator(Integer.MAX_VALUE), Location.forGrpcInsecure(host, port))
.build()); .build());
@@ -81,7 +79,8 @@ public class FlightSqlTest extends BaseTest {
headers.insert("password", "admin"); headers.insert("password", "admin");
HeaderCallOption headerOption = new HeaderCallOption(headers); HeaderCallOption headerOption = new HeaderCallOption(headers);
try { try {
FlightInfo flightInfo = sqlClient.execute("SELECT 部门, SUM(访问次数) AS 访问次数 FROM 超音数PVUV统计 GROUP BY 部门", FlightInfo flightInfo = sqlClient.execute(
"SELECT 部门, SUM(访问次数) AS 访问次数 FROM 超音数PVUV统计 GROUP BY 部门",
headerOption); headerOption);
FlightStream stream = sqlClient.getStream(flightInfo FlightStream stream = sqlClient.getStream(flightInfo
.getEndpoints() .getEndpoints()
@@ -100,4 +99,11 @@ public class FlightSqlTest extends BaseTest {
log.error("", e); log.error("", e);
} }
} }
private void startServer() {
if (!flightSqlListener.isRunning()) {
UserHolder.setStrategy(fakeUserStrategy);
flightSqlListener.startServer();
}
}
} }