77 "context"
88 "math"
99 "math/rand/v2"
10+ "sync"
1011 "sync/atomic"
1112 "time"
1213
@@ -26,6 +27,9 @@ func FixedSizeReservoirProvider(k int) ReservoirProvider {
2627// sample each one. If there are more than k, the Reservoir will then randomly
2728// sample all additional measurement with a decreasing probability.
2829func NewFixedSizeReservoir (k int ) * FixedSizeReservoir {
30+ if k < 0 {
31+ k = 0
32+ }
2933 return & FixedSizeReservoir {
3034 storage : newStorage (k ),
3135 nextTracker : newNextTracker (k ),
@@ -97,15 +101,16 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
97101 // https://github.com/MrAlias/reservoir-sampling for a performance
98102 // comparison of reservoir sampling algorithms.
99103
100- r .mu .Lock ()
101- defer r .mu .Unlock ()
102104 count , next := r .incrementCount ()
103- if int (count ) < r .measurementsCap {
104- r .store (int (count ), newMeasurement (ctx , t , n , a ))
105+ intCount := int (count ) // nolint:gosec // count is at most 32 bits in length
106+ if intCount < r .k {
107+ r .store (intCount , newMeasurement (ctx , t , n , a ))
105108 } else if count == next {
106109 // Overwrite a random existing measurement with the one offered.
107- idx := int ( rand .Int64N ( int64 ( cap ( r . measurements ))) )
110+ idx := rand .IntN ( r . k )
108111 r .store (idx , newMeasurement (ctx , t , n , a ))
112+ r .wMu .Lock ()
113+ defer r .wMu .Unlock ()
109114 r .advance ()
110115 }
111116}
@@ -123,7 +128,7 @@ func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
123128}
124129
125130func newNextTracker (k int ) * nextTracker {
126- nt := & nextTracker {measurementsCap : k }
131+ nt := & nextTracker {k : k }
127132 nt .reset ()
128133 return nt
129134}
@@ -135,16 +140,19 @@ type nextTracker struct {
135140 // w is the largest random number in a distribution that is used to compute
136141 // the next next.
137142 w float64
138- // measurementsCap is the number of measurements that can be stored in the
139- // reservoir.
140- measurementsCap int
143+ // wMu ensures w is kept consistent with next during advance and reset.
144+ wMu sync.Mutex
145+ // k is the number of measurements that can be stored in the reservoir.
146+ k int
141147}
142148
143149// reset resets r to the initial state.
144150func (r * nextTracker ) reset () {
151+ r .wMu .Lock ()
152+ defer r .wMu .Unlock ()
145153 // This resets the number of exemplars known.
146154 // Random index inserts should only happen after the storage is full.
147- r .setCountAndNext (0 , uint64 (r .measurementsCap ))
155+ r .setCountAndNext (0 , uint64 (r .k )) // nolint:gosec // we ensure k is 1 or greater.
148156
149157 // Initial random number in the series used to generate r.next.
150158 //
@@ -155,7 +163,7 @@ func (r *nextTracker) reset() {
155163 // This maps the uniform random number in (0,1) to a geometric distribution
156164 // over the same interval. The mean of the distribution is inversely
157165 // proportional to the storage capacity.
158- r .w = math .Exp (math .Log (randomFloat64 ()) / float64 (r .measurementsCap ))
166+ r .w = math .Exp (math .Log (randomFloat64 ()) / float64 (r .k ))
159167
160168 r .advance ()
161169}
@@ -172,7 +180,7 @@ func (r *nextTracker) incrementNext(inc uint64) {
172180}
173181
174182// returns the count before the increment and next value.
175- func (r * nextTracker ) setCountAndNext (count uint64 , next uint64 ) {
183+ func (r * nextTracker ) setCountAndNext (count , next uint64 ) {
176184 r .countAndNext .Store (next << 32 + count )
177185}
178186
@@ -191,7 +199,7 @@ func (r *nextTracker) advance() {
191199 // therefore the next r.w will be based on the same distribution (i.e.
192200 // `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
193201 // computing the next random number `u` and take r.w as `w * u^(1/k)`.
194- r .w *= math .Exp (math .Log (randomFloat64 ()) / float64 (r .measurementsCap ))
202+ r .w *= math .Exp (math .Log (randomFloat64 ()) / float64 (r .k ))
195203 // Use the new random number in the series to calculate the count of the
196204 // next measurement that will be stored.
197205 //
0 commit comments