Skip to content

Commit c98c9f7

Browse files
authored
fix(repeat): memory leak (#413)
* test: add repeat non regression test * fix(profiler): recognize mem keyword * refactor(process): wip! rewrite processor with go iterators * refactor(process): wip! impact pipeline api * refactor(process): wip! dry queue * refactor(process): wip! deprecate repeater process * refactor(process): wip! fix failed unit tests * refactor(process): wip! fix wasm build * refactor(process): wip! fix caches * refactor(process): wip! fix conversion with non dict selector * refactor(process): wip! fix wasm build * refactor(process): wip! repeat until and while * refactor(process): wip! refactor until and while * refactor(process): wip! rollback repeatCondition * refactor(process): wip! rollback repeater * refactor(process): wip! fix TempSource * refactor(process): wip! fix repeat while * refactor(process): changelog * refactor(repeat): cleanup unused code
1 parent c778c3d commit c98c9f7

File tree

19 files changed

+494
-203
lines changed

19 files changed

+494
-203
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ Types of changes
1414
- `Fixed` for any bug fixes.
1515
- `Security` in case of vulnerabilities.
1616

17+
## [1.31.1]
18+
19+
- `Fixed` memory leak with `--repeat` option
20+
1721
## [1.31.0]
1822

1923
- `Added` insecure, proxy, proxy-user and user flags to mock command

cmd/pimo/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func run(cmd *cobra.Command) {
328328

329329
if profiling == "cpu" {
330330
profiler = profile.Start(profile.CPUProfile, profile.ProfilePath("."), profile.Quiet)
331-
} else if profiling == "memory" {
331+
} else if profiling == "memory" || profiling == "mem" {
332332
profiler = profile.Start(profile.MemProfile, profile.ProfilePath("."), profile.Quiet)
333333
}
334334

@@ -355,7 +355,7 @@ func run(cmd *cobra.Command) {
355355
dumpStats(stats)
356356
}
357357

358-
if profiling == "cpu" || profiling == "memory" {
358+
if profiling == "cpu" || profiling == "memory" || profiling == "mem" {
359359
profiler.Stop()
360360
}
361361

internal/app/pimo/mock/process.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (s BlackHoleSink) ProcessDictionary(value model.Entry) error {
2222

2323
type Processor struct {
2424
mutex *sync.Mutex
25-
source *model.CallableMapSource
25+
source *model.MutableSource
2626
pipeline model.SinkedPipeline
2727
}
2828

@@ -44,7 +44,7 @@ func NewProcessor(maskingFile string, globalSeed *int64, caches map[string]model
4444
}
4545
}
4646

47-
source := model.NewCallableMapSource()
47+
source := model.NewMutableSource()
4848

4949
pipeline, caches, err := model.BuildPipeline(model.NewPipeline(source), pdef, caches, nil, "", "")
5050
if err != nil {
@@ -62,7 +62,7 @@ func (p *Processor) Process(dict model.Dictionary) error {
6262
p.mutex.Lock()
6363
defer p.mutex.Unlock()
6464

65-
p.source.SetValue(dict)
65+
p.source.SetValues(dict)
6666

6767
err := p.pipeline.Run()
6868

internal/app/pimo/pimo.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,13 @@ func (ctx *Context) Configure(cfg Config) error {
122122
switch {
123123
case cfg.XMLCallback:
124124
over.MDC().Set("context", "callback-input")
125-
ctx.source = model.NewCallableMapSource()
125+
ctx.source = model.NewMutableSource()
126126
case cfg.EmptyInput:
127127
over.MDC().Set("context", "empty-input")
128-
ctx.source = model.NewSourceFromSlice([]model.Dictionary{model.NewPackedDictionary()})
128+
ctx.source = model.NewSliceSource([]model.Dictionary{model.NewPackedDictionary()})
129129
case cfg.SingleInput != nil:
130130
over.MDC().Set("context", "single-input")
131-
ctx.source = model.NewSourceFromSlice([]model.Dictionary{cfg.SingleInput.Pack()})
131+
ctx.source = model.NewSliceSource([]model.Dictionary{cfg.SingleInput.Pack()})
132132
case cfg.ParquetInput != "":
133133
over.MDC().Set("context", "parquet")
134134
source, err := parquet.NewPackedSource(cfg.ParquetInput)
@@ -145,6 +145,10 @@ func (ctx *Context) Configure(cfg Config) error {
145145
return fmt.Errorf("Cannot use repeatUntil and repeatWhile options together")
146146
}
147147

148+
if cfg.Iteration > 1 {
149+
ctx.source = model.NewCountRepeater(ctx.source, cfg.Iteration)
150+
}
151+
148152
ctx.repeatCondition = cfg.RepeatWhile
149153
ctx.repeatConditionMode = "while"
150154
if cfg.RepeatUntil != "" {
@@ -153,8 +157,8 @@ func (ctx *Context) Configure(cfg Config) error {
153157
}
154158

155159
ctx.pipeline = model.NewPipeline(ctx.source).
156-
Process(model.NewCounterProcessWithCallback("input-line", 0, updateContext)).
157-
Process(model.NewRepeaterProcess(cfg.Iteration))
160+
Process(model.NewCounterProcessWithCallback("input-line", 0, updateContext))
161+
158162
over.AddGlobalFields("input-line")
159163

160164
injectTemplateFuncs()
@@ -399,11 +403,11 @@ func (ctx *Context) ExecuteMap(data map[string]any) (map[string]any, error) {
399403
for k, v := range data {
400404
input = input.With(k, v)
401405
}
402-
source, ok := ctx.source.(*model.CallableMapSource)
406+
source, ok := ctx.source.(*model.MutableSource)
403407
if !ok {
404-
return nil, fmt.Errorf("Source is not CallableMapSource")
408+
return nil, fmt.Errorf("Source is not MutableSource")
405409
}
406-
source.SetValue(input)
410+
source.SetValues(input)
407411
result := []model.Entry{}
408412
err := ctx.pipeline.AddSink(model.NewSinkToSlice(&result)).Run()
409413
if err != nil {

pkg/jsonline/jsonline.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package jsonline
2020
import (
2121
"bufio"
2222
"io"
23+
"iter"
2324

2425
over "github.com/adrienaury/zeromdc"
2526
"github.com/cgi-fr/pimo/pkg/model"
@@ -54,6 +55,40 @@ func (s *Source) Open() error {
5455
return nil
5556
}
5657

58+
func (s *Source) Close() error {
59+
return nil
60+
}
61+
62+
func (s *Source) Values() iter.Seq2[model.Entry, error] {
63+
return func(yield func(model.Entry, error) bool) {
64+
for s.fscanner.Scan() {
65+
line := s.fscanner.Bytes()
66+
67+
var dict model.Dictionary
68+
var err error
69+
70+
if s.packed {
71+
dict, err = JSONToPackedDictionary(line)
72+
} else {
73+
dict, err = JSONToDictionary(line)
74+
}
75+
76+
if err != nil {
77+
yield(dict, err)
78+
return
79+
}
80+
81+
if !yield(dict, nil) {
82+
return
83+
}
84+
}
85+
86+
if s.fscanner.Err() != nil {
87+
yield(model.NewDictionary(), s.fscanner.Err())
88+
}
89+
}
90+
}
91+
5792
// Next convert next line to model.Dictionary
5893
func (s *Source) Next() bool {
5994
if !s.fscanner.Scan() {

pkg/model/common.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package model
2+
3+
type Resource interface {
4+
Open() error
5+
Close() error
6+
}

pkg/model/iterable.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package model
2+
3+
import "iter"
4+
5+
// Iterable is a resource that can be iterated over to produce values of type T. It extends the Resource interface.
6+
type Iterable[T any] interface {
7+
Resource
8+
Values() iter.Seq2[T, error]
9+
}

0 commit comments

Comments
 (0)