Skip to content

feat(pull): filter by scanning all rows #351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions internal/app/pull/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
var initialFilters map[string]string
var diagnostic bool
var filters pull.RowReader
var scan bool
var parallel uint

cmd := &cobra.Command{
Expand All @@ -99,6 +100,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
Bool("diagnostic", diagnostic).
Bool("distinct", distinct).
Str("filter-from-file", filefilter).
Bool("scan", scan).
Str("exclude-from-file", fileexclude).
Str("table", table).
Str("where", where).
Expand Down Expand Up @@ -133,19 +135,47 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
tracer = traceListener
}

switch filefilter {
case "":
filters = pull.NewOneEmptyRowReader()
case "-":
filters = rowReaderFactory(in)
default:
filterReader, e3 := os.Open(filefilter)
if e3 != nil {
fmt.Fprintln(err, e3.Error())
os.Exit(1)
var filtersIn pull.KeyStore
if scan {
switch filefilter {
case "":
filtersIn = nil
case "-":
var eks error
filtersIn, eks = keyStoreFactory(in, start.Keys)
if eks != nil {
fmt.Fprintln(err, eks.Error())
os.Exit(1)
}
default:
filterReader, e3 := os.Open(filefilter)
if e3 != nil {
fmt.Fprintln(err, e3.Error())
os.Exit(1)
}
var eks error
filtersIn, eks = keyStoreFactory(filterReader, start.Keys)
if eks != nil {
fmt.Fprintln(err, eks.Error())
os.Exit(1)
}
log.Trace().Str("file", filefilter).Msg("reading file")
}
} else {
switch filefilter {
case "":
filters = pull.NewOneEmptyRowReader()
case "-":
filters = rowReaderFactory(in)
default:
filterReader, e3 := os.Open(filefilter)
if e3 != nil {
fmt.Fprintln(err, e3.Error())
os.Exit(1)
}
filters = rowReaderFactory(filterReader)
log.Trace().Str("file", filefilter).Msg("reading file")
}
filters = rowReaderFactory(filterReader)
log.Trace().Str("file", filefilter).Msg("reading file")
}

var filtersEx pull.KeyStore
Expand Down Expand Up @@ -175,7 +205,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
}

puller := pull.NewPullerParallel(plan, datasource, pullExporterFactory(out), tracer, parallel)
if e3 := puller.Pull(start, filter, startSelect, filters, filtersEx); e3 != nil {
if e3 := puller.Pull(start, filter, startSelect, filters, filtersEx, filtersIn); e3 != nil {
log.Fatal().AnErr("error", e3).Msg("Fatal error stop the pull command")
os.Exit(1)
}
Expand All @@ -192,6 +222,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
cmd.Flags().BoolVarP(&diagnostic, "diagnostic", "d", false, "Set diagnostic debug on")
cmd.Flags().BoolVarP(&distinct, "distinct", "D", false, "select distinct values from start table")
cmd.Flags().StringVarP(&filefilter, "filter-from-file", "F", "", "Use file to filter start table")
cmd.Flags().BoolVarP(&scan, "scan", "s", false, "read all rows from start table and apply filter `filter-from-file` in memory")
cmd.Flags().StringVarP(&fileexclude, "exclude-from-file", "X", "", "Use file to filter out start table")
cmd.Flags().StringVarP(&table, "table", "t", "", "pull content of table without relations instead of ingress descriptor definition")
cmd.Flags().StringVarP(&where, "where", "w", "", "Advanced SQL where clause to filter")
Expand Down
2 changes: 1 addition & 1 deletion internal/app/pull/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func HandlerFactory(ingressDescriptor string) func(w http.ResponseWriter, r *htt
pullExporter := pullExporterFactory(w)
puller := pull.NewPuller(plan, datasource, pullExporter, pull.NoTraceListener{})

e3 := puller.Pull(start, pull.Filter{Limit: limit, Values: pull.Row{}, Where: where, Distinct: distinct}, startSelect, nil, nil)
e3 := puller.Pull(start, pull.Filter{Limit: limit, Values: pull.Row{}, Where: where, Distinct: distinct}, startSelect, nil, nil, nil)
if e3 != nil {
log.Error().Err(e3).Msg("")
w.WriteHeader(http.StatusInternalServerError)
Expand Down
58 changes: 58 additions & 0 deletions internal/infra/pull/keystore_json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (C) 2023 CGI France
//
// This file is part of LINO.
//
// LINO is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// LINO is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with LINO. If not, see <http://www.gnu.org/licenses/>.

package pull

import (
"bytes"
"strings"
"testing"

"github.com/cgi-fr/lino/pkg/pull"
"github.com/stretchr/testify/assert"
)

func TestJSONKeyStore(t *testing.T) {
var buffer bytes.Buffer
buffer.WriteString(`{"ID_INDIVIDU":1000006456,"ID_LANGUE":472355}`)

ks, err := NewJSONKeyStore(&buffer, []string{"ID_INDIVIDU", "ID_LANGUE"})

assert.Nil(t, err)
assert.True(t, ks.Has(pull.Row{"ID_INDIVIDU": 1000006456, "ID_LANGUE": 472355}))

assert.True(t, ks.Has(pull.Row{"ID_INDIVIDU": 1000006456, "ID_LANGUE": 472355, "OTHER": 42}))

assert.False(t, ks.Has(pull.Row{"ID_INDIVIDU": 1000008957, "ID_LANGUE": 472355}))
assert.False(t, ks.Has(pull.Row{"ID_INDIVIDU": 1000006456, "ID_LANGUE": 472354, "OTHER": 42}))
assert.False(t, ks.Has(pull.Row{"ID_INDIVIDU": 1000006456, "OTHER": 42}))
}

func TestJSONKeyStoreFromJSON(t *testing.T) {
ks, err := NewJSONKeyStore(strings.NewReader(
`{"ID_INDIVIDU":1000006456,"ID_LANGUE":472355}`),
[]string{"ID_INDIVIDU", "ID_LANGUE"})

assert.Nil(t, err)

jrr := NewJSONRowReader(strings.NewReader(`{"ID_INDIVIDU":1000006456,"ID_LANGUE":472355,"CODE_LANGUE":"AN","NIVEAU":"2","COMPLEMENT":null,"JSON_LIAISON":null,"ORIGINE_DONNEE":"MIGAUDEPP","VISIBILITE_DONNEE":"C"}`))

assert.True(t, jrr.Next())

row := jrr.Value()
assert.True(t, ks.Has(row))
}
19 changes: 16 additions & 3 deletions pkg/pull/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewStep(puller *puller, out ExportedRow, entry Relation) *Step {
}

type Puller interface {
Pull(start Table, filter Filter, selectColumns []string, filterCohort RowReader, excluded KeyStore) error
Pull(start Table, filter Filter, selectColumns []string, filterCohort RowReader, excluded KeyStore, included KeyStore) error
}

type puller struct {
Expand All @@ -65,7 +65,7 @@ func NewPuller(plan Plan, datasource DataSource, exporter RowExporter, diagnosti
}
}

func (p *puller) Pull(start Table, filter Filter, selectColumns []string, filterCohort RowReader, excluded KeyStore) error {
func (p *puller) Pull(start Table, filter Filter, selectColumns []string, filterCohort RowReader, excluded KeyStore, included KeyStore) error {
start.selectColumns(selectColumns...)
start = p.graph.addMissingColumns(start)

Expand All @@ -81,6 +81,7 @@ func (p *puller) Pull(start Table, filter Filter, selectColumns []string, filter
if filterCohort != nil {
for filterCohort.Next() {
fc := filterCohort.Value()

values := Row{}
for key, val := range fc {
values[key] = val
Expand All @@ -103,6 +104,7 @@ func (p *puller) Pull(start Table, filter Filter, selectColumns []string, filter
Distinct: filter.Distinct,
})
}
log.Trace().Interface("included", included).Msg("filtrer")

for _, f := range filters {
IncFiltersCount()
Expand All @@ -114,8 +116,19 @@ func (p *puller) Pull(start Table, filter Filter, selectColumns []string, filter
for reader.Next() {
IncLinesPerStepCount(string(start.Name))
row := start.export(reader.Value())

log.Trace().Interface("row", row).Msg("read from DB")
row_keys := extract(row, start.Keys)
log.Trace().Interface("row_keys", row_keys).Msg("read from DB extract keys")
log.Trace().Interface("included", included).Msg("incl")
log.Trace().Interface("excluded", excluded).Msg("excluded")
if excluded != nil && excluded.Has(extract(row, start.Keys)) {
log.Trace().Interface("row", row).Msg("in excluded")
continue
}

if included != nil && !included.Has(extract(row, start.Keys)) {
log.Trace().Interface("row", row).Msg("not in included")

continue
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/pull/driver_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type pullerParallel struct {
outChan chan ExportedRow
errors []error
excluded KeyStore
included KeyStore
}

func NewPullerParallel(plan Plan, datasource DataSource, exporter RowExporter, diagnostic TraceListener, nbworkers uint) Puller { //nolint:lll
Expand All @@ -55,13 +56,14 @@ func NewPullerParallel(plan Plan, datasource DataSource, exporter RowExporter, d
outChan: nil,
errors: nil,
excluded: nil,
included: nil,
}
}

return puller
}

func (p *pullerParallel) Pull(start Table, filter Filter, selectColumns []string, filterCohort RowReader, excluded KeyStore) error {
func (p *pullerParallel) Pull(start Table, filter Filter, selectColumns []string, filterCohort RowReader, excluded KeyStore, included KeyStore) error {
start.selectColumns(selectColumns...)
start = p.graph.addMissingColumns(start)

Expand Down Expand Up @@ -103,6 +105,7 @@ func (p *pullerParallel) Pull(start Table, filter Filter, selectColumns []string
p.outChan = make(chan ExportedRow)
p.errors = []error{}
p.excluded = excluded
p.included = included

wg := &sync.WaitGroup{}

Expand Down Expand Up @@ -172,6 +175,10 @@ LOOP:
continue
}

if p.included != nil && !p.included.Has(extract(out, start.Keys)) {
continue
}

err := p.pull(start, out)
if err != nil {
p.errChan <- err
Expand Down
4 changes: 2 additions & 2 deletions pkg/pull/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func RunTest(t *testing.T, test *Test) {

for _, execution := range test.Executions {
collector.Reset()
assert.NoError(t, puller.Pull(execution.Start, execution.Filter, execution.Select, nil, nil))
assert.NoError(t, puller.Pull(execution.Start, execution.Filter, execution.Select, nil, nil, nil))
assert.Len(t, collector.Result, len(execution.Result))

for i := 0; i < len(execution.Result); i++ {
Expand All @@ -94,7 +94,7 @@ func RunBench(b *testing.B, test *Test) {

for _, execution := range test.Executions {
collector.Reset()
assert.NoError(b, puller.Pull(execution.Start, execution.Filter, execution.Select, nil, nil))
assert.NoError(b, puller.Pull(execution.Start, execution.Filter, execution.Select, nil, nil, nil))
assert.Len(b, collector.Result, len(execution.Result))
}
}
Expand Down
Loading
Loading