feat:Support kyuubi presto trino (#2109)

This commit is contained in:
zyclove
2025-02-26 17:33:14 +08:00
committed by GitHub
parent 11ff99cdbe
commit 5e3bafb953
31 changed files with 501 additions and 101 deletions

View File

@@ -121,6 +121,18 @@
<artifactId>DmJdbcDriver18</artifactId>
<version>8.1.2.192</version>
</dependency>
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-hive-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
</dependency>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-jdbc</artifactId>
</dependency>
</dependencies>

View File

@@ -5,23 +5,27 @@ import com.tencent.supersonic.headless.api.pojo.DBColumn;
import com.tencent.supersonic.headless.api.pojo.enums.FieldType;
import com.tencent.supersonic.headless.core.pojo.ConnectInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@Slf4j
public abstract class BaseDbAdaptor implements DbAdaptor {
@Override
public List<String> getCatalogs(ConnectInfo connectInfo) throws SQLException {
// Apart from supporting multiple catalog types of data sources, other types will return an
// empty set by default.
return List.of();
List<String> catalogs = Lists.newArrayList();
try (Connection con = getConnection(connectInfo);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery("SHOW CATALOGS")) {
while (rs.next()) {
catalogs.add(rs.getString(1));
}
}
return catalogs;
}
public List<String> getDBs(ConnectInfo connectionInfo, String catalog) throws SQLException {
@@ -32,38 +36,49 @@ public abstract class BaseDbAdaptor implements DbAdaptor {
protected List<String> getDBs(ConnectInfo connectionInfo) throws SQLException {
List<String> dbs = Lists.newArrayList();
DatabaseMetaData metaData = getDatabaseMetaData(connectionInfo);
try {
ResultSet schemaSet = metaData.getSchemas();
while (schemaSet.next()) {
String db = schemaSet.getString("TABLE_SCHEM");
dbs.add(db);
try (ResultSet schemaSet = getDatabaseMetaData(connectionInfo).getSchemas()) {
while (schemaSet.next()) {
String db = schemaSet.getString("TABLE_SCHEM");
dbs.add(db);
}
}
} catch (Exception e) {
log.info("get meta schemas failed, try to get catalogs");
log.warn("get meta schemas failed", e);
log.warn("get meta schemas failed, try to get catalogs");
}
try {
ResultSet catalogSet = metaData.getCatalogs();
while (catalogSet.next()) {
String db = catalogSet.getString("TABLE_CAT");
dbs.add(db);
try (ResultSet catalogSet = getDatabaseMetaData(connectionInfo).getCatalogs()) {
while (catalogSet.next()) {
String db = catalogSet.getString("TABLE_CAT");
dbs.add(db);
}
}
} catch (Exception e) {
log.info("get meta catalogs failed, try to get schemas");
log.warn("get meta catalogs failed", e);
log.warn("get meta catalogs failed, try to get schemas");
}
return dbs;
}
public List<String> getTables(ConnectInfo connectionInfo, String schemaName)
@Override
public List<String> getTables(ConnectInfo connectInfo, String catalog, String schemaName)
throws SQLException {
// Except for special types implemented separately, the generic logic catalog does not take
// effect.
return getTables(connectInfo, schemaName);
}
protected List<String> getTables(ConnectInfo connectionInfo, String schemaName)
throws SQLException {
List<String> tablesAndViews = new ArrayList<>();
DatabaseMetaData metaData = getDatabaseMetaData(connectionInfo);
try {
ResultSet resultSet = getResultSet(schemaName, metaData);
while (resultSet.next()) {
String name = resultSet.getString("TABLE_NAME");
tablesAndViews.add(name);
try(ResultSet resultSet = getResultSet(schemaName, getDatabaseMetaData(connectionInfo))) {
while (resultSet.next()) {
String name = resultSet.getString("TABLE_NAME");
tablesAndViews.add(name);
}
}
} catch (SQLException e) {
log.error("Failed to get tables and views", e);
@@ -76,27 +91,34 @@ public abstract class BaseDbAdaptor implements DbAdaptor {
return metaData.getTables(schemaName, schemaName, null, new String[] {"TABLE", "VIEW"});
}
public List<DBColumn> getColumns(ConnectInfo connectInfo, String schemaName, String tableName)
public List<DBColumn> getColumns(ConnectInfo connectInfo, String catalog, String schemaName, String tableName)
throws SQLException {
List<DBColumn> dbColumns = Lists.newArrayList();
DatabaseMetaData metaData = getDatabaseMetaData(connectInfo);
ResultSet columns = metaData.getColumns(schemaName, schemaName, tableName, null);
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
String dataType = columns.getString("TYPE_NAME");
String remarks = columns.getString("REMARKS");
FieldType fieldType = classifyColumnType(dataType);
dbColumns.add(new DBColumn(columnName, dataType, remarks, fieldType));
List<DBColumn> dbColumns = new ArrayList<>();
// 确保连接会自动关闭
try (ResultSet columns = getDatabaseMetaData(connectInfo).getColumns(catalog, schemaName, tableName, null)) {
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
String dataType = columns.getString("TYPE_NAME");
String remarks = columns.getString("REMARKS");
FieldType fieldType = classifyColumnType(dataType);
dbColumns.add(new DBColumn(columnName, dataType, remarks, fieldType));
}
}
return dbColumns;
}
protected DatabaseMetaData getDatabaseMetaData(ConnectInfo connectionInfo) throws SQLException {
Connection connection = DriverManager.getConnection(connectionInfo.getUrl(),
connectionInfo.getUserName(), connectionInfo.getPassword());
Connection connection = getConnection(connectionInfo);
return connection.getMetaData();
}
public Connection getConnection(ConnectInfo connectionInfo) throws SQLException {
final Properties properties = getProperties(connectionInfo);
return DriverManager.getConnection(connectionInfo.getUrl(), properties);
}
public FieldType classifyColumnType(String typeName) {
switch (typeName.toUpperCase()) {
case "INT":
@@ -118,4 +140,24 @@ public abstract class BaseDbAdaptor implements DbAdaptor {
}
}
public Properties getProperties(ConnectInfo connectionInfo) {
final Properties properties = new Properties();
String url = connectionInfo.getUrl().toLowerCase();
// 设置通用属性
properties.setProperty("user", connectionInfo.getUserName());
// 针对 Presto 和 Trino ssl=false 的情况,不需要设置密码
if (url.startsWith("jdbc:presto") || url.startsWith("jdbc:trino")) {
// 检查是否需要处理 SSL
if (!url.contains("ssl=false")) {
properties.setProperty("password", connectionInfo.getPassword());
}
} else {
// 针对其他数据库类型
properties.setProperty("password", connectionInfo.getPassword());
}
return properties;
}
}

View File

@@ -18,9 +18,10 @@ public interface DbAdaptor {
List<String> getDBs(ConnectInfo connectInfo, String catalog) throws SQLException;
List<String> getTables(ConnectInfo connectInfo, String schemaName) throws SQLException;
List<String> getTables(ConnectInfo connectInfo, String catalog, String schemaName)
throws SQLException;
List<DBColumn> getColumns(ConnectInfo connectInfo, String schemaName, String tableName)
List<DBColumn> getColumns(ConnectInfo connectInfo, String catalog, String schemaName, String tableName)
throws SQLException;
FieldType classifyColumnType(String typeName);

View File

@@ -19,6 +19,9 @@ public class DbAdaptorFactory {
dbAdaptorMap.put(EngineType.DUCKDB.getName(), new DuckdbAdaptor());
dbAdaptorMap.put(EngineType.HANADB.getName(), new HanadbAdaptor());
dbAdaptorMap.put(EngineType.STARROCKS.getName(), new StarrocksAdaptor());
dbAdaptorMap.put(EngineType.KYUUBI.getName(), new KyuubiAdaptor());
dbAdaptorMap.put(EngineType.PRESTO.getName(), new PrestoAdaptor());
dbAdaptorMap.put(EngineType.TRINO.getName(), new TrinoAdaptor());
}
public static DbAdaptor getEngineAdaptor(String engineType) {

View File

@@ -19,7 +19,7 @@ public class DuckdbAdaptor extends DefaultDbAdaptor {
return metaData.getTables(schemaName, null, null, new String[] {"TABLE", "VIEW"});
}
public List<DBColumn> getColumns(ConnectInfo connectInfo, String schemaName, String tableName)
public List<DBColumn> getColumns(ConnectInfo connectInfo, String catalog, String schemaName, String tableName)
throws SQLException {
List<DBColumn> dbColumns = Lists.newArrayList();
DatabaseMetaData metaData = getDatabaseMetaData(connectInfo);

View File

@@ -46,7 +46,7 @@ public class H2Adaptor extends BaseDbAdaptor {
return metaData.getTables(schemaName, null, null, new String[] {"TABLE", "VIEW"});
}
public List<DBColumn> getColumns(ConnectInfo connectInfo, String schemaName, String tableName)
public List<DBColumn> getColumns(ConnectInfo connectInfo, String catalog, String schemaName, String tableName)
throws SQLException {
List<DBColumn> dbColumns = Lists.newArrayList();
DatabaseMetaData metaData = getDatabaseMetaData(connectInfo);

View File

@@ -0,0 +1,84 @@
package com.tencent.supersonic.headless.core.adaptor.db;
import com.google.common.collect.Lists;
import com.tencent.supersonic.common.pojo.Constants;
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
import com.tencent.supersonic.headless.api.pojo.DBColumn;
import com.tencent.supersonic.headless.api.pojo.enums.FieldType;
import com.tencent.supersonic.headless.core.pojo.ConnectInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class KyuubiAdaptor extends BaseDbAdaptor {
/** transform YYYYMMDD to YYYY-MM-DD YYYY-MM YYYY-MM-DD(MONDAY) */
@Override
public String getDateFormat(String dateType, String dateFormat, String column) {
if (dateFormat.equalsIgnoreCase(Constants.DAY_FORMAT_INT)) {
if (TimeDimensionEnum.MONTH.name().equalsIgnoreCase(dateType)) {
return String.format("date_format(%s, 'yyyy-MM')", column);
} else if (TimeDimensionEnum.WEEK.name().equalsIgnoreCase(dateType)) {
return String.format("date_format(date_sub(%s, (dayofweek(%s) - 2)), 'yyyy-MM-dd')",
column, column);
} else {
return String.format(
"date_format(to_date(cast(%s as string), 'yyyyMMdd'), 'yyyy-MM-dd')",
column);
}
} else if (dateFormat.equalsIgnoreCase(Constants.DAY_FORMAT)) {
if (TimeDimensionEnum.MONTH.name().equalsIgnoreCase(dateType)) {
return String.format("date_format(%s, 'yyyy-MM')", column);
} else if (TimeDimensionEnum.WEEK.name().equalsIgnoreCase(dateType)) {
return String.format("date_format(date_sub(%s, (dayofweek(%s) - 2)), 'yyyy-MM-dd')",
column, column);
} else {
return column;
}
}
return column;
}
@Override
public List<String> getDBs(ConnectInfo connectionInfo, String catalog) throws SQLException {
List<String> dbs = Lists.newArrayList();
final StringBuilder sql = new StringBuilder("SHOW DATABASES");
if (StringUtils.isNotBlank(catalog)) {
sql.append(" IN ").append(catalog);
}
try (Connection con = getConnection(connectionInfo);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery(sql.toString())) {
while (rs.next()) {
dbs.add(rs.getString(1));
}
}
return dbs;
}
@Override
public List<String> getTables(ConnectInfo connectInfo, String catalog, String schemaName) throws SQLException {
List<String> tablesAndViews = new ArrayList<>();
try {
try (ResultSet resultSet = getDatabaseMetaData(connectInfo).getTables(catalog, schemaName, null, new String[] {"TABLE", "VIEW"})) {
while (resultSet.next()) {
String name = resultSet.getString("TABLE_NAME");
tablesAndViews.add(name);
}
}
} catch (SQLException e) {
log.error("Failed to get tables and views", e);
}
return tablesAndViews;
}
@Override
public String rewriteSql(String sql) {
return sql;
}
}

View File

@@ -99,7 +99,7 @@ public class PostgresqlAdaptor extends BaseDbAdaptor {
return tablesAndViews;
}
public List<DBColumn> getColumns(ConnectInfo connectInfo, String schemaName, String tableName)
public List<DBColumn> getColumns(ConnectInfo connectInfo, String catalog, String schemaName, String tableName)
throws SQLException {
List<DBColumn> dbColumns = Lists.newArrayList();
DatabaseMetaData metaData = getDatabaseMetaData(connectInfo);

View File

@@ -0,0 +1,88 @@
package com.tencent.supersonic.headless.core.adaptor.db;
import com.google.common.collect.Lists;
import com.tencent.supersonic.common.pojo.Constants;
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
import com.tencent.supersonic.headless.api.pojo.DBColumn;
import com.tencent.supersonic.headless.api.pojo.enums.FieldType;
import com.tencent.supersonic.headless.core.pojo.ConnectInfo;
import org.apache.commons.lang3.StringUtils;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class PrestoAdaptor extends BaseDbAdaptor {
/** transform YYYYMMDD to YYYY-MM-DD YYYY-MM YYYY-MM-DD(MONDAY) */
@Override
public String getDateFormat(String dateType, String dateFormat, String column) {
if (dateFormat.equalsIgnoreCase(Constants.DAY_FORMAT_INT)) {
if (TimeDimensionEnum.MONTH.name().equalsIgnoreCase(dateType)) {
return String.format("date_format(%s, '%%Y-%%m')", column);
} else if (TimeDimensionEnum.WEEK.name().equalsIgnoreCase(dateType)) {
return String.format(
"date_format(date_add('day', - (day_of_week(%s) - 2), %s), '%%Y-%%m-%%d')",
column, column);
} else {
return String.format("date_format(date_parse(%s, '%%Y%%m%%d'), '%%Y-%%m-%%d')",
column);
}
} else if (dateFormat.equalsIgnoreCase(Constants.DAY_FORMAT)) {
if (TimeDimensionEnum.MONTH.name().equalsIgnoreCase(dateType)) {
return String.format("date_format(%s, '%%Y-%%m')", column);
} else if (TimeDimensionEnum.WEEK.name().equalsIgnoreCase(dateType)) {
return String.format(
"date_format(date_add('day', - (day_of_week(%s) - 2), %s), '%%Y-%%m-%%d')",
column, column);
} else {
return column;
}
}
return column;
}
@Override
public List<String> getDBs(ConnectInfo connectionInfo, String catalog) throws SQLException {
List<String> dbs = Lists.newArrayList();
final StringBuilder sql = new StringBuilder("SHOW SCHEMAS");
if (StringUtils.isNotBlank(catalog)) {
sql.append(" IN ").append(catalog);
}
try (Connection con = getConnection(connectionInfo);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery(sql.toString())) {
while (rs.next()) {
dbs.add(rs.getString(1));
}
}
return dbs;
}
@Override
public List<String> getTables(ConnectInfo connectInfo, String catalog, String schemaName)
throws SQLException {
List<String> tablesAndViews = new ArrayList<>();
final StringBuilder sql = new StringBuilder("SHOW TABLES");
if (StringUtils.isNotBlank(catalog)) {
sql.append(" IN ").append(catalog).append(".").append(schemaName);
}else {
sql.append(" IN ").append(schemaName);
}
try (Connection con = getConnection(connectInfo);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery(sql.toString())) {
while (rs.next()) {
tablesAndViews.add(rs.getString(1));
}
}
return tablesAndViews;
}
@Override
public String rewriteSql(String sql) {
return sql;
}
}

View File

@@ -1,42 +1,87 @@
package com.tencent.supersonic.headless.core.adaptor.db;
import com.google.common.collect.Lists;
import com.tencent.supersonic.headless.api.pojo.DBColumn;
import com.tencent.supersonic.headless.api.pojo.enums.FieldType;
import com.tencent.supersonic.headless.core.pojo.ConnectInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.Assert;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@Slf4j
public class StarrocksAdaptor extends MysqlAdaptor {
@Override
public List<String> getCatalogs(ConnectInfo connectInfo) throws SQLException {
List<String> catalogs = Lists.newArrayList();
try (Connection con = DriverManager.getConnection(connectInfo.getUrl(),
connectInfo.getUserName(), connectInfo.getPassword());
Statement st = con.createStatement();
ResultSet rs = st.executeQuery("SHOW CATALOGS")) {
while (rs.next()) {
catalogs.add(rs.getString(1));
}
}
return catalogs;
}
@Override
public List<String> getDBs(ConnectInfo connectionInfo, String catalog) throws SQLException {
Assert.hasText(catalog, "StarRocks type catalog can not be null or empty");
List<String> dbs = Lists.newArrayList();
try (Connection con = DriverManager.getConnection(connectionInfo.getUrl(),
connectionInfo.getUserName(), connectionInfo.getPassword());
Statement st = con.createStatement();
ResultSet rs = st.executeQuery("SHOW DATABASES IN " + catalog)) {
final StringBuilder sql = new StringBuilder("SHOW DATABASES");
if (StringUtils.isNotBlank(catalog)) {
sql.append(" IN ").append(catalog);
}
try (Connection con = getConnection(connectionInfo);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery(sql.toString())) {
while (rs.next()) {
dbs.add(rs.getString(1));
}
}
return dbs;
}
@Override
public List<String> getTables(ConnectInfo connectInfo, String catalog, String schemaName)
throws SQLException {
List<String> tablesAndViews = new ArrayList<>();
final StringBuilder sql = new StringBuilder("SHOW TABLES");
if (StringUtils.isNotBlank(catalog)) {
sql.append(" IN ").append(catalog).append(".").append(schemaName);
}else {
sql.append(" IN ").append(schemaName);
}
try (Connection con = getConnection(connectInfo);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery(sql.toString())) {
while (rs.next()) {
tablesAndViews.add(rs.getString(1));
}
}
return tablesAndViews;
}
@Override
public List<DBColumn> getColumns(ConnectInfo connectInfo, String catalog, String schemaName, String tableName)
throws SQLException {
List<DBColumn> dbColumns = new ArrayList<>();
try (Connection con = getConnection(connectInfo);
Statement st = con.createStatement()) {
// 切换到指定的 catalog或 database/schema这在某些 SQL 方言中很重要
if (StringUtils.isNotBlank(catalog)) {
st.execute("SET CATALOG " + catalog);
}
// 获取 DatabaseMetaData; 需要注意调用此方法的位置(在 USE 之后)
DatabaseMetaData metaData = con.getMetaData();
// 获取特定表的列信息
try (ResultSet columns = metaData.getColumns(schemaName, schemaName, tableName, null)) {
while (columns.next()) {
String columnName = columns.getString("COLUMN_NAME");
String dataType = columns.getString("TYPE_NAME");
String remarks = columns.getString("REMARKS");
FieldType fieldType = classifyColumnType(dataType);
dbColumns.add(new DBColumn(columnName, dataType, remarks, fieldType));
}
}
}
return dbColumns;
}
}

View File

@@ -0,0 +1,8 @@
package com.tencent.supersonic.headless.core.adaptor.db;
import com.tencent.supersonic.common.pojo.Constants;
import com.tencent.supersonic.common.pojo.enums.TimeDimensionEnum;
public class TrinoAdaptor extends PrestoAdaptor {
}

View File

@@ -40,11 +40,22 @@ public class JdbcDataSourceUtils {
log.error(e.toString(), e);
return false;
}
try (Connection con = DriverManager.getConnection(database.getUrl(), database.getUsername(),
database.passwordDecrypt())) {
return con != null;
} catch (SQLException e) {
log.error(e.toString(), e);
// presto/trino ssl=false connection need password
if (database.getUrl().startsWith("jdbc:presto") || database.getUrl().startsWith("jdbc:trino")) {
if (database.getUrl().toLowerCase().contains("ssl=false")) {
try (Connection con = DriverManager.getConnection(database.getUrl(), database.getUsername(), null)) {
return con != null;
} catch (SQLException e) {
log.error(e.toString(), e);
}
}
}else {
try (Connection con = DriverManager.getConnection(database.getUrl(), database.getUsername(),
database.passwordDecrypt())) {
return con != null;
} catch (SQLException e) {
log.error(e.toString(), e);
}
}
return false;