Skip to content

Commit 1cf7588

Browse files
committed
PMM-13830 Changes.
1 parent 915aceb commit 1cf7588

File tree

8 files changed

+187
-186
lines changed

8 files changed

+187
-186
lines changed

basic/scraper.go

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package basic
22

33
import (
4+
"context"
45
"sync"
56
"time"
67

7-
"github.com/aws/aws-sdk-go/aws"
8-
"github.com/aws/aws-sdk-go/service/cloudwatch"
8+
awsV2 "github.com/aws/aws-sdk-go-v2/aws"
9+
cloudwatchV2 "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
10+
cloudwatchV2types "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
911
"github.com/go-kit/log/level"
1012
"github.com/prometheus/client_golang/prometheus"
1113

@@ -25,17 +27,16 @@ type Scraper struct {
2527
ch chan<- prometheus.Metric
2628

2729
// internal
28-
svc *cloudwatch.CloudWatch
30+
svc *cloudwatchV2.Client
2931
constLabels prometheus.Labels
3032
}
3133

3234
func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prometheus.Metric) *Scraper {
33-
// Create CloudWatch client
34-
sess, _ := collector.sessions.GetSession(instance.Region, instance.Instance)
35-
if sess == nil {
35+
cfg, _ := collector.sessions.GetSession(instance.Region, instance.Instance)
36+
if cfg == nil {
3637
return nil
3738
}
38-
svc := cloudwatch.New(sess)
39+
svc := cloudwatchV2.NewFromConfig(*cfg)
3940

4041
constLabels := prometheus.Labels{
4142
"region": instance.Region,
@@ -61,15 +62,13 @@ func NewScraper(instance *config.Instance, collector *Collector, ch chan<- prome
6162
}
6263
}
6364

64-
func getLatestDatapoint(datapoints []*cloudwatch.Datapoint) *cloudwatch.Datapoint {
65-
var latest *cloudwatch.Datapoint = nil
66-
67-
for dp := range datapoints {
68-
if latest == nil || latest.Timestamp.Before(*datapoints[dp].Timestamp) {
69-
latest = datapoints[dp]
65+
func getLatestDatapoint(datapoints []cloudwatchV2types.Datapoint) *cloudwatchV2types.Datapoint {
66+
var latest *cloudwatchV2types.Datapoint = nil
67+
for i := range datapoints {
68+
if latest == nil || latest.Timestamp.Before(*datapoints[i].Timestamp) {
69+
latest = &datapoints[i]
7070
}
7171
}
72-
7372
return latest
7473
}
7574

@@ -96,46 +95,35 @@ func (s *Scraper) scrapeMetric(metric Metric) error {
9695
now := time.Now()
9796
end := now.Add(-Delay)
9897

99-
params := &cloudwatch.GetMetricStatisticsInput{
100-
EndTime: aws.Time(end),
101-
StartTime: aws.Time(end.Add(-Range)),
102-
103-
Period: aws.Int64(int64(Period.Seconds())),
104-
MetricName: aws.String(metric.cwName),
105-
Namespace: aws.String("AWS/RDS"),
106-
Dimensions: []*cloudwatch.Dimension{},
107-
Statistics: aws.StringSlice([]string{"Average"}),
108-
Unit: nil,
98+
params := &cloudwatchV2.GetMetricStatisticsInput{
99+
EndTime: awsV2.Time(end),
100+
StartTime: awsV2.Time(end.Add(-Range)),
101+
Period: awsV2.Int32(int32(Period.Seconds())),
102+
MetricName: awsV2.String(metric.cwName),
103+
Namespace: awsV2.String("AWS/RDS"),
104+
Dimensions: []cloudwatchV2types.Dimension{{
105+
Name: awsV2.String("DBInstanceIdentifier"),
106+
Value: awsV2.String(s.instance.Instance),
107+
}},
108+
Statistics: []cloudwatchV2types.Statistic{cloudwatchV2types.StatisticAverage},
109109
}
110110

111-
params.Dimensions = append(params.Dimensions, &cloudwatch.Dimension{
112-
Name: aws.String("DBInstanceIdentifier"),
113-
Value: aws.String(s.instance.Instance),
114-
})
115-
116-
// Call CloudWatch to gather the datapoints
117-
resp, err := s.svc.GetMetricStatistics(params)
111+
resp, err := s.svc.GetMetricStatistics(context.Background(), params)
118112
if err != nil {
119113
return err
120114
}
121115

122-
// There's nothing in there, don't publish the metric
123116
if len(resp.Datapoints) == 0 {
124117
return nil
125118
}
126119

127-
// Pick the latest datapoint
128120
dp := getLatestDatapoint(resp.Datapoints)
129-
130-
// Get the metric.
131-
v := aws.Float64Value(dp.Average)
121+
v := awsV2.ToFloat64(dp.Average)
132122
switch metric.cwName {
133123
case "EngineUptime":
134-
// "Fake EngineUptime -> node_boot_time with time.Now().Unix() - EngineUptime."
135124
v = float64(time.Now().Unix() - int64(v))
136125
}
137126

138-
// Send metric.
139127
s.ch <- prometheus.MustNewConstMetric(
140128
prometheus.NewDesc(metric.prometheusName, metric.prometheusHelp, nil, s.constLabels),
141129
prometheus.GaugeValue,

enhanced/collector.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func NewCollector(sessions *sessions.Sessions, logger log.Logger) *Collector {
3838

3939
for session, instances := range sessions.AllSessions() {
4040
enabledInstances := getEnabledInstances(instances)
41-
s := newScraper(session, enabledInstances, logger)
41+
cfg := sessions.Configs[session]
42+
s := newScraper(cfg, enabledInstances, logger)
4243

4344
interval := maxInterval
4445
for _, instance := range enabledInstances {

enhanced/scraper.go

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/aws/aws-sdk-go/aws"
9-
"github.com/aws/aws-sdk-go/aws/session"
10-
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
8+
awsV2 "github.com/aws/aws-sdk-go-v2/aws"
9+
cloudwatchlogsV2 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
1110
"github.com/go-kit/log"
1211
"github.com/go-kit/log/level"
1312
"github.com/prometheus/client_golang/prometheus"
@@ -19,14 +18,14 @@ import (
1918
type scraper struct {
2019
instances []sessions.Instance
2120
logStreamNames []string
22-
svc *cloudwatchlogs.CloudWatchLogs
21+
svc *cloudwatchlogsV2.Client
2322
nextStartTime time.Time
2423
logger log.Logger
2524

2625
testDisallowUnknownFields bool // for tests only
2726
}
2827

29-
func newScraper(session *session.Session, instances []sessions.Instance, logger log.Logger) *scraper {
28+
func newScraper(cfg awsV2.Config, instances []sessions.Instance, logger log.Logger) *scraper {
3029
logStreamNames := make([]string, 0, len(instances))
3130
for _, instance := range instances {
3231
logStreamNames = append(logStreamNames, instance.ResourceID)
@@ -35,7 +34,7 @@ func newScraper(session *session.Session, instances []sessions.Instance, logger
3534
return &scraper{
3635
instances: instances,
3736
logStreamNames: logStreamNames,
38-
svc: cloudwatchlogs.New(session),
37+
svc: cloudwatchlogsV2.NewFromConfig(cfg),
3938
nextStartTime: time.Now().Add(-3 * time.Minute).Round(0), // strip monotonic clock reading
4039
logger: log.With(logger, "component", "enhanced"),
4140
}
@@ -63,7 +62,6 @@ func (s *scraper) start(ctx context.Context, interval time.Duration, ch chan<- m
6362

6463
// scrape performs a single scrape.
6564
func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, map[string]string) {
66-
6765
allMetrics := make(map[string]map[time.Time][]prometheus.Metric) // ResourceID -> event timestamp -> metrics
6866
allMessages := make(map[string]map[time.Time]string) // ResourceID -> event timestamp -> message
6967

@@ -77,26 +75,31 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m
7775
sliceEnd = streamCount
7876
}
7977

80-
input := &cloudwatchlogs.FilterLogEventsInput{
81-
LogGroupName: aws.String("RDSOSMetrics"),
82-
LogStreamNames: aws.StringSlice(s.logStreamNames[sliceStart:sliceEnd]),
83-
StartTime: aws.Int64(aws.TimeUnixMilli(s.nextStartTime)),
78+
input := &cloudwatchlogsV2.FilterLogEventsInput{
79+
LogGroupName: awsV2.String("RDSOSMetrics"),
80+
LogStreamNames: s.logStreamNames[sliceStart:sliceEnd],
81+
StartTime: awsV2.Int64(s.nextStartTime.UnixMilli()),
8482
}
8583

8684
level.Debug(log.With(s.logger, "next_start", s.nextStartTime.UTC(), "since_last", time.Since(s.nextStartTime))).Log("msg", "Requesting metrics")
8785

88-
// collect all returned events and metrics/messages
89-
collectAllMetrics := func(output *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
86+
paginator := cloudwatchlogsV2.NewFilterLogEventsPaginator(s.svc, input)
87+
for paginator.HasMorePages() {
88+
output, err := paginator.NextPage(ctx)
89+
if err != nil {
90+
level.Error(s.logger).Log("msg", "Failed to filter log events.", "error", err)
91+
break
92+
}
9093
for _, event := range output.Events {
9194
l := log.With(s.logger,
92-
"EventId", *event.EventId,
93-
"LogStreamName", *event.LogStreamName,
94-
"Timestamp", aws.MillisecondsTimeValue(event.Timestamp).UTC(),
95-
"IngestionTime", aws.MillisecondsTimeValue(event.IngestionTime).UTC())
95+
"EventId", awsV2.ToString(event.EventId),
96+
"LogStreamName", awsV2.ToString(event.LogStreamName),
97+
"Timestamp", time.UnixMilli(awsV2.ToInt64(event.Timestamp)).UTC(),
98+
"IngestionTime", time.UnixMilli(awsV2.ToInt64(event.IngestionTime)).UTC())
9699

97100
var instance *sessions.Instance
98101
for _, i := range s.instances {
99-
if i.ResourceID == *event.LogStreamName {
102+
if i.ResourceID == awsV2.ToString(event.LogStreamName) {
100103
instance = &i
101104
break
102105
}
@@ -112,8 +115,7 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m
112115
}
113116
l = log.With(l, "region", instance.Region, "instance", instance.Instance)
114117

115-
// l.Debugf("Message:\n%s", *event.Message)
116-
osMetrics, err := parseOSMetrics([]byte(*event.Message), s.testDisallowUnknownFields)
118+
osMetrics, err := parseOSMetrics([]byte(awsV2.ToString(event.Message)), s.testDisallowUnknownFields)
117119
if err != nil {
118120
// only for tests
119121
if s.testDisallowUnknownFields {
@@ -123,9 +125,8 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m
123125
level.Error(l).Log("msg", "Failed to parse metrics.", "error", err)
124126
continue
125127
}
126-
// l.Debugf("OS Metrics:\n%#v", osMetrics)
127128

128-
timestamp := aws.MillisecondsTimeValue(event.Timestamp).UTC()
129+
timestamp := time.UnixMilli(awsV2.ToInt64(event.Timestamp)).UTC()
129130
level.Debug(l).Log("msg", fmt.Sprintf("Timestamp from message: %s; from event: %s.", osMetrics.Timestamp.UTC(), timestamp))
130131

131132
if allMetrics[instance.ResourceID] == nil {
@@ -136,13 +137,8 @@ func (s *scraper) scrape(ctx context.Context) (map[string][]prometheus.Metric, m
136137
if allMessages[instance.ResourceID] == nil {
137138
allMessages[instance.ResourceID] = make(map[time.Time]string)
138139
}
139-
allMessages[instance.ResourceID][timestamp] = *event.Message
140+
allMessages[instance.ResourceID][timestamp] = awsV2.ToString(event.Message)
140141
}
141-
142-
return true // continue pagination
143-
}
144-
if err := s.svc.FilterLogEventsPagesWithContext(ctx, input, collectAllMetrics); err != nil {
145-
level.Error(s.logger).Log("msg", "Failed to filter log events.", "error", err)
146142
}
147143
}
148144
// get better times

enhanced/scraper_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ func TestScraper(t *testing.T) {
4949
for session, instances := range sess.AllSessions() {
5050
session, instances := session, instances
5151
t.Run(fmt.Sprint(instances), func(t *testing.T) {
52-
// test that there are no new metrics
53-
s := newScraper(session, instances, logger)
52+
cfg := sess.Configs[session]
53+
s := newScraper(cfg, instances, logger)
5454
s.testDisallowUnknownFields = true
5555
metrics, messages := s.scrape(context.Background())
5656
require.Len(t, metrics, len(instances))
@@ -165,7 +165,7 @@ func TestScraperDisableEnhancedMetrics(t *testing.T) {
165165
for session, instances := range sess.AllSessions() {
166166
session, instances := session, instances
167167
t.Run(fmt.Sprint(instances), func(t *testing.T) {
168-
s := newScraper(session, instances, logger)
168+
s := newScraper(sess.Configs[session], instances, logger)
169169
s.testDisallowUnknownFields = true
170170
metrics, _ := s.scrape(context.Background())
171171

go.mod

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@ module github.com/percona/rds_exporter
33
go 1.23
44

55
require (
6-
github.com/aws/aws-sdk-go v1.36.30
6+
github.com/aws/aws-sdk-go-v2 v1.36.5
7+
github.com/aws/aws-sdk-go-v2/config v1.29.17
8+
github.com/aws/aws-sdk-go-v2/credentials v1.17.70
9+
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.45.3
10+
github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs v1.51.0
11+
github.com/aws/aws-sdk-go-v2/service/rds v1.99.0
12+
github.com/aws/aws-sdk-go-v2/service/sts v1.34.0
713
github.com/go-kit/log v0.2.0
814
github.com/percona/exporter_shared v0.7.4
915
github.com/prometheus/client_golang v1.14.0
@@ -16,12 +22,21 @@ require (
1622
require (
1723
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
1824
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15 // indirect
25+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect
26+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.32 // indirect
27+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.36 // indirect
28+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.36 // indirect
29+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
30+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.4 // indirect
31+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.17 // indirect
32+
github.com/aws/aws-sdk-go-v2/service/sso v1.25.5 // indirect
33+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.30.3 // indirect
34+
github.com/aws/smithy-go v1.22.4 // indirect
1935
github.com/beorn7/perks v1.0.1 // indirect
2036
github.com/cespare/xxhash/v2 v2.1.2 // indirect
2137
github.com/davecgh/go-spew v1.1.1 // indirect
2238
github.com/go-logfmt/logfmt v0.5.1 // indirect
2339
github.com/golang/protobuf v1.5.2 // indirect
24-
github.com/jmespath/go-jmespath v0.4.0 // indirect
2540
github.com/kr/text v0.2.0 // indirect
2641
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
2742
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect

0 commit comments

Comments
 (0)