Skip to content

Commit a5abee8

Browse files
committed
fix index mapping updating
1 parent aba1d9b commit a5abee8

File tree

2 files changed

+20
-29
lines changed

2 files changed

+20
-29
lines changed

target_elasticsearch/sinks.py

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,7 @@ def __init__(
3232
):
3333
super().__init__(target, stream_name, schema, key_properties)
3434
self.client = self._authenticated_client()
35-
self.index_schema_fields = self.config.get("index_schema_fields", {}).get(
36-
self.stream_name, {}
37-
)
35+
self.index_schema_fields = self.config.get("index_schema_fields", {}).get(self.stream_name, {})
3836
self.metadata_fields = self.config.get("metadata_fields", {}).get(self.stream_name, {})
3937
self.index_mappings = self.config.get("index_mappings", {}).get(self.stream_name, {})
4038
self.index_name = None
@@ -99,9 +97,7 @@ def _build_fields(
9997
for k, v in mapping.items():
10098
match = jsonpath_ng.parse(v).find(record)
10199
if len(match) == 0:
102-
self.logger.warning(
103-
f"schema key {k} with json path {v} could not be found in record: {record}"
104-
)
100+
self.logger.warning(f"schema key {k} with json path {v} could not be found in record: {record}")
105101
schemas[k] = v
106102
else:
107103
if len(match) > 1:
@@ -154,23 +150,24 @@ def create_index(self, index: str) -> None:
154150
mappings = {
155151
key: value["mapping"][key]["type"]
156152
for key, value in self.client.indices.get_field_mapping(
157-
index=index, fields=self.index_mappings.keys()
158-
)["mappings"].items()
153+
index=index, fields=list(self.index_mappings.keys())
154+
)[index]["mappings"].items()
159155
}
160-
if not all(self.index_mappings[key] == value for key, value in mappings):
161-
self.logger.warning(
162-
f"Index {index} already exists with different mappings. Recreate index with new mappings."
163-
)
164-
elif mappings.keys() != self.index_mappings.keys():
165-
self.logger.info(
166-
f"Index {index} exists but with different fields. Updating mapping for existing index."
167-
)
168-
self.client.indices.put_mapping(index=index, body=self.index_mappings)
156+
if not all(self.index_mappings[key]["type"] == value for key, value in mappings.items()):
157+
try:
158+
self.client.indices.put_mapping(index=index, body={"properties": self.index_mappings})
159+
except elasticsearch.exceptions.BadRequestError as e:
160+
if e.message == "illegal_argument_exception":
161+
self.logger.warning(
162+
f"Failed to update mapping for index {index}: {e}, recreate index to apply new mappings."
163+
)
164+
else:
165+
raise e
169166
else:
170167
self.logger.debug(f"Index {index} already exists, skipping creation.")
171168
else:
172169
self.logger.info(f"Creating index {index} with mappings: {self.index_mappings}")
173-
self.client.indices.create(index=index, mappings=self.index_mappings)
170+
self.client.indices.create(index=index, mappings={"properties": self.index_mappings})
174171

175172
def _authenticated_client(self) -> elasticsearch.Elasticsearch:
176173
"""Generate a newly authenticated Elasticsearch client.
@@ -212,9 +209,7 @@ def process_batch(self, context: dict[str, Any]) -> None:
212209
Args:
213210
context: Dictionary containing batch processing context including records.
214211
"""
215-
updated_records, distinct_indices = self.build_request_body_and_distinct_indices(
216-
context["records"]
217-
)
212+
updated_records, distinct_indices = self.build_request_body_and_distinct_indices(context["records"])
218213
for index in distinct_indices:
219214
self.create_index(index)
220215
try:

target_elasticsearch/target.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ class TargetElasticsearch(Target):
120120
th.ObjectType(),
121121
description="""Index Mappings allows you to define field mappings for each stream/index.
122122
This creates or updates the Elasticsearch index mapping with the specified field types and properties.
123-
Format: {"stream_name": {"properties": {"field_name": {"type": "text", "analyzer": "standard"}}}}
124-
Example: {"users": {"properties": {"email": {"type": "keyword"}, "created_at": {"type": "date"}}}}
123+
Format: {"stream_name": {"field_name": {"type": "text", "analyzer": "standard"}}}
124+
Example: {"users": {"email": {"type": "keyword"}, "created_at": {"type": "date"}}}
125125
See: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html""",
126126
default=None,
127127
),
@@ -154,12 +154,8 @@ def __init__(
154154
validate_config=validate_config,
155155
setup_mapper=setup_mapper,
156156
)
157-
assert bool(self.config.get("username") is None) == bool(
158-
self.config.get("password") is None
159-
)
160-
assert bool(self.config.get("api_key_id") is None) == bool(
161-
self.config.get("api_key") is None
162-
)
157+
assert bool(self.config.get("username") is None) == bool(self.config.get("password") is None)
158+
assert bool(self.config.get("api_key_id") is None) == bool(self.config.get("api_key") is None)
163159

164160
@property
165161
def state(self) -> Dict:

0 commit comments

Comments
 (0)