Skip to content

Commit ebaf291

Browse files
authored
feat(push): watch push stats real time (#275)
1 parent 93ba495 commit ebaf291

File tree

10 files changed

+114
-3
lines changed

10 files changed

+114
-3
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ Types of changes
1414
- `Fixed` for any bug fixes.
1515
- `Security` in case of vulnerabilities.
1616

17+
## [2.8.0]
18+
19+
- `Added` flag `--watch` (short `-w`) to `lino push` command
20+
1721
## [2.7.1]
1822

1923
- `Fixed` panic during push on Oracle database with a `null` column value

cmd/lino/dep_push.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,7 @@ func pushRowExporterFactory() func(io.Writer) domain.RowWriter {
4848
func pushTranslator() domain.Translator {
4949
return infra.NewFileTranslator()
5050
}
51+
52+
func pushObserver() domain.Observer {
53+
return infra.NewObserver()
54+
}

cmd/lino/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func initConfig() {
211211
sequence.Inject(dataconnectorStorage(), tableStorage(), sequenceStorage(), sequenceUpdatorFactory())
212212
id.Inject(idStorageFile, relationStorage(), idExporter(), idJSONStorage(*os.Stdout))
213213
pull.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pullDataSourceFactory(), pullRowExporterFactory(), pullRowReaderFactory(), pullKeyStoreFactory(), traceListner(os.Stderr))
214-
push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator())
214+
push.Inject(dataconnectorStorage(), relationStorage(), tableStorage(), idStorageFactory(), pushDataDestinationFactory(), pushRowIteratorFactory(), pushRowExporterFactory(), pushTranslator(), pushObserver())
215215
}
216216

