dict() -> new empty dictionary




Takes actions from a queue and executes them on the elasticsearch





Manages the creation/destruction of indices. The indices it creates


Detects languages with the help of langdetect.


Handles the onegov.core orm events, translates them into indexing


parse_index_name(→ IndexParts)

Takes the given index name and returns the hostname, schema,

Module Contents

class search.indexer.IndexTask[source]

Bases: TypedDict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs

dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v

dict(**kwargs) -> new dictionary initialized with the name=value pairs

in the keyword argument list. For example: dict(one=1, two=2)

action: Literal['index'][source]
id: uuid.UUID | str | int[source]
id_key: str[source]
schema: str[source]
type_name: str[source]
tablename: str[source]
language: str[source]
properties: dict[str, Any][source]
class search.indexer.IndexParts[source]

Bases: NamedTuple

hostname: str | None[source]
schema: str | None[source]
language: str | None[source]
type_name: str | None[source]
version: str | None[source]
search.indexer.parse_index_name(index_name: str) IndexParts[source]

Takes the given index name and returns the hostname, schema, language and type_name in a dictionary.

  • If the index_name doesn’t match the pattern, all values are None.

  • If the index_name has no version, the version is None.

class search.indexer.IndexerBase[source]
queue: Queue[Any][source]
failed_task: Task | None = None[source]
process(block: bool = False, timeout: float | None = None) int[source]

Processes the queue until it is empty or until there’s an error.

If there’s an error, the next call to this function will try to execute the failed task again. This is mainly meant for elasticsearch outages.


If True, the process waits for the queue to be available. Useful if you run this in a separate thread.


How long the blocking call should block. Has no effect if block is False.


The number of successfully processed items

process_task(task: Task) bool[source]
class search.indexer.Indexer(mappings: TypeMappingRegistry, queue: Queue[Task], es_client: elasticsearch.Elasticsearch, hostname: str | None = None)[source]

Bases: IndexerBase

Takes actions from a queue and executes them on the elasticsearch cluster. Depends on IndexManager for index management and expects to have the same TypeRegistry as ORMEventTranslator.

The idea is that this class does the indexing/deindexing, the index manager sets up the indices and the orm event translator listens for changes in the ORM.

A queue is used so the indexer can be run in a separate thread.

queue: Queue[Task][source]
bulk_process() None[source]

Processes the queue in bulk. This offers better performance but it is less safe at the moment and should only be used as part of reindexing.

ensure_index(task: IndexTask) str[source]
index(task: IndexTask) None[source]
delete(task: DeleteTask) None[source]
class search.indexer.PostgresIndexer(queue: Queue[IndexTask], engine: sqlalchemy.engine.Engine)[source]

Bases: IndexerBase

TEXT_SEARCH_COLUMN_NAME = 'fts_idx'[source]
queue: Queue[IndexTask][source]
index(tasks: list[IndexTask] | IndexTask) bool[source]

Update the ‘fts_idx’ column (full text search index) of the given object(s)/task(s).

In case of a bunch of tasks we are assuming they are all from the same schema and table in order to optimize the indexing process.


tasks – A list of tasks to index


True if the indexing was successful, False otherwise

bulk_process() None[source]

Processes the queue in bulk. This offers better performance but it is less safe at the moment and should only be used as part of reindexing.

Gather all index tasks, group them by model and index batch-wise

class search.indexer.TypeMapping(name: str, mapping: dict[str, Any], model: type[] | None = None)[source]
__slots__ = ('name', 'mapping', 'version', 'model')[source]
add_defaults(mapping: dict[str, Any]) dict[str, Any][source]
for_language(language: str) dict[str, Any][source]

Returns the mapping for the given language. Mappings can be slightly different for each language. That is, the analyzer changes.

Because the IndexManager puts each language into its own index we do not have to worry about creating different versions of the same mapping here.

supplement_analyzer(dictionary: dict[str, Any], language: str) dict[str, Any][source]

Iterate through the dictionary found in the type mapping and replace the ‘localized’ type with a ‘text’ type that includes a language specific analyzer.

class search.indexer.TypeMappingRegistry[source]
mappings: dict[str, TypeMapping][source]
__getitem__(key: str) TypeMapping[source]
__iter__() Iterator[TypeMapping][source]
register_orm_base(base: type[object]) None[source]

Takes the given SQLAlchemy base and registers all Searchable objects.

register_type(type_name: str, mapping: dict[str, Any], model: type[] | None = None) None[source]

Registers the given type with the given mapping. The mapping is as dictionary representing the part below the mappings/type_name.


When the mapping changes, a new index is created internally and the alias to this index (the external name of the index) is pointed to this new index.

As a consequence, a change in the mapping requires a reindex.

property registered_fields: set[str][source]

Goes through all the registered types and returns the a set with all fields used by the mappings.

class search.indexer.IndexManager(hostname: str, es_client: elasticsearch.Elasticsearch)[source]

Manages the creation/destruction of indices. The indices it creates have an internal name and an external alias. To facilitate that, versions are used.

created_indices: set[str][source]
property normalized_hostname: str[source]
query_indices() set[str][source]

Queryies the elasticsearch cluster for indices belonging to this hostname.

query_aliases() set[str][source]

Queryies the elasticsearch cluster for aliases belonging to this hostname.

ensure_index(schema: str, language: str, mapping: TypeMapping, return_index: Literal['external', 'internal'] = 'external') str[source]

Takes the given database schema, language and type name and creates an internal index with a version number and an external alias without the version number.


The database schema this index is based on.


The language in ISO 639-1 format.


The TypeMapping mapping used in this index.


The index name to return. Either ‘external’ or ‘internal’.


The (external/aliased) name of the created index.

remove_expired_indices(current_mappings: Iterable[TypeMapping]) int[source]

Removes all expired indices. An index is expired if it’s version number is no longer known in the current mappings.


The number of indices that were deleted.

get_managed_indices_wildcard(schema: str) str[source]

Returns a wildcard index name for all indices managed.

get_external_index_names(schema: str, languages: Iterable[str] = '*', types: Iterable[str] = '*') str[source]

Returns a comma separated string of external index names that match the given arguments. Useful to pass on to elasticsearch when targeting multiple indices.

get_external_index_name(schema: str, language: str, type_name: str) str[source]

Generates the external index name from the given parameters.

get_internal_index_name(schema: str, language: str, type_name: str, version: str) str[source]

Generates the internal index name from the given parameters.

class search.indexer.ORMLanguageDetector(supported_languages: Sequence[str])[source]


Detects languages with the help of langdetect.

Unlike langdetect this detector may be limited to a subset of all supported languages, which may improve accuracy if the subset is known and saves some memory.

localized_properties(obj: Iterator[str][source]
localized_texts(obj:, max_chars: int | None = None) Iterator[str][source]
detect_object_language(obj: str[source]
class search.indexer.ORMEventTranslator(mappings: TypeMappingRegistry, max_queue_size: int = 0, languages: Sequence[str] = ('de', 'fr', 'en'))[source]

Handles the onegov.core orm events, translates them into indexing actions and puts the result into a queue for the indexer to consume.

The queue may be limited. Once the limit is reached, new events are no longer processed and an error is logged.

es_queue: Queue[Task][source]
psql_queue: Queue[IndexTask][source]
on_insert(schema: str, obj: object) None[source]
on_update(schema: str, obj: object) None[source]
on_delete(schema: str, obj: object) None[source]
put(translation: Task) None[source]
index(schema: str, obj: None[source]
delete(schema: str, obj: None[source]