diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ede9c45..c69be571 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ Types of changes - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [2.8.0] + +- `Added` flags `--conn-max-lifetime`, `--conn-max-open` and `--conn-max-idle` to `lino pull` and `lino push` commands + ## [2.7.1] - `Fixed` panic during push on Oracle database with a `null` column value diff --git a/cmd/lino/dep_analyse.go b/cmd/lino/dep_analyse.go index 92830c96..f88fba01 100644 --- a/cmd/lino/dep_analyse.go +++ b/cmd/lino/dep_analyse.go @@ -27,6 +27,7 @@ func analyseDataSourceFactory() map[string]infra.SQLExtractorFactory { "godror": infra.NewOracleExtractorFactory(), "godror-raw": infra.NewOracleExtractorFactory(), "mysql": infra.NewMariaDBExtractorFactory(), + "mymysql": infra.NewMariaDBExtractorFactory(), "db2": infra.NewDB2ExtractorFactory(), "sqlserver": infra.NewSQLServerExtractorFactory(), } diff --git a/cmd/lino/dep_dataconnector.go b/cmd/lino/dep_dataconnector.go index a06f7ba1..4194e248 100755 --- a/cmd/lino/dep_dataconnector.go +++ b/cmd/lino/dep_dataconnector.go @@ -32,6 +32,7 @@ func dataPingerFactory() map[string]domain.DataPingerFactory { "godror": infra.NewSQLDataPingerFactory(), "godror-raw": infra.NewSQLDataPingerFactory(), "mysql": infra.NewSQLDataPingerFactory(), + "mymysql": infra.NewSQLDataPingerFactory(), "db2": infra.NewSQLDataPingerFactory(), "http": infra.NewHTTPDataPingerFactory(), "ws": infra.NewWSDataPingerFactory(), diff --git a/cmd/lino/dep_pull.go b/cmd/lino/dep_pull.go index f982aa5a..8224c055 100755 --- a/cmd/lino/dep_pull.go +++ b/cmd/lino/dep_pull.go @@ -20,6 +20,7 @@ package main import ( "io" "os" + "time" infra "github.com/cgi-fr/lino/internal/infra/pull" domain "github.com/cgi-fr/lino/pkg/pull" @@ -31,6 +32,7 @@ func pullDataSourceFactory() map[string]domain.DataSourceFactory { "godror": infra.NewOracleDataSourceFactory(), "godror-raw": infra.NewOracleDataSourceFactory(), "mysql": infra.NewMariadbDataSourceFactory(), + "mymysql": infra.NewMariadbDataSourceFactory(), "db2": infra.NewDb2DataSourceFactory(), "http": infra.NewHTTPDataSourceFactory(), "ws": infra.NewWSDataSourceFactory(), @@ -59,3 +61,15 @@ func pullKeyStoreFactory() func(file io.ReadCloser, keys []string) (domain.KeySt func traceListner(file *os.File) domain.TraceListener { return infra.NewJSONTraceListener(file) } + +func pullMaxLifeTime(maxLifetimeInSeconds int64) domain.DataSourceOption { + return infra.WithMaxLifetime(time.Duration(maxLifetimeInSeconds) * time.Second) +} + +func pullMaxOpenConns(maxOpenConns int) domain.DataSourceOption { + return infra.WithMaxOpenConns(maxOpenConns) +} + +func pullMaxIdleConns(maxIdleConns int) domain.DataSourceOption { + return infra.WithMaxIdleConns(maxIdleConns) +} diff --git a/cmd/lino/dep_push.go b/cmd/lino/dep_push.go index b2b03b28..017c18fb 100755 --- a/cmd/lino/dep_push.go +++ b/cmd/lino/dep_push.go @@ -19,6 +19,7 @@ package main import ( "io" + "time" infra "github.com/cgi-fr/lino/internal/infra/push" domain "github.com/cgi-fr/lino/pkg/push" @@ -30,6 +31,7 @@ func pushDataDestinationFactory() map[string]domain.DataDestinationFactory { "godror": infra.NewOracleDataDestinationFactory(), "godror-raw": infra.NewOracleDataDestinationFactory(), "mysql": infra.NewMariadbDataDestinationFactory(), + "mymysql": infra.NewMariadbDataDestinationFactory(), "db2": infra.NewDb2DataDestinationFactory(), "http": infra.NewHTTPDataDestinationFactory(), "ws": infra.NewWebSocketDataDestinationFactory(), @@ -48,3 +50,15 @@ func pushRowExporterFactory() func(io.Writer) domain.RowWriter { func pushTranslator() domain.Translator { return infra.NewFileTranslator() } + +func pushMaxLifeTime(maxLifetimeInSeconds int64) domain.DataDestinationOption { + return infra.WithMaxLifetime(time.Duration(maxLifetimeInSeconds) * time.Second) +} + +func pushMaxOpenConns(maxOpenConns int) domain.DataDestinationOption { + return infra.WithMaxOpenConns(maxOpenConns) +} + +func pushMaxIdleConns(maxIdleConns int) domain.DataDestinationOption { + return infra.WithMaxIdleConns(maxIdleConns) +} diff --git a/cmd/lino/main.go b/cmd/lino/main.go index 23783d0f..8ee689b9 100755 --- a/cmd/lino/main.go +++ b/cmd/lino/main.go @@ -210,8 +210,8 @@ func initConfig() { table.Inject(dataconnectorStorage(), tableStorage(), tableExtractorFactory()) sequence.Inject(dataconnectorStorage(), tableStorage(), sequenceStorage(), sequenceUpdatorFactory()) id.Inject(idStorageFile, relationStorage(), idExporter(), idJSONStorage(*os.Stdout)) - pull.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pullDataSourceFactory(), pullRowExporterFactory(), pullRowReaderFactory(), pullKeyStoreFactory(), traceListner(os.Stderr)) - push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator()) + pull.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pullDataSourceFactory(), pullRowExporterFactory(), pullRowReaderFactory(), pullKeyStoreFactory(), traceListner(os.Stderr), pullMaxLifeTime, pullMaxOpenConns, pullMaxIdleConns) + push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator(), pushMaxLifeTime, pushMaxOpenConns, pushMaxIdleConns) } func writeMetricsToFile(statsFile string, statsByte []byte) { diff --git a/go.mod b/go.mod index 9d2c98b5..0517dff3 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/cgi-fr/jsonline v0.5.0 github.com/cgi-fr/rimo v0.4.0 github.com/docker/docker-credential-helpers v0.8.1 - github.com/go-sql-driver/mysql v1.8.0 github.com/gorilla/mux v1.8.1 github.com/hashicorp/go-multierror v1.1.1 github.com/ibmdb/go_ibm_db v0.4.5 @@ -23,6 +22,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/stretchr/testify v1.9.0 github.com/xo/dburl v0.21.1 + github.com/ziutek/mymysql v1.5.4 golang.org/x/exp v0.0.0-20231006140011-7918f672742d golang.org/x/term v0.18.0 gopkg.in/yaml.v3 v3.0.1 @@ -30,7 +30,6 @@ require ( ) require ( - filippo.io/edwards25519 v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect github.com/golang-sql/sqlexp v0.1.0 // indirect diff --git a/go.sum b/go.sum index 502a3d0e..9ea01f96 100644 --- a/go.sum +++ b/go.sum @@ -43,8 +43,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= -filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 h1:lGlwhPtrX6EVml1hO0ivjkUxsSyl4dsiw9qcA1k/3IQ= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 h1:6oNBlSdi1QqM1PNW7FPA6xOGA5UNsXnkaYZz9vdPGhA= @@ -110,9 +108,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= -github.com/go-sql-driver/mysql v1.8.0 h1:UtktXaU2Nb64z/pLiGIxY4431SJ4/dR5cjMmlVHgnT4= -github.com/go-sql-driver/mysql v1.8.0/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= @@ -364,6 +361,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= +github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs= +github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= diff --git a/internal/app/pull/cli.go b/internal/app/pull/cli.go index dec11256..6e43dcce 100755 --- a/internal/app/pull/cli.go +++ b/internal/app/pull/cli.go @@ -44,6 +44,9 @@ var ( pullExporterFactory func(io.Writer) pull.RowExporter rowReaderFactory func(io.ReadCloser) pull.RowReader keyStoreFactory func(io.ReadCloser, []string) (pull.KeyStore, error) + maxLifeTimeOption func(int64) pull.DataSourceOption + maxOpenConnsOption func(int) pull.DataSourceOption + maxIdleConnsOption func(int) pull.DataSourceOption ) var traceListener pull.TraceListener @@ -59,6 +62,9 @@ func Inject( rrf func(io.ReadCloser) pull.RowReader, ksf func(io.ReadCloser, []string) (pull.KeyStore, error), tl pull.TraceListener, + mltOpt func(int64) pull.DataSourceOption, + mocOpt func(int) pull.DataSourceOption, + micOpt func(int) pull.DataSourceOption, ) { dataconnectorStorage = dbas relStorage = rs @@ -69,6 +75,9 @@ func Inject( rowReaderFactory = rrf keyStoreFactory = ksf traceListener = tl + maxLifeTimeOption = mltOpt + maxOpenConnsOption = mocOpt + maxIdleConnsOption = micOpt } // NewCommand implements the cli pull command @@ -85,6 +94,8 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra var diagnostic bool var filters pull.RowReader var parallel uint + var maxLifeTimeInSeconds int64 + var maxOpenConns, maxIdleConns int cmd := &cobra.Command{ Use: "pull [DB Alias Name]", @@ -103,6 +114,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra Str("table", table). Str("where", where). Uint("parallel", parallel). + Int64("maxLifeTimeInSeconds", maxLifeTimeInSeconds). + Int("maxOpenConns", maxOpenConns). + Int("maxIdleConns", maxIdleConns). Msg("Pull mode") }, Run: func(cmd *cobra.Command, args []string) { @@ -111,7 +125,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra startTime := time.Now() - datasource, e1 := getDataSource(args[0], out) + datasource, e1 := getDataSource(args[0], out, maxLifeTimeInSeconds, maxOpenConns, maxIdleConns) if e1 != nil { fmt.Fprintln(err, e1.Error()) os.Exit(1) @@ -197,13 +211,16 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra cmd.Flags().StringVarP(&where, "where", "w", "", "Advanced SQL where clause to filter") cmd.Flags().StringVarP(&ingressDescriptor, "ingress-descriptor", "i", "ingress-descriptor.yaml", "pull content using ingress descriptor definition") cmd.Flags().UintVarP(¶llel, "parallel", "p", 1, "number of parallel workers") + cmd.Flags().Int64Var(&maxLifeTimeInSeconds, "conn-max-lifetime", -1, "sets the maximum amount of time (in seconds) a connection may be reused") + cmd.Flags().IntVar(&maxOpenConns, "conn-max-open", -1, "sets the maximum number of open connections to the database") + cmd.Flags().IntVar(&maxIdleConns, "conn-max-idle", -1, "sets the maximum number of connections in the idle connection pool") cmd.SetOut(out) cmd.SetErr(err) cmd.SetIn(in) return cmd } -func getDataSource(dataconnectorName string, out io.Writer) (pull.DataSource, error) { +func getDataSource(dataconnectorName string, out io.Writer, maxLifeTimeInSeconds int64, maxOpenConns, maxIdleConns int) (pull.DataSource, error) { alias, e1 := dataconnector.Get(dataconnectorStorage, dataconnectorName) if e1 != nil { return nil, e1 @@ -219,7 +236,21 @@ func getDataSource(dataconnectorName string, out io.Writer) (pull.DataSource, er return nil, fmt.Errorf("no datasource found for database type") } - return datasourceFactory.New(u.URL.String(), alias.Schema), nil + options := []pull.DataSourceOption{} + + if maxLifeTimeInSeconds >= 0 { + options = append(options, maxLifeTimeOption(maxLifeTimeInSeconds)) + } + + if maxOpenConns >= 0 { + options = append(options, maxOpenConnsOption(maxOpenConns)) + } + + if maxIdleConns >= 0 { + options = append(options, maxIdleConnsOption(maxIdleConns)) + } + + return datasourceFactory.New(u.URL.String(), alias.Schema, options...), nil } func getPullerPlan(idStorage id.Storage) (pull.Plan, pull.Table, error) { diff --git a/internal/app/pull/http.go b/internal/app/pull/http.go index 165033db..3fb9d379 100644 --- a/internal/app/pull/http.go +++ b/internal/app/pull/http.go @@ -114,7 +114,7 @@ func HandlerFactory(ingressDescriptor string) func(w http.ResponseWriter, r *htt return } - datasource, err = getDataSource(datasourceName, w) + datasource, err = getDataSource(datasourceName, w, -1, -1, -1) if err != nil { log.Error().Err(err).Msg("") w.WriteHeader(http.StatusNotFound) diff --git a/internal/app/push/cli.go b/internal/app/push/cli.go index 552947b1..4bc4a66f 100755 --- a/internal/app/push/cli.go +++ b/internal/app/push/cli.go @@ -45,6 +45,9 @@ var ( rowIteratorFactory func(io.ReadCloser) push.RowIterator rowExporterFactory func(io.Writer) push.RowWriter translator push.Translator + maxLifeTimeOption func(int64) push.DataDestinationOption + maxOpenConnsOption func(int) push.DataDestinationOption + maxIdleConnsOption func(int) push.DataDestinationOption ) // Inject dependencies @@ -57,6 +60,9 @@ func Inject( rif func(io.ReadCloser) push.RowIterator, ref func(io.Writer) push.RowWriter, trnsltor push.Translator, + mltOpt func(int64) push.DataDestinationOption, + mocOpt func(int) push.DataDestinationOption, + micOpt func(int) push.DataDestinationOption, ) { dataconnectorStorage = dbas relStorage = rs @@ -66,21 +72,26 @@ func Inject( rowIteratorFactory = rif rowExporterFactory = ref translator = trnsltor + maxLifeTimeOption = mltOpt + maxOpenConnsOption = mocOpt + maxIdleConnsOption = micOpt } // NewCommand implements the cli pull command func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra.Command { var ( - commitSize uint - disableConstraints bool - catchErrors string - table string - ingressDescriptor string - rowExporter push.RowWriter - pkTranslations map[string]string - whereField string - savepoint string - autoTruncate bool + commitSize uint + disableConstraints bool + catchErrors string + table string + ingressDescriptor string + rowExporter push.RowWriter + pkTranslations map[string]string + whereField string + savepoint string + autoTruncate bool + maxLifeTimeInSeconds int64 + maxOpenConns, maxIdleConns int ) cmd := &cobra.Command{ @@ -106,6 +117,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra Bool("disable-constraints", disableConstraints). Str("catch-errors", catchErrors). Str("table", table). + Int64("maxLifeTimeInSeconds", maxLifeTimeInSeconds). + Int("maxOpenConns", maxOpenConns). + Int("maxIdleConns", maxIdleConns). Msg("Push mode") }, Run: func(cmd *cobra.Command, args []string) { @@ -122,7 +136,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra mode, _ = push.ParseMode(args[0]) } - datadestination, e1 := getDataDestination(dcDestination) + datadestination, e1 := getDataDestination(dcDestination, maxLifeTimeInSeconds, maxOpenConns, maxIdleConns) if e1 != nil { fmt.Fprintln(err, e1.Error()) os.Exit(1) @@ -174,6 +188,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra cmd.Flags().StringVar(&whereField, "using-pk-field", "__usingpk__", "Name of the data field that can be used as pk for update queries") cmd.Flags().StringVar(&savepoint, "savepoint", "", "Name of a file to write primary keys of effectively processed lines (commit to database)") cmd.Flags().BoolVarP(&autoTruncate, "autotruncate", "a", false, "Automatically truncate values to the maximum length defined in table.yaml") + cmd.Flags().Int64Var(&maxLifeTimeInSeconds, "conn-max-lifetime", -1, "sets the maximum amount of time (in seconds) a connection may be reused") + cmd.Flags().IntVar(&maxOpenConns, "conn-max-open", -1, "sets the maximum number of open connections to the database") + cmd.Flags().IntVar(&maxIdleConns, "conn-max-idle", -1, "sets the maximum number of connections in the idle connection pool") cmd.SetOut(out) cmd.SetErr(err) cmd.SetIn(in) @@ -221,7 +238,7 @@ func loadTranslator(pkTranslations map[string]string) error { return nil } -func getDataDestination(dataconnectorName string) (push.DataDestination, *push.Error) { +func getDataDestination(dataconnectorName string, maxLifeTimeInSeconds int64, maxOpenConns, maxIdleConns int) (push.DataDestination, *push.Error) { alias, e1 := dataconnector.Get(dataconnectorStorage, dataconnectorName) if e1 != nil { return nil, &push.Error{Description: e1.Error()} @@ -240,7 +257,21 @@ func getDataDestination(dataconnectorName string) (push.DataDestination, *push.E return nil, &push.Error{Description: "no datadestination found for database type " + u.UnaliasedDriver} } - return datadestinationFactory.New(u.URL.String(), alias.Schema), nil + options := []push.DataDestinationOption{} + + if maxLifeTimeInSeconds >= 0 { + options = append(options, maxLifeTimeOption(maxLifeTimeInSeconds)) + } + + if maxOpenConns >= 0 { + options = append(options, maxOpenConnsOption(maxOpenConns)) + } + + if maxIdleConns >= 0 { + options = append(options, maxIdleConnsOption(maxIdleConns)) + } + + return datadestinationFactory.New(u.URL.String(), alias.Schema, options...), nil } func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) { diff --git a/internal/app/push/cli_test.go b/internal/app/push/cli_test.go index a7cdf07b..7b5cb700 100755 --- a/internal/app/push/cli_test.go +++ b/internal/app/push/cli_test.go @@ -43,6 +43,9 @@ func Test_getDataDestination(t *testing.T) { func(io.ReadCloser) push.RowIterator { return &push.MockRowIterator{} }, func(io.Writer) push.RowWriter { return &push.MockRowWriter{} }, push.NewMockTranslator(), + maxLifeTimeOption, + maxOpenConnsOption, + maxIdleConnsOption, ) type args struct { @@ -63,7 +66,7 @@ func Test_getDataDestination(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, got1 := getDataDestination(tt.args.dataconnectorName) + got, got1 := getDataDestination(tt.args.dataconnectorName, -1, -1, -1) if !reflect.DeepEqual(got, tt.want) { t.Errorf("getDataDestination() got = %v, want %v", got, tt.want) } diff --git a/internal/app/push/http.go b/internal/app/push/http.go index 92b732f3..2087af5c 100644 --- a/internal/app/push/http.go +++ b/internal/app/push/http.go @@ -69,7 +69,7 @@ func Handler(w http.ResponseWriter, r *http.Request, mode push.Mode, ingressDesc return } - datadestination, err := getDataDestination(dcDestination) + datadestination, err := getDataDestination(dcDestination, -1, -1, -1) if err != nil { log.Error().Err(err).Msg("") w.WriteHeader(http.StatusNotFound) diff --git a/internal/infra/pull/datasource_db2.go b/internal/infra/pull/datasource_db2.go index 42368513..bc6f5445 100644 --- a/internal/infra/pull/datasource_db2.go +++ b/internal/infra/pull/datasource_db2.go @@ -36,10 +36,16 @@ func NewDb2DataSourceFactory() *Db2DataSourceFactory { } // New return a Db2 puller -func (e *Db2DataSourceFactory) New(url string, schema string) pull.DataSource { - return &SQLDataSource{ +func (e *Db2DataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { + ds := &SQLDataSource{ url: url, schema: schema, dialect: commonsql.Db2Dialect{}, } + + for _, option := range options { + option(ds) + } + + return ds } diff --git a/internal/infra/pull/datasource_db2_dummy.go b/internal/infra/pull/datasource_db2_dummy.go index 1d4db4bf..f5d8c9b1 100644 --- a/internal/infra/pull/datasource_db2_dummy.go +++ b/internal/infra/pull/datasource_db2_dummy.go @@ -34,10 +34,16 @@ func NewDb2DataSourceFactory() *Db2DataSourceFactory { } // New return a Db2 puller -func (e *Db2DataSourceFactory) New(url string, schema string) pull.DataSource { - return &SQLDataSource{ +func (e *Db2DataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { + ds := &SQLDataSource{ url: url, schema: schema, dialect: commonsql.Db2Dialect{}, } + + for _, option := range options { + option(ds) + } + + return ds } diff --git a/internal/infra/pull/datasource_mariadb.go b/internal/infra/pull/datasource_mariadb.go index fffd59c8..d2debc69 100644 --- a/internal/infra/pull/datasource_mariadb.go +++ b/internal/infra/pull/datasource_mariadb.go @@ -21,7 +21,7 @@ import ( "github.com/cgi-fr/lino/internal/infra/commonsql" "github.com/cgi-fr/lino/pkg/pull" - _ "github.com/go-sql-driver/mysql" + _ "github.com/ziutek/mymysql/godrv" ) // MariadbDataSourceFactory exposes methods to create new Mariadb pullers. @@ -33,10 +33,16 @@ func NewMariadbDataSourceFactory() *MariadbDataSourceFactory { } // New return a Mariadb puller -func (e *MariadbDataSourceFactory) New(url string, schema string) pull.DataSource { - return &SQLDataSource{ +func (e *MariadbDataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { + ds := &SQLDataSource{ url: url, schema: schema, dialect: commonsql.MariadbDialect{}, } + + for _, option := range options { + option(ds) + } + + return ds } diff --git a/internal/infra/pull/datasource_oracle.go b/internal/infra/pull/datasource_oracle.go index 6804282c..62b6a377 100755 --- a/internal/infra/pull/datasource_oracle.go +++ b/internal/infra/pull/datasource_oracle.go @@ -14,10 +14,16 @@ func NewOracleDataSourceFactory() *OracleDataSourceFactory { } // New return a Oracle puller -func (e *OracleDataSourceFactory) New(url string, schema string) pull.DataSource { - return &SQLDataSource{ +func (e *OracleDataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { + ds := &SQLDataSource{ url: url, schema: schema, dialect: commonsql.OracleDialect{}, } + + for _, option := range options { + option(ds) + } + + return ds } diff --git a/internal/infra/pull/datasource_postgres.go b/internal/infra/pull/datasource_postgres.go index 74037f07..e965e1cd 100755 --- a/internal/infra/pull/datasource_postgres.go +++ b/internal/infra/pull/datasource_postgres.go @@ -33,10 +33,16 @@ func NewPostgresDataSourceFactory() *PostgresDataSourceFactory { } // New return a Postgres puller -func (e *PostgresDataSourceFactory) New(url string, schema string) pull.DataSource { - return &SQLDataSource{ +func (e *PostgresDataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { + ds := &SQLDataSource{ url: url, schema: schema, dialect: commonsql.PostgresDialect{}, } + + for _, option := range options { + option(ds) + } + + return ds } diff --git a/internal/infra/pull/datasource_sqlserver.go b/internal/infra/pull/datasource_sqlserver.go index e4a5053a..df5abb6f 100644 --- a/internal/infra/pull/datasource_sqlserver.go +++ b/internal/infra/pull/datasource_sqlserver.go @@ -33,10 +33,16 @@ func NewSQLServerDataSourceFactory() *SQLServerDataSourceFactory { } // New return a SQLServer puller -func (e *SQLServerDataSourceFactory) New(url string, schema string) pull.DataSource { - return &SQLDataSource{ +func (e *SQLServerDataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { + ds := &SQLDataSource{ url: url, schema: schema, dialect: commonsql.SQLServerDialect{}, } + + for _, option := range options { + option(ds) + } + + return ds } diff --git a/internal/infra/pull/datasource_ws.go b/internal/infra/pull/datasource_ws.go index f81d9ba0..9d681e0b 100644 --- a/internal/infra/pull/datasource_ws.go +++ b/internal/infra/pull/datasource_ws.go @@ -69,7 +69,7 @@ func NewWSDataSourceFactory() *WSDataSourceFactory { } // New return a WS puller -func (e *WSDataSourceFactory) New(url string, schema string) pull.DataSource { +func (e *WSDataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { return &WSDataSource{ url: url, schema: schema, @@ -152,6 +152,8 @@ func (ds *WSDataSource) Read(source pull.Table, filter pull.Filter) (pull.RowSet return nil, err } + defer reader.Close() + result := pull.RowSet{} for reader.Next() { result = append(result, reader.Value()) @@ -289,3 +291,7 @@ func (rs *ResultStream) Value() pull.Row { func (rs *ResultStream) Error() error { return rs.err } + +func (rs *ResultStream) Close() error { + return nil +} diff --git a/internal/infra/pull/http_datasource.go b/internal/infra/pull/http_datasource.go index 06cb20ac..a267cea9 100644 --- a/internal/infra/pull/http_datasource.go +++ b/internal/infra/pull/http_datasource.go @@ -38,7 +38,7 @@ func NewHTTPDataSourceFactory() *HTTPDataSourceFactory { } // New return a HTTP puller -func (e *HTTPDataSourceFactory) New(url string, schema string) pull.DataSource { +func (e *HTTPDataSourceFactory) New(url string, schema string, options ...pull.DataSourceOption) pull.DataSource { return &HTTPDataSource{ url: url, schema: schema, @@ -63,6 +63,8 @@ func (ds *HTTPDataSource) Read(source pull.Table, filter pull.Filter) (pull.RowS return nil, err } + defer reader.Close() + result := pull.RowSet{} for reader.Next() { result = append(result, reader.Value()) diff --git a/internal/infra/pull/rowreader_json.go b/internal/infra/pull/rowreader_json.go index 240c1343..b18e8899 100644 --- a/internal/infra/pull/rowreader_json.go +++ b/internal/infra/pull/rowreader_json.go @@ -68,3 +68,7 @@ func (jrr *JSONRowReader) Value() pull.Row { func (jrr *JSONRowReader) Error() error { return jrr.err } + +func (jrr *JSONRowReader) Close() error { + return nil +} diff --git a/internal/infra/pull/sql_datasource.go b/internal/infra/pull/sql_datasource.go index 0f94c6ba..dbb20c62 100644 --- a/internal/infra/pull/sql_datasource.go +++ b/internal/infra/pull/sql_datasource.go @@ -21,6 +21,7 @@ import ( "database/sql" "fmt" "strings" + "time" "github.com/cgi-fr/lino/internal/infra/commonsql" "github.com/cgi-fr/lino/pkg/pull" @@ -30,13 +31,37 @@ import ( "github.com/xo/dburl" ) +func WithMaxLifetime(maxLifeTime time.Duration) pull.DataSourceOption { + return func(ds pull.DataSource) { + log.Info().Int64("maxLifetime", int64(maxLifeTime.Seconds())).Msg("setting database connection parameter") + ds.(*SQLDataSource).maxLifetime = maxLifeTime + } +} + +func WithMaxOpenConns(maxOpenConns int) pull.DataSourceOption { + return func(ds pull.DataSource) { + log.Info().Int("maxOpenConns", maxOpenConns).Msg("setting database connection parameter") + ds.(*SQLDataSource).maxOpenConns = maxOpenConns + } +} + +func WithMaxIdleConns(maxIdleConns int) pull.DataSourceOption { + return func(ds pull.DataSource) { + log.Info().Int("maxIdleConns", maxIdleConns).Msg("setting database connection parameter") + ds.(*SQLDataSource).maxIdleConns = maxIdleConns + } +} + // SQLDataSource to read in the pull process. type SQLDataSource struct { - url string - schema string - dbx *sqlx.DB - db *sql.DB - dialect commonsql.Dialect + url string + schema string + dbx *sqlx.DB + db *sql.DB + dialect commonsql.Dialect + maxLifetime time.Duration + maxOpenConns int + maxIdleConns int } // Open a connection to the SQL DB @@ -46,6 +71,13 @@ func (ds *SQLDataSource) Open() error { return err } + log.Info().Msg("open database connection pool") + + // database handle settings + db.SetConnMaxLifetime(ds.maxLifetime) + db.SetMaxOpenConns(ds.maxOpenConns) + db.SetMaxIdleConns(ds.maxIdleConns) + ds.db = db u, err := dburl.Parse(ds.url) @@ -99,6 +131,8 @@ func (ds *SQLDataSource) Read(source pull.Table, filter pull.Filter) (pull.RowSe return nil, err } + defer reader.Close() + result := pull.RowSet{} for reader.Next() { result = append(result, reader.Value()) @@ -130,6 +164,8 @@ func (ds *SQLDataSource) RowReader(source pull.Table, filter pull.Filter) (pull. return nil, err } + log.Info().Msg("open database rows iterator") + return &SQLDataIterator{rows, nil, nil}, nil } @@ -191,6 +227,7 @@ func (ds *SQLDataSource) Close() error { if err != nil { return err } + log.Info().Msg("close database connection pool") return nil } @@ -242,6 +279,12 @@ func (di *SQLDataIterator) Error() error { return di.err } +// Close returns the iterator +func (di *SQLDataIterator) Close() error { + defer log.Info().Msg("close database rows iterator") + return di.rows.Close() +} + func NewSQLDataSource(url, schema string, dbx *sqlx.DB, db *sql.DB, dialect commonsql.Dialect) *SQLDataSource { return &SQLDataSource{ url: url, diff --git a/internal/infra/push/datadestination_db2.go b/internal/infra/push/datadestination_db2.go index 9b94fe38..c396e069 100644 --- a/internal/infra/push/datadestination_db2.go +++ b/internal/infra/push/datadestination_db2.go @@ -40,8 +40,8 @@ func NewDb2DataDestinationFactory() *Db2DataDestinationFactory { } // New return a Db2 pusher -func (e *Db2DataDestinationFactory) New(url string, schema string) push.DataDestination { - return NewSQLDataDestination(url, schema, Db2Dialect{}) +func (e *Db2DataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { + return NewSQLDataDestination(url, schema, Db2Dialect{}, options...) } // Db2Dialect inject oracle variations diff --git a/internal/infra/push/datadestination_db2_dummy.go b/internal/infra/push/datadestination_db2_dummy.go index 73e4e0f3..0ea73054 100644 --- a/internal/infra/push/datadestination_db2_dummy.go +++ b/internal/infra/push/datadestination_db2_dummy.go @@ -35,8 +35,8 @@ func NewDb2DataDestinationFactory() *Db2DataDestinationFactory { } // New return a Db2 pusher -func (e *Db2DataDestinationFactory) New(url string, schema string) push.DataDestination { - return NewSQLDataDestination(url, schema, Db2Dialect{}) +func (e *Db2DataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { + return NewSQLDataDestination(url, schema, Db2Dialect{}, options...) } // Db2Dialect inject oracle variations diff --git a/internal/infra/push/datadestination_http.go b/internal/infra/push/datadestination_http.go index dba1991c..5755c552 100644 --- a/internal/infra/push/datadestination_http.go +++ b/internal/infra/push/datadestination_http.go @@ -38,7 +38,7 @@ func NewHTTPDataDestinationFactory() *HTTPDataDestinationFactory { } // New return a HTTP pusher -func (e *HTTPDataDestinationFactory) New(url string, schema string) push.DataDestination { +func (e *HTTPDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { return NewHTTPDataDestination(url, schema) } diff --git a/internal/infra/push/datadestination_mariadb.go b/internal/infra/push/datadestination_mariadb.go index d088c425..330230f6 100644 --- a/internal/infra/push/datadestination_mariadb.go +++ b/internal/infra/push/datadestination_mariadb.go @@ -34,8 +34,8 @@ func NewMariadbDataDestinationFactory() *MariadbDataDestinationFactory { } // New return a Mariadb pusher -func (e *MariadbDataDestinationFactory) New(url string, schema string) push.DataDestination { - return NewSQLDataDestination(url, schema, MariadbDialect{}) +func (e *MariadbDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { + return NewSQLDataDestination(url, schema, MariadbDialect{}, options...) } // MariadbDialect inject mariadb variations diff --git a/internal/infra/push/datadestination_oracle.go b/internal/infra/push/datadestination_oracle.go index 59815727..f84c1328 100644 --- a/internal/infra/push/datadestination_oracle.go +++ b/internal/infra/push/datadestination_oracle.go @@ -21,8 +21,8 @@ func NewOracleDataDestinationFactory() *OracleDataDestinationFactory { } // New return a Oracle pusher -func (e *OracleDataDestinationFactory) New(url string, schema string) push.DataDestination { - return NewSQLDataDestination(url, schema, OracleDialect{}) +func (e *OracleDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { + return NewSQLDataDestination(url, schema, OracleDialect{}, options...) } // OracleDialect inject oracle variations diff --git a/internal/infra/push/datadestination_postgres.go b/internal/infra/push/datadestination_postgres.go index c8f2abb7..01dcf94f 100755 --- a/internal/infra/push/datadestination_postgres.go +++ b/internal/infra/push/datadestination_postgres.go @@ -34,8 +34,8 @@ func NewPostgresDataDestinationFactory() *PostgresDataDestinationFactory { } // New return a Postgres pusher -func (e *PostgresDataDestinationFactory) New(url string, schema string) push.DataDestination { - return NewSQLDataDestination(url, schema, PostgresDialect{}) +func (e *PostgresDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { + return NewSQLDataDestination(url, schema, PostgresDialect{}, options...) } // PostgresDialect inject postgres variations diff --git a/internal/infra/push/datadestination_sql.go b/internal/infra/push/datadestination_sql.go index 24234bf5..2396d18e 100644 --- a/internal/infra/push/datadestination_sql.go +++ b/internal/infra/push/datadestination_sql.go @@ -21,6 +21,7 @@ import ( "database/sql" "fmt" "strings" + "time" "github.com/cgi-fr/lino/pkg/push" "github.com/jmoiron/sqlx" @@ -28,6 +29,27 @@ import ( "github.com/xo/dburl" ) +func WithMaxLifetime(maxLifeTime time.Duration) push.DataDestinationOption { + return func(ds push.DataDestination) { + log.Info().Int64("maxLifetime", int64(maxLifeTime.Seconds())).Msg("setting database connection parameter") + ds.(*SQLDataDestination).maxLifetime = maxLifeTime + } +} + +func WithMaxOpenConns(maxOpenConns int) push.DataDestinationOption { + return func(ds push.DataDestination) { + log.Info().Int("maxOpenConns", maxOpenConns).Msg("setting database connection parameter") + ds.(*SQLDataDestination).maxOpenConns = maxOpenConns + } +} + +func WithMaxIdleConns(maxIdleConns int) push.DataDestinationOption { + return func(ds push.DataDestination) { + log.Info().Int("maxIdleConns", maxIdleConns).Msg("setting database connection parameter") + ds.(*SQLDataDestination).maxIdleConns = maxIdleConns + } +} + // SQLDataDestination read data from a SQL database. type SQLDataDestination struct { url string @@ -38,16 +60,25 @@ type SQLDataDestination struct { mode push.Mode disableConstraints bool dialect SQLDialect + maxLifetime time.Duration + maxOpenConns int + maxIdleConns int } // NewSQLDataDestination creates a new SQL datadestination. -func NewSQLDataDestination(url string, schema string, dialect SQLDialect) *SQLDataDestination { - return &SQLDataDestination{ +func NewSQLDataDestination(url string, schema string, dialect SQLDialect, options ...push.DataDestinationOption) *SQLDataDestination { + dd := &SQLDataDestination{ url: url, schema: schema, rowWriter: map[string]*SQLRowWriter{}, dialect: dialect, } + + for _, option := range options { + option(dd) + } + + return dd } // Close SQL connections @@ -84,6 +115,8 @@ func (dd *SQLDataDestination) Close() *push.Error { errors = append(errors, &push.Error{Description: err2.Error()}) } + log.Info().Msg("close database connection pool") + if len(errors) > 0 { allErrors := &push.Error{} for _, err := range errors { @@ -110,11 +143,15 @@ func (dd *SQLDataDestination) Commit() *push.Error { } log.Debug().Msg("transaction committed") + log.Info().Msg("close (commit) database transaction") + tx, err := dd.db.Begin() if err != nil { return &push.Error{Description: err.Error()} } + log.Info().Msg("open database transaction") + dd.tx = tx return nil @@ -130,6 +167,13 @@ func (dd *SQLDataDestination) Open(plan push.Plan, mode push.Mode, disableConstr return &push.Error{Description: err.Error()} } + log.Info().Msg("open database connection pool") + + // database handle settings + db.SetConnMaxLifetime(dd.maxLifetime) + db.SetMaxOpenConns(dd.maxOpenConns) + db.SetMaxIdleConns(dd.maxIdleConns) + u, err := dburl.Parse(dd.url) if err != nil { return &push.Error{Description: err.Error()} @@ -150,6 +194,8 @@ func (dd *SQLDataDestination) Open(plan push.Plan, mode push.Mode, disableConstr } dd.tx = tx + log.Info().Msg("open database transaction") + for _, table := range plan.Tables() { rw := NewSQLRowWriter(table, dd) err := rw.open() @@ -227,6 +273,7 @@ func (rw *SQLRowWriter) close() *push.Error { if err != nil { return &push.Error{Description: err.Error()} } + log.Info().Msg("close database statement") rw.statement = nil log.Debug().Msg(fmt.Sprintf("close statement %s", rw.dd.mode)) } @@ -284,6 +331,7 @@ func (rw *SQLRowWriter) createStatement(row push.Row, where push.Row) *push.Erro if err != nil { return &push.Error{Description: err.Error()} } + log.Info().Msg("open database statement") rw.statement = stmt return nil } @@ -350,12 +398,12 @@ func (rw *SQLRowWriter) Write(row push.Row, where push.Row) *push.Error { _, err2 := rw.statement.Exec(values...) if err2 != nil { // reset statement after error - if err := rw.close(); err != nil { - return &push.Error{Description: err.Error() + "\noriginal error :\n" + err2.Error()} - } if rw.dd.dialect.IsDuplicateError(err2) { log.Trace().Msg(fmt.Sprintf("duplicate key %v (%s) for %s", row, rw.table.PrimaryKey(), rw.table.Name())) } else { + if err := rw.close(); err != nil { + return &push.Error{Description: err.Error() + "\noriginal error :\n" + err2.Error()} + } return &push.Error{Description: err2.Error()} } } @@ -383,7 +431,9 @@ func (rw *SQLRowWriter) disableConstraints() *push.Error { return &push.Error{Description: err.Error()} } - defer result.Close() + log.Info().Msg("open database rows iterator") + + defer func() { result.Close(); log.Info().Msg("close database rows iterator") }() var tableName, constraintName string for result.Next() { diff --git a/internal/infra/push/datadestination_sqlserver.go b/internal/infra/push/datadestination_sqlserver.go index cb3a28e9..a9153002 100644 --- a/internal/infra/push/datadestination_sqlserver.go +++ b/internal/infra/push/datadestination_sqlserver.go @@ -36,8 +36,8 @@ func NewSQLServerDataDestinationFactory() *SQLServerDataDestinationFactory { } // New return a SQLServer pusher -func (e *SQLServerDataDestinationFactory) New(url string, schema string) push.DataDestination { - return NewSQLDataDestination(url, schema, SQLServerDialect{}) +func (e *SQLServerDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { + return NewSQLDataDestination(url, schema, SQLServerDialect{}, options...) } // SQLServerDialect inject SQLServer variations diff --git a/internal/infra/push/datadestination_ws.go b/internal/infra/push/datadestination_ws.go index 21e065f6..4d954d21 100644 --- a/internal/infra/push/datadestination_ws.go +++ b/internal/infra/push/datadestination_ws.go @@ -34,7 +34,7 @@ import ( type Action string -// "pull_open", "push_open", "push_data", "push_commit", "push_close" +// "pull_open", "push_open", "push_data", "push_commit", "push_close" const ( PushOpen Action = "push_open" PushData Action = "push_data" @@ -77,7 +77,7 @@ func NewWebSocketDataDestinationFactory() *WebSocketDataDestinationFactory { } // New return a web socket pusher -func (e *WebSocketDataDestinationFactory) New(url string, schema string) push.DataDestination { +func (e *WebSocketDataDestinationFactory) New(url string, schema string, options ...push.DataDestinationOption) push.DataDestination { return NewWebSocketDataDestination(url, schema) } diff --git a/internal/infra/relation/extractor_mariadb.go b/internal/infra/relation/extractor_mariadb.go index e3ec42d0..2d0beecc 100644 --- a/internal/infra/relation/extractor_mariadb.go +++ b/internal/infra/relation/extractor_mariadb.go @@ -22,7 +22,7 @@ import ( // import mariadb connector "fmt" - _ "github.com/go-sql-driver/mysql" + _ "github.com/ziutek/mymysql/godrv" "github.com/cgi-fr/lino/pkg/relation" ) diff --git a/internal/infra/table/extractor_mariadb.go b/internal/infra/table/extractor_mariadb.go index b3414c36..07c39431 100644 --- a/internal/infra/table/extractor_mariadb.go +++ b/internal/infra/table/extractor_mariadb.go @@ -21,7 +21,7 @@ import ( "fmt" // import mariadbsql connector - _ "github.com/go-sql-driver/mysql" + _ "github.com/ziutek/mymysql/godrv" "github.com/cgi-fr/lino/internal/infra/commonsql" "github.com/cgi-fr/lino/pkg/table" diff --git a/pkg/pull/driven.go b/pkg/pull/driven.go index 2a34757f..3fdc46eb 100755 --- a/pkg/pull/driven.go +++ b/pkg/pull/driven.go @@ -22,9 +22,11 @@ type RowExporter interface { Export(ExportedRow) error } +type DataSourceOption func(DataSource) + // DataSourceFactory exposes methods to create new datasources. type DataSourceFactory interface { - New(url string, schema string) DataSource + New(url string, schema string, options ...DataSourceOption) DataSource } // DataSource to read in the pull process. @@ -40,6 +42,7 @@ type RowReader interface { Next() bool Value() Row Error() error + Close() error } // TraceListener receives diagnostic trace. diff --git a/pkg/pull/driven_DatasourceInMemory.go b/pkg/pull/driven_DatasourceInMemory.go index 196c5f2e..a050439c 100644 --- a/pkg/pull/driven_DatasourceInMemory.go +++ b/pkg/pull/driven_DatasourceInMemory.go @@ -40,6 +40,8 @@ func (ds DataSourceInMemory) Read(source Table, filter Filter) (RowSet, error) { return nil, err } + defer reader.Close() + result := RowSet{} for reader.Next() { result = append(result, reader.Value()) @@ -104,3 +106,4 @@ func (rr *RowReaderInMemory) Value() Row { return row } func (rr *RowReaderInMemory) Error() error { return nil } +func (rr *RowReaderInMemory) Close() error { return nil } diff --git a/pkg/pull/driven_OneEmptyRowReader.go b/pkg/pull/driven_OneEmptyRowReader.go index 0ab9d88e..43291596 100644 --- a/pkg/pull/driven_OneEmptyRowReader.go +++ b/pkg/pull/driven_OneEmptyRowReader.go @@ -38,3 +38,6 @@ func (r OneEmptyRowReader) Value() Row { return Row{} } // Error return always nil func (r OneEmptyRowReader) Error() error { return nil } + +// Close return always nil +func (r OneEmptyRowReader) Close() error { return nil } diff --git a/pkg/pull/driver.go b/pkg/pull/driver.go index c35b582b..5957dc8d 100755 --- a/pkg/pull/driver.go +++ b/pkg/pull/driver.go @@ -110,6 +110,8 @@ func (p *puller) Pull(start Table, filter Filter, filterCohort RowReader, exclud return fmt.Errorf("%w", err) } + defer reader.Close() + for reader.Next() { IncLinesPerStepCount(string(start.Name)) row := start.export(reader.Value()) diff --git a/pkg/pull/driver_parallel.go b/pkg/pull/driver_parallel.go index ea55c821..9d1daf49 100644 --- a/pkg/pull/driver_parallel.go +++ b/pkg/pull/driver_parallel.go @@ -121,6 +121,8 @@ func (p *pullerParallel) Pull(start Table, filter Filter, filterCohort RowReader return fmt.Errorf("%w", err) } + defer reader.Close() + for reader.Next() { IncLinesPerStepCount(string(start.Name)) p.inChan <- reader.Value() diff --git a/pkg/push/driven.go b/pkg/push/driven.go index 57e5f7ef..dbd50757 100755 --- a/pkg/push/driven.go +++ b/pkg/push/driven.go @@ -17,9 +17,11 @@ package push +type DataDestinationOption func(DataDestination) + // DataDestinationFactory exposes methods to create new datadestinations. type DataDestinationFactory interface { - New(url string, schema string) DataDestination + New(url string, schema string, options ...DataDestinationOption) DataDestination } // DataDestination to write in the push process. diff --git a/tests/suites/pull/stats.yml b/tests/suites/pull/stats.yml index 75d54356..28d17383 100644 --- a/tests/suites/pull/stats.yml +++ b/tests/suites/pull/stats.yml @@ -38,7 +38,7 @@ testcases: - script: lino pull -v 5 source --where "address_id > 13" --log-json assertions: - result.code ShouldEqual 0 - - result.systemerr ShouldContainSubstring {"level":"info","limit":1,"filter":{},"diagnostic":false,"distinct":false,"filter-from-file":"","exclude-from-file":"","table":"","where":"address_id > 13","parallel":1,"message":"Pull mode"} + - result.systemerr ShouldContainSubstring {"level":"info","limit":1,"filter":{},"diagnostic":false,"distinct":false,"filter-from-file":"","exclude-from-file":"","table":"","where":"address_id > 13","parallel":1, - name: pull with filter from file and where clause steps: