(feature)(headless) add accelerator base abstract and methods (#942) (#1037)

This commit is contained in:
jipeli
2024-05-28 11:48:02 +08:00
committed by GitHub
parent a7d845f224
commit c51c278f33
13 changed files with 714 additions and 13 deletions

View File

@@ -0,0 +1,89 @@
package com.tencent.supersonic.common.util;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* * tools functions to file
*/
public class FileUtils {
public static Boolean exit(String path) {
File file = new File(path);
return file.exists();
}
public static long getLastModified(String path) {
if (!exit(path)) {
return -1;
}
File file = new File(path);
Optional<Long> lastModified = Arrays.stream(file.listFiles()).filter(f -> f.isFile())
.map(f -> f.lastModified()).sorted(Collections.reverseOrder()).findFirst();
if (lastModified.isPresent()) {
return lastModified.get();
}
return -1;
}
public static File[] getDirFiles(File file) {
if (file.isDirectory()) {
return file.listFiles();
}
return null;
}
public static void scanDirectory(File file, int maxLevel, Map<Integer, List<File>> directories) {
if (maxLevel < 0) {
return;
}
if (!file.exists() || !file.isDirectory()) {
return;
}
if (!directories.containsKey(maxLevel)) {
directories.put(maxLevel, new ArrayList<>());
}
for (File f : file.listFiles()) {
if (f.isDirectory()) {
directories.get(maxLevel).add(f);
scanDirectory(f, maxLevel - 1, directories);
}
}
}
public static Map<String, Map<String, List<String>>> getTop3Directory(String path) {
Map<String, Map<String, List<String>>> result = new HashMap<>();
File file = new File(path);
if (!file.exists() || !file.isDirectory()) {
return result;
}
Map<Integer, List<File>> directories = new HashMap<>();
scanDirectory(file, 2, directories);
for (int i = 2; i >= 0; i--) {
for (File f : directories.getOrDefault(i, new ArrayList<>())) {
if (i == 2) {
result.put(f.getName(), new HashMap<>());
continue;
}
if (i == 1 && result.containsKey(f.getParentFile().getName())) {
result.get(f.getParentFile().getName()).put(f.getName(), new ArrayList<>());
continue;
}
String parent = f.getParentFile().getParentFile().getName();
if (result.containsKey(parent) && result.get(parent).containsKey(f.getParentFile().getName())) {
result.get(parent).get(f.getParentFile().getName()).add(f.getName());
}
}
}
return result;
}
}

View File

@@ -1,6 +1,14 @@
package com.tencent.supersonic.common.util.jsqlparser;
import com.tencent.supersonic.common.util.StringUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.Alias;
@@ -28,6 +36,7 @@ import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.select.Distinct;
import net.sf.jsqlparser.statement.select.GroupByElement;
import net.sf.jsqlparser.statement.select.Join;
import net.sf.jsqlparser.statement.select.LateralView;
import net.sf.jsqlparser.statement.select.OrderByElement;
import net.sf.jsqlparser.statement.select.ParenthesedSelect;
@@ -36,16 +45,10 @@ import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectItem;
import net.sf.jsqlparser.statement.select.SelectVisitorAdapter;
import net.sf.jsqlparser.statement.select.SetOperationList;
import net.sf.jsqlparser.statement.select.WithItem;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Sql Parser Select Helper
*/
@@ -608,5 +611,52 @@ public class SqlSelectHelper {
getColumnFromExpr(expr.getExpression(), columns);
}
}
public static Map<String, Set<String>> getFieldsWithSubQuery(String sql) {
List<PlainSelect> plainSelects = getPlainSelects(getPlainSelect(sql));
Map<String, Set<String>> results = new HashMap<>();
for (PlainSelect plainSelect : plainSelects) {
getFieldsWithSubQuery(plainSelect, results);
}
return results;
}
private static void getFieldsWithSubQuery(PlainSelect plainSelect, Map<String, Set<String>> fields) {
if (plainSelect.getFromItem() instanceof Table) {
boolean isWith = false;
if (!CollectionUtils.isEmpty(plainSelect.getWithItemsList())) {
for (WithItem withItem : plainSelect.getWithItemsList()) {
if (Objects.nonNull(withItem.getSelect())) {
getFieldsWithSubQuery(withItem.getSelect().getPlainSelect(), fields);
isWith = true;
}
}
}
if (!isWith) {
Table table = (Table) plainSelect.getFromItem();
if (!fields.containsKey(table.getFullyQualifiedName())) {
fields.put(table.getFullyQualifiedName(), new HashSet<>());
}
List<String> sqlFields = getFieldsByPlainSelect(plainSelect).stream().map(f -> f.replaceAll("`", ""))
.collect(
Collectors.toList());
fields.get(table.getFullyQualifiedName()).addAll(sqlFields);
}
}
if (plainSelect.getFromItem() instanceof ParenthesedSelect) {
ParenthesedSelect parenthesedSelect = (ParenthesedSelect) plainSelect.getFromItem();
getFieldsWithSubQuery(parenthesedSelect.getPlainSelect(), fields);
if (!CollectionUtils.isEmpty(plainSelect.getJoins())) {
for (Join join : plainSelect.getJoins()) {
if (join.getRightItem() instanceof ParenthesedSelect) {
getFieldsWithSubQuery(((ParenthesedSelect) join.getRightItem()).getPlainSelect(), fields);
}
if (join.getFromItem() instanceof ParenthesedSelect) {
getFieldsWithSubQuery(((ParenthesedSelect) join.getFromItem()).getPlainSelect(), fields);
}
}
}
}
}
}

