diff --git a/internal/infra/pull/datasource_ws.go b/internal/infra/pull/datasource_ws.go index f81d9ba0..f99b4398 100644 --- a/internal/infra/pull/datasource_ws.go +++ b/internal/infra/pull/datasource_ws.go @@ -289,3 +289,7 @@ func (rs *ResultStream) Value() pull.Row { func (rs *ResultStream) Error() error { return rs.err } + +func (rs *ResultStream) Close() error { + return nil +} diff --git a/internal/infra/pull/http_datasource.go b/internal/infra/pull/http_datasource.go index 06cb20ac..1874390d 100644 --- a/internal/infra/pull/http_datasource.go +++ b/internal/infra/pull/http_datasource.go @@ -63,6 +63,8 @@ func (ds *HTTPDataSource) Read(source pull.Table, filter pull.Filter) (pull.RowS return nil, err } + defer reader.Close() + result := pull.RowSet{} for reader.Next() { result = append(result, reader.Value()) diff --git a/internal/infra/pull/rowreader_json.go b/internal/infra/pull/rowreader_json.go index 240c1343..b18e8899 100644 --- a/internal/infra/pull/rowreader_json.go +++ b/internal/infra/pull/rowreader_json.go @@ -68,3 +68,7 @@ func (jrr *JSONRowReader) Value() pull.Row { func (jrr *JSONRowReader) Error() error { return jrr.err } + +func (jrr *JSONRowReader) Close() error { + return nil +} diff --git a/internal/infra/pull/sql_datasource.go b/internal/infra/pull/sql_datasource.go index 2d9f2176..80974808 100644 --- a/internal/infra/pull/sql_datasource.go +++ b/internal/infra/pull/sql_datasource.go @@ -99,6 +99,8 @@ func (ds *SQLDataSource) Read(source pull.Table, filter pull.Filter) (pull.RowSe return nil, err } + defer reader.Close() + result := pull.RowSet{} for reader.Next() { result = append(result, reader.Value()) @@ -250,3 +252,8 @@ func (di *SQLDataIterator) Value() pull.Row { func (di *SQLDataIterator) Error() error { return di.err } + +// Close returns the iterator +func (di *SQLDataIterator) Close() error { + return di.rows.Close() +} diff --git a/pkg/pull/driven.go b/pkg/pull/driven.go index 2a34757f..4f01ad7e 100755 --- a/pkg/pull/driven.go +++ b/pkg/pull/driven.go @@ -40,6 +40,7 @@ type RowReader interface { Next() bool Value() Row Error() error + Close() error } // TraceListener receives diagnostic trace. diff --git a/pkg/pull/driven_DatasourceInMemory.go b/pkg/pull/driven_DatasourceInMemory.go index 196c5f2e..a050439c 100644 --- a/pkg/pull/driven_DatasourceInMemory.go +++ b/pkg/pull/driven_DatasourceInMemory.go @@ -40,6 +40,8 @@ func (ds DataSourceInMemory) Read(source Table, filter Filter) (RowSet, error) { return nil, err } + defer reader.Close() + result := RowSet{} for reader.Next() { result = append(result, reader.Value()) @@ -104,3 +106,4 @@ func (rr *RowReaderInMemory) Value() Row { return row } func (rr *RowReaderInMemory) Error() error { return nil } +func (rr *RowReaderInMemory) Close() error { return nil } diff --git a/pkg/pull/driven_OneEmptyRowReader.go b/pkg/pull/driven_OneEmptyRowReader.go index 0ab9d88e..43291596 100644 --- a/pkg/pull/driven_OneEmptyRowReader.go +++ b/pkg/pull/driven_OneEmptyRowReader.go @@ -38,3 +38,6 @@ func (r OneEmptyRowReader) Value() Row { return Row{} } // Error return always nil func (r OneEmptyRowReader) Error() error { return nil } + +// Close return always nil +func (r OneEmptyRowReader) Close() error { return nil } diff --git a/pkg/pull/driver.go b/pkg/pull/driver.go index c35b582b..5957dc8d 100755 --- a/pkg/pull/driver.go +++ b/pkg/pull/driver.go @@ -110,6 +110,8 @@ func (p *puller) Pull(start Table, filter Filter, filterCohort RowReader, exclud return fmt.Errorf("%w", err) } + defer reader.Close() + for reader.Next() { IncLinesPerStepCount(string(start.Name)) row := start.export(reader.Value()) diff --git a/pkg/pull/driver_parallel.go b/pkg/pull/driver_parallel.go index ea55c821..9d1daf49 100644 --- a/pkg/pull/driver_parallel.go +++ b/pkg/pull/driver_parallel.go @@ -121,6 +121,8 @@ func (p *pullerParallel) Pull(start Table, filter Filter, filterCohort RowReader return fmt.Errorf("%w", err) } + defer reader.Close() + for reader.Next() { IncLinesPerStepCount(string(start.Name)) p.inChan <- reader.Value()