-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat(database): Add support for Trino as a new database driver #1313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
7953245
792556b
c9d87b0
8b7e02f
8639c4e
9ad3947
7ddf953
e7f7574
0118f5b
388ada0
eace67d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Trino | ||
|
||
The Trino driver supports schema migrations for synchronizing databases connected via Trino, including data sources like Iceberg, Parquet, and S3. It is designed to handle schema changes, but its capabilities depend on the Trino configuration. | ||
|
||
## Connection String | ||
|
||
The connection string for Trino follows the format: | ||
|
||
`trino://{user}@{host}:{port}?catalog={catalog}&schema={schema}&ssl=true` | ||
|
||
### URL Query Parameters | ||
|
||
| Parameter | Description | | ||
|---|---| | ||
| `catalog` | The name of the catalog to connect to. This catalog must already exist. | | ||
| `schema` | The name of the schema to use. This schema must already exist within the specified catalog. | | ||
| `ssl` | A boolean value (`true` or `false`) to enable or disable SSL. If not specified, it defaults to `true` (HTTPS). | | ||
| `x-migrations-table` | The name of the migrations table. Defaults to `schema_migrations`. | | ||
| `x-migrations-catalog`| The catalog where the migrations table is located. If not specified, the current catalog is used. | | ||
| `x-migrations-schema` | The schema where the migrations table is located. If not specified, the current schema is used. | | ||
| `x-statement-timeout` | The statement timeout in milliseconds. | | ||
|
||
### Notes | ||
|
||
- **Pre-existing Catalog and Schema**: The catalog and schema specified in the connection string must be created in Trino beforehand. The driver does not create them automatically. | ||
- **Schema Synchronization**: The primary purpose of this driver is to synchronize schemas across different databases connected through Trino. It is particularly useful for managing schema evolution in data lakes where data is stored in formats like Iceberg, Parquet, or on S3. | ||
- **Schema Changes**: Support for schema changes (e.g., `ALTER TABLE`) is dependent on the underlying connector and data source configuration in Trino. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
DROP TABLE IF EXISTS users |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
CREATE TABLE IF NOT EXISTS users ( | ||
id BIGINT, | ||
name VARCHAR, | ||
email VARCHAR, | ||
created_at TIMESTAMP | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
DROP VIEW IF EXISTS user_by_email |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
-- Trino doesn't support traditional indexes, but we can create a view for common queries | ||
CREATE OR REPLACE VIEW user_by_email AS | ||
SELECT id, name, email, created_at | ||
FROM users | ||
WHERE email IS NOT NULL |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,294 @@ | ||
package trino | ||
|
||
import ( | ||
"database/sql" | ||
"fmt" | ||
"io" | ||
"net/url" | ||
"strconv" | ||
"strings" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/golang-migrate/migrate/v4" | ||
"github.com/golang-migrate/migrate/v4/database" | ||
"github.com/hashicorp/go-multierror" | ||
_ "github.com/trinodb/trino-go-client/trino" | ||
) | ||
|
||
var ( | ||
DefaultMigrationsTable = "schema_migrations" | ||
ErrNilConfig = fmt.Errorf("no config") | ||
) | ||
|
||
type Config struct { | ||
MigrationsTable string | ||
MigrationsSchema string | ||
MigrationsCatalog string | ||
StatementTimeout time.Duration | ||
} | ||
|
||
func init() { | ||
database.Register("trino", &Trino{}) | ||
} | ||
|
||
func WithInstance(conn *sql.DB, config *Config) (database.Driver, error) { | ||
if config == nil { | ||
return nil, ErrNilConfig | ||
} | ||
|
||
if err := conn.Ping(); err != nil { | ||
return nil, err | ||
} | ||
|
||
t := &Trino{ | ||
conn: conn, | ||
config: config, | ||
} | ||
|
||
if err := t.init(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return t, nil | ||
} | ||
|
||
type Trino struct { | ||
conn *sql.DB | ||
config *Config | ||
isLocked atomic.Bool | ||
} | ||
|
||
func (t *Trino) Open(dsn string) (database.Driver, error) { | ||
purl, err := url.Parse(dsn) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Filter custom parameters and handle scheme conversion | ||
q := migrate.FilterCustomQuery(purl) | ||
|
||
// Convert trino:// scheme to http:// or https:// based on ssl parameter | ||
if q.Scheme == "trino" { | ||
// Check ssl parameter (default is true) | ||
ssl := purl.Query().Get("ssl") | ||
if ssl == "" || ssl == "true" { | ||
q.Scheme = "https" | ||
} else { | ||
q.Scheme = "http" | ||
} | ||
} | ||
|
||
// Set source if not provided | ||
query := q.Query() | ||
if query.Get("source") == "" { | ||
query.Set("source", "golang-migrate") | ||
} | ||
q.RawQuery = query.Encode() | ||
|
||
conn, err := sql.Open("trino", q.String()) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// Parse statement timeout | ||
var statementTimeout time.Duration | ||
if timeoutStr := purl.Query().Get("x-statement-timeout"); timeoutStr != "" { | ||
if timeoutMs, err := strconv.Atoi(timeoutStr); err == nil { | ||
statementTimeout = time.Duration(timeoutMs) * time.Millisecond | ||
} | ||
} | ||
|
||
t = &Trino{ | ||
conn: conn, | ||
config: &Config{ | ||
MigrationsTable: purl.Query().Get("x-migrations-table"), | ||
MigrationsSchema: purl.Query().Get("x-migrations-schema"), | ||
MigrationsCatalog: purl.Query().Get("x-migrations-catalog"), | ||
StatementTimeout: statementTimeout, | ||
}, | ||
} | ||
|
||
if err := t.init(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return t, nil | ||
} | ||
|
||
func (t *Trino) init() error { | ||
// Test basic connectivity first | ||
if err := t.conn.Ping(); err != nil { | ||
return fmt.Errorf("ping failed: %w", err) | ||
} | ||
|
||
// Get current catalog if not specified | ||
if t.config.MigrationsCatalog == "" { | ||
if err := t.conn.QueryRow("SELECT current_catalog").Scan(&t.config.MigrationsCatalog); err != nil { | ||
return fmt.Errorf("failed to get current catalog: %w", err) | ||
} | ||
} | ||
|
||
// Get current schema if not specified | ||
if t.config.MigrationsSchema == "" { | ||
if err := t.conn.QueryRow("SELECT current_schema").Scan(&t.config.MigrationsSchema); err != nil { | ||
return fmt.Errorf("failed to get current schema: %w", err) | ||
} | ||
} | ||
|
||
if t.config.MigrationsTable == "" { | ||
t.config.MigrationsTable = DefaultMigrationsTable | ||
} | ||
|
||
return t.ensureVersionTable() | ||
} | ||
|
||
func (t *Trino) Run(r io.Reader) error { | ||
migration, err := io.ReadAll(r) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
query := string(migration) | ||
if strings.TrimSpace(query) == "" { | ||
return nil | ||
} | ||
|
||
if _, err := t.conn.Exec(query); err != nil { | ||
return database.Error{OrigErr: err, Err: "migration failed", Query: migration} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (t *Trino) Version() (int, bool, error) { | ||
var ( | ||
version int | ||
dirty bool | ||
query = fmt.Sprintf("SELECT version, dirty FROM %s.%s.%s ORDER BY sequence DESC LIMIT 1", | ||
t.config.MigrationsCatalog, t.config.MigrationsSchema, t.config.MigrationsTable) | ||
) | ||
|
||
err := t.conn.QueryRow(query).Scan(&version, &dirty) | ||
if err != nil { | ||
if err == sql.ErrNoRows { | ||
return database.NilVersion, false, nil | ||
} | ||
// Check if table doesn't exist | ||
if strings.Contains(strings.ToLower(err.Error()), "not exist") || | ||
strings.Contains(strings.ToLower(err.Error()), "not found") { | ||
return database.NilVersion, false, nil | ||
} | ||
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
return version, dirty, nil | ||
} | ||
|
||
func (t *Trino) SetVersion(version int, dirty bool) error { | ||
migrationsTable := fmt.Sprintf("%s.%s.%s", | ||
t.config.MigrationsCatalog, t.config.MigrationsSchema, t.config.MigrationsTable) | ||
|
||
insertQuery := fmt.Sprintf("INSERT INTO %s (version, dirty, sequence) VALUES (?, ?, ?)", migrationsTable) | ||
Comment on lines
+190
to
+193
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL injection vulnerability: the table name is constructed using string concatenation without proper escaping. While the values themselves use parameterized queries, the table name should also be properly escaped. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
if _, err := t.conn.Exec(insertQuery, version, dirty, time.Now().UnixNano()); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(insertQuery)} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ensureVersionTable creates the migrations table if it doesn't exist | ||
func (t *Trino) ensureVersionTable() (err error) { | ||
if err = t.Lock(); err != nil { | ||
return err | ||
} | ||
|
||
defer func() { | ||
if e := t.Unlock(); e != nil { | ||
if err == nil { | ||
err = e | ||
} else { | ||
err = multierror.Append(err, e) | ||
} | ||
} | ||
}() | ||
|
||
migrationsTable := fmt.Sprintf("%s.%s.%s", | ||
t.config.MigrationsCatalog, t.config.MigrationsSchema, t.config.MigrationsTable) | ||
|
||
// Use CREATE TABLE IF NOT EXISTS for safe concurrent table creation | ||
createQuery := fmt.Sprintf(` | ||
CREATE TABLE IF NOT EXISTS %s ( | ||
version BIGINT NOT NULL, | ||
dirty BOOLEAN NOT NULL, | ||
sequence BIGINT NOT NULL | ||
)`, migrationsTable) | ||
Comment on lines
+217
to
+226
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL injection vulnerability: the table name is directly interpolated into the CREATE TABLE statement without proper escaping. This could allow SQL injection if the catalog, schema, or table names contain malicious content. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
|
||
if _, err := t.conn.Exec(createQuery); err != nil { | ||
// Check if it's a "table already exists" error, which is safe to ignore | ||
if strings.Contains(strings.ToLower(err.Error()), "already exists") || | ||
strings.Contains(strings.ToLower(err.Error()), "table exists") { | ||
return nil | ||
} | ||
return &database.Error{OrigErr: err, Query: []byte(createQuery)} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (t *Trino) Drop() (err error) { | ||
// Get all tables in the current schema | ||
query := fmt.Sprintf(` | ||
SELECT table_name | ||
FROM information_schema.tables | ||
WHERE table_catalog = '%s' | ||
AND table_schema = '%s' | ||
AND table_type = 'BASE TABLE'`, | ||
Comment on lines
+243
to
+247
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL injection vulnerability: the catalog and schema names are directly interpolated into the SQL string without proper escaping or parameterization. Use parameterized queries or properly escape the values. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
t.config.MigrationsCatalog, t.config.MigrationsSchema) | ||
|
||
tables, err := t.conn.Query(query) | ||
if err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
defer func() { | ||
if errClose := tables.Close(); errClose != nil { | ||
err = multierror.Append(err, errClose) | ||
} | ||
}() | ||
|
||
// Drop tables one by one | ||
for tables.Next() { | ||
var tableName string | ||
if err := tables.Scan(&tableName); err != nil { | ||
return err | ||
} | ||
|
||
dropQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s.%s", | ||
t.config.MigrationsCatalog, t.config.MigrationsSchema, tableName) | ||
Comment on lines
+267
to
+268
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SQL injection vulnerability: catalog, schema, and table names are directly interpolated into the DROP TABLE statement without proper escaping. This is particularly dangerous as it's used in a loop to drop multiple tables. Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback |
||
if _, err := t.conn.Exec(dropQuery); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(dropQuery)} | ||
} | ||
} | ||
if err := tables.Err(); err != nil { | ||
return &database.Error{OrigErr: err, Query: []byte(query)} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (t *Trino) Lock() error { | ||
if !t.isLocked.CompareAndSwap(false, true) { | ||
return database.ErrLocked | ||
} | ||
return nil | ||
} | ||
|
||
func (t *Trino) Unlock() error { | ||
if !t.isLocked.CompareAndSwap(true, false) { | ||
return database.ErrNotLocked | ||
} | ||
return nil | ||
} | ||
|
||
func (t *Trino) Close() error { | ||
return t.conn.Close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQL injection vulnerability: catalog, schema, and table names are directly interpolated into the SQL string. These values should be properly escaped or validated to prevent SQL injection attacks.
Copilot uses AI. Check for mistakes.