View File

@@ -167,6 +167,11 @@
<artifactId>xk-time</artifactId>
<version>${xk.time.version}</version>
</dependency>
<dependency>
<groupId>org.duckdb</groupId>
<artifactId>duckdb_jdbc</artifactId>
<version>${duckdb_jdbc.version}</version>
</dependency>
</dependencies>

View File

@@ -0,0 +1,234 @@
package com.tencent.supersonic.headless.core.executor.accelerator;
import com.tencent.supersonic.common.util.jsqlparser.SqlSelectHelper;
import com.tencent.supersonic.headless.core.parser.calcite.Configuration;
import com.tencent.supersonic.headless.core.parser.calcite.s2sql.TimeRange;
import com.tencent.supersonic.headless.core.parser.calcite.schema.DataSourceTable;
import com.tencent.supersonic.headless.core.parser.calcite.schema.DataSourceTable.Builder;
import com.tencent.supersonic.headless.core.parser.calcite.schema.SchemaBuilder;
import com.tencent.supersonic.headless.core.pojo.Materialization;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.enumerable.EnumerableRules;
import org.apache.calcite.config.CalciteConnectionConfigImpl;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptMaterialization;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.RelHomogeneousShuttle;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelShuttle;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rel.rules.materialize.MaterializedViewRules;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.util.CollectionUtils;
/**
* abstract of accelerator , provide Basic methods
*/
@Slf4j
public abstract class AbstractAccelerator implements QueryAccelerator {
public static final String MATERIALIZATION_SYS_DB = "sys";
public static final String MATERIALIZATION_SYS_SOURCE = "sys_src";
public static final String MATERIALIZATION_SYS_VIEW = "sys_view";
public static final String MATERIALIZATION_SYS_PARTITION = "sys_partition";
/**
* check if a materialization match the fields and partitions
*/
protected boolean check(RelOptPlanner relOptPlanner, RelBuilder relBuilder,
CalciteCatalogReader calciteCatalogReader, Materialization materialization, List<String> fields,
List<ImmutablePair<String, String>> partitions) {
if (!materialization.isPartitioned()) {
return fields.stream().allMatch(f -> materialization.getColumns().contains(f));
}
Set<String> queryFields = new HashSet<>(fields);
queryFields.add(MATERIALIZATION_SYS_PARTITION);
List<String> queryFieldList = queryFields.stream().collect(Collectors.toList());
Set<String> viewFields = new HashSet<>(materialization.getColumns());
viewFields.add(MATERIALIZATION_SYS_PARTITION);
List<String> viewFieldList = viewFields.stream().collect(Collectors.toList());
Set<String> materializationFields = new HashSet<>(viewFields);
materializationFields.addAll(queryFields);
List<String> materializationFieldList = materializationFields.stream().collect(Collectors.toList());
relBuilder.clear();
if (!CollectionUtils.isEmpty(relOptPlanner.getMaterializations())) {
relOptPlanner.clear();
}
Materialization viewMaterialization = Materialization.builder().build();
viewMaterialization.setName(String.format("%s.%s", MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_VIEW));
viewMaterialization.setColumns(viewFieldList);
addMaterialization(calciteCatalogReader.getRootSchema(), viewMaterialization);
Materialization queryMaterialization = Materialization.builder().build();
queryMaterialization.setName(String.format("%s.%s", MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_SOURCE));
queryMaterialization.setColumns(materializationFieldList);
addMaterialization(calciteCatalogReader.getRootSchema(), queryMaterialization);
RelNode replacement = relBuilder.scan(Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_VIEW)).build();
RelBuilder viewBuilder = relBuilder.scan(Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_SOURCE));
if (materialization.isPartitioned()) {
RexNode viewFilter = getRexNode(relBuilder, materialization,
MATERIALIZATION_SYS_PARTITION);
viewBuilder = viewBuilder.filter(viewFilter);
}
RelNode viewRel = project(viewBuilder, viewFieldList).build();
List<String> view = Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_VIEW);
RelOptMaterialization relOptMaterialization = new RelOptMaterialization(replacement, viewRel, null,
view);
relOptPlanner.addMaterialization(relOptMaterialization);
RelBuilder checkBuilder = relBuilder.scan(Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_SOURCE));
if (materialization.isPartitioned()) {
checkBuilder = checkBuilder.filter(getRexNode(checkBuilder, partitions, MATERIALIZATION_SYS_PARTITION));
}
RelNode checkRel = project(checkBuilder, queryFieldList).build();
relOptPlanner.setRoot(checkRel);
RelNode optRel = relOptPlanner.findBestExp();
System.out.println(optRel.explain());
return !extractTableNames(optRel).contains(MATERIALIZATION_SYS_SOURCE);
}
protected Map<String, Set<String>> getFields(String sql) {
return SqlSelectHelper.getFieldsWithSubQuery(sql);
}
protected CalciteCatalogReader getCalciteCatalogReader() {
CalciteCatalogReader calciteCatalogReader;
CalciteSchema viewSchema = SchemaBuilder.getMaterializationSchema();
calciteCatalogReader = new CalciteCatalogReader(
CalciteSchema.from(viewSchema.plus()),
CalciteSchema.from(viewSchema.plus()).path(null),
Configuration.typeFactory,
new CalciteConnectionConfigImpl(new Properties()));
return calciteCatalogReader;
}
protected RelOptPlanner getRelOptPlanner() {
HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
hepProgramBuilder.addRuleInstance(MaterializedViewRules.PROJECT_FILTER);
RelOptPlanner relOptPlanner = new HepPlanner(hepProgramBuilder.build());
return relOptPlanner;
}
protected RelBuilder builderMaterializationPlan(CalciteCatalogReader calciteCatalogReader,
RelOptPlanner relOptPlanner) {
relOptPlanner.addRelTraitDef(ConventionTraitDef.INSTANCE);
relOptPlanner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
EnumerableRules.rules().forEach(relOptPlanner::addRule);
RexBuilder rexBuilder = new RexBuilder(Configuration.typeFactory);
RelOptCluster relOptCluster = RelOptCluster.create(relOptPlanner, rexBuilder);
return RelFactories.LOGICAL_BUILDER.create(relOptCluster, calciteCatalogReader);
}
protected void addMaterialization(CalciteSchema dataSetSchema, Materialization materialization) {
String[] dbTable = materialization.getName().split("\\.");
String tb = dbTable[1].toLowerCase();
String db = dbTable[0].toLowerCase();
Builder builder = DataSourceTable.newBuilder(tb);
for (String f : materialization.getColumns()) {
builder.addField(f, SqlTypeName.VARCHAR);
}
if (StringUtils.isNotBlank(materialization.getPartitionName())) {
builder.addField(materialization.getPartitionName(), SqlTypeName.VARCHAR);
}
DataSourceTable srcTable = builder.withRowCount(1L).build();
if (Objects.nonNull(db) && !db.isEmpty()) {
SchemaPlus schemaPlus = dataSetSchema.plus().getSubSchema(db);
if (Objects.isNull(schemaPlus)) {
dataSetSchema.plus().add(db, new AbstractSchema());
schemaPlus = dataSetSchema.plus().getSubSchema(db);
}
schemaPlus.add(tb, srcTable);
} else {
dataSetSchema.add(tb, srcTable);
}
}
protected Set<String> extractTableNames(RelNode relNode) {
Set<String> tableNames = new HashSet<>();
RelShuttle shuttle = new RelHomogeneousShuttle() {
public RelNode visit(TableScan scan) {
RelOptTable table = scan.getTable();
tableNames.addAll(table.getQualifiedName());
return scan;
}
};
relNode.accept(shuttle);
return tableNames;
}
protected RexNode getRexNodeByTimeRange(RelBuilder relBuilder, TimeRange timeRange, String field) {
return relBuilder.call(SqlStdOperatorTable.AND,
relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, relBuilder.field(field),
relBuilder.literal(timeRange.getStart())),
relBuilder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, relBuilder.field(field),
relBuilder.literal(timeRange.getEnd())));
}
protected RexNode getRexNode(RelBuilder relBuilder, Materialization materialization, String viewField) {
RexNode rexNode = null;
for (String partition : materialization.getPartitions()) {
TimeRange timeRange = TimeRange.builder().start(partition).end(partition).build();
if (rexNode == null) {
rexNode = getRexNodeByTimeRange(relBuilder, timeRange, viewField);
continue;
}
rexNode = relBuilder.call(SqlStdOperatorTable.OR, rexNode,
getRexNodeByTimeRange(relBuilder, timeRange, viewField));
}
return rexNode;
}
protected RexNode getRexNode(RelBuilder relBuilder, List<ImmutablePair<String, String>> timeRanges,
String viewField) {
RexNode rexNode = null;
for (ImmutablePair<String, String> timeRange : timeRanges) {
if (rexNode == null) {
rexNode = getRexNodeByTimeRange(relBuilder,
TimeRange.builder().start(timeRange.left).end(timeRange.right).build(),
viewField);
continue;
}
rexNode = relBuilder.call(SqlStdOperatorTable.OR, rexNode,
getRexNodeByTimeRange(relBuilder,
TimeRange.builder().start(timeRange.left).end(timeRange.right).build(),
viewField));
}
return rexNode;
}
private static RelBuilder project(RelBuilder relBuilder, List<String> fields) {
List<RexNode> rexNodes = fields.stream().map(f -> relBuilder.field(f)).collect(Collectors.toList());
return relBuilder.project(rexNodes);
}
}

