diff --git a/common/src/main/java/com/tencent/supersonic/common/util/FileUtils.java b/common/src/main/java/com/tencent/supersonic/common/util/FileUtils.java new file mode 100644 index 000000000..bae0c2129 --- /dev/null +++ b/common/src/main/java/com/tencent/supersonic/common/util/FileUtils.java @@ -0,0 +1,89 @@ +package com.tencent.supersonic.common.util; + + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * * tools functions to file + */ +public class FileUtils { + + public static Boolean exit(String path) { + File file = new File(path); + return file.exists(); + } + + public static long getLastModified(String path) { + if (!exit(path)) { + return -1; + } + File file = new File(path); + Optional lastModified = Arrays.stream(file.listFiles()).filter(f -> f.isFile()) + .map(f -> f.lastModified()).sorted(Collections.reverseOrder()).findFirst(); + + if (lastModified.isPresent()) { + return lastModified.get(); + } + return -1; + } + + public static File[] getDirFiles(File file) { + if (file.isDirectory()) { + return file.listFiles(); + } + return null; + } + + public static void scanDirectory(File file, int maxLevel, Map> directories) { + if (maxLevel < 0) { + return; + } + if (!file.exists() || !file.isDirectory()) { + return; + } + if (!directories.containsKey(maxLevel)) { + directories.put(maxLevel, new ArrayList<>()); + } + for (File f : file.listFiles()) { + if (f.isDirectory()) { + directories.get(maxLevel).add(f); + scanDirectory(f, maxLevel - 1, directories); + } + } + } + + public static Map>> getTop3Directory(String path) { + Map>> result = new HashMap<>(); + File file = new File(path); + if (!file.exists() || !file.isDirectory()) { + return result; + } + Map> directories = new HashMap<>(); + scanDirectory(file, 2, directories); + for (int i = 2; i >= 0; i--) { + for (File f : directories.getOrDefault(i, new ArrayList<>())) { + if (i == 2) { + result.put(f.getName(), new HashMap<>()); + continue; + } + if (i == 1 && result.containsKey(f.getParentFile().getName())) { + result.get(f.getParentFile().getName()).put(f.getName(), new ArrayList<>()); + continue; + } + String parent = f.getParentFile().getParentFile().getName(); + if (result.containsKey(parent) && result.get(parent).containsKey(f.getParentFile().getName())) { + result.get(parent).get(f.getParentFile().getName()).add(f.getName()); + } + } + } + return result; + } + +} diff --git a/common/src/main/java/com/tencent/supersonic/common/util/jsqlparser/SqlSelectHelper.java b/common/src/main/java/com/tencent/supersonic/common/util/jsqlparser/SqlSelectHelper.java index db1d50dd4..cf01ced2c 100644 --- a/common/src/main/java/com/tencent/supersonic/common/util/jsqlparser/SqlSelectHelper.java +++ b/common/src/main/java/com/tencent/supersonic/common/util/jsqlparser/SqlSelectHelper.java @@ -1,6 +1,14 @@ package com.tencent.supersonic.common.util.jsqlparser; import com.tencent.supersonic.common.util.StringUtil; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.expression.Alias; @@ -28,6 +36,7 @@ import net.sf.jsqlparser.schema.Table; import net.sf.jsqlparser.statement.Statement; import net.sf.jsqlparser.statement.select.Distinct; import net.sf.jsqlparser.statement.select.GroupByElement; +import net.sf.jsqlparser.statement.select.Join; import net.sf.jsqlparser.statement.select.LateralView; import net.sf.jsqlparser.statement.select.OrderByElement; import net.sf.jsqlparser.statement.select.ParenthesedSelect; @@ -36,16 +45,10 @@ import net.sf.jsqlparser.statement.select.Select; import net.sf.jsqlparser.statement.select.SelectItem; import net.sf.jsqlparser.statement.select.SelectVisitorAdapter; import net.sf.jsqlparser.statement.select.SetOperationList; +import net.sf.jsqlparser.statement.select.WithItem; import org.apache.commons.lang3.StringUtils; import org.springframework.util.CollectionUtils; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - /** * Sql Parser Select Helper */ @@ -608,5 +611,52 @@ public class SqlSelectHelper { getColumnFromExpr(expr.getExpression(), columns); } } + + public static Map> getFieldsWithSubQuery(String sql) { + List plainSelects = getPlainSelects(getPlainSelect(sql)); + Map> results = new HashMap<>(); + for (PlainSelect plainSelect : plainSelects) { + getFieldsWithSubQuery(plainSelect, results); + } + return results; + } + + private static void getFieldsWithSubQuery(PlainSelect plainSelect, Map> fields) { + if (plainSelect.getFromItem() instanceof Table) { + boolean isWith = false; + if (!CollectionUtils.isEmpty(plainSelect.getWithItemsList())) { + for (WithItem withItem : plainSelect.getWithItemsList()) { + if (Objects.nonNull(withItem.getSelect())) { + getFieldsWithSubQuery(withItem.getSelect().getPlainSelect(), fields); + isWith = true; + } + } + } + if (!isWith) { + Table table = (Table) plainSelect.getFromItem(); + if (!fields.containsKey(table.getFullyQualifiedName())) { + fields.put(table.getFullyQualifiedName(), new HashSet<>()); + } + List sqlFields = getFieldsByPlainSelect(plainSelect).stream().map(f -> f.replaceAll("`", "")) + .collect( + Collectors.toList()); + fields.get(table.getFullyQualifiedName()).addAll(sqlFields); + } + } + if (plainSelect.getFromItem() instanceof ParenthesedSelect) { + ParenthesedSelect parenthesedSelect = (ParenthesedSelect) plainSelect.getFromItem(); + getFieldsWithSubQuery(parenthesedSelect.getPlainSelect(), fields); + if (!CollectionUtils.isEmpty(plainSelect.getJoins())) { + for (Join join : plainSelect.getJoins()) { + if (join.getRightItem() instanceof ParenthesedSelect) { + getFieldsWithSubQuery(((ParenthesedSelect) join.getRightItem()).getPlainSelect(), fields); + } + if (join.getFromItem() instanceof ParenthesedSelect) { + getFieldsWithSubQuery(((ParenthesedSelect) join.getFromItem()).getPlainSelect(), fields); + } + } + } + } + } } diff --git a/headless/core/pom.xml b/headless/core/pom.xml index 327998c32..2a0006bad 100644 --- a/headless/core/pom.xml +++ b/headless/core/pom.xml @@ -167,6 +167,11 @@ xk-time ${xk.time.version} + + org.duckdb + duckdb_jdbc + ${duckdb_jdbc.version} + diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/executor/accelerator/AbstractAccelerator.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/executor/accelerator/AbstractAccelerator.java new file mode 100644 index 000000000..0dc25dd83 --- /dev/null +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/executor/accelerator/AbstractAccelerator.java @@ -0,0 +1,234 @@ +package com.tencent.supersonic.headless.core.executor.accelerator; + +import com.tencent.supersonic.common.util.jsqlparser.SqlSelectHelper; +import com.tencent.supersonic.headless.core.parser.calcite.Configuration; +import com.tencent.supersonic.headless.core.parser.calcite.s2sql.TimeRange; +import com.tencent.supersonic.headless.core.parser.calcite.schema.DataSourceTable; +import com.tencent.supersonic.headless.core.parser.calcite.schema.DataSourceTable.Builder; +import com.tencent.supersonic.headless.core.parser.calcite.schema.SchemaBuilder; +import com.tencent.supersonic.headless.core.pojo.Materialization; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.calcite.adapter.enumerable.EnumerableRules; +import org.apache.calcite.config.CalciteConnectionConfigImpl; +import org.apache.calcite.jdbc.CalciteSchema; +import org.apache.calcite.plan.ConventionTraitDef; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptMaterialization; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.calcite.rel.RelDistributionTraitDef; +import org.apache.calcite.rel.RelHomogeneousShuttle; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelShuttle; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.rules.materialize.MaterializedViewRules; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.RelBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.springframework.util.CollectionUtils; + +/** + * abstract of accelerator , provide Basic methods + */ +@Slf4j +public abstract class AbstractAccelerator implements QueryAccelerator { + + public static final String MATERIALIZATION_SYS_DB = "sys"; + public static final String MATERIALIZATION_SYS_SOURCE = "sys_src"; + public static final String MATERIALIZATION_SYS_VIEW = "sys_view"; + public static final String MATERIALIZATION_SYS_PARTITION = "sys_partition"; + + /** + * check if a materialization match the fields and partitions + */ + protected boolean check(RelOptPlanner relOptPlanner, RelBuilder relBuilder, + CalciteCatalogReader calciteCatalogReader, Materialization materialization, List fields, + List> partitions) { + if (!materialization.isPartitioned()) { + return fields.stream().allMatch(f -> materialization.getColumns().contains(f)); + } + Set queryFields = new HashSet<>(fields); + queryFields.add(MATERIALIZATION_SYS_PARTITION); + List queryFieldList = queryFields.stream().collect(Collectors.toList()); + + Set viewFields = new HashSet<>(materialization.getColumns()); + viewFields.add(MATERIALIZATION_SYS_PARTITION); + List viewFieldList = viewFields.stream().collect(Collectors.toList()); + + Set materializationFields = new HashSet<>(viewFields); + materializationFields.addAll(queryFields); + List materializationFieldList = materializationFields.stream().collect(Collectors.toList()); + + relBuilder.clear(); + if (!CollectionUtils.isEmpty(relOptPlanner.getMaterializations())) { + relOptPlanner.clear(); + } + + Materialization viewMaterialization = Materialization.builder().build(); + viewMaterialization.setName(String.format("%s.%s", MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_VIEW)); + viewMaterialization.setColumns(viewFieldList); + addMaterialization(calciteCatalogReader.getRootSchema(), viewMaterialization); + + Materialization queryMaterialization = Materialization.builder().build(); + queryMaterialization.setName(String.format("%s.%s", MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_SOURCE)); + + queryMaterialization.setColumns(materializationFieldList); + addMaterialization(calciteCatalogReader.getRootSchema(), queryMaterialization); + + RelNode replacement = relBuilder.scan(Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_VIEW)).build(); + RelBuilder viewBuilder = relBuilder.scan(Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_SOURCE)); + if (materialization.isPartitioned()) { + RexNode viewFilter = getRexNode(relBuilder, materialization, + MATERIALIZATION_SYS_PARTITION); + viewBuilder = viewBuilder.filter(viewFilter); + } + RelNode viewRel = project(viewBuilder, viewFieldList).build(); + List view = Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_VIEW); + RelOptMaterialization relOptMaterialization = new RelOptMaterialization(replacement, viewRel, null, + view); + relOptPlanner.addMaterialization(relOptMaterialization); + + RelBuilder checkBuilder = relBuilder.scan(Arrays.asList(MATERIALIZATION_SYS_DB, MATERIALIZATION_SYS_SOURCE)); + if (materialization.isPartitioned()) { + checkBuilder = checkBuilder.filter(getRexNode(checkBuilder, partitions, MATERIALIZATION_SYS_PARTITION)); + } + RelNode checkRel = project(checkBuilder, queryFieldList).build(); + relOptPlanner.setRoot(checkRel); + RelNode optRel = relOptPlanner.findBestExp(); + System.out.println(optRel.explain()); + return !extractTableNames(optRel).contains(MATERIALIZATION_SYS_SOURCE); + } + + protected Map> getFields(String sql) { + return SqlSelectHelper.getFieldsWithSubQuery(sql); + } + + protected CalciteCatalogReader getCalciteCatalogReader() { + CalciteCatalogReader calciteCatalogReader; + CalciteSchema viewSchema = SchemaBuilder.getMaterializationSchema(); + calciteCatalogReader = new CalciteCatalogReader( + CalciteSchema.from(viewSchema.plus()), + CalciteSchema.from(viewSchema.plus()).path(null), + Configuration.typeFactory, + new CalciteConnectionConfigImpl(new Properties())); + return calciteCatalogReader; + } + + protected RelOptPlanner getRelOptPlanner() { + HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); + hepProgramBuilder.addRuleInstance(MaterializedViewRules.PROJECT_FILTER); + RelOptPlanner relOptPlanner = new HepPlanner(hepProgramBuilder.build()); + return relOptPlanner; + } + + protected RelBuilder builderMaterializationPlan(CalciteCatalogReader calciteCatalogReader, + RelOptPlanner relOptPlanner) { + relOptPlanner.addRelTraitDef(ConventionTraitDef.INSTANCE); + relOptPlanner.addRelTraitDef(RelDistributionTraitDef.INSTANCE); + EnumerableRules.rules().forEach(relOptPlanner::addRule); + RexBuilder rexBuilder = new RexBuilder(Configuration.typeFactory); + RelOptCluster relOptCluster = RelOptCluster.create(relOptPlanner, rexBuilder); + return RelFactories.LOGICAL_BUILDER.create(relOptCluster, calciteCatalogReader); + } + + protected void addMaterialization(CalciteSchema dataSetSchema, Materialization materialization) { + String[] dbTable = materialization.getName().split("\\."); + String tb = dbTable[1].toLowerCase(); + String db = dbTable[0].toLowerCase(); + Builder builder = DataSourceTable.newBuilder(tb); + for (String f : materialization.getColumns()) { + builder.addField(f, SqlTypeName.VARCHAR); + } + if (StringUtils.isNotBlank(materialization.getPartitionName())) { + builder.addField(materialization.getPartitionName(), SqlTypeName.VARCHAR); + } + DataSourceTable srcTable = builder.withRowCount(1L).build(); + if (Objects.nonNull(db) && !db.isEmpty()) { + SchemaPlus schemaPlus = dataSetSchema.plus().getSubSchema(db); + if (Objects.isNull(schemaPlus)) { + dataSetSchema.plus().add(db, new AbstractSchema()); + schemaPlus = dataSetSchema.plus().getSubSchema(db); + } + schemaPlus.add(tb, srcTable); + } else { + dataSetSchema.add(tb, srcTable); + } + + } + + protected Set extractTableNames(RelNode relNode) { + Set tableNames = new HashSet<>(); + RelShuttle shuttle = new RelHomogeneousShuttle() { + public RelNode visit(TableScan scan) { + RelOptTable table = scan.getTable(); + tableNames.addAll(table.getQualifiedName()); + return scan; + } + }; + relNode.accept(shuttle); + return tableNames; + } + + protected RexNode getRexNodeByTimeRange(RelBuilder relBuilder, TimeRange timeRange, String field) { + return relBuilder.call(SqlStdOperatorTable.AND, + relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, relBuilder.field(field), + relBuilder.literal(timeRange.getStart())), + relBuilder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, relBuilder.field(field), + relBuilder.literal(timeRange.getEnd()))); + } + + protected RexNode getRexNode(RelBuilder relBuilder, Materialization materialization, String viewField) { + RexNode rexNode = null; + for (String partition : materialization.getPartitions()) { + TimeRange timeRange = TimeRange.builder().start(partition).end(partition).build(); + if (rexNode == null) { + rexNode = getRexNodeByTimeRange(relBuilder, timeRange, viewField); + continue; + } + rexNode = relBuilder.call(SqlStdOperatorTable.OR, rexNode, + getRexNodeByTimeRange(relBuilder, timeRange, viewField)); + } + return rexNode; + } + + protected RexNode getRexNode(RelBuilder relBuilder, List> timeRanges, + String viewField) { + RexNode rexNode = null; + for (ImmutablePair timeRange : timeRanges) { + if (rexNode == null) { + rexNode = getRexNodeByTimeRange(relBuilder, + TimeRange.builder().start(timeRange.left).end(timeRange.right).build(), + viewField); + continue; + } + rexNode = relBuilder.call(SqlStdOperatorTable.OR, rexNode, + getRexNodeByTimeRange(relBuilder, + TimeRange.builder().start(timeRange.left).end(timeRange.right).build(), + viewField)); + } + return rexNode; + } + + private static RelBuilder project(RelBuilder relBuilder, List fields) { + List rexNodes = fields.stream().map(f -> relBuilder.field(f)).collect(Collectors.toList()); + return relBuilder.project(rexNodes); + } +} diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/executor/accelerator/QueryAccelerator.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/executor/accelerator/QueryAccelerator.java new file mode 100644 index 000000000..4ce357332 --- /dev/null +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/executor/accelerator/QueryAccelerator.java @@ -0,0 +1,17 @@ +package com.tencent.supersonic.headless.core.executor.accelerator; + +import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp; +import com.tencent.supersonic.headless.core.pojo.QueryStatement; + +/** + * customize various query media ( like duckDb redis) to improved query performance + * check ok and query successful , return SemanticQueryResp to interface immediately + */ +public interface QueryAccelerator { + + boolean reload(); + + boolean check(QueryStatement queryStatement); + + SemanticQueryResp query(QueryStatement queryStatement); +} diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/DefaultQueryPlanner.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/DefaultQueryPlanner.java index 0696ad086..1dafe6126 100644 --- a/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/DefaultQueryPlanner.java +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/DefaultQueryPlanner.java @@ -1,6 +1,7 @@ package com.tencent.supersonic.headless.core.planner; import com.tencent.supersonic.headless.core.executor.QueryExecutor; +import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator; import com.tencent.supersonic.headless.core.pojo.QueryStatement; import com.tencent.supersonic.headless.core.utils.ComponentFactory; import lombok.extern.slf4j.Slf4j; @@ -29,4 +30,14 @@ public class DefaultQueryPlanner implements QueryPlanner { } return null; } + + @Override + public QueryAccelerator accelerate(QueryStatement queryStatement) { + for (QueryAccelerator queryAccelerator : ComponentFactory.getQueryAccelerators()) { + if (queryAccelerator.check(queryStatement)) { + return queryAccelerator; + } + } + return null; + } } diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/QueryPlanner.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/QueryPlanner.java index 5400e4c0e..edb28077e 100644 --- a/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/QueryPlanner.java +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/planner/QueryPlanner.java @@ -1,6 +1,7 @@ package com.tencent.supersonic.headless.core.planner; import com.tencent.supersonic.headless.core.executor.QueryExecutor; +import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator; import com.tencent.supersonic.headless.core.pojo.QueryStatement; /** @@ -11,4 +12,6 @@ public interface QueryPlanner { QueryExecutor plan(QueryStatement queryStatement); QueryExecutor route(QueryStatement queryStatement); + + QueryAccelerator accelerate(QueryStatement queryStatement); } diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/DuckDbSource.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/DuckDbSource.java new file mode 100644 index 000000000..22beac0c7 --- /dev/null +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/DuckDbSource.java @@ -0,0 +1,178 @@ +package com.tencent.supersonic.headless.core.pojo; + + +import com.tencent.supersonic.common.pojo.QueryColumn; +import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.DependsOn; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; + +/** + * duckDb connection session object + */ +@Component +@Slf4j +public class DuckDbSource { + + @Value("${accelerator.duckDb.temp:/data1/duck/tmp/}") + protected String duckDbTemp; + + @Value("${accelerator.duckDb.maximumPoolSize:10}") + protected Integer duckDbMaximumPoolSize; + + @Value("${accelerator.duckDb.MaxLifetime:3}") + protected Integer duckDbMaxLifetime; + + @Value("${accelerator.duckDb.memoryLimit:31}") + protected Integer memoryLimit; + + @Value("${accelerator.duckDb.threads:32}") + protected Integer threads; + + + @Autowired + @Qualifier("duckDbDataSource") + protected DataSource duckDbDataSource; + + @Autowired + @Qualifier("duckDbJdbcTemplate") + protected JdbcTemplate duckDbJdbcTemplate; + + @Bean(name = "duckDbConfig") + public HikariConfig getHikariConfig() { + HikariConfig config = new HikariConfig(); + config.setDriverClassName("org.duckdb.DuckDBDriver"); + config.setMaximumPoolSize(duckDbMaximumPoolSize); + config.setMaxLifetime(duckDbMaxLifetime); + config.setJdbcUrl("jdbc:duckdb:"); + return config; + } + + @Bean(name = "duckDbDataSource") + @DependsOn("duckDbConfig") + public DataSource getDuckDbDataSource(@Qualifier("duckDbConfig") HikariConfig config) { + HikariDataSource ds = new HikariDataSource(config); + return ds; + } + + @Bean("duckDbJdbcTemplate") + @DependsOn("duckDbDataSource") + public JdbcTemplate getDuckDbTemplate(@Qualifier("duckDbDataSource") DataSource dataSource) { + JdbcTemplate jdbcTemplate = new JdbcTemplate(); + jdbcTemplate.setDataSource(dataSource); + init(jdbcTemplate); + return jdbcTemplate; + } + + protected void init(JdbcTemplate jdbcTemplate) { + jdbcTemplate.execute(String.format("SET memory_limit = '%sGB';", memoryLimit)); + jdbcTemplate.execute(String.format("SET temp_directory='%s';", duckDbTemp)); + jdbcTemplate.execute(String.format("SET threads TO %s;", threads)); + jdbcTemplate.execute("SET enable_object_cache = true;"); + } + + public JdbcTemplate getDuckDbJdbcTemplate() { + return duckDbJdbcTemplate; + } + + public void setDuckDbJdbcTemplate(JdbcTemplate jdbcTemplate) { + this.duckDbJdbcTemplate = jdbcTemplate; + } + + public void execute(String sql) { + duckDbJdbcTemplate.execute(sql); + } + + public void query(String sql, SemanticQueryResp queryResultWithColumns) { + duckDbJdbcTemplate.query(sql, rs -> { + if (null == rs) { + return queryResultWithColumns; + } + ResultSetMetaData metaData = rs.getMetaData(); + List queryColumns = new ArrayList<>(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + String key = metaData.getColumnLabel(i); + queryColumns.add(new QueryColumn(key, metaData.getColumnTypeName(i))); + } + queryResultWithColumns.setColumns(queryColumns); + List> resultList = buildResult(rs); + queryResultWithColumns.setResultList(resultList); + return queryResultWithColumns; + }); + } + + public static List> buildResult(ResultSet resultSet) { + List> list = new ArrayList<>(); + try { + ResultSetMetaData rsMeta = resultSet.getMetaData(); + int columnCount = rsMeta.getColumnCount(); + while (resultSet.next()) { + Map row = new HashMap<>(); + for (int i = 1; i <= columnCount; i++) { + String column = rsMeta.getColumnName(i); + switch (rsMeta.getColumnType(i)) { + case java.sql.Types.BOOLEAN: + row.put(column, resultSet.getBoolean(i)); + break; + case java.sql.Types.INTEGER: + row.put(column, resultSet.getInt(i)); + break; + case java.sql.Types.BIGINT: + row.put(column, resultSet.getLong(i)); + break; + case java.sql.Types.DOUBLE: + row.put(column, resultSet.getDouble(i)); + break; + case java.sql.Types.VARCHAR: + row.put(column, resultSet.getString(i)); + break; + case java.sql.Types.NUMERIC: + row.put(column, resultSet.getBigDecimal(i)); + break; + case java.sql.Types.TINYINT: + row.put(column, (int) resultSet.getByte(i)); + break; + case java.sql.Types.SMALLINT: + row.put(column, resultSet.getShort(i)); + break; + case java.sql.Types.REAL: + row.put(column, resultSet.getFloat(i)); + break; + case java.sql.Types.DATE: + row.put(column, resultSet.getDate(i)); + break; + case java.sql.Types.TIME: + row.put(column, resultSet.getTime(i)); + break; + case java.sql.Types.TIMESTAMP: + row.put(column, resultSet.getTimestamp(i)); + break; + case java.sql.Types.JAVA_OBJECT: + row.put(column, resultSet.getObject(i)); + break; + default: + throw new Exception("get result row type not found :" + rsMeta.getColumnType(i)); + } + } + list.add(row); + } + } catch (Exception e) { + log.error("buildResult error {}", e); + } + return list; + } +} diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/Materialization.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/Materialization.java new file mode 100644 index 000000000..43b95d773 --- /dev/null +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/pojo/Materialization.java @@ -0,0 +1,18 @@ +package com.tencent.supersonic.headless.core.pojo; + +import java.util.List; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class Materialization { + + private String name; + private Long id; + private Long dataSetId; + private List columns; + private List partitions; + private boolean isPartitioned; + private String partitionName; +} diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/ComponentFactory.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/ComponentFactory.java index bbb4562a2..d49dcec7d 100644 --- a/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/ComponentFactory.java +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/ComponentFactory.java @@ -2,10 +2,11 @@ package com.tencent.supersonic.headless.core.utils; import com.tencent.supersonic.common.util.ContextUtils; import com.tencent.supersonic.headless.core.cache.QueryCache; +import com.tencent.supersonic.headless.core.chat.parser.llm.DataSetResolver; import com.tencent.supersonic.headless.core.chat.parser.llm.JavaLLMProxy; import com.tencent.supersonic.headless.core.chat.parser.llm.LLMProxy; -import com.tencent.supersonic.headless.core.chat.parser.llm.DataSetResolver; import com.tencent.supersonic.headless.core.executor.QueryExecutor; +import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator; import com.tencent.supersonic.headless.core.parser.SqlParser; import com.tencent.supersonic.headless.core.parser.converter.HeadlessConverter; import com.tencent.supersonic.headless.core.planner.QueryOptimizer; @@ -28,6 +29,7 @@ public class ComponentFactory { private static List headlessConverters = new ArrayList<>(); private static Map queryOptimizers = new HashMap<>(); private static List queryExecutors = new ArrayList<>(); + private static List queryAccelerators = new ArrayList<>(); private static SqlParser sqlParser; private static QueryCache queryCache; @@ -61,6 +63,13 @@ public class ComponentFactory { return queryExecutors; } + public static List getQueryAccelerators() { + if (queryAccelerators.isEmpty()) { + initQueryAccelerators(); + } + return queryAccelerators; + } + public static SqlParser getSqlParser() { if (sqlParser == null) { initQueryParser(); @@ -96,6 +105,11 @@ public class ComponentFactory { init(QueryExecutor.class, queryExecutors); } + private static void initQueryAccelerators() { + //queryExecutors.add(ContextUtils.getContext().getBean("JdbcExecutor", JdbcExecutor.class)); + init(QueryAccelerator.class, queryAccelerators); + } + private static void initSemanticConverter() { init(HeadlessConverter.class, headlessConverters); } diff --git a/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/JdbcDuckDbUtils.java b/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/JdbcDuckDbUtils.java new file mode 100644 index 000000000..b9fb394d0 --- /dev/null +++ b/headless/core/src/main/java/com/tencent/supersonic/headless/core/utils/JdbcDuckDbUtils.java @@ -0,0 +1,72 @@ +package com.tencent.supersonic.headless.core.utils; + + +import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp; +import com.tencent.supersonic.headless.core.pojo.DuckDbSource; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * tools functions to duckDb query + */ +public class JdbcDuckDbUtils { + + public static void attachMysql(DuckDbSource duckDbSource, String host, Integer port, String user, + String password, + String database) + throws Exception { + try { + duckDbSource.execute("INSTALL mysql"); + duckDbSource.execute("load mysql"); + String attachSql = "ATTACH 'host=%s port=%s user=%s password=%s database=%s' AS mysqldb (TYPE mysql);"; + duckDbSource.execute(String.format(attachSql, + host, + port, + user, + password, + database + )); + duckDbSource.execute("SET mysql_experimental_filter_pushdown = true;"); + } catch (Exception e) { + throw e; + } + } + + public static List getParquetColumns(DuckDbSource duckDbSource, String parquetPath) throws Exception { + SemanticQueryResp queryResultWithColumns = new SemanticQueryResp(); + duckDbSource.query(String.format("SELECT distinct name FROM parquet_schema('%s')", parquetPath), + queryResultWithColumns); + if (!queryResultWithColumns.getResultList().isEmpty()) { + return queryResultWithColumns.getResultList().stream() + .filter(l -> l.containsKey("name") && Objects.nonNull(l.get("name"))) + .map(l -> (String) l.get("name")).collect(Collectors.toList()); + } + return new ArrayList<>(); + } + + public static List getParquetPartition(DuckDbSource duckDbSource, String parquetPath, String partitionName) + throws Exception { + SemanticQueryResp queryResultWithColumns = new SemanticQueryResp(); + duckDbSource.query(String.format("SELECT distinct %s as partition FROM read_parquet('%s')", partitionName, + parquetPath), queryResultWithColumns); + if (!queryResultWithColumns.getResultList().isEmpty()) { + return queryResultWithColumns.getResultList().stream() + .filter(l -> l.containsKey("partition") && Objects.nonNull(l.get("partition"))) + .map(l -> (String) l.get("partition")).collect(Collectors.toList()); + } + return new ArrayList<>(); + } + + public static boolean createDatabase(DuckDbSource duckDbSource, String db) throws Exception { + duckDbSource.execute("CREATE SCHEMA IF NOT EXISTS " + db); + return true; + } + + public static boolean createView(DuckDbSource duckDbSource, String view, String sql) throws Exception { + duckDbSource.execute(String.format("CREATE OR REPLACE VIEW %s AS %s;", view, sql)); + return true; + } + +} diff --git a/headless/server/src/main/java/com/tencent/supersonic/headless/server/service/impl/QueryServiceImpl.java b/headless/server/src/main/java/com/tencent/supersonic/headless/server/service/impl/QueryServiceImpl.java index 3ef24cf8d..7865eef42 100644 --- a/headless/server/src/main/java/com/tencent/supersonic/headless/server/service/impl/QueryServiceImpl.java +++ b/headless/server/src/main/java/com/tencent/supersonic/headless/server/service/impl/QueryServiceImpl.java @@ -23,6 +23,7 @@ import com.tencent.supersonic.headless.api.pojo.response.SemanticQueryResp; import com.tencent.supersonic.headless.api.pojo.response.SemanticSchemaResp; import com.tencent.supersonic.headless.core.cache.QueryCache; import com.tencent.supersonic.headless.core.executor.QueryExecutor; +import com.tencent.supersonic.headless.core.executor.accelerator.QueryAccelerator; import com.tencent.supersonic.headless.core.parser.DefaultQueryParser; import com.tencent.supersonic.headless.core.parser.QueryParser; import com.tencent.supersonic.headless.core.parser.calcite.s2sql.SemanticModel; @@ -38,15 +39,14 @@ import com.tencent.supersonic.headless.server.service.QueryService; import com.tencent.supersonic.headless.server.utils.QueryReqConverter; import com.tencent.supersonic.headless.server.utils.QueryUtils; import com.tencent.supersonic.headless.server.utils.StatUtils; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - @Service @Slf4j @@ -249,7 +249,16 @@ public class QueryServiceImpl implements QueryService { queryParser.parse(queryStatement); //2 plan QueryExecutor queryExecutor = queryPlanner.plan(queryStatement); - //3 execute + //3 accelerate + QueryAccelerator queryAccelerator = queryPlanner.accelerate(queryStatement); + if (queryAccelerator != null) { + semanticQueryResp = queryAccelerator.query(queryStatement); + if (Objects.nonNull(semanticQueryResp) && !semanticQueryResp.getResultList().isEmpty()) { + log.info("query by Accelerator {}", queryAccelerator.getClass().getSimpleName()); + return semanticQueryResp; + } + } + //4 execute if (queryExecutor != null) { semanticQueryResp = queryExecutor.execute(queryStatement); queryUtils.fillItemNameInfo(semanticQueryResp, queryStatement.getSemanticSchemaResp()); diff --git a/pom.xml b/pom.xml index 4a0dc0ba7..2650dfe89 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,7 @@ 0.24.0 42.7.1 4.0.8 + 0.10.0