This is how MDN Web Docs uses Elasticsearch. Daily, we build all the content and then upload it all using elasticsearch-dsl using aliases. Because there are no good complete guides to do this, I thought I'd write it down for the next person who needs to do something similar. Let's jump straight into the code. The reader will need a healthy dose of imagination to fill in their details.
Indexing
# models.py
from datetime.datetime import utcnow
from elasticsearch_dsl import Document
PREFIX = "myprefix"
class MyDocument(Document):
title = Text()
body = Text()
# ...
class Index:
name = (
f'{PREFIX}_{utcnow().strftime("%Y%m%d%H%M%S")}'
)
What's important to note here is that the MyDocument.Index.name
is dynamically allocated every single time the module is imported. It's not very important exactly what it is called but it's important that it becomes unique each time.
This means that when you start using MyDocument
it will automatically figure out which index to use. Now, it's time to create the index and bulk publish it.
# index.py
# Note! This example code skips over things like progress bars
# and verbose logging and misc sanity checks and stuff.
from elasticsearch.helpers import parallel_bulk
from elasticsearch_dsl import Index
from elasticsearch_dsl.connections import connections
from .models import MyDocument, PREFIX
def index(buildroot: Path, url: str, update=False):
"""
* 'buildroot' is where the files are we're going to read and index
* 'url' is the host URL for the Elasticsearch server
* 'update' is if just want to "cake on" a couple of documents
instead of starting over and doing a complete indexing.
"""
# Connect and stuff
connections.create_connection(hosts=[url], retry_on_timeout=True)
connection = connections.get_connection()
health = connection.cluster.health()
status = health["status"]
if status not in ("green", "yellow"):
raise Exception(f"status {status} not green or yellow")
if update:
for name in connection.indices.get_alias():
if name.startswith(f"{PREFIX}_"):
document_index = Index(name)
break
else:
raise IndexAliasError(
f"Unable to find an index called {PREFIX}_*"
)
else:
# Confusingly, `._index` is actually not a private API.
# It's the documented way you're supposed to reach it.
document_index = MyDocument._index
document_index.create()
def generator():
for doc in Path(buildroot):
# The reason for specifying the exact index name is that we might
# be doing an update and if you don't specify it, elasticsearch_dsl
# will fall back to using whatever Document._meta.Index automatically
# becomes in this moment.
yield to_search(doc, _index=document_index._name).to_dict(True)
for success, info in parallel_bulk(connection, generator()):
# 'success' is a boolean
# 'info' has stuff like:
# - info["index"]["error"]
# - info["index"]["_shards"]["successful"]
# - info["index"]["_shards"]["failed"]
pass
if update:
# When you do an update, Elasticsearch will internally delete the
# previous docs (based on the _id primary key we set).
# Normally, Elasticsearch will do this when you restart the cluster
# but that's not something we usually do.
# See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html
document_index.forcemerge()
else:
# Now we're going to bundle the change to set the alias to point
# to the new index and delete all old indexes.
# The reason for doing this together in one update is to make it atomic.
alias_updates = [
{"add": {"index": document_index._name, "alias": PREFIX}}
]
for index_name in connection.indices.get_alias():
if index_name.startswith(f"{PREFIX}_"):
if index_name != document_index._name:
alias_updates.append({"remove_index": {"index": index_name}})
connection.indices.update_aliases({"actions": alias_updates})
print("All done!")
def to_search(file: Path, _index=None):
with open(file) as f:
data = json.load(f)
return MyDocument(
_index=_index,
_id=data["identifier"],
title=data["title"],
body=data["body"]
)
A lot is left to the reader as an exercise to fill in but these are the most important operations. It demonstrates how you can
- Correctly create indexes
- Atomically create an alias and clean up old indexes (and aliases)
- How you can add to an existing index
After you've run this you'll see something like this:
$ curl http://localhost:9200/_cat/indices?v ... health status index uuid pri rep docs.count docs.deleted store.size pri.store.size yellow open myprefix_20210514141421 vulVt5EKRW2MNV47j403Mw 1 1 11629 0 28.7mb 28.7mb $ curl http://localhost:9200/_cat/aliases?v ... alias index filter routing.index routing.search is_write_index myprefix myprefix_20210514141421 - - - -
Searching
When it comes to using the index, well, it depends on where your code for that is. For example, on MDN Web Docs, the code that searches the index is in an entirely different code-base. It's incidentally Python (and elasticsearch-dsl
) in both places but other than that they have nothing in common. So for the searching, you need to manually make sure you write down the name of the index (or name of the alias if you prefer) into the code that searches. For example:
from elasticsearch_dsl import Search
def search(params):
search_query = Search(index=settings.SEARCH_INDEX_NAME)
# Do stuff to 'search_query' based on 'params'
response = search_query.execute()
for hit in response:
# ...
If you're within the same code that has that models.MyDocument
in the first example code above, you can simply do things like this:
from elasticsearch_dsl import Index
from elasticsearch_dsl.connections import connections
from .models import PREFIX
def analyze(
url: str,
text: str,
analyzer: str,
):
connections.create_connection(hosts=[url])
index = Index(PREFIX)
analysis = index.analyze(body={"text": text, "analyzer": analyzer})
# ...
Comments