Skip to content

Commit 0252c5b

Browse files
authored
Merge pull request #29 from HBS-HBX/#25_evaluate_queryset_inside_workers
closes #25 use pks for queryset inside workers
2 parents 8dedb2b + 20a222d commit 0252c5b

File tree

15 files changed

+2201
-58
lines changed

15 files changed

+2201
-58
lines changed

CHANGELOG.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
Changelog
22
---------
33

4+
0.7.2 (2018-08-13)
5+
~~~~~~~~~~~~~~~~~~
6+
* fix #21 wrong batch update total using multiprocessing in 0.7.1
7+
* fix #23 KeyError _index_version_name in es_update --newer
8+
* address #25 use pks for queryset inside workers #29
9+
410
0.7.1 (2018-08-07)
511
~~~~~~~~~~~~~~~~~~
612
* fixed gh #8 es_dangerous_reset --es-only to sync database to ES

README.md

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,9 @@ params = {
265265
call_command('dumpdata', **params)
266266
```
267267
268+
An example of this is included with the
269+
[moviegen management command](./management/commands/moviegen.py).
270+
268271
### Tuning
269272
By default, `/.manage.py es_update` will divide the result of
270273
`DEMDocType.get_queryset()` into batches of size `DocType.BATCH_SIZE`.
@@ -288,17 +291,92 @@ to see the available `make` targets.
288291
289292
290293
### Requirements
294+
* run `make requirements`* to run the pip install.
295+
291296
*`make upgrade`* upgrades the dependencies of the requirements to latest
292297
version. This process also excludes `django` and `elasticsearch-dsl`
293298
from the `requirements/test.txt` so they can be injected with different
294299
versions by tox during matrix testing.
295300
296-
*`make requirements`* runs the pip install.
297-
298301
This project also uses [`pip-tools`](https://github.com/jazzband/pip-tools).
299302
The `requirements.txt` files are generated and pinned to latest versions
300303
with `make upgrade`.
301304
305+
### Populating Local `tests_movies` Database Table With Data
306+
307+
It may be helpful for you to populate a local database with Movies test
308+
data to experiment with using `django-elastic-migrations`. First,
309+
migrate the database:
310+
311+
`./manage.py migrate --run-syncdb --settings=test_settings`
312+
313+
Next, load the basic fixtures:
314+
315+
`./manage.py loaddata tests/100films.json`
316+
317+
You may wish to add more movies to the database. A management command
318+
has been created for this purpose. Get a [Free OMDB API key here](https://www.omdbapi.com/apikey.aspx),
319+
then run a query like this (replace `MYAPIKEY` with yours):
320+
321+
```
322+
$> ./manage.py moviegen --title="Inception" --api-key="MYAPIKEY"
323+
{'actors': 'Leonardo DiCaprio, Joseph Gordon-Levitt, Ellen Page, Tom Hardy',
324+
'awards': 'Won 4 Oscars. Another 152 wins & 204 nominations.',
325+
'boxoffice': '$292,568,851',
326+
'country': 'USA, UK',
327+
'director': 'Christopher Nolan',
328+
'dvd': '07 Dec 2010',
329+
'genre': 'Action, Adventure, Sci-Fi',
330+
'imdbid': 'tt1375666',
331+
'imdbrating': '8.8',
332+
'imdbvotes': '1,721,888',
333+
'language': 'English, Japanese, French',
334+
'metascore': '74',
335+
'plot': 'A thief, who steals corporate secrets through the use of '
336+
'dream-sharing technology, is given the inverse task of planting an '
337+
'idea into the mind of a CEO.',
338+
'poster': 'https://m.media-amazon.com/images/M/MV5BMjAxMzY3NjcxNF5BMl5BanBnXkFtZTcwNTI5OTM0Mw@@._V1_SX300.jpg',
339+
'production': 'Warner Bros. Pictures',
340+
'rated': 'PG-13',
341+
'ratings': [{'Source': 'Internet Movie Database', 'Value': '8.8/10'},
342+
{'Source': 'Rotten Tomatoes', 'Value': '86%'},
343+
{'Source': 'Metacritic', 'Value': '74/100'}],
344+
'released': '16 Jul 2010',
345+
'response': 'True',
346+
'runtime': 148,
347+
'title': 'Inception',
348+
'type': 'movie',
349+
'website': 'http://inceptionmovie.warnerbros.com/',
350+
'writer': 'Christopher Nolan',
351+
'year': '2010'}
352+
```
353+
354+
To save the movie to the database, use the `--save` flag. Also useful is
355+
the `--noprint` option, to suppress json. Also, if you add
356+
`OMDB_API_KEY=MYAPIKEY` to your environment variables, you don't have
357+
to specify it each time:
358+
359+
```
360+
$ ./manage.py moviegen --title "Closer" --noprint --save
361+
Saved 1 new movie(s) to the database: Closer
362+
```
363+
364+
Now that it's been saved to the database, you may want to create a fixture,
365+
so you can get back to this state in the future.
366+
367+
```
368+
$ ./manage.py moviegen --makefixture=tests/myfixture.json
369+
dumping fixture data to tests/myfixture.json ...
370+
[...........................................................................]
371+
```
372+
373+
Later, you can restore this database with the regular `loaddata` command:
374+
375+
```
376+
$ ./manage.py loaddata tests/myfixture.json
377+
Installed 101 object(s) from 1 fixture(s)
378+
```
379+
302380
### Running Tests Locally
303381
304382
Run `make test`. To run all tests and quality checks locally,

codecov.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ coverage:
44

55
status:
66
project: yes
7-
patch: yes
7+
patch:
8+
default:
9+
enabled: no
810
changes: no
911

1012
ignore:

django_elastic_migrations/__init__.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from django_elastic_migrations.utils import loading
1111
from django_elastic_migrations.utils.django_elastic_migrations_log import get_logger
1212

13-
__version__ = '0.7.1'
13+
__version__ = '0.7.2'
1414

1515
default_app_config = 'django_elastic_migrations.apps.DjangoElasticMigrationsConfig' # pylint: disable=invalid-name
1616

@@ -25,7 +25,6 @@
2525
'This should be the python path to the elasticsearch client '
2626
'to use for indexing.')
2727

28-
logger.debug("using DJANGO_ELASTIC_MIGRATIONS_ES_CLIENT = {}".format(settings.DJANGO_ELASTIC_MIGRATIONS_ES_CLIENT))
2928

3029
try:
3130
es_client = loading.import_module_element(settings.DJANGO_ELASTIC_MIGRATIONS_ES_CLIENT)

django_elastic_migrations/indexes.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -575,10 +575,20 @@ def generate_batches(cls, qs=None, batch_size=BATCH_SIZE, total_items=None, upda
575575
if qs is None:
576576
qs = cls.get_queryset()
577577

578+
update_index_action.add_log("START getting all ids to update")
579+
try:
580+
qs_ids = list(qs.values_list(cls.PK_ATTRIBUTE, flat=True))
581+
except TypeError as e:
582+
if "values_list() got an unexpected keyword argument 'flat'" in e:
583+
qs_ids = [str(id) for id in list(qs.values_list(cls.PK_ATTRIBUTE))]
584+
else:
585+
raise
586+
update_index_action.add_log("END getting all ids to update")
587+
578588
if total_items is None:
579-
total_items = cls.get_queryset_count(qs)
589+
total_items = len(qs_ids)
580590

581-
total_docs = cls.get_total_docs(qs)
591+
total_docs = cls.get_total_docs(cls.get_queryset().filter(id__in=qs_ids))
582592

583593
batches = []
584594

@@ -588,15 +598,8 @@ def generate_batches(cls, qs=None, batch_size=BATCH_SIZE, total_items=None, upda
588598
for start_index in range(0, total_items, batch_size):
589599
# See https://docs.djangoproject.com/en/1.9/ref/models/querysets/#when-querysets-are-evaluated:
590600
# "slicing an unevaluated QuerySet returns another unevaluated QuerySet"
591-
end_index = min(start_index + batch_size, total_items - start_index)
592-
batch_qs = qs[start_index:end_index]
593-
try:
594-
ids_in_batch = list(batch_qs.values_list(cls.PK_ATTRIBUTE, flat=True))
595-
except TypeError as e:
596-
if "values_list() got an unexpected keyword argument 'flat'" in e:
597-
ids_in_batch = [str(id) for id in list(batch_qs.values_list(cls.PK_ATTRIBUTE))]
598-
else:
599-
raise
601+
end_index = min(start_index + batch_size, total_items)
602+
ids_in_batch = qs_ids[start_index:end_index]
600603

601604
batch_index_action = PartialUpdateIndexAction(
602605
index=update_index_action.index,

django_elastic_migrations/models.py

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from copy import deepcopy
1212
from multiprocessing import cpu_count
1313

14-
from django.db import models, transaction
14+
from django.db import models, transaction, OperationalError
1515
from django.utils import timezone
1616
from django.utils.encoding import python_2_unicode_compatible
1717
from elasticsearch import TransportError
@@ -281,6 +281,7 @@ def dem_index(self):
281281
def add_log(self, msg, commit=True, use_self_dict_format=False, level=logger.INFO):
282282
if use_self_dict_format:
283283
msg = msg.format(**self.__dict__)
284+
msg = "[{}]: {}".format(str(datetime.datetime.utcnow()), msg)
284285
logger.log(level, msg)
285286
self.log = "{old_log}\n{msg}".format(old_log=self.log, msg=msg)
286287
if commit and 'test' not in sys.argv:
@@ -300,17 +301,15 @@ def perform_action(self, dem_index, *args, **kwargs):
300301
raise NotImplemented("override in subclasses")
301302

302303
def to_in_progress(self):
303-
if self.status == self.STATUS_QUEUED:
304-
self.start = timezone.now()
305-
self.status = self.STATUS_IN_PROGRESS
306-
self.argv = " ".join(sys.argv)
307-
self.save()
304+
self.start = timezone.now()
305+
self.status = self.STATUS_IN_PROGRESS
306+
self.argv = " ".join(sys.argv)
307+
self.save()
308308

309309
def to_complete(self):
310-
if self.status == self.STATUS_IN_PROGRESS:
311-
self.status = self.STATUS_COMPLETE
312-
self.end = timezone.now()
313-
self.save()
310+
self.status = self.STATUS_COMPLETE
311+
self.end = timezone.now()
312+
self.save()
314313

315314
def to_aborted(self):
316315
self.status = self.STATUS_ABORTED
@@ -365,14 +364,32 @@ def add_to_parent_docs_affected(self, num_docs):
365364
"""
366365
parent_docs_affected = self.parent.docs_affected
367366
if self.parent and num_docs:
368-
with transaction.atomic():
369-
parent = (
370-
IndexAction.objects.select_for_update()
371-
.get(id=self.parent.id)
372-
)
373-
parent.docs_affected += num_docs
374-
parent_docs_affected = parent.docs_affected
375-
parent.save()
367+
max_retries = 5
368+
try_num = 1
369+
successful = False
370+
while not successful and try_num < max_retries:
371+
try:
372+
with transaction.atomic():
373+
parent = (
374+
IndexAction.objects.select_for_update()
375+
.get(id=self.parent.id)
376+
)
377+
parent.docs_affected += num_docs
378+
parent_docs_affected = parent.docs_affected
379+
parent.save()
380+
successful = True
381+
except OperationalError as oe:
382+
if "database is locked" in str(oe):
383+
# specific to sql-lite in testing
384+
# https://docs.djangoproject.com/en/2.1/ref/databases/#database-is-locked-errors
385+
try_num += 1
386+
time.sleep(random.random())
387+
if try_num >= max_retries:
388+
msg = "Exceeded number of retries while updating parent docs affected for {}"
389+
msg.format(str(self))
390+
logger.warning(msg)
391+
else:
392+
raise
376393
return parent_docs_affected
377394

378395
def check_child_statuses(self):
@@ -390,6 +407,10 @@ def check_child_statuses(self):
390407
else:
391408
self.add_logs("No child tasks found! Please ensure there was work to be done.", level=logger.WARNING)
392409

410+
def get_task_kwargs(self):
411+
if self.task_kwargs:
412+
return json.loads(self.task_kwargs)
413+
return {}
393414

394415
"""
395416
↓ Action Mixins Below ↓
@@ -1052,8 +1073,9 @@ def perform_action(self, dem_index, *args, **kwargs):
10521073
runtimes = []
10531074
docs_per_batch = []
10541075
for child in self.children.all():
1055-
delta = child.end - child.start
1056-
runtimes.append(delta.total_seconds())
1076+
if child.end and child.start:
1077+
delta = child.end - child.start
1078+
runtimes.append(delta.total_seconds())
10571079
docs_per_batch.append(child.docs_affected)
10581080

10591081
self._avg_batch_runtime = 'unknown'
@@ -1197,6 +1219,7 @@ def perform_action(self, dem_index, *args, **kwargs):
11971219

11981220
start = kwargs["start_index"]
11991221
end = kwargs["end_index"]
1222+
pks = kwargs["pks"]
12001223
self._workers = kwargs["workers"]
12011224
self._total_docs_expected = kwargs["total_docs_expected"]
12021225
verbosity = kwargs["verbosity"]
@@ -1217,15 +1240,7 @@ def perform_action(self, dem_index, *args, **kwargs):
12171240
use_self_dict_format=True
12181241
)
12191242

1220-
if self._workers and self._total_docs_expected:
1221-
# ensure workers don't overload dbs by being sync'd up
1222-
# if we're less than 10% done, put a little randomness
1223-
# in between the workers to dither query load
1224-
if (self.parent.docs_affected / self._total_docs_expected) < 0.1:
1225-
time.sleep(random.random() * 2)
1226-
1227-
qs = doc_type.get_queryset()
1228-
current_qs = qs[start:end]
1243+
current_qs = doc_type.get_queryset().filter(id__in=pks)
12291244

12301245
retries = 0
12311246
success, failed = (0, 0)
@@ -1293,6 +1308,7 @@ def perform_action(self, dem_index, *args, **kwargs):
12931308
" # parent estimated runtime remaining: {_expected_parent_runtime}\n"
12941309
" # num workers {_workers}\n"
12951310
" # pid: {_pid}\n"
1311+
" # IndexAction id: {id}\n"
12961312
),
12971313
use_self_dict_format=True
12981314
)

test_settings.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
1+
from __future__ import (absolute_import, division, print_function, unicode_literals)
12
"""
23
These settings are here to use during tests, because django requires them.
34
45
In a real-world use case, apps in this project are installed into other
56
Django applications, so these settings will not be used.
67
"""
78

8-
from __future__ import print_function
9-
from __future__ import absolute_import, unicode_literals
10-
119
import logging
1210
import os
11+
import subprocess
1312
import sys
1413
from logging import config as logging_config
15-
import subprocess
1614

1715
import django
1816
from os.path import abspath, dirname, join
@@ -30,14 +28,25 @@ def root(*args):
3028
DATABASES = {
3129
'default': {
3230
'ENGINE': 'django.db.backends.sqlite3',
33-
'NAME': 'old.db',
34-
'USER': '',
35-
'PASSWORD': '',
36-
'HOST': '',
37-
'PORT': '',
31+
'NAME': 'local.db',
32+
},
33+
'mp_testdb': {
34+
'ENGINE': 'django.db.backends.sqlite3',
35+
'NAME': 'test.db',
36+
'TEST': {
37+
'ENGINE': 'django.db.backends.sqlite3',
38+
'NAME': 'test.db',
39+
}
3840
}
3941
}
4042

43+
if 'test' in sys.argv and '--tag=multiprocessing' in sys.argv:
44+
print("detected multiprocessing in sys.argv: {}".format(sys.argv))
45+
DATABASES['default'] = DATABASES['mp_testdb']
46+
from pprint import pprint
47+
pprint(DATABASES['default'])
48+
49+
4150
INSTALLED_APPS = (
4251
'django.contrib.auth',
4352
'django.contrib.contenttypes',

0 commit comments

Comments
 (0)