|
31 | 31 | from cachetools import Cache, LRUCache, TTLCache |
32 | 32 | from httpx import Response |
33 | 33 |
|
| 34 | +from confluent_kafka import version |
34 | 35 | from confluent_kafka.schema_registry.common.schema_registry_client import ( |
35 | 36 | RegisteredSchema, |
36 | 37 | Schema, |
37 | 38 | SchemaVersion, |
38 | 39 | ServerConfig, |
39 | 40 | _BearerFieldProvider, |
40 | | - _SchemaCache, |
41 | 41 | _StaticFieldProvider, |
| 42 | + _SchemaCache, |
42 | 43 | full_jitter, |
43 | 44 | is_retriable, |
44 | 45 | is_success, |
@@ -441,7 +442,9 @@ def delete(self, url: str) -> Any: |
441 | 442 | def put(self, url: str, body: Optional[dict] = None) -> Any: |
442 | 443 | return self.send_request(url, method='PUT', body=body) |
443 | 444 |
|
444 | | - def send_request(self, url: str, method: str, body: Optional[dict] = None, query: Optional[dict] = None) -> Any: |
| 445 | + def send_request( |
| 446 | + self, url: str, method: str, body: Optional[dict] = None, query: Optional[dict] = None |
| 447 | + ) -> Any: |
445 | 448 | """ |
446 | 449 | Sends HTTP request to the SchemaRegistry, trying each base URL in turn. |
447 | 450 |
|
@@ -477,6 +480,7 @@ def send_request(self, url: str, method: str, body: Optional[dict] = None, query |
477 | 480 | 'Content-Length': str(len(body_str)), |
478 | 481 | 'Content-Type': "application/vnd.schemaregistry.v1+json", |
479 | 482 | 'Confluent-Accept-Unknown-Properties': "true", |
| 483 | + 'Confluent-Client-Version': f"python/{version()}" |
480 | 484 | } |
481 | 485 |
|
482 | 486 | if self.bearer_auth_credentials_source: |
@@ -942,7 +946,9 @@ def lookup_schema( |
942 | 946 |
|
943 | 947 | query_string = '&'.join(f"{key}={value}" for key, value in query_params.items()) |
944 | 948 |
|
945 | | - response = self._rest_client.post('subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request) |
| 949 | + response = self._rest_client.post( |
| 950 | + 'subjects/{}?{}'.format(_urlencode(subject_name), query_string), body=request |
| 951 | + ) |
946 | 952 |
|
947 | 953 | result = RegisteredSchema.from_dict(response) |
948 | 954 |
|
@@ -1043,7 +1049,9 @@ def get_latest_version(self, subject_name: str, fmt: Optional[str] = None) -> 'R |
1043 | 1049 | return registered_schema |
1044 | 1050 |
|
1045 | 1051 | query = {'format': fmt} if fmt is not None else None |
1046 | | - response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), 'latest'), query) |
| 1052 | + response = self._rest_client.get( |
| 1053 | + 'subjects/{}/versions/{}'.format(_urlencode(subject_name), 'latest'), query |
| 1054 | + ) |
1047 | 1055 |
|
1048 | 1056 | registered_schema = RegisteredSchema.from_dict(response) |
1049 | 1057 |
|
@@ -1123,7 +1131,9 @@ def get_version( |
1123 | 1131 | return registered_schema |
1124 | 1132 |
|
1125 | 1133 | query: dict[str, Any] = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted} |
1126 | | - response = self._rest_client.get('subjects/{}/versions/{}'.format(_urlencode(subject_name), version), query) |
| 1134 | + response = self._rest_client.get( |
| 1135 | + 'subjects/{}/versions/{}'.format(_urlencode(subject_name), version), query |
| 1136 | + ) |
1127 | 1137 |
|
1128 | 1138 | registered_schema = RegisteredSchema.from_dict(response) |
1129 | 1139 |
|
@@ -1210,7 +1220,9 @@ def delete_version(self, subject_name: str, version: int, permanent: bool = Fals |
1210 | 1220 | 'subjects/{}/versions/{}?permanent=true'.format(_urlencode(subject_name), version) |
1211 | 1221 | ) |
1212 | 1222 | else: |
1213 | | - response = self._rest_client.delete('subjects/{}/versions/{}'.format(_urlencode(subject_name), version)) |
| 1223 | + response = self._rest_client.delete( |
| 1224 | + 'subjects/{}/versions/{}'.format(_urlencode(subject_name), version) |
| 1225 | + ) |
1214 | 1226 |
|
1215 | 1227 | # Clear cache for both soft and hard deletes to maintain consistency |
1216 | 1228 | self._cache.remove_by_subject_version(subject_name, version) |
@@ -1338,7 +1350,9 @@ def test_compatibility_all_versions( |
1338 | 1350 | ) |
1339 | 1351 | return response['is_compatible'] |
1340 | 1352 |
|
1341 | | - def set_config(self, subject_name: Optional[str] = None, config: Optional['ServerConfig'] = None) -> 'ServerConfig': |
| 1353 | + def set_config( |
| 1354 | + self, subject_name: Optional[str] = None, config: Optional['ServerConfig'] = None |
| 1355 | + ) -> 'ServerConfig': |
1342 | 1356 | """ |
1343 | 1357 | Update global or subject config. |
1344 | 1358 |
|
|
0 commit comments