View File

@@ -0,0 +1,17 @@
package com.tencent.supersonic.headless.core.executor.accelerator;
import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
/**
* customize various query media ( like duckDb redis) to improved query performance
* check ok and query successful , return SemanticQueryResp to interface immediately
*/
public interface QueryAccelerator {
boolean reload();
boolean check(QueryStatement queryStatement);
SemanticQueryResp query(QueryStatement queryStatement);
}

View File

@@ -1,6 +1,7 @@
package com.tencent.supersonic.headless.core.planner;
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
import com.tencent.supersonic.headless.core.utils.ComponentFactory;
import lombok.extern.slf4j.Slf4j;
@@ -29,4 +30,14 @@ public class DefaultQueryPlanner implements QueryPlanner {
}
return null;
}
@Override
public QueryAccelerator accelerate(QueryStatement queryStatement) {
for (QueryAccelerator queryAccelerator : ComponentFactory.getQueryAccelerators()) {
if (queryAccelerator.check(queryStatement)) {
return queryAccelerator;
}
}
return null;
}
}

View File

@@ -1,6 +1,7 @@
package com.tencent.supersonic.headless.core.planner;
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator;
import com.tencent.supersonic.headless.core.pojo.QueryStatement;
/**
@@ -11,4 +12,6 @@ public interface QueryPlanner {
QueryExecutor plan(QueryStatement queryStatement);
QueryExecutor route(QueryStatement queryStatement);
QueryAccelerator accelerate(QueryStatement queryStatement);
}

View File

@@ -0,0 +1,178 @@
package com.tencent.supersonic.headless.core.pojo;
import com.tencent.supersonic.common.pojo.QueryColumn;
import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
/**
* duckDb connection session object
*/
@Component
@Slf4j
public class DuckDbSource {
@Value("${accelerator.duckDb.temp:/data1/duck/tmp/}")
protected String duckDbTemp;
@Value("${accelerator.duckDb.maximumPoolSize:10}")
protected Integer duckDbMaximumPoolSize;
@Value("${accelerator.duckDb.MaxLifetime:3}")
protected Integer duckDbMaxLifetime;
@Value("${accelerator.duckDb.memoryLimit:31}")
protected Integer memoryLimit;
@Value("${accelerator.duckDb.threads:32}")
protected Integer threads;
@Autowired
@Qualifier("duckDbDataSource")
protected DataSource duckDbDataSource;
@Autowired
@Qualifier("duckDbJdbcTemplate")
protected JdbcTemplate duckDbJdbcTemplate;
@Bean(name = "duckDbConfig")
public HikariConfig getHikariConfig() {
HikariConfig config = new HikariConfig();
config.setDriverClassName("org.duckdb.DuckDBDriver");
config.setMaximumPoolSize(duckDbMaximumPoolSize);
config.setMaxLifetime(duckDbMaxLifetime);
config.setJdbcUrl("jdbc:duckdb:");
return config;
}
@Bean(name = "duckDbDataSource")
@DependsOn("duckDbConfig")
public DataSource getDuckDbDataSource(@Qualifier("duckDbConfig") HikariConfig config) {
HikariDataSource ds = new HikariDataSource(config);
return ds;
}
@Bean("duckDbJdbcTemplate")
@DependsOn("duckDbDataSource")
public JdbcTemplate getDuckDbTemplate(@Qualifier("duckDbDataSource") DataSource dataSource) {
JdbcTemplate jdbcTemplate = new JdbcTemplate();
jdbcTemplate.setDataSource(dataSource);
init(jdbcTemplate);
return jdbcTemplate;
}
protected void init(JdbcTemplate jdbcTemplate) {
jdbcTemplate.execute(String.format("SET memory_limit = '%sGB';", memoryLimit));
jdbcTemplate.execute(String.format("SET temp_directory='%s';", duckDbTemp));
jdbcTemplate.execute(String.format("SET threads TO %s;", threads));
jdbcTemplate.execute("SET enable_object_cache = true;");
}
public JdbcTemplate getDuckDbJdbcTemplate() {
return duckDbJdbcTemplate;
}
public void setDuckDbJdbcTemplate(JdbcTemplate jdbcTemplate) {
this.duckDbJdbcTemplate = jdbcTemplate;
}
public void execute(String sql) {
duckDbJdbcTemplate.execute(sql);
}
public void query(String sql, SemanticQueryResp queryResultWithColumns) {
duckDbJdbcTemplate.query(sql, rs -> {
if (null == rs) {
return queryResultWithColumns;
}
ResultSetMetaData metaData = rs.getMetaData();
List<QueryColumn> queryColumns = new ArrayList<>();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String key = metaData.getColumnLabel(i);
queryColumns.add(new QueryColumn(key, metaData.getColumnTypeName(i)));
}
queryResultWithColumns.setColumns(queryColumns);
List<Map<String, Object>> resultList = buildResult(rs);
queryResultWithColumns.setResultList(resultList);
return queryResultWithColumns;
});
}
public static List<Map<String, Object>> buildResult(ResultSet resultSet) {
List<Map<String, Object>> list = new ArrayList<>();
try {
ResultSetMetaData rsMeta = resultSet.getMetaData();
int columnCount = rsMeta.getColumnCount();
while (resultSet.next()) {
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String column = rsMeta.getColumnName(i);
switch (rsMeta.getColumnType(i)) {
case java.sql.Types.BOOLEAN:
row.put(column, resultSet.getBoolean(i));
break;
case java.sql.Types.INTEGER:
row.put(column, resultSet.getInt(i));
break;
case java.sql.Types.BIGINT:
row.put(column, resultSet.getLong(i));
break;
case java.sql.Types.DOUBLE:
row.put(column, resultSet.getDouble(i));
break;
case java.sql.Types.VARCHAR:
row.put(column, resultSet.getString(i));
break;
case java.sql.Types.NUMERIC:
row.put(column, resultSet.getBigDecimal(i));
break;
case java.sql.Types.TINYINT:
row.put(column, (int) resultSet.getByte(i));
break;
case java.sql.Types.SMALLINT:
row.put(column, resultSet.getShort(i));
break;
case java.sql.Types.REAL:
row.put(column, resultSet.getFloat(i));
break;
case java.sql.Types.DATE:
row.put(column, resultSet.getDate(i));
break;
case java.sql.Types.TIME:
row.put(column, resultSet.getTime(i));
break;
case java.sql.Types.TIMESTAMP:
row.put(column, resultSet.getTimestamp(i));
break;
case java.sql.Types.JAVA_OBJECT:
row.put(column, resultSet.getObject(i));
break;
default:
throw new Exception("get result row type not found :" + rsMeta.getColumnType(i));
}
}
list.add(row);
}
} catch (Exception e) {
log.error("buildResult error {}", e);
}
return list;
}
}

