Skip to content

Commit c667895

Browse files
fix: buckets attribute (#252)
* fix: bucket attribute * fix: pyrate-limiter * fix * Update deepset_cloud_sdk/_s3/upload.py Co-authored-by: Abraham Yusuf <abrahamy@users.noreply.github.com> --------- Co-authored-by: Abraham Yusuf <abrahamy@users.noreply.github.com>
1 parent f47934b commit c667895

File tree

2 files changed

+41
-10
lines changed

2 files changed

+41
-10
lines changed

deepset_cloud_sdk/_s3/upload.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ class RetryableHttpError(Exception):
3030
"""An error that indicates a function should be retried."""
3131

3232
def __init__(
33-
self, error: Union[aiohttp.ClientResponseError, aiohttp.ServerDisconnectedError, aiohttp.ClientConnectionError]
33+
self,
34+
error: Union[
35+
aiohttp.ClientResponseError,
36+
aiohttp.ServerDisconnectedError,
37+
aiohttp.ClientConnectionError,
38+
],
3439
) -> None:
3540
"""Store the original exception."""
3641
self.error = error
@@ -76,7 +81,12 @@ def make_safe_file_name(file_name: str) -> str:
7681
class S3:
7782
"""Client for S3 operations related to deepset Cloud uploads."""
7883

79-
def __init__(self, concurrency: int = 120, rate_limit: Rate = Rate(3000, Duration.SECOND), max_attempts: int = 5):
84+
def __init__(
85+
self,
86+
concurrency: int = 120,
87+
rate_limit: Rate = Rate(3000, Duration.SECOND),
88+
max_attempts: int = 5,
89+
):
8090
"""
8191
Initialize the client.
8292
@@ -99,8 +109,15 @@ async def __aexit__(
99109
) -> None:
100110
"""Exit the context manager."""
101111
await self.connector.close()
102-
for bucket in self.limiter.buckets():
103-
self.limiter.dispose(bucket)
112+
113+
# Handle limiter cleanup based on available methods
114+
# Support both older and newer versions of pyrate_limiter
115+
# In version 3.7.0, the dispose method was added to the Limiter class
116+
# See diff here: https://github.com/vutran1710/PyrateLimiter/compare/v3.6.2...master
117+
try:
118+
list(map(self.limiter.dispose, self.limiter.buckets()))
119+
except AttributeError:
120+
pass
104121

105122
async def _upload_file_with_retries(
106123
self,
@@ -257,7 +274,9 @@ async def upload_from_memory(
257274
return S3UploadResult(file_name=file_name, success=False, exception=exception)
258275

259276
async def _process_results(
260-
self, tasks: List[Coroutine[Any, Any, S3UploadResult]], show_progress: bool = True
277+
self,
278+
tasks: List[Coroutine[Any, Any, S3UploadResult]],
279+
show_progress: bool = True,
261280
) -> S3UploadSummary:
262281
"""Summarize the results of the uploads to S3.
263282
@@ -297,7 +316,10 @@ async def _process_results(
297316
return result_summary
298317

299318
async def upload_files_from_paths(
300-
self, upload_session: UploadSession, file_paths: List[Path], show_progress: bool = True
319+
self,
320+
upload_session: UploadSession,
321+
file_paths: List[Path],
322+
show_progress: bool = True,
301323
) -> S3UploadSummary:
302324
"""Upload a set of files to the prefixed S3 namespace given a list of paths.
303325
@@ -316,7 +338,10 @@ async def upload_files_from_paths(
316338
return result_summary
317339

318340
async def upload_in_memory(
319-
self, upload_session: UploadSession, files: Sequence[DeepsetCloudFileBase], show_progress: bool = True
341+
self,
342+
upload_session: UploadSession,
343+
files: Sequence[DeepsetCloudFileBase],
344+
show_progress: bool = True,
320345
) -> S3UploadSummary:
321346
"""Upload a set of files to the prefixed S3 namespace given a list of paths.
322347
@@ -326,7 +351,8 @@ async def upload_in_memory(
326351
:return: S3UploadSummary object.
327352
"""
328353
async with aiohttp.ClientSession(
329-
connector=self.connector, timeout=aiohttp.ClientTimeout(total=ASYNC_CLIENT_TIMEOUT)
354+
connector=self.connector,
355+
timeout=aiohttp.ClientTimeout(total=ASYNC_CLIENT_TIMEOUT),
330356
) as client_session:
331357
tasks = []
332358

@@ -339,7 +365,12 @@ async def upload_in_memory(
339365
if file.meta is not None:
340366
meta_name = f"{file_name}.meta.json"
341367
tasks.append(
342-
self.upload_from_memory(meta_name, upload_session, file.meta_as_string(), client_session)
368+
self.upload_from_memory(
369+
meta_name,
370+
upload_session,
371+
file.meta_as_string(),
372+
client_session,
373+
)
343374
)
344375

345376
result_summary = await self._process_results(tasks, show_progress=show_progress)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ dependencies = [
3232
"tabulate>=0.9.0",
3333
"tqdm>=4.66.4",
3434
"yaspin>=3.0.0",
35-
"pyrate-limiter>=3.6.0",
35+
"pyrate-limiter>=3.7.0",
3636
]
3737

3838
[project.urls]

0 commit comments

Comments
 (0)