217217
func writeMetricsToFile(statsFile string, statsByte []byte) {

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ require (
1919
github.com/microsoft/go-mssqldb v1.7.0
2020
github.com/mitchellh/go-homedir v1.1.0
2121
github.com/rs/zerolog v1.32.0
22+
github.com/schollz/progressbar/v3 v3.14.2
2223
github.com/sijms/go-ora/v2 v2.8.10
2324
github.com/spf13/cobra v1.8.0
2425
github.com/stretchr/testify v1.9.0
@@ -41,7 +42,9 @@ require (
4142
github.com/kr/text v0.2.0 // indirect
4243
github.com/mattn/go-colorable v0.1.13 // indirect
4344
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
45+
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect
4446
github.com/pmezard/go-difflib v1.0.0 // indirect
47+
github.com/rivo/uniseg v0.4.7 // indirect
4548
github.com/spf13/pflag v1.0.5 // indirect
4649
github.com/stretchr/objx v0.5.2 // indirect
4750
golang.org/x/crypto v0.18.0 // indirect

go.sum

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/
245245
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
246246
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
247247
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
248+
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
248249
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
249250
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
250251
github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE=
@@ -286,6 +287,8 @@ github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3N
286287
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
287288
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
288289
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
290+
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ=
291+
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw=
289292
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
290293
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
291294
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@@ -311,6 +314,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
311314
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
312315
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
313316
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
317+
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
318+
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
314319
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
315320
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
316321
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
@@ -325,6 +330,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
325330
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
326331
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
327332
github.com/sagikazarmark/crypt v0.1.0/go.mod h1:B/mN0msZuINBtQ1zZLEQcegFJJf9vnYIR88KRMEuODE=
333+
github.com/schollz/progressbar/v3 v3.14.2 h1:EducH6uNLIWsr560zSV1KrTeUb/wZGAHqyMFIEa99ks=
334+
github.com/schollz/progressbar/v3 v3.14.2/go.mod h1:aQAZQnhF4JGFtRJiw/eobaXpsqpVQAftEQ+hLGXaRc4=
328335
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
329336
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
330337
github.com/sijms/go-ora/v2 v2.8.10 h1:Ekhx0I+A9qVBy1eOLa2eIhHWWYwVTa0MM78KS6h+5fg=
@@ -551,9 +558,11 @@ golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBc
551558
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
552559
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
553560
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
561+
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
554562
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
555563
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
556564
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
565+
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
557566
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
558567
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
559568
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

internal/app/push/cli.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ var (
4545
rowIteratorFactory func(io.ReadCloser) push.RowIterator
4646
rowExporterFactory func(io.Writer) push.RowWriter
4747
translator push.Translator
48+
observer push.Observer
4849
)
4950

5051
// Inject dependencies
@@ -57,6 +58,7 @@ func Inject(
5758
rif func(io.ReadCloser) push.RowIterator,
5859
ref func(io.Writer) push.RowWriter,
5960
trnsltor push.Translator,
61+
obs push.Observer,
6062
) {
6163
dataconnectorStorage = dbas
6264
relStorage = rs
@@ -66,6 +68,7 @@ func Inject(
6668
rowIteratorFactory = rif
6769
rowExporterFactory = ref
6870
translator = trnsltor
71+
observer = obs
6972
}
7073

7174
// NewCommand implements the cli pull command
@@ -81,6 +84,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
8184
whereField string
8285
savepoint string
8386
autoTruncate bool
87+
watch bool
8488
)
8589

8690
cmd := &cobra.Command{
@@ -152,7 +156,12 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
152156
os.Exit(1)
153157
}
154158

155-
e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate)
159+
observers := []push.Observer{}
160+
if watch {
161+
observers = append(observers, observer)
162+
}
163+
164+
e3 := push.Push(rowIteratorFactory(in), datadestination, plan, mode, commitSize, disableConstraints, rowExporter, translator, whereField, savepoint, autoTruncate, observers...)
156165
if e3 != nil {
157166
log.Fatal().AnErr("error", e3).Msg("Fatal error stop the push command")
158167
os.Exit(1)
@@ -174,6 +183,7 @@ func NewCommand(fullName string, err *os.File, out *os.File, in *os.File) *cobra
174183
cmd.Flags().StringVar(&whereField, "using-pk-field", "__usingpk__", "Name of the data field that can be used as pk for update queries")
175184
cmd.Flags().StringVar(&savepoint, "savepoint", "", "Name of a file to write primary keys of effectively processed lines (commit to database)")
176185
cmd.Flags().BoolVarP(&autoTruncate, "autotruncate", "a", false, "Automatically truncate values to the maximum length defined in table.yaml")
186+
cmd.Flags().BoolVarP(&watch, "watch", "w", false, "watch statistics about pushed lines")
177187
cmd.SetOut(out)
178188
cmd.SetErr(err)
179189
cmd.SetIn(in)

internal/app/push/cli_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func Test_getDataDestination(t *testing.T) {
4343
func(io.ReadCloser) push.RowIterator { return &push.MockRowIterator{} },
4444
func(io.Writer) push.RowWriter { return &push.MockRowWriter{} },
4545
push.NewMockTranslator(),
46+
nil,
4647
)
4748

4849
type args struct {

internal/infra/push/observer.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (C) 2021 CGI France
2+
//
3+
// This file is part of LINO.
4+
//
5+
// LINO is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU General Public License as published by
7+
// the Free Software Foundation, either version 3 of the License, or
8+
// (at your option) any later version.
9+
//
10+
// LINO is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU General Public License
16+
// along with LINO. If not, see <http://www.gnu.org/licenses/>.
17+
18+
package push
19+
20+
import (
21+
"fmt"
22+
"os"
23+
"time"
24+
25+
"github.com/schollz/progressbar/v3"
26+
)
27+
28+
type Observer struct {
29+
count int
30+
bar *progressbar.ProgressBar
31+
}
32+
33+
func NewObserver() *Observer {
34+
//nolint:gomnd
35+
pgb := progressbar.NewOptions(-1,
36+
progressbar.OptionSetDescription("Pushing ... "),
37+
progressbar.OptionSetItsString("entity"),
38+
progressbar.OptionSetWriter(os.Stderr),
39+
progressbar.OptionShowIts(),
40+
progressbar.OptionSpinnerType(11),
41+
progressbar.OptionThrottle(time.Millisecond*10),
42+
progressbar.OptionOnCompletion(func() { fmt.Fprintln(os.Stderr) }),
43+
// progressbar.OptionShowDescriptionAtLineEnd(),
44+
)
45+
46+
return &Observer{
47+
count: 0,
48+
bar: pgb,
49+
}
50+
}
51+
52+
func (o *Observer) Pushed() {
53+
_ = o.bar.Add(1)
54+
55+
o.count++
56+
57+
o.bar.Describe(fmt.Sprintf("Pushed %d entities", o.count))
58+
}
59+
60+
func (o *Observer) Close() {
61+
_ = o.bar.Close()
62+
}

pkg/push/driven.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,8 @@ type Translator interface {
5959
FindValue(key Key, value Value) Value
6060
Load(keys []Key, rows RowIterator) *Error
6161
}
62+
63+
type Observer interface {
64+
Pushed()
65+
Close()
66+
}

pkg/push/driver.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,15 @@ import (
2626
)
2727

2828
// Push write rows to target table
29-
func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool) (err *Error) {
29+
func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, commitSize uint, disableConstraints bool, catchError RowWriter, translator Translator, whereField string, savepointPath string, autotruncate bool, observers ...Observer) (err *Error) {
30+
defer func() {
31+
for _, observer := range observers {
32+
if observer != nil {
33+
observer.Close()
34+
}
35+
}
36+
}()
37+
3038
err1 := destination.Open(plan, mode, disableConstraints)
3139
if err1 != nil {
3240
return err1
@@ -94,6 +102,11 @@ func Push(ri RowIterator, destination DataDestination, plan Plan, mode Mode, com
94102
IncCommitsCount()
95103
}
96104
IncInputLinesCount()
105+
for _, observer := range observers {
106+
if observer != nil {
107+
observer.Pushed()
108+
}
109+
}
97110
}
98111

99112
if ri.Error() != nil {

0 commit comments

Comments
 (0)