@@ -13,6 +13,7 @@ class MetricBase:
13
13
collection_end = "METRICS OPERATOR COLLECTION END"
14
14
metadata_start = "METADATA START"
15
15
metadata_end = "METADATA END"
16
+ container_name = None
16
17
17
18
def __init__ (self , spec = None , ** kwargs ):
18
19
"""
@@ -23,6 +24,10 @@ def __init__(self, spec=None, **kwargs):
23
24
self .spec = spec
24
25
self ._core_v1 = kwargs .get ("core_v1_api" )
25
26
27
+ # If we don't have a default container name...
28
+ if not self .container_name :
29
+ self .container_name = kwargs .get ("container_name" ) or "launcher"
30
+
26
31
# Load kubeconfig on Metricbase init only
27
32
if self .spec is not None :
28
33
config .load_kube_config ()
@@ -55,6 +60,14 @@ def parse(self, pod, container):
55
60
)
56
61
return self .parse_log (lines )
57
62
63
+ def parse_log (self , lines ):
64
+ """
65
+ If the parser doesn't have anything, just return the lines
66
+ """
67
+ # Get the log metadata, split lines by newline so not so hefty a log!
68
+ metadata = self .get_log_metadata (lines )
69
+ return {"data" : lines .split ("\n " ), "metadata" : metadata , "spec" : self .spec }
70
+
58
71
@property
59
72
def core_v1 (self ):
60
73
"""
@@ -69,12 +82,14 @@ def core_v1(self):
69
82
self ._core_v1 = core_v1_api .CoreV1Api ()
70
83
return self ._core_v1
71
84
72
- def logging_containers (self , namespace = None , states = None , retry_seconds = 5 ):
85
+ def logging_containers (
86
+ self , namespace = None , states = None , retry_seconds = 5 , pod_prefix = None
87
+ ):
73
88
"""
74
89
Return list of containers intended to get logs from
75
90
"""
76
91
containers = []
77
- pods = self .wait (namespace , states , retry_seconds )
92
+ pods = self .wait (namespace , states , retry_seconds , pod_prefix = pod_prefix )
78
93
container_name = getattr (self , "container_name" , self .container )
79
94
print (f"Looking for container name { container_name } ..." )
80
95
for pod in pods .items :
@@ -90,17 +105,27 @@ def logging_containers(self, namespace=None, states=None, retry_seconds=5):
90
105
)
91
106
return containers
92
107
93
- def wait (self , namespace = None , states = None , retry_seconds = 5 ):
108
+ def get_pod_prefix (self , pod_prefix = None ):
109
+ """
110
+ Return the default or a custom pod prefix.
111
+ """
112
+ pod_prefix = pod_prefix or getattr (self , "pod_prefix" , None )
113
+ if not pod_prefix :
114
+ raise ValueError ("A pod prefix 'pod_prefix' is required to wait for pods." )
115
+ return pod_prefix
116
+
117
+ def wait (self , namespace = None , states = None , retry_seconds = 5 , pod_prefix = None ):
94
118
"""
95
119
Wait for one or more pods of interest to be done.
96
120
97
121
This assumes creation or a consistent size of pod getting to a
98
122
particular state. If looking for Termination -> gone, use
99
123
wait_for_delete.
100
124
"""
125
+ pod_prefix = self .get_pod_prefix (pod_prefix )
101
126
namespace = namespace or self .namespace
102
- print (f"Looking for prefix { self . pod_prefix } in namespace { namespace } " )
103
- pod_list = self .get_pods (namespace , self . pod_prefix )
127
+ print (f"Looking for prefix { pod_prefix } in namespace { namespace } " )
128
+ pod_list = self .get_pods (namespace , pod_prefix )
104
129
size = len (pod_list .items )
105
130
106
131
# We only want logs when they are completed
@@ -111,7 +136,7 @@ def wait(self, namespace=None, states=None, retry_seconds=5):
111
136
ready = set ()
112
137
while len (ready ) != size :
113
138
print (f"{ len (ready )} pods are ready, out of { size } " )
114
- pod_list = self .get_pods (name = self . pod_prefix , namespace = namespace )
139
+ pod_list = self .get_pods (name = pod_prefix , namespace = namespace )
115
140
116
141
for pod in pod_list .items :
117
142
print (f"{ pod .metadata .name } is in phase { pod .status .phase } " )
@@ -126,16 +151,17 @@ def wait(self, namespace=None, states=None, retry_seconds=5):
126
151
print (f'All pods are in states "{ states } "' )
127
152
return pod_list
128
153
129
- def wait_for_delete (self , namespace = None , retry_seconds = 5 ):
154
+ def wait_for_delete (self , namespace = None , retry_seconds = 5 , pod_prefix = None ):
130
155
"""
131
156
Wait for one or more pods of interest to be gone
132
157
"""
158
+ pod_prefix = self .get_pod_prefix (pod_prefix )
133
159
namespace = namespace or self .namespace
134
- print (f"Looking for prefix { self . pod_prefix } in namespace { namespace } " )
135
- pod_list = self .get_pods (namespace , name = self . pod_prefix )
160
+ print (f"Looking for prefix { pod_prefix } in namespace { namespace } " )
161
+ pod_list = self .get_pods (namespace , name = pod_prefix )
136
162
while len (pod_list .items ) != 0 :
137
163
print (f"{ len (pod_list .items )} pods exist, waiting for termination." )
138
- pod_list = self .get_pods (name = self . pod_prefix , namespace = namespace )
164
+ pod_list = self .get_pods (name = pod_prefix , namespace = namespace )
139
165
time .sleep (retry_seconds )
140
166
print ("All pods are terminated." )
141
167
0 commit comments