import abc
import asyncio
import copy
import os
from collections import UserDict
from copy import deepcopy
from datetime import datetime
from functools import partial
from typing import NamedTuple, Optional
import elasticsearch
from elasticsearch import AsyncElasticsearch
from biothings import config as btconfig
from biothings.hub import INDEXER_CATEGORY, INDEXMANAGER_CATEGORY
from biothings.hub.databuild.backend import merge_src_build_metadata
from biothings.utils.common import get_class_from_classpath, get_random_string, iter_n, merge, traverse
from biothings.utils.es import ESIndexer
from biothings.utils.hub_db import get_src_build
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager
from biothings.utils.mongo import DatabaseClient, id_feeder
from .indexer_cleanup import Cleaner
from .indexer_payload import DEFAULT_INDEX_MAPPINGS, DEFAULT_INDEX_SETTINGS, IndexMappings, IndexSettings
from .indexer_registrar import IndexJobStateRegistrar, MainIndexJSR, PostIndexJSR, PreIndexJSR
from .indexer_schedule import Schedule
from .indexer_task import dispatch
# Summary
# -------
# IndexManager: a hub feature, providing top level commands and config environments(env).
# Indexer/ColdHotIndexer: the "index" command, handles jobs, db state and errors.
# .indexer_task.IndexingTask: index a set of ids, running independent of the hub.
# TODO
# Clarify returned result
# Distinguish creates/updates/deletes
# So that hot/cold indexer doc count can be accurate
# TODO
# Multi-layer logging
[docs]
class IndexerException(Exception):
...
[docs]
class ProcessInfo:
def __init__(self, indexer, concurrency):
self.indexer = indexer
self.concurrency = concurrency
[docs]
def get_predicates(self):
def limit_indexer_concurrency(job_manager):
def by_indexer_environment(job):
return all(
(
job["category"] == INDEXER_CATEGORY,
job["source"] == self.indexer.env_name,
)
)
return len(list(filter(by_indexer_environment, job_manager.jobs.values()))) < self.concurrency
return [limit_indexer_concurrency]
[docs]
def get_pinfo(self, step="", description=""):
"""
Return dict containing information about the current process
(used to report in the hub)
"""
pinfo = {
"__predicates__": self.get_predicates(),
"category": INDEXER_CATEGORY,
"source": self.indexer.env_name,
"description": description,
"step": step,
}
return pinfo
class _BuildBackend(NamedTuple): # mongo
args: dict = {}
dbs: Optional[str] = None
col: Optional[str] = None
class _BuildDoc(UserDict):
"""Represent A Build Under "src_build" Collection.
Example:
{
"_id":"mynews_202105261855_5ffxvchx",
"target_backend": "mongo",
"target_name": "mynews_202105261855_5ffxvchx", # UNUSED
"backend_url": "mynews_202105261855_5ffxvchx",
"build_config": {
"_id": "mynews",
"name": "mynews",
"doc_type": "news",
...
"cold_collection": "mynews_202012280220_vsdevjdk"
},
"mapping": {
"author": {"type": "text" },
"title": {"type": "text" },
"description": {"type": "text" },
...
},
"_meta": {
"biothing_type": "news",
"build_version": "202105261855",
"build_date": "2021-05-26T18:55:00.054622+00:00",
...
},
...
}
"""
@property
def build_name(self):
return self.get("_id")
@property
def build_config(self):
return self.setdefault("build_config", {})
def enrich_mappings(self, mappings):
mappings["__hub_doc_type"] = self.build_config.get("doc_type")
mappings["properties"].update(self.get("mapping", {}))
mappings["_meta"] = self.get("_meta", {})
def enrich_settings(self, settings):
settings["number_of_shards"] = self.build_config.get("num_shards", 1)
settings["number_of_replicas"] = self.build_config.get("num_replicas", 0)
# this feature may be removed at any time
settings.update(self.build_config.get("extra_index_settings", {}))
def parse_backend(self):
# Support Sebastian's hub style backend URI
# #biothings.hub.databuild.backend.create_backend
backend = self.get("target_backend")
backend_url = self.get("backend_url")
# Case 1:
# As a dummy indexer
# Used in validate_mapping, ...
if backend is None:
return _BuildBackend()
# Case 2:
# Most common setup
# Index a merged collection
elif backend == "mongo":
from biothings.hub.databuild import backend
db = backend.mongo.get_target_db()
if backend_url in db.list_collection_names():
return _BuildBackend(
dict(zip(("host", "port"), db.client.address)),
db.name,
backend_url,
)
# Case 3:
# For single source build_config(s)
# Index the source collection directly
elif backend == "link":
from biothings.hub.databuild import backend
if backend_url[0] == "src":
db = backend.mongo.get_src_db()
else: # backend_url[0] == "target"
db = backend.mongo.get_target_db()
if backend_url[1] in db.list_collection_names():
return _BuildBackend(
dict(zip(("host", "port"), db.client.address)),
db.name,
backend_url[1],
)
raise ValueError(backend, backend_url)
def extract_coldbuild(self):
cold_target = self.build_config["cold_collection"]
cold_build_doc = get_src_build().find_one({"_id": cold_target})
cold_build_doc = _BuildDoc(cold_build_doc)
cold_build_doc["_id"] = self.build_name # *
cold_build_doc["mapping"].update(self["mapping"]) # combine mapping
merge_src_build_metadata([cold_build_doc, self]) # combine _meta
# * About State Updates
# All updates are diverted to the hot collection.
# Indices & snapshots are only registered there.
if self.build_config.get("num_shards"):
cold_build_doc.build_config["num_shards"] = self.build_config["num_shards"]
if self.build_config.get("num_replicas"):
cold_build_doc.build_config["num_replicas"] = self.build_config["num_replicas"]
return cold_build_doc
[docs]
class Step(abc.ABC):
name: property(abc.abstractmethod(lambda _: ...))
state: property(abc.abstractmethod(lambda _: ...))
method: property(abc.abstractmethod(lambda _: ...))
catelog = dict()
[docs]
@staticmethod
def order(steps):
if isinstance(steps, str):
return (yield from Step.order([steps]))
for _step in ("pre", "index", "post"):
if _step in steps:
yield _step
def __init__(self, indexer):
self.indexer = indexer
self.state = self.state(
get_src_build(),
indexer.build_name,
indexer.es_index_name,
logfile=indexer.logfile,
)
@classmethod
def __init_subclass__(cls):
cls.catelog[cls.name] = cls
[docs]
@classmethod
def dispatch(cls, name):
return cls.catelog[name]
[docs]
async def execute(self, *args, **kwargs):
coro = getattr(self.indexer, self.method)
coro = coro(*args, **kwargs)
return await coro
def __str__(self):
return f"<Step name='{self.name}' indexer={self.indexer}>"
[docs]
class PreIndexStep(Step):
name = "pre"
state = PreIndexJSR
method = "pre_index"
[docs]
class MainIndexStep(Step):
name = "index"
state = MainIndexJSR
method = "do_index"
[docs]
class PostIndexStep(Step):
name = "post"
state = PostIndexJSR
method = "post_index"
class _IndexerResult(UserDict):
def __str__(self):
return f"{type(self).__name__}({str(self.data)})"
[docs]
class IndexerCumulativeResult(_IndexerResult):
...
[docs]
class IndexerStepResult(_IndexerResult):
...
[docs]
class Indexer:
"""
MongoDB -> Elasticsearch Indexer.
"""
def __init__(self, build_doc, indexer_env, index_name):
# build_doc primarily describes the source.
# indexer_env primarily describes the destination.
_build_doc = _BuildDoc(build_doc)
_build_backend = _build_doc.parse_backend()
# ----------source----------
self.mongo_client_args = _build_backend.args
self.mongo_database_name = _build_backend.dbs
self.mongo_collection_name = _build_backend.col
# -----------dest-----------
# [1] https://elasticsearch-py.readthedocs.io/en/v7.12.0/api.html#elasticsearch.Elasticsearch
# [2] https://elasticsearch-py.readthedocs.io/en/v7.12.0/helpers.html#elasticsearch.helpers.bulk
self.es_client_args = indexer_env.get("args", {}) # See [1] for available args
self.es_blkidx_args = indexer_env.get("bulk", {}) # See [2] for available args
self.es_index_name = index_name or _build_doc.build_name
self.es_index_settings = IndexSettings(deepcopy(DEFAULT_INDEX_SETTINGS))
self.es_index_mappings = IndexMappings(deepcopy(DEFAULT_INDEX_MAPPINGS))
_build_doc.enrich_settings(self.es_index_settings)
_build_doc.enrich_mappings(self.es_index_mappings)
# -----------info-----------
self.env_name = indexer_env.get("name")
self.conf_name = _build_doc.build_config.get("name")
self.build_name = _build_doc.build_name
self.setup_log()
self.pinfo = ProcessInfo(self, indexer_env.get("concurrency", 10))
[docs]
def setup_log(self):
log_folder = os.path.join(btconfig.LOG_FOLDER, "build", self.build_name or "", "index") if btconfig.LOG_FOLDER else None
log_name = f"index_{self.es_index_name}"
self.logger, self.logfile = get_logger(log_name, log_folder=log_folder, force=True)
def __str__(self):
showx = self.mongo_collection_name != self.es_index_name
lines = [
f"<{type(self).__name__}",
f" source='{self.mongo_collection_name}'" if showx else "",
f" dest='{self.es_index_name}'>",
]
return "".join(lines)
# --------------
# Entry Point
# --------------
[docs]
async def index(self, job_manager, **kwargs):
"""
Build an Elasticsearch index (self.es_index_name)
with data from MongoDB collection (self.mongo_collection_name).
"ids" can be passed to selectively index documents.
"mode" can have the following values:
- 'purge': will delete an index if it exists.
- 'resume': will use an existing index and add missing documents.
- 'merge': will merge data to an existing index.
- 'index' (default): will create a new index.
"""
steps = kwargs.pop("steps", ("pre", "index", "post"))
batch_size = kwargs.setdefault("batch_size", 10000)
# mode = kwargs.setdefault("mode", "index")
kwargs.setdefault("mode", "index")
ids = kwargs.setdefault("ids", None)
assert job_manager
assert all(isinstance(_id, str) for _id in ids) if ids else True
assert 500 <= batch_size <= 10000, '"batch_size" out-of-range'
# the batch size here controls only the task partitioning
# it does not affect how the elasticsearch python client
# makes batch requests. a number larger than 10000 may exceed
# es result window size and doc_feeder maximum fetch size.
# a number smaller than chunk_size is too small that the docs
# can be sent to elasticsearch within one request, making it
# inefficient, amplifying the scheduling overhead.
x = IndexerCumulativeResult()
for step in Step.order(steps):
step = Step.dispatch(step)(self)
self.logger.info(step)
step.state.started()
try:
dx = await step.execute(job_manager, **kwargs)
dx = IndexerStepResult(dx)
except Exception as exc:
_exc = str(exc)[:500]
self.logger.exception(_exc)
step.state.failed(_exc)
raise exc
else:
merge(x.data, dx.data)
self.logger.info(dx)
self.logger.info(x)
step.state.succeed(x.data)
return x
# ---------
# Steps
# ---------
[docs]
async def pre_index(self, *args, mode, **kwargs):
client = AsyncElasticsearch(**self.es_client_args)
try:
if mode in ("index", None):
# index MUST NOT exist
# ----------------------
if await client.indices.exists(self.es_index_name):
msg = (
"Index '%s' already exists, (use mode='purge' to "
"auto-delete it or mode='resume' to add more documents)"
)
raise IndexerException(msg % self.es_index_name)
elif mode in ("resume", "merge"):
# index MUST exist
# ------------------
if not (await client.indices.exists(self.es_index_name)):
raise IndexerException("'%s' does not exist." % self.es_index_name)
self.logger.info(("Exists", self.es_index_name))
return # skip index creation
elif mode == "purge":
# index MAY exist
# -----------------
response = await client.indices.delete(self.es_index_name, ignore_unavailable=True)
self.logger.info(("Deleted", self.es_index_name, response))
else:
raise ValueError("Invalid mode: %s" % mode)
response = await client.indices.create(
self.es_index_name,
body={
"settings": (await self.es_index_settings.finalize(client)),
"mappings": (await self.es_index_mappings.finalize(client)),
},
)
self.logger.info(("Created", self.es_index_name, response))
return {
"__REPLACE__": True,
"host": self.es_client_args.get("hosts"), # for frontend display
"environment": self.env_name, # used in snapshot module.
}
finally:
await client.close()
[docs]
async def do_index(self, job_manager, batch_size, ids, mode, **kwargs):
client = DatabaseClient(**self.mongo_client_args)
database = client[self.mongo_database_name]
collection = database[self.mongo_collection_name]
if ids:
self.logger.info(
("Indexing from '%s' with specific list of _ids, " "create indexer job with batch_size=%d."),
self.mongo_collection_name,
batch_size,
)
# use user provided ids in batch
id_provider = iter_n(ids, batch_size)
else:
self.logger.info(
("Fetch _ids from '%s', and create " "indexer job with batch_size=%d."),
self.mongo_collection_name,
batch_size,
)
# use ids from the target mongodb collection in batch
id_provider = id_feeder(collection, batch_size, logger=self.logger)
jobs = [] # asyncio.Future(s)
error = None # the first Exception
total = len(ids) if ids else collection.count()
schedule = Schedule(total, batch_size)
def batch_finished(future):
nonlocal error
try:
schedule.finished += future.result()
except Exception as exc:
self.logger.warning(exc)
error = exc
for batch_num, ids in zip(schedule, id_provider):
await asyncio.sleep(0.0)
# when one batch failed, and job scheduling has not completed,
# stop scheduling and cancel all on-going jobs, to fail quickly.
if error:
for job in jobs:
if not job.done():
job.cancel()
raise error
self.logger.info(schedule)
pinfo = self.pinfo.get_pinfo(schedule.suffix(self.mongo_collection_name))
job = await job_manager.defer_to_process(
pinfo,
dispatch,
self.mongo_client_args,
self.mongo_database_name,
self.mongo_collection_name,
self.es_client_args,
self.es_blkidx_args,
self.es_index_name,
ids,
mode,
batch_num,
)
job.add_done_callback(batch_finished)
jobs.append(job)
self.logger.info(schedule)
await asyncio.gather(*jobs)
schedule.completed()
self.logger.notify(schedule)
return {"count": total, "created_at": datetime.now().astimezone()}
[docs]
async def post_index(self, *args, **kwargs):
...
[docs]
class ColdHotIndexer:
"""MongoDB to Elasticsearch 2-pass Indexer.
(
1st pass: <MongoDB Cold Collection>, # static data
2nd pass: <MongoDB Hot Collection> # changing data
) =>
<Elasticsearch Index>
"""
# "ColdHotIndexer" is not a subclass of the "Indexer".
# Step-level customization requires a subclass of "Indexer"
# and assigning it to the "INDEXER" class attribute below.
INDEXER = Indexer
def __init__(self, build_doc, indexer_env, index_name):
hot_build_doc = _BuildDoc(build_doc)
cold_build_doc = hot_build_doc.extract_coldbuild()
self.hot = self.INDEXER(hot_build_doc, indexer_env, index_name)
self.cold = self.INDEXER(cold_build_doc, indexer_env, self.hot.es_index_name)
[docs]
async def index(
self,
job_manager,
batch_size=10000,
steps=("pre", "index", "post"),
ids=None,
mode=None,
**kwargs,
):
result = []
cold_task = self.cold.index(
job_manager,
steps=set(Step.order(steps)) & {"pre", "index"},
batch_size=batch_size,
ids=ids,
mode=mode,
)
result.append((await cold_task))
hot_task = self.hot.index(
job_manager,
steps=set(Step.order(steps)) & {"index", "post"},
batch_size=batch_size,
ids=ids,
mode="merge",
)
result.append((await hot_task))
return result
[docs]
class IndexManager(BaseManager):
# An index config is considered a "source" for the manager
# Each call returns a different instance from a factory call
DEFAULT_INDEXER = Indexer
def __init__(self, *args, **kwargs):
"""
An example of config dict for this module.
{
"indexer_select": {
None: "hub.dataindex.indexer.DrugIndexer", # default
"build_config.cold_collection" : "mv.ColdHotVariantIndexer",
},
"env": {
"prod": {
"host": "localhost:9200",
"indexer": {
"args": {
"timeout": 300,
"retry_on_timeout": True,
"max_retries": 10,
},
"bulk": {
"chunk_size": 50
"raise_on_exception": False
},
"concurrency": 3
},
"index": [
# for information only, only used in index_info
{"index": "mydrugs_current", "doc_type": "drug"},
{"index": "mygene_current", "doc_type": "gene"}
],
},
"dev": { ... }
}
}
"""
super().__init__(*args, **kwargs)
self._srcbuild = get_src_build()
self._config = {}
self.logger, self.logfile = get_logger("indexmanager")
# Object Lifecycle Calls
# --------------------------
# manager = IndexManager(job_manager)
# manager.clean_stale_status() # in __init__
# manager.configure(config)
[docs]
def clean_stale_status(self):
IndexJobStateRegistrar.prune(get_src_build())
# Job Manager Hooks
# ----------------------
[docs]
def get_predicates(self):
def no_other_indexmanager_step_running(job_manager):
"""IndexManager deals with snapshot, publishing,
none of them should run more than one at a time"""
return len([j for j in job_manager.jobs.values() if j["category"] == INDEXMANAGER_CATEGORY]) == 0
return [no_other_indexmanager_step_running]
[docs]
def get_pinfo(self):
"""
Return dict containing information about the current process
(used to report in the hub)
"""
pinfo = {
"category": INDEXMANAGER_CATEGORY,
"source": "",
"step": "",
"description": "",
}
preds = self.get_predicates()
if preds:
pinfo["__predicates__"] = preds
return pinfo
# Hub Features
# --------------
def _select_indexer(self, build_name=None):
"""Find the indexer class required to index build_name."""
rules = self._config.get("indexer_select")
if not rules or not build_name:
self.logger.debug(self.DEFAULT_INDEXER)
return self.DEFAULT_INDEXER
# the presence of a path in the build doc
# can determine the indexer class to use.
path = None
doc = self._srcbuild.find_one({"_id": build_name})
for path_in_doc, _ in traverse(doc or dict(), True):
if path_in_doc in rules:
if not path:
path = path_in_doc
else:
_ERR = "Multiple indexers matched."
raise RuntimeError(_ERR)
kls = get_class_from_classpath(rules[path])
self.logger.info(kls)
return kls
[docs]
def index(self, indexer_env, build_name, index_name=None, ids=None, **kwargs):
"""
Trigger an index creation to index the collection build_name and create an
index named index_name (or build_name if None). Optional list of IDs can be
passed to index specific documents.
"""
indexer_env_ = dict(self[indexer_env]) # describes a destination
build_doc = self._srcbuild.find_one({"_id": build_name}) # describes a source
if not build_doc:
raise ValueError("Cannot find build %s." % build_name)
if not build_doc.get("build_config"):
raise ValueError("Cannot find build config for '%s'." % build_name)
idx = self._select_indexer(build_name)
idx = idx(build_doc, indexer_env_, index_name)
job = idx.index(self.job_manager, ids=ids, **kwargs)
job = asyncio.ensure_future(job)
job.add_done_callback(self.logger.debug)
return job
[docs]
def index_info(self, remote=False):
"""Show index manager config with enhanced index information."""
# http://localhost:7080/index_manager
async def _enhance(conf):
conf = copy.deepcopy(conf)
for name, env in self.register.items():
async with AsyncElasticsearch(**env["args"]) as client:
try:
indices = await client.indices.get("*")
except elasticsearch.exceptions.ConnectionError:
... # keep the hard-coded place-holders info
else: # replace the index key with remote info
conf["env"][name]["index"] = [
{
"index": k,
"aliases": list(v["aliases"].keys()),
"doc_type": v["mappings"]["_meta"]["biothing_type"],
}
for k, v in indices.items()
]
return conf
if remote:
job = asyncio.ensure_future(_enhance(self._config))
job.add_done_callback(self.logger.debug)
return job
return self._config
[docs]
def get_indexes_by_name(self, index_name=None, limit=10):
"""Accept an index_name and return a list of indexes get from all elasticsearch environments
If index_name is blank, it will be return all indexes.
limit can be used to specify how many indexes should be return.
The list of indexes will be like this:
[
{
"index_name": "...",
"build_version": "...",
"count": 1000,
"creation_date": 1653468868933,
"environment": {
"name": "env name",
"host": "localhost:9200",
}
},
]
"""
if not index_name:
index_name = "*"
limit = int(limit)
async def fetch(index_name, limit=None):
indexes = []
for env_name, env in self.register.items():
async with AsyncElasticsearch(**env["args"]) as client:
try:
indices = await client.indices.get(index_name)
except Exception:
continue
for index_name, index_data in indices.items():
mapping_meta = index_data["mappings"]["_meta"]
indexes.append(
{
"index_name": index_name,
"build_version": mapping_meta["build_version"],
"count": mapping_meta["stats"]["total"],
"creation_date": index_data["settings"]["index"]["creation_date"],
"environment": {
"name": env_name,
"host": env["args"]["hosts"],
},
}
)
indexes.sort(key=lambda index: index["creation_date"], reverse=True)
if limit:
indexes = indexes[:limit]
return indexes
job = asyncio.ensure_future(fetch(index_name, limit=limit))
job.add_done_callback(self.logger.debug)
return job
[docs]
def validate_mapping(self, mapping, env):
indexer = self._select_indexer() # default indexer
indexer = indexer(dict(mapping=mapping), self[env], None)
self.logger.debug(indexer.es_client_args)
self.logger.debug(indexer.es_index_settings)
self.logger.debug(indexer.es_index_mappings)
async def _validate_mapping():
client = AsyncElasticsearch(**indexer.es_client_args)
index_name = ("hub_tmp_%s" % get_random_string()).lower()
try:
return await client.indices.create(
index_name,
body={
"settings": (await indexer.es_index_settings.finalize(client)),
"mappings": (await indexer.es_index_mappings.finalize(client)),
},
)
finally:
await client.indices.delete(index_name, ignore_unavailable=True)
await client.close()
job = asyncio.ensure_future(_validate_mapping())
job.add_done_callback(self.logger.info)
return job
[docs]
def cleanup(self, env=None, keep=3, dryrun=True, **filters):
"""Delete old indices except for the most recent ones.
Examples:
>>> index_cleanup()
>>> index_cleanup("production")
>>> index_cleanup("local", build_config="demo")
>>> index_cleanup("local", keep=0)
>>> index_cleanup(_id="<elasticsearch_index>")
"""
if not env and not dryrun: # low specificity, unsafe.
raise ValueError('Missing argument "env".')
cleaner = Cleaner(get_src_build(), self, self.logger)
cleanups = cleaner.find(env, keep, **filters)
if dryrun:
return "\n".join(
(
"-" * 75,
cleaner.plain_text(cleanups),
"-" * 75,
"DRYRUN ONLY - APPLY THE ACTIONS WITH:",
" > index_cleanup(..., dryrun=False)",
)
)
job = asyncio.ensure_future(cleaner.clean(cleanups))
job.add_done_callback(self.logger.info)
return job
[docs]
class DynamicIndexerFactory:
"""
In the context of autohub/standalone instances, create indexer
with parameters taken from versions.json URL.
A list of URLs is provided so the factory knows how to create these
indexers for each URLs. There's no way to "guess" an ES host from a URL,
so this parameter must be specified as well, common to all URLs
"suffix" param is added at the end of index names.
"""
def __init__(self, urls, es_host, suffix="_current"):
self.urls = urls
self.es_host = es_host
self.bynames = {}
for url in urls:
if isinstance(url, dict):
name = url["name"]
# actual_url = url["url"]
else:
name = os.path.basename(os.path.dirname(url))
# actual_url = url
self.bynames[name] = {
"es_host": self.es_host,
"index": name + suffix,
}
[docs]
def create(self, name):
conf = self.bynames[name]
pidxr = partial(
ESIndexer,
index=conf["index"],
doc_type=None,
es_host=conf["es_host"],
)
conf = {"es_host": conf["es_host"], "index": conf["index"]}
return pidxr, conf