1
1
import logging
2
2
import os
3
3
import sys
4
+ from dataclasses import dataclass , field
4
5
from pathlib import Path
5
6
6
7
import requests
@@ -54,7 +55,7 @@ def collect_missing_imports(advices: list[LocatedAdvice]):
54
55
return missing_imports
55
56
56
57
57
- def collect_not_computed (advices : list [LocatedAdvice ]):
58
+ def collect_uninferrable_count (advices : list [LocatedAdvice ]):
58
59
not_computed = 0
59
60
for located_advice in advices :
60
61
if "computed" in located_advice .advice .message :
@@ -68,93 +69,121 @@ def print_advices(advices: list[LocatedAdvice], file: Path):
68
69
sys .stdout .write (f"{ message } \n " )
69
70
70
71
71
- def lint_one (file : Path , ctx : LocalCheckoutContext , unparsed : Path | None ) -> tuple [set [str ], int , int ]:
72
+ @dataclass
73
+ class SolaccContext :
74
+ unparsed_path : Path | None = None
75
+ files_to_skip : set [str ] | None = None
76
+ total_count = 0
77
+ parseable_count = 0
78
+ uninferrable_count = 0
79
+ missing_imports : dict [str , dict [str , int ]] = field (default_factory = dict )
80
+
81
+ @classmethod
82
+ def create (cls , lint_all : bool ):
83
+ unparsed_path : Path | None = None
84
+ # if lint_all, recreate "solacc-unparsed.txt"
85
+ if lint_all is None :
86
+ unparsed_path = Path (Path (__file__ ).parent , "solacc-unparsed.txt" )
87
+ if unparsed_path .exists ():
88
+ os .remove (unparsed_path )
89
+ files_to_skip : set [str ] | None = None
90
+ malformed = Path (__file__ ).parent / "solacc-malformed.txt"
91
+ if lint_all and malformed .exists ():
92
+ lines = malformed .read_text (encoding = "utf-8" ).split ("\n " )
93
+ files_to_skip = set (line for line in lines if len (line ) > 0 and not line .startswith ("#" ))
94
+ return SolaccContext (unparsed_path = unparsed_path , files_to_skip = files_to_skip )
95
+
96
+ def register_missing_import (self , missing_import : str ):
97
+ prefix = missing_import .split ("." )[0 ]
98
+ details = self .missing_imports .get (prefix , None )
99
+ if details is None :
100
+ details = {}
101
+ self .missing_imports [prefix ] = details
102
+ count = details .get (missing_import , 0 )
103
+ details [missing_import ] = count + 1
104
+
105
+ def log_missing_imports (self ):
106
+ missing_imports = dict (sorted (self .missing_imports .items (), key = lambda item : sum (item [1 ].values ()), reverse = True ))
107
+ for prefix , details in missing_imports .items ():
108
+ logger .info (f"Missing import '{ prefix } '" )
109
+ for item , count in details .items ():
110
+ logger .info (f" { item } : { count } occurrences" )
111
+
112
+
113
+
114
+ def lint_one (solacc : SolaccContext , file : Path , ctx : LocalCheckoutContext ) -> None :
72
115
try :
73
116
advices = list (ctx .local_code_linter .lint_path (file , set ()))
117
+ solacc .parseable_count += 1
74
118
missing_imports = collect_missing_imports (advices )
75
- not_computed = collect_not_computed (advices )
119
+ for missing_import in missing_imports :
120
+ solacc .register_missing_import (missing_import )
121
+ uninferrable_count = collect_uninferrable_count (advices )
122
+ solacc .uninferrable_count += uninferrable_count
76
123
print_advices (advices , file )
77
- return missing_imports , 1 , not_computed
78
124
except Exception as e : # pylint: disable=broad-except
79
125
# here we're most likely catching astroid & sqlglot errors
80
- if unparsed is None : # linting single file, log exception details
81
- logger .error (f"Error during parsing of { file } : { e } " .replace ("\n " , " " ), exc_info = e )
82
- else :
126
+ # when linting single file, log exception details
127
+ logger .error (f"Error during parsing of { file } : { e } " .replace ("\n " , " " ), exc_info = e if solacc . unparsed_path is None else None )
128
+ if solacc . unparsed_path :
83
129
logger .error (f"Error during parsing of { file } : { e } " .replace ("\n " , " " ))
84
130
# populate solacc-unparsed.txt
85
- with unparsed .open (mode = "a" , encoding = "utf-8" ) as f :
131
+ with solacc . unparsed_path .open (mode = "a" , encoding = "utf-8" ) as f :
86
132
f .write (file .relative_to (dist ).as_posix ())
87
133
f .write ("\n " )
88
- return set (), 0 , 0
89
134
90
135
91
- def lint_all ( file_to_lint : str | None ):
136
+ def lint_dir ( solacc : SolaccContext , dir : Path , file_to_lint : str | None = None ):
92
137
ws = WorkspaceClient (host = '...' , token = '...' )
93
138
ctx = LocalCheckoutContext (ws ).replace (
94
139
linter_context_factory = lambda session_state : LinterContext (TableMigrationIndex ([]), session_state )
95
140
)
96
- parseable = 0
97
- not_computed = 0
98
- missing_imports : dict [str , dict [str , int ]] = {}
99
- all_files = list (dist .glob ('**/*.py' )) if file_to_lint is None else [Path (dist , file_to_lint )]
100
- unparsed : Path | None = None
101
- if file_to_lint is None :
102
- unparsed = Path (Path (__file__ ).parent , "solacc-unparsed.txt" )
103
- if unparsed .exists ():
104
- os .remove (unparsed )
105
- skipped : set [str ] | None = None
106
- malformed = Path (__file__ ).parent / "solacc-malformed.txt"
107
- if file_to_lint is None and malformed .exists ():
108
- lines = malformed .read_text (encoding = "utf-8" ).split ("\n " )
109
- skipped = set (line for line in lines if len (line ) > 0 and not line .startswith ("#" ))
141
+ all_files = list (dir .glob ('**/*.py' )) if file_to_lint is None else [Path (dir , file_to_lint )]
110
142
for file in all_files :
111
- if skipped and file .relative_to (dist ).as_posix () in skipped :
143
+ solacc .total_count += 1
144
+ if solacc .files_to_skip and file .relative_to (dist ).as_posix () in solacc .files_to_skip :
112
145
continue
113
- _missing_imports , _parseable , _not_computed = lint_one (file , ctx , unparsed )
114
- for _import in _missing_imports :
115
- register_missing_import (missing_imports , _import )
116
- parseable += _parseable
117
- not_computed += _not_computed
118
- all_files_len = len (all_files ) - (len (skipped ) if skipped else 0 )
119
- parseable_pct = int (parseable / all_files_len * 100 )
120
- missing_imports_count = sum (sum (details .values ()) for details in missing_imports .values ())
146
+ lint_one (solacc , file , ctx )
147
+
148
+
149
+ def lint_file (file_to_lint : str ):
150
+ solacc = SolaccContext .create (False )
151
+ file_path = Path (file_to_lint )
152
+ lint_dir (solacc , file_path .parent , file_path .name )
153
+
154
+
155
+ def lint_all ():
156
+ solacc = SolaccContext .create (True )
157
+ for dir in os .listdir (dist ):
158
+ lint_dir (solacc , dist / dir )
159
+ all_files_len = solacc .total_count - (len (solacc .files_to_skip ) if solacc .files_to_skip else 0 )
160
+ parseable_pct = int (solacc .parseable_count / all_files_len * 100 )
161
+ missing_imports_count = sum (sum (details .values ()) for details in solacc .missing_imports .values ())
121
162
logger .info (
122
- f"Skipped: { len (skipped or [])} , parseable: { parseable_pct } % ({ parseable } /{ all_files_len } ), missing imports: { missing_imports_count } , not computed: { not_computed } "
163
+ f"Skipped: { len (solacc .files_to_skip or [])} , "
164
+ f"parseable: { parseable_pct } % ({ solacc .parseable_count } /{ all_files_len } ), "
165
+ f"missing imports: { missing_imports_count } , "
166
+ f"not computed: { solacc .uninferrable_count } "
123
167
)
124
- log_missing_imports (missing_imports )
168
+ solacc . log_missing_imports ()
125
169
# fail the job if files are unparseable
126
170
if parseable_pct < 100 :
127
171
sys .exit (1 )
128
172
129
173
130
- def register_missing_import (missing_imports : dict [str , dict [str , int ]], missing_import : str ):
131
- prefix = missing_import .split ("." )[0 ]
132
- details = missing_imports .get (prefix , None )
133
- if details is None :
134
- details = {}
135
- missing_imports [prefix ] = details
136
- count = details .get (missing_import , 0 )
137
- details [missing_import ] = count + 1
138
-
139
-
140
- def log_missing_imports (missing_imports : dict [str , dict [str , int ]]):
141
- missing_imports = dict (sorted (missing_imports .items (), key = lambda item : sum (item [1 ].values ()), reverse = True ))
142
- for prefix , details in missing_imports .items ():
143
- logger .info (f"Missing import '{ prefix } '" )
144
- for item , count in details .items ():
145
- logger .info (f" { item } : { count } occurrences" )
146
-
147
-
148
174
def main (args : list [str ]):
149
175
install_logger ()
150
176
logging .root .setLevel (logging .INFO )
151
177
file_to_lint = args [1 ] if len (args ) > 1 else None
152
- if not file_to_lint :
178
+ if file_to_lint :
153
179
# don't clone if linting just one file, assumption is we're troubleshooting
154
- logger .info ("Cloning..." )
155
- clone_all ()
180
+ logger .info ("Linting..." )
181
+ lint_file (file_to_lint )
182
+ return
183
+ logger .info ("Cloning..." )
184
+ clone_all ()
156
185
logger .info ("Linting..." )
157
- lint_all (file_to_lint )
186
+ lint_all ()
158
187
159
188
160
189
if __name__ == "__main__" :
0 commit comments