import logging
import os
import time
from datetime import datetime
from enum import Enum
from types import SimpleNamespace
from biothings.utils.common import merge, timesofar
[docs]
class Stage(Enum):
READY = 0
STARTED = 1
DONE = 2
[docs]
def at(self, stage):
assert self == stage
# IndexJobStateRegistrar CAN be further generalized
# to replace utils.manager.BaseStatusRegisterer
[docs]
class IndexJobStateRegistrar:
def __init__(self, collection, build_name, index_name, **context):
self.collection = collection
self.build_id = build_name
self.index_name = index_name
self.context = context
self.stage = Stage.READY
self.t0 = 0
[docs]
@staticmethod
def prune(collection):
for build in collection.find():
dirty = False
for job in build.get("jobs", []):
if job.get("status") == "in progress":
logging.warning("Found stale build '%s', marking index status as 'cancelled'", build["_id"])
job["status"] = "cancelled"
job.pop("pid", None)
dirty = True
if dirty:
collection.replace_one({"_id": build["_id"]}, build)
[docs]
def started(self, step="index"):
self.stage.at(Stage.READY)
self.stage = Stage.STARTED
self.t0 = time.time()
job = {
"step": step,
"status": "in progress",
"step_started_at": datetime.now().astimezone(),
"pid": os.getpid(),
**self.context,
}
self.collection.update(
{"_id": self.build_id},
{"$push": {"jobs": job}},
)
[docs]
def failed(self, error):
def func(job, delta_build):
job["status"] = "failed"
job["err"] = str(error)
self._done(func)
[docs]
def succeed(self, result):
def func(job, delta_build):
job["status"] = "success"
if result:
delta_build["index"] = {self.index_name: result}
self._done(func)
def _done(self, func):
self.stage.at(Stage.STARTED)
self.stage = Stage.DONE
build = self.collection.find_one({"_id": self.build_id})
assert build, "Can't find build document '%s'" % self.build_id
job = build["jobs"][-1]
job["time"] = timesofar(self.t0)
job["time_in_s"] = round(time.time() - self.t0, 0)
job.pop("pid")
delta_build = {}
func(job, delta_build)
merge(build, delta_build)
self.collection.replace_one({"_id": build["_id"]}, build)
[docs]
class PreIndexJSR(IndexJobStateRegistrar):
[docs]
def started(self):
super().started("pre-index")
[docs]
def succeed(self, result):
# no result registration on pre-indexing step.
# --------------------------------------------
# registration indicates the creation of
# the index on the elasticsearch server.
# thus failure at the post-index stage means
# registration of the index state up until the
# indexing step, but success at the pre-index
# stage suggests no index created and thus
# no registration at all.
super().succeed({})
[docs]
class MainIndexJSR(IndexJobStateRegistrar):
[docs]
def started(self):
super().started("index")
[docs]
class PostIndexJSR(IndexJobStateRegistrar):
[docs]
def started(self):
super().started("post-index")
# TESTS OUTDATED
[docs]
def test_registrar():
from pymongo import MongoClient
indexer = SimpleNamespace(
mongo_collection_name="mynews_202012280220_vsdevjdk", # must exists in DB
es_client_args=dict(hosts="localhost:9200"),
es_index_name="__index_name__",
logfile="/log/file",
conf_name="bc_news",
env_name="dev",
)
collection = MongoClient().biothings.src_build
IndexJobStateRegistrar.prune(collection)
# ----------
# round 1
# ----------
job = MainIndexJSR(indexer, collection)
input()
job.started()
input()
job.failed("MockErrorA")
input()
try:
job.succeed()
except Exception as exc:
print(exc)
# ----------
# round 2
# ----------
job = MainIndexJSR(indexer, collection)
input()
job.started()
input()
job.succeed(index={"__index_name__": {"count": "99"}})
# ----------
# round 3
# ----------
job = PostIndexJSR(indexer, collection)
input()
try:
job.succeed()
except Exception as exc:
print(exc)
input()
job.started()
input()
job.succeed({"__index_name__": {"additionally": "done"}})
if __name__ == "__main__":
test_registrar()