Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite trino
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
TEST_FLAGS ?=
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
* [Firebird](database/firebird)
* [MS SQL Server](database/sqlserver)
* [rqlite](database/rqlite)
* [Trino](database/trino)

### Database URLs

Expand Down
27 changes: 27 additions & 0 deletions database/trino/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Trino

The Trino driver supports schema migrations for synchronizing databases connected via Trino, including data sources like Iceberg, Parquet, and S3. It is designed to handle schema changes, but its capabilities depend on the Trino configuration.

## Connection String

The connection string for Trino follows the format:

`trino://{user}@{host}:{port}?catalog={catalog}&schema={schema}&ssl=true`

### URL Query Parameters

| Parameter | Description |
|---|---|
| `catalog` | The name of the catalog to connect to. This catalog must already exist. |
| `schema` | The name of the schema to use. This schema must already exist within the specified catalog. |
| `ssl` | A boolean value (`true` or `false`) to enable or disable SSL. If not specified, it defaults to `true` (HTTPS). |
| `x-migrations-table` | The name of the migrations table. Defaults to `schema_migrations`. |
| `x-migrations-catalog`| The catalog where the migrations table is located. If not specified, the current catalog is used. |
| `x-migrations-schema` | The schema where the migrations table is located. If not specified, the current schema is used. |
| `x-statement-timeout` | The statement timeout in milliseconds. |

### Notes

- **Pre-existing Catalog and Schema**: The catalog and schema specified in the connection string must be created in Trino beforehand. The driver does not create them automatically.
- **Schema Synchronization**: The primary purpose of this driver is to synchronize schemas across different databases connected through Trino. It is particularly useful for managing schema evolution in data lakes where data is stored in formats like Iceberg, Parquet, or on S3.
- **Schema Changes**: Support for schema changes (e.g., `ALTER TABLE`) is dependent on the underlying connector and data source configuration in Trino.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS users
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS users (
id BIGINT,
name VARCHAR,
email VARCHAR,
created_at TIMESTAMP
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP VIEW IF EXISTS user_by_email
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Trino doesn't support traditional indexes, but we can create a view for common queries
CREATE OR REPLACE VIEW user_by_email AS
SELECT id, name, email, created_at
FROM users
WHERE email IS NOT NULL
294 changes: 294 additions & 0 deletions database/trino/trino.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
package trino

import (
"database/sql"
"fmt"
"io"
"net/url"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
_ "github.com/trinodb/trino-go-client/trino"
)

var (
DefaultMigrationsTable = "schema_migrations"
ErrNilConfig = fmt.Errorf("no config")
)

type Config struct {
MigrationsTable string
MigrationsSchema string
MigrationsCatalog string
StatementTimeout time.Duration
}

func init() {
database.Register("trino", &Trino{})
}

func WithInstance(conn *sql.DB, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
}

if err := conn.Ping(); err != nil {
return nil, err
}

t := &Trino{
conn: conn,
config: config,
}

if err := t.init(); err != nil {
return nil, err
}

return t, nil
}

type Trino struct {
conn *sql.DB
config *Config
isLocked atomic.Bool
}

func (t *Trino) Open(dsn string) (database.Driver, error) {
purl, err := url.Parse(dsn)
if err != nil {
return nil, err
}

// Filter custom parameters and handle scheme conversion
q := migrate.FilterCustomQuery(purl)

// Convert trino:// scheme to http:// or https:// based on ssl parameter
if q.Scheme == "trino" {
// Check ssl parameter (default is true)
ssl := purl.Query().Get("ssl")
if ssl == "" || ssl == "true" {
q.Scheme = "https"
} else {
q.Scheme = "http"
}
}

// Set source if not provided
query := q.Query()
if query.Get("source") == "" {
query.Set("source", "golang-migrate")
}
q.RawQuery = query.Encode()

conn, err := sql.Open("trino", q.String())
if err != nil {
return nil, err
}

// Parse statement timeout
var statementTimeout time.Duration
if timeoutStr := purl.Query().Get("x-statement-timeout"); timeoutStr != "" {
if timeoutMs, err := strconv.Atoi(timeoutStr); err == nil {
statementTimeout = time.Duration(timeoutMs) * time.Millisecond
}
}

t = &Trino{
conn: conn,
config: &Config{
MigrationsTable: purl.Query().Get("x-migrations-table"),
MigrationsSchema: purl.Query().Get("x-migrations-schema"),
MigrationsCatalog: purl.Query().Get("x-migrations-catalog"),
StatementTimeout: statementTimeout,
},
}

if err := t.init(); err != nil {
return nil, err
}

return t, nil
}

func (t *Trino) init() error {
// Test basic connectivity first
if err := t.conn.Ping(); err != nil {
return fmt.Errorf("ping failed: %w", err)
}

// Get current catalog if not specified
if t.config.MigrationsCatalog == "" {
if err := t.conn.QueryRow("SELECT current_catalog").Scan(&t.config.MigrationsCatalog); err != nil {
return fmt.Errorf("failed to get current catalog: %w", err)
}
}

// Get current schema if not specified
if t.config.MigrationsSchema == "" {
if err := t.conn.QueryRow("SELECT current_schema").Scan(&t.config.MigrationsSchema); err != nil {
return fmt.Errorf("failed to get current schema: %w", err)
}
}

if t.config.MigrationsTable == "" {
t.config.MigrationsTable = DefaultMigrationsTable
}

return t.ensureVersionTable()
}