View File

@@ -0,0 +1,18 @@
package com.tencent.supersonic.headless.core.pojo;
import java.util.List;
import lombok.Builder;
import lombok.Data;
@Data
@Builder
public class Materialization {
private String name;
private Long id;
private Long dataSetId;
private List<String> columns;
private List<String> partitions;
private boolean isPartitioned;
private String partitionName;
}

View File

@@ -2,10 +2,11 @@ package com.tencent.supersonic.headless.core.utils;
import com.tencent.supersonic.common.util.ContextUtils;
import com.tencent.supersonic.headless.core.cache.QueryCache;
import com.tencent.supersonic.headless.core.chat.parser.llm.DataSetResolver;
import com.tencent.supersonic.headless.core.chat.parser.llm.JavaLLMProxy;
import com.tencent.supersonic.headless.core.chat.parser.llm.LLMProxy;
import com.tencent.supersonic.headless.core.chat.parser.llm.DataSetResolver;
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator;
import com.tencent.supersonic.headless.core.parser.SqlParser;
import com.tencent.supersonic.headless.core.parser.converter.HeadlessConverter;
import com.tencent.supersonic.headless.core.planner.QueryOptimizer;
@@ -28,6 +29,7 @@ public class ComponentFactory {
private static List<HeadlessConverter> headlessConverters = new ArrayList<>();
private static Map<String, QueryOptimizer> queryOptimizers = new HashMap<>();
private static List<QueryExecutor> queryExecutors = new ArrayList<>();
private static List<QueryAccelerator> queryAccelerators = new ArrayList<>();
private static SqlParser sqlParser;
private static QueryCache queryCache;
@@ -61,6 +63,13 @@ public class ComponentFactory {
return queryExecutors;
}
public static List<QueryAccelerator> getQueryAccelerators() {
if (queryAccelerators.isEmpty()) {
initQueryAccelerators();
}
return queryAccelerators;
}
public static SqlParser getSqlParser() {
if (sqlParser == null) {
initQueryParser();
@@ -96,6 +105,11 @@ public class ComponentFactory {
init(QueryExecutor.class, queryExecutors);
}
private static void initQueryAccelerators() {
//queryExecutors.add(ContextUtils.getContext().getBean("JdbcExecutor", JdbcExecutor.class));
init(QueryAccelerator.class, queryAccelerators);
}
private static void initSemanticConverter() {
init(HeadlessConverter.class, headlessConverters);
}

View File

@@ -0,0 +1,72 @@
package com.tencent.supersonic.headless.core.utils;
import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp;
import com.tencent.supersonic.headless.core.pojo.DuckDbSource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* tools functions to duckDb query
*/
public class JdbcDuckDbUtils {
public static void attachMysql(DuckDbSource duckDbSource, String host, Integer port, String user,
String password,
String database)
throws Exception {
try {
duckDbSource.execute("INSTALL mysql");
duckDbSource.execute("load mysql");
String attachSql = "ATTACH 'host=%s port=%s user=%s password=%s database=%s' AS mysqldb (TYPE mysql);";
duckDbSource.execute(String.format(attachSql,
host,
port,
user,
password,
database
));
duckDbSource.execute("SET mysql_experimental_filter_pushdown = true;");
} catch (Exception e) {
throw e;
}
}
public static List<String> getParquetColumns(DuckDbSource duckDbSource, String parquetPath) throws Exception {
SemanticQueryResp queryResultWithColumns = new SemanticQueryResp();
duckDbSource.query(String.format("SELECT distinct name FROM parquet_schema('%s')", parquetPath),
queryResultWithColumns);
if (!queryResultWithColumns.getResultList().isEmpty()) {
return queryResultWithColumns.getResultList().stream()
.filter(l -> l.containsKey("name") && Objects.nonNull(l.get("name")))
.map(l -> (String) l.get("name")).collect(Collectors.toList());
}
return new ArrayList<>();
}
public static List<String> getParquetPartition(DuckDbSource duckDbSource, String parquetPath, String partitionName)
throws Exception {
SemanticQueryResp queryResultWithColumns = new SemanticQueryResp();
duckDbSource.query(String.format("SELECT distinct %s as partition FROM read_parquet('%s')", partitionName,
parquetPath), queryResultWithColumns);
if (!queryResultWithColumns.getResultList().isEmpty()) {
return queryResultWithColumns.getResultList().stream()
.filter(l -> l.containsKey("partition") && Objects.nonNull(l.get("partition")))
.map(l -> (String) l.get("partition")).collect(Collectors.toList());
}
return new ArrayList<>();
}
public static boolean createDatabase(DuckDbSource duckDbSource, String db) throws Exception {
duckDbSource.execute("CREATE SCHEMA IF NOT EXISTS " + db);
return true;
}
public static boolean createView(DuckDbSource duckDbSource, String view, String sql) throws Exception {
duckDbSource.execute(String.format("CREATE OR REPLACE VIEW %s AS %s;", view, sql));
return true;
}
}

View File

@@ -23,6 +23,7 @@ import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp;
import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp;
import com.tencent.supersonic.headless.core.cache.QueryCache;
import com.tencent.supersonic.headless.core.executor.QueryExecutor;
import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator;
import com.tencent.supersonic.headless.core.parser.DefaultQueryParser;
import com.tencent.supersonic.headless.core.parser.QueryParser;
import com.tencent.supersonic.headless.core.parser.calcite.s2sql.SemanticModel;
@@ -38,15 +39,14 @@ import com.tencent.supersonic.headless.server.service.QueryService;
import com.tencent.supersonic.headless.server.utils.QueryReqConverter;
import com.tencent.supersonic.headless.server.utils.QueryUtils;
import com.tencent.supersonic.headless.server.utils.StatUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@Service
@Slf4j
@@ -249,7 +249,16 @@ public class QueryServiceImpl implements QueryService {
queryParser.parse(queryStatement);
//2 plan
QueryExecutor queryExecutor = queryPlanner.plan(queryStatement);
//3 execute
//3 accelerate
QueryAccelerator queryAccelerator = queryPlanner.accelerate(queryStatement);
if (queryAccelerator != null) {
semanticQueryResp = queryAccelerator.query(queryStatement);
if (Objects.nonNull(semanticQueryResp) && !semanticQueryResp.getResultList().isEmpty()) {
log.info("query by Accelerator {}", queryAccelerator.getClass().getSimpleName());
return semanticQueryResp;
}
}
//4 execute
if (queryExecutor != null) {
semanticQueryResp = queryExecutor.execute(queryStatement);
queryUtils.fillItemNameInfo(semanticQueryResp, queryStatement.getSemanticSchemaResp());

View File

@@ -76,6 +76,7 @@
<langchain4j.version>0.24.0</langchain4j.version>
<postgresql.version>42.7.1</postgresql.version>
<st.version>4.0.8</st.version>
<duckdb_jdbc.version>0.10.0</duckdb_jdbc.version>
</properties>
<dependencyManagement>