Bartosz Płóciennik

Bartosz Płóciennik

> hi there! _

24 May 2021

Django-elasticsearch-dsl rebuilding indexes with Celery task help

In most of the projects where I was using Elasticsearch, performance was an important thing so every possibility to reduce response time was a great opportunity to consider.

django-elasticsearch-dsl by default is not supporting rebuilding indexes for Elasticsearch in the background.

Lucky us, there is possible to change the default signal from RealTimeSignalProcessor to a custom one. So we can easily move the rebuilding calculation to our Celery worker and return something for the user much faster.

How to use it?

Code below is based on Abdelhadi92 solution. I assume that you have django-elasticsearch-dsl working already.

signals.py

from django.db import models

from django_elasticsearch_dsl.signals import RealTimeSignalProcessor

from .tasks import ElasticsearchRebuildIndexesTask


class CelerySignalProcessor(RealTimeSignalProcessor):
    def setup(self):
        # Listen only for model saves
        models.signals.post_save.connect(self.handle_save)

    def handle_save(self, sender, instance, **kwargs):
        """Handle save.
        Given an individual model instance, update the object in the index.
        Update the related objects either.
        """
        ElasticsearchRebuildIndexesTask.apply_async(
            (instance.pk, instance._meta.app_label, instance._meta.model_name),
        )

Our signal class CelerySignalProcessor will be trigger based on what do we put inside setup() method.

Inheriting from RealTimeSignalProcessor class will listen for post_save, post_delete, m2m_changed and pre_delete by default, but in this case, I’m going to use post_save only.

Every time that model instance is changed handle_save method will delay the celery task for us.

tasks.py

from typing import List

from django.apps import apps

from celery import Task
from django_elasticsearch_dsl.registries import registry

from config.celery import app  # import your Celery instance


class ElasticsearchRebuildIndexesTask(Task):
    def __init__(self):
        super().__init__()
        # Define custom methods for your models to run
        self.models = {
            ("reviews", "productreview"): self._handle_review,
            ("comments", "comment"): self._handle_comment,
        }

    def run(self, obj_id: int, app_label: str, model_name: str, *args, **kwargs):
        sender = apps.get_model(app_label, model_name)
        instance = sender._default_manager.filter(pk=obj_id).first()

        try:
            func = self.models[(instance._meta.app_label, instance._meta.model_name)]
            for obj in set(func(instance)):
                # registry function from django_elasticsearch_dsl will figure out which 
                # document should update based on a model instance defined in Document
                registry.update(obj)
                registry.update_related(obj)
        except (KeyError, AttributeError):
            pass

    @staticmethod
    def _handle_comment(instance: Comment) -> List:
        # If comment model will be changed we want to rebuild Comment document
        return [instance]

    def _handle_review(self, instance: ProductReview) -> List:
        # If review will be changed we want to rebuild more documents
        return [
            instance,
            instance.product,
            instance.product.user,
        ]


ElasticsearchRebuildIndexesTask = app.register_task(  # type: ignore
    ElasticsearchRebuildIndexesTask()
)

This task based on passed data is getting proper instance from the database. Next step is using self.models to obtain which method to run. The method returns us a list of instances on which we should update on Elasticsearch.

settings.py

The last step is to override the default signal and set what we created.

ELASTICSEARCH_DSL_SIGNAL_PROCESSOR = "search_indexes.signals.CelerySignalProcessor"

When should you use it?

Moving rebuilding indexes from real-time to calculate on the background is not expensive so if you start experiencing issues with response time then think about it.

Imagine that we have a lot of documents on Elasticsearch like Users, Videos, Comments, and in every document, we need to store a username. So when our user decides to change his username then we have a lot of documents to update:

  • user object
  • all user’s movies
  • all user’s comments

It can take more than a few seconds and there is no need for the user to wait after a change to complete the whole process.

Categories