func (t *Trino) Run(r io.Reader) error {
migration, err := io.ReadAll(r)
if err != nil {
return err
}

query := string(migration)
if strings.TrimSpace(query) == "" {
return nil
}

if _, err := t.conn.Exec(query); err != nil {
return database.Error{OrigErr: err, Err: "migration failed", Query: migration}
}

return nil
}

func (t *Trino) Version() (int, bool, error) {
var (
version int
dirty bool
query = fmt.Sprintf("SELECT version, dirty FROM %s.%s.%s ORDER BY sequence DESC LIMIT 1",
t.config.MigrationsCatalog, t.config.MigrationsSchema, t.config.MigrationsTable)
Comment on lines +170 to +171
Copy link
Preview

Copilot AI Aug 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: catalog, schema, and table names are directly interpolated into the SQL string. These values should be properly escaped or validated to prevent SQL injection attacks.

Copilot uses AI. Check for mistakes.

)

err := t.conn.QueryRow(query).Scan(&version, &dirty)
if err != nil {
if err == sql.ErrNoRows {
return database.NilVersion, false, nil
}
// Check if table doesn't exist
if strings.Contains(strings.ToLower(err.Error()), "not exist") ||
strings.Contains(strings.ToLower(err.Error()), "not found") {
return database.NilVersion, false, nil
}
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}
}
return version, dirty, nil
}

func (t *Trino) SetVersion(version int, dirty bool) error {
migrationsTable := fmt.Sprintf("%s.%s.%s",
t.config.MigrationsCatalog, t.config.MigrationsSchema, t.config.MigrationsTable)

insertQuery := fmt.Sprintf("INSERT INTO %s (version, dirty, sequence) VALUES (?, ?, ?)", migrationsTable)
Comment on lines +190 to +193
Copy link
Preview

Copilot AI Aug 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: the table name is constructed using string concatenation without proper escaping. While the values themselves use parameterized queries, the table name should also be properly escaped.

Copilot uses AI. Check for mistakes.

if _, err := t.conn.Exec(insertQuery, version, dirty, time.Now().UnixNano()); err != nil {
return &database.Error{OrigErr: err, Query: []byte(insertQuery)}
}

return nil
}

// ensureVersionTable creates the migrations table if it doesn't exist
func (t *Trino) ensureVersionTable() (err error) {
if err = t.Lock(); err != nil {
return err
}

defer func() {
if e := t.Unlock(); e != nil {
if err == nil {
err = e
} else {
err = multierror.Append(err, e)
}
}
}()

migrationsTable := fmt.Sprintf("%s.%s.%s",
t.config.MigrationsCatalog, t.config.MigrationsSchema, t.config.MigrationsTable)

// Use CREATE TABLE IF NOT EXISTS for safe concurrent table creation
createQuery := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
version BIGINT NOT NULL,
dirty BOOLEAN NOT NULL,
sequence BIGINT NOT NULL
)`, migrationsTable)
Comment on lines +217 to +226
Copy link
Preview

Copilot AI Aug 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: the table name is directly interpolated into the CREATE TABLE statement without proper escaping. This could allow SQL injection if the catalog, schema, or table names contain malicious content.

Copilot uses AI. Check for mistakes.


if _, err := t.conn.Exec(createQuery); err != nil {
// Check if it's a "table already exists" error, which is safe to ignore
if strings.Contains(strings.ToLower(err.Error()), "already exists") ||
strings.Contains(strings.ToLower(err.Error()), "table exists") {
return nil
}
return &database.Error{OrigErr: err, Query: []byte(createQuery)}
}

return nil
}

func (t *Trino) Drop() (err error) {
// Get all tables in the current schema
query := fmt.Sprintf(`
SELECT table_name
FROM information_schema.tables
WHERE table_catalog = '%s'
AND table_schema = '%s'
AND table_type = 'BASE TABLE'`,
Comment on lines +243 to +247
Copy link
Preview

Copilot AI Aug 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: the catalog and schema names are directly interpolated into the SQL string without proper escaping or parameterization. Use parameterized queries or properly escape the values.

Copilot uses AI. Check for mistakes.

t.config.MigrationsCatalog, t.config.MigrationsSchema)

tables, err := t.conn.Query(query)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
defer func() {
if errClose := tables.Close(); errClose != nil {
err = multierror.Append(err, errClose)
}
}()

// Drop tables one by one
for tables.Next() {
var tableName string
if err := tables.Scan(&tableName); err != nil {
return err
}

dropQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s.%s.%s",
t.config.MigrationsCatalog, t.config.MigrationsSchema, tableName)
Comment on lines +267 to +268
Copy link
Preview

Copilot AI Aug 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: catalog, schema, and table names are directly interpolated into the DROP TABLE statement without proper escaping. This is particularly dangerous as it's used in a loop to drop multiple tables.

Copilot uses AI. Check for mistakes.

if _, err := t.conn.Exec(dropQuery); err != nil {
return &database.Error{OrigErr: err, Query: []byte(dropQuery)}
}
}
if err := tables.Err(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}

return nil
}

func (t *Trino) Lock() error {
if !t.isLocked.CompareAndSwap(false, true) {
return database.ErrLocked
}
return nil
}

func (t *Trino) Unlock() error {
if !t.isLocked.CompareAndSwap(true, false) {
return database.ErrNotLocked
}
return nil
}

func (t *Trino) Close() error {
return t.conn.Close()
}
Loading
Loading