Skip to content

feat: add maxLifetime maxOpen and maxIdle options parameters #271

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Types of changes
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [2.8.0]

- `Added` flags `--conn-max-lifetime`, `--conn-max-open` and `--conn-max-idle` to `lino pull` and `lino push` commands

## [2.7.1]

- `Fixed` panic during push on Oracle database with a `null` column value
Expand Down
1 change: 1 addition & 0 deletions cmd/lino/dep_analyse.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func analyseDataSourceFactory() map[string]infra.SQLExtractorFactory {
"godror": infra.NewOracleExtractorFactory(),
"godror-raw": infra.NewOracleExtractorFactory(),
"mysql": infra.NewMariaDBExtractorFactory(),
"mymysql": infra.NewMariaDBExtractorFactory(),
"db2": infra.NewDB2ExtractorFactory(),
"sqlserver": infra.NewSQLServerExtractorFactory(),
}
Expand Down
1 change: 1 addition & 0 deletions cmd/lino/dep_dataconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func dataPingerFactory() map[string]domain.DataPingerFactory {
"godror": infra.NewSQLDataPingerFactory(),
"godror-raw": infra.NewSQLDataPingerFactory(),
"mysql": infra.NewSQLDataPingerFactory(),
"mymysql": infra.NewSQLDataPingerFactory(),
"db2": infra.NewSQLDataPingerFactory(),
"http": infra.NewHTTPDataPingerFactory(),
"ws": infra.NewWSDataPingerFactory(),
Expand Down
14 changes: 14 additions & 0 deletions cmd/lino/dep_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package main
import (
"io"
"os"
"time"

infra "github.com/cgi-fr/lino/internal/infra/pull"
domain "github.com/cgi-fr/lino/pkg/pull"
Expand All @@ -31,6 +32,7 @@ func pullDataSourceFactory() map[string]domain.DataSourceFactory {
"godror": infra.NewOracleDataSourceFactory(),
"godror-raw": infra.NewOracleDataSourceFactory(),
"mysql": infra.NewMariadbDataSourceFactory(),
"mymysql": infra.NewMariadbDataSourceFactory(),
"db2": infra.NewDb2DataSourceFactory(),
"http": infra.NewHTTPDataSourceFactory(),
"ws": infra.NewWSDataSourceFactory(),
Expand Down Expand Up @@ -59,3 +61,15 @@ func pullKeyStoreFactory() func(file io.ReadCloser, keys []string) (domain.KeySt
func traceListner(file *os.File) domain.TraceListener {
return infra.NewJSONTraceListener(file)
}

func pullMaxLifeTime(maxLifetimeInSeconds int64) domain.DataSourceOption {
return infra.WithMaxLifetime(time.Duration(maxLifetimeInSeconds) * time.Second)
}

func pullMaxOpenConns(maxOpenConns int) domain.DataSourceOption {
return infra.WithMaxOpenConns(maxOpenConns)
}

func pullMaxIdleConns(maxIdleConns int) domain.DataSourceOption {
return infra.WithMaxIdleConns(maxIdleConns)
}
14 changes: 14 additions & 0 deletions cmd/lino/dep_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package main

import (
"io"
"time"

infra "github.com/cgi-fr/lino/internal/infra/push"
domain "github.com/cgi-fr/lino/pkg/push"
Expand All @@ -30,6 +31,7 @@ func pushDataDestinationFactory() map[string]domain.DataDestinationFactory {
"godror": infra.NewOracleDataDestinationFactory(),
"godror-raw": infra.NewOracleDataDestinationFactory(),
"mysql": infra.NewMariadbDataDestinationFactory(),
"mymysql": infra.NewMariadbDataDestinationFactory(),
"db2": infra.NewDb2DataDestinationFactory(),
"http": infra.NewHTTPDataDestinationFactory(),
"ws": infra.NewWebSocketDataDestinationFactory(),
Expand All @@ -48,3 +50,15 @@ func pushRowExporterFactory() func(io.Writer) domain.RowWriter {
func pushTranslator() domain.Translator {
return infra.NewFileTranslator()
}

func pushMaxLifeTime(maxLifetimeInSeconds int64) domain.DataDestinationOption {
return infra.WithMaxLifetime(time.Duration(maxLifetimeInSeconds) * time.Second)
}

func pushMaxOpenConns(maxOpenConns int) domain.DataDestinationOption {
return infra.WithMaxOpenConns(maxOpenConns)
}

func pushMaxIdleConns(maxIdleConns int) domain.DataDestinationOption {
return infra.WithMaxIdleConns(maxIdleConns)
}
4 changes: 2 additions & 2 deletions cmd/lino/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ func initConfig() {
table.Inject(dataconnectorStorage(), tableStorage(), tableExtractorFactory())
sequence.Inject(dataconnectorStorage(), tableStorage(), sequenceStorage(), sequenceUpdatorFactory())
id.Inject(idStorageFile, relationStorage(), idExporter(), idJSONStorage(*os.Stdout))
pull.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pullDataSourceFactory(), pullRowExporterFactory(), pullRowReaderFactory(), pullKeyStoreFactory(), traceListner(os.Stderr))
push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator())
pull.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pullDataSourceFactory(), pullRowExporterFactory(), pullRowReaderFactory(), pullKeyStoreFactory(), traceListner(os.Stderr), pullMaxLifeTime, pullMaxOpenConns, pullMaxIdleConns)
push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator(), pushMaxLifeTime, pushMaxOpenConns, pushMaxIdleConns)
}

func writeMetricsToFile(statsFile string, statsByte []byte) {
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/cgi-fr/jsonline v0.5.0
github.com/cgi-fr/rimo v0.4.0
github.com/docker/docker-credential-helpers v0.8.1
github.com/go-sql-driver/mysql v1.8.0
github.com/gorilla/mux v1.8.1
github.com/hashicorp/go-multierror v1.1.1
github.com/ibmdb/go_ibm_db v0.4.5
Expand All @@ -23,14 +22,14 @@ require (
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
github.com/xo/dburl v0.21.1
github.com/ziutek/mymysql v1.5.4
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/term v0.18.0
gopkg.in/yaml.v3 v3.0.1
nhooyr.io/websocket v1.8.10
)

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 // indirect
github.com/golang-sql/sqlexp v0.1.0 // indirect
Expand Down
7 changes: 3 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 h1:lGlwhPtrX6EVml1hO0ivjkUxsSyl4dsiw9qcA1k/3IQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 h1:6oNBlSdi1QqM1PNW7FPA6xOGA5UNsXnkaYZz9vdPGhA=
Expand Down Expand Up @@ -110,9 +108,8 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.8.0 h1:UtktXaU2Nb64z/pLiGIxY4431SJ4/dR5cjMmlVHgnT4=
github.com/go-sql-driver/mysql v1.8.0/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
Expand Down Expand Up @@ -364,6 +361,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs=
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ=
Expand Down
37 changes: 34 additions & 3 deletions internal/app/pull/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var (
pullExporterFactory func(io.Writer) pull.RowExporter
rowReaderFactory func(io.ReadCloser) pull.RowReader
keyStoreFactory func(io.ReadCloser, []string) (pull.KeyStore, error)
maxLifeTimeOption func(int64) pull.DataSourceOption
maxOpenConnsOption func(int) pull.DataSourceOption
maxIdleConnsOption func(int) pull.DataSourceOption
)

var traceListener pull.TraceListener
Expand All @@ -59,6 +62,9 @@ func Inject(
rrf func(io.ReadCloser) pull.RowReader,
ksf func(io.ReadCloser, []string) (pull.KeyStore, error),
tl pull.TraceListener,
mltOpt func(int64) pull.DataSourceOption,
mocOpt func(int) pull.DataSourceOption,
micOpt func(int) pull.DataSourceOption,
) {
dataconnectorStorage = dbas
relStorage = rs
Expand All @@ -69,6 +75,9 @@ func Inject(
rowReaderFactory = rrf
keyStoreFactory = ksf
traceListener = tl
maxLifeTimeOption = mltOpt
maxOpenConnsOption = mocOpt
maxIdleConnsOption = micOpt
}

// NewCommand implements the cli pull command
Expand All @@ -85,6 +94,8 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
var diagnostic bool
var filters pull.RowReader
var parallel uint
var maxLifeTimeInSeconds int64
var maxOpenConns, maxIdleConns int

cmd := &cobra.Command{
Use: "pull [DB Alias Name]",
Expand All @@ -103,6 +114,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
Str("table", table).
Str("where", where).
Uint("parallel", parallel).
Int64("maxLifeTimeInSeconds", maxLifeTimeInSeconds).
Int("maxOpenConns", maxOpenConns).
Int("maxIdleConns", maxIdleConns).
Msg("Pull mode")
},
Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -111,7 +125,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra

startTime := time.Now()

datasource, e1 := getDataSource(args[0], out)
datasource, e1 := getDataSource(args[0], out, maxLifeTimeInSeconds, maxOpenConns, maxIdleConns)
if e1 != nil {
fmt.Fprintln(err, e1.Error())
os.Exit(1)
Expand Down Expand Up @@ -197,13 +211,16 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
cmd.Flags().StringVarP(&where, "where", "w", "", "Advanced SQL where clause to filter")
cmd.Flags().StringVarP(&ingressDescriptor, "ingress-descriptor", "i", "ingress-descriptor.yaml", "pull content using ingress descriptor definition")
cmd.Flags().UintVarP(&parallel, "parallel", "p", 1, "number of parallel workers")
cmd.Flags().Int64Var(&maxLifeTimeInSeconds, "conn-max-lifetime", -1, "sets the maximum amount of time (in seconds) a connection may be reused")
cmd.Flags().IntVar(&maxOpenConns, "conn-max-open", -1, "sets the maximum number of open connections to the database")
cmd.Flags().IntVar(&maxIdleConns, "conn-max-idle", -1, "sets the maximum number of connections in the idle connection pool")
cmd.SetOut(out)
cmd.SetErr(err)
cmd.SetIn(in)
return cmd
}

func getDataSource(dataconnectorName string, out io.Writer) (pull.DataSource, error) {
func getDataSource(dataconnectorName string, out io.Writer, maxLifeTimeInSeconds int64, maxOpenConns, maxIdleConns int) (pull.DataSource, error) {
alias, e1 := dataconnector.Get(dataconnectorStorage, dataconnectorName)
if e1 != nil {
return nil, e1
Expand All @@ -219,7 +236,21 @@ func getDataSource(dataconnectorName string, out io.Writer) (pull.DataSource, er
return nil, fmt.Errorf("no datasource found for database type")
}

return datasourceFactory.New(u.URL.String(), alias.Schema), nil
options := []pull.DataSourceOption{}

if maxLifeTimeInSeconds >= 0 {
options = append(options, maxLifeTimeOption(maxLifeTimeInSeconds))
}

if maxOpenConns >= 0 {
options = append(options, maxOpenConnsOption(maxOpenConns))
}

if maxIdleConns >= 0 {
options = append(options, maxIdleConnsOption(maxIdleConns))
}

return datasourceFactory.New(u.URL.String(), alias.Schema, options...), nil
}

func getPullerPlan(idStorage id.Storage) (pull.Plan, pull.Table, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/pull/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func HandlerFactory(ingressDescriptor string) func(w http.ResponseWriter, r *htt
return
}

datasource, err = getDataSource(datasourceName, w)
datasource, err = getDataSource(datasourceName, w, -1, -1, -1)
if err != nil {
log.Error().Err(err).Msg("")
w.WriteHeader(http.StatusNotFound)
Expand Down
57 changes: 44 additions & 13 deletions internal/app/push/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var (
rowIteratorFactory func(io.ReadCloser) push.RowIterator
rowExporterFactory func(io.Writer) push.RowWriter
translator push.Translator
maxLifeTimeOption func(int64) push.DataDestinationOption
maxOpenConnsOption func(int) push.DataDestinationOption
maxIdleConnsOption func(int) push.DataDestinationOption
)

// Inject dependencies
Expand All @@ -57,6 +60,9 @@ func Inject(
rif func(io.ReadCloser) push.RowIterator,
ref func(io.Writer) push.RowWriter,
trnsltor push.Translator,
mltOpt func(int64) push.DataDestinationOption,
mocOpt func(int) push.DataDestinationOption,
micOpt func(int) push.DataDestinationOption,
) {
dataconnectorStorage = dbas
relStorage = rs
Expand All @@ -66,21 +72,26 @@ func Inject(
rowIteratorFactory = rif
rowExporterFactory = ref
translator = trnsltor
maxLifeTimeOption = mltOpt
maxOpenConnsOption = mocOpt
maxIdleConnsOption = micOpt
}

// NewCommand implements the cli pull command
func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra.Command {
var (
commitSize uint
disableConstraints bool
catchErrors string
table string
ingressDescriptor string
rowExporter push.RowWriter
pkTranslations map[string]string
whereField string
savepoint string
autoTruncate bool
commitSize uint
disableConstraints bool
catchErrors string
table string
ingressDescriptor string
rowExporter push.RowWriter
pkTranslations map[string]string
whereField string
savepoint string
autoTruncate bool
maxLifeTimeInSeconds int64
maxOpenConns, maxIdleConns int
)

cmd := &cobra.Command{
Expand All @@ -106,6 +117,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
Bool("disable-constraints", disableConstraints).
Str("catch-errors", catchErrors).
Str("table", table).
Int64("maxLifeTimeInSeconds", maxLifeTimeInSeconds).
Int("maxOpenConns", maxOpenConns).
Int("maxIdleConns", maxIdleConns).
Msg("Push mode")
},
Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -122,7 +136,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
mode, _ = push.ParseMode(args[0])
}

datadestination, e1 := getDataDestination(dcDestination)
datadestination, e1 := getDataDestination(dcDestination, maxLifeTimeInSeconds, maxOpenConns, maxIdleConns)
if e1 != nil {
fmt.Fprintln(err, e1.Error())
os.Exit(1)
Expand Down Expand Up @@ -174,6 +188,9 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
cmd.Flags().StringVar(&whereField, "using-pk-field", "__usingpk__", "Name of the data field that can be used as pk for update queries")
cmd.Flags().StringVar(&savepoint, "savepoint", "", "Name of a file to write primary keys of effectively processed lines (commit to database)")
cmd.Flags().BoolVarP(&autoTruncate, "autotruncate", "a", false, "Automatically truncate values to the maximum length defined in table.yaml")
cmd.Flags().Int64Var(&maxLifeTimeInSeconds, "conn-max-lifetime", -1, "sets the maximum amount of time (in seconds) a connection may be reused")
cmd.Flags().IntVar(&maxOpenConns, "conn-max-open", -1, "sets the maximum number of open connections to the database")
cmd.Flags().IntVar(&maxIdleConns, "conn-max-idle", -1, "sets the maximum number of connections in the idle connection pool")
cmd.SetOut(out)
cmd.SetErr(err)
cmd.SetIn(in)
Expand Down Expand Up @@ -221,7 +238,7 @@ func loadTranslator(pkTranslations map[string]string) error {
return nil
}

func getDataDestination(dataconnectorName string) (push.DataDestination, *push.Error) {
func getDataDestination(dataconnectorName string, maxLifeTimeInSeconds int64, maxOpenConns, maxIdleConns int) (push.DataDestination, *push.Error) {
alias, e1 := dataconnector.Get(dataconnectorStorage, dataconnectorName)
if e1 != nil {
return nil, &push.Error{Description: e1.Error()}
Expand All @@ -240,7 +257,21 @@ func getDataDestination(dataconnectorName string) (push.DataDestination, *push.E
return nil, &push.Error{Description: "no datadestination found for database type " + u.UnaliasedDriver}
}

return datadestinationFactory.New(u.URL.String(), alias.Schema), nil
options := []push.DataDestinationOption{}

if maxLifeTimeInSeconds >= 0 {
options = append(options, maxLifeTimeOption(maxLifeTimeInSeconds))
}

if maxOpenConns >= 0 {
options = append(options, maxOpenConnsOption(maxOpenConns))
}

if maxIdleConns >= 0 {
options = append(options, maxIdleConnsOption(maxIdleConns))
}

return datadestinationFactory.New(u.URL.String(), alias.Schema, options...), nil
}

func getPlan(idStorage id.Storage, autoTruncate bool) (push.Plan, *push.Error) {
Expand Down
5 changes: 4 additions & 1 deletion internal/app/push/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func Test_getDataDestination(t *testing.T) {
func(io.ReadCloser) push.RowIterator { return &push.MockRowIterator{} },
func(io.Writer) push.RowWriter { return &push.MockRowWriter{} },
push.NewMockTranslator(),
maxLifeTimeOption,
maxOpenConnsOption,
maxIdleConnsOption,
)

type args struct {
Expand All @@ -63,7 +66,7 @@ func Test_getDataDestination(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, got1 := getDataDestination(tt.args.dataconnectorName)
got, got1 := getDataDestination(tt.args.dataconnectorName, -1, -1, -1)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getDataDestination() got = %v, want %v", got, tt.want)
}
Expand Down
Loading