# from __future__ import annotations  # for cyclic type hints
# Use forward-references for compatible with python3.6
# ref https://peps.python.org/pep-0484/#forward-references
import locale
from datetime import datetime
from dateutil.parser import parse as dtparse
from biothings.utils.dataload import update_dict_recur
from biothings.utils.hub_db import get_source_fullname
from biothings.utils.jsondiff import make as make_json_diff
locale.setlocale(locale.LC_ALL, "")
[docs]
class ReleaseNoteSrcBuildReader:
    # TODO shall we use biothings.hub.dataindex.indexer._Build_doc here?
    def __init__(self, src_build_doc: dict):
        self.src_build_doc = src_build_doc
        # If `self` is a "hot" src_build doc reader, it can refer to a "cold" reader to access the cold build info.
        # This works like a two-node linked list.
        self.cold_src_build_reader: "ReleaseNoteSrcBuildReader" = None
    @property
    def build_id(self) -> str:
        return self.src_build_doc["_id"]
    @property
    def build_version(self) -> str:
        return self.src_build_doc.get("_meta", {}).get("build_version")
    @property
    def cold_collection_name(self) -> str:
        return self.src_build_doc.get("build_config", {}).get("cold_collection", None)
[docs]
    def has_cold_collection(self) -> bool:
        return self.cold_collection_name is not None 
[docs]
    def attach_cold_src_build_reader(self, other: "ReleaseNoteSrcBuildReader"):
        """
        Attach a cold src_build reader.
        It's required that `self` is a hot src_builder reader and `other` is cold.
        """
        if not self.has_cold_collection():
            raise ValueError(
                f"{self.build_id} is not a hot src_build doc, "
                f"thus not able to attach a cold reader of {other.build_id}."
            )
        if other.has_cold_collection():
            raise ValueError(
                f"{other.build_id} is a hot src_build doc, "
                f"thus not able to be attached to the reader of {self.build_id}"
            )
        # src_build `_id`s and collection names are interchangeable
        # See https://github.com/biothings/biothings.api/blob/master/biothings/hub/databuild/builder.py#L311
        if self.cold_collection_name != other.build_id:
            raise ValueError(
                f"{self.build_id} has cold collection {self.cold_collection_name}, "
                f"while the reader to be attached is for {other.build_id}"
            )
        self.cold_src_build_reader = other 
    @property
    def build_stats(self) -> dict:
        meta = self.src_build_doc.get("_meta", {})
        return meta.get("stats", {})
    def _get_datasource_stats(self) -> dict:
        return self.src_build_doc.get("merge_stats", {})
    def _get_datasource_versions(self) -> dict:
        meta = self.src_build_doc.get("_meta", {})
        # previous version format
        if "src_version" in meta:
            return meta["src_version"]
        # current version format
        src = meta.get("src", {})
        src_version = {src_name: src_info["version"] for src_name, src_info in src.items() if "version" in src_info}
        return src_version
    def _get_datasource_mapping(self) -> dict:
        return self.src_build_doc.get("mapping", {})
    @property
    def datasource_stats(self) -> dict:
        if not self.has_cold_collection():
            return self._get_datasource_stats()
        combined_stats = {
            **self._get_datasource_stats(),
            **self.cold_src_build_reader._get_datasource_stats(),
        }
        return combined_stats
    @property
    def datasource_versions(self) -> dict:
        if not self.has_cold_collection():
            return self._get_datasource_versions()
        combined_versions = {
            **self._get_datasource_versions(),
            **self.cold_src_build_reader._get_datasource_versions(),
        }
        return combined_versions
    @property
    def datasource_mapping(self) -> dict:
        if not self.has_cold_collection():
            return self._get_datasource_mapping()
        combined_mapping = {
            **self._get_datasource_mapping(),
            **self.cold_src_build_reader._get_datasource_mapping(),
        }
        return combined_mapping 
[docs]
class ReleaseNoteSrcBuildReaderAdapter:
    def __init__(self, src_build_reader: ReleaseNoteSrcBuildReader):
        self.src_build_reader = src_build_reader
    @classmethod
    def _get_datasource_fullname_stats(cls, datasource_stats) -> dict:
        """
        Receive a stat dictionary of <datasource_name>:<doc_count>, fetch the full datasource name, and
        return a new stat dictionary.
        If a full datasource name is two-tier (e.g. "gnomad.gnomad_exomes_hg19", as the full datasource name of
        "gnomad_exomes_hg19"), the returned dictionary is formed as:
            { <main_datasource_name> : { <sub_datasource_name> : { "_count" : <doc_count> } } }
        e.g.
            { "gnomad" : { "gnomad_exomes_hg19" : { "_count" : 12345678 } } }
        If a full datasource name is one-tier:
            CASE 1: the full datasource name is identical to the input datasource name,
                e.g. "cosmic" is the full name of "cosmic";
            CASE 2: the full datasource is None,
                e.g. when the input datasource name is "observed" or "total" in MyVariant, or "total_*" in MyGene
                In this case, the input datasource name is not a merge stat from a source but a custom field stat.
        the returned stats dictionary has the following structure:
            { <datasource_name> : { "_count" : <doc_count> } }
        e.g.
            { "cosmic" : { "_count" : 12345678 } }
            { "total" : { "_count" : 12345678 } }
        """
        result = {}
        for datasource_name, doc_count in datasource_stats.items():
            datasource_fullname = get_source_fullname(datasource_name)
            if (datasource_fullname is None) or (datasource_fullname == datasource_name):
                # one-tier fullname
                result[datasource_name] = {"_count": doc_count}
            else:
                # two-tier fullname
                main_name, sub_name = datasource_fullname.split(".")
                result.setdefault(main_name, {})
                result[main_name][sub_name] = {"_count": doc_count}
        return result
    @classmethod
    def _expand_datasource_versions(cls, datasource_versions) -> dict:
        """
        Receive a version dictionary of <datasource_name>:<version> (e.g. {"dbsnp" : "155"}), rewrite it to
            { <datasource_name> : {"_version" : <version>} }
        """
        return dict((k, {"_version": v}) for k, v in datasource_versions.items())
    @property
    def datasource_info(self):
        datasource_versions = self._expand_datasource_versions(self.src_build_reader.datasource_versions)
        datasource_stats = self._get_datasource_fullname_stats(self.src_build_reader.datasource_stats)
        datasource_info = update_dict_recur(datasource_versions, datasource_stats)
        return datasource_info
    @property
    def build_stats(self):
        # TODO this is the original logic, however I don't think it's necessary to apply get_source_fullname() to
        #  `build_stats.keys()`. E.g. in MyVariant, the `build_stats` keys are "total", "vcf", "hg19", and "observed",
        #  none of which has a two-tier full name. So the only effect of _get_datasource_fullname_stats() is to add a
        #  "_count" key to each of them.
        return self._get_datasource_fullname_stats(self.src_build_reader.build_stats) 
[docs]
class ReleaseNoteSource:
    def __init__(
        self,
        old_src_build_reader: ReleaseNoteSrcBuildReader,
        new_src_build_reader: ReleaseNoteSrcBuildReader,
        diff_stats_from_metadata_file: dict,
        addon_note: str,
    ):
        self.old_src_build_reader = old_src_build_reader
        self.new_src_build_reader = new_src_build_reader
        self.old_src_build_reader_adapter = ReleaseNoteSrcBuildReaderAdapter(self.old_src_build_reader)
        self.new_src_build_reader_adapter = ReleaseNoteSrcBuildReaderAdapter(self.new_src_build_reader)
        self.diff_stats_from_metadata_file = diff_stats_from_metadata_file
        self.addon_note = addon_note
    @classmethod
    def _make_stats_diff(cls, old: dict, new: dict):
        result = {
            "added": {},
            "deleted": {},
            "updated": {},
        }
        diff = make_json_diff(old, new)
        for item in diff:
            # get main source / main field
            key = item["path"].strip("/").split("/")[0]
            if item["op"] == "add":
                result["added"][key] = new[key]
            elif item["op"] == "remove":
                result["deleted"][key] = old[key]
            elif item["op"] == "replace":
                result["updated"][key] = {"new": new[key], "old": old[key]}
            else:
                raise ValueError("Unknown operation '%s' while computing changes" % item["op"])
        return result
    @classmethod
    def _make_mapping_diff(cls, old: dict, new: dict):
        def mapping_path_to_field_name(path: str) -> str:
            """
            Convert a JSON-Pointer path in a mapping json to a field name.
                E.g. path "/dbnsfp/properties/altai_neandertal" => field name "dbnsfp.altai_neandertal".
            Note that "properties" should not be included as part of a field name.
            The strategy here is iterate over the path components and remove any "properties" found at odd
            indices (1, 3, 5...).
            """
            path_components = path.strip("/").split("/")
            path_components = [
                path_components[i]
                for i in range(len(path_components))
                if (i % 2 == 0) or (i % 2 == 1 and path_components[i] != "properties")
            ]
            return ".".join(path_components)
        fields = {}
        diff = make_json_diff(old, new)
        for item in diff:
            if item["op"] in ("add", "remove", "replace"):
                field_name = mapping_path_to_field_name(item["path"])
                fields.setdefault(item["op"], []).append(field_name)
            elif item["op"] == "move":
                add_field_name = mapping_path_to_field_name(item["path"])
                remove_field_name = mapping_path_to_field_name(item["from"])
                fields.setdefault("add", []).append(add_field_name)
                fields.setdefault("remove", []).append(remove_field_name)
            else:
                raise ValueError("Unknown operation '%s' while computing changes" % item["op"])
        return fields
[docs]
    def diff_build_stats(self) -> dict:
        # Read from the reader adapters, not the readers directly
        old_stats = self.old_src_build_reader_adapter.build_stats
        new_stats = self.new_src_build_reader_adapter.build_stats
        return self._make_stats_diff(old_stats, new_stats) 
[docs]
    def diff_datasource_info(self) -> dict:
        old_info = self.old_src_build_reader_adapter.datasource_info
        new_info = self.new_src_build_reader_adapter.datasource_info
        return self._make_stats_diff(old_info, new_info) 
[docs]
    def diff_datasource_mapping(self) -> dict:
        new_mapping = self.new_src_build_reader.datasource_mapping
        if not new_mapping:
            raise ValueError(f"New Mapping cannot be empty. Build id: {self.new_src_build_reader.build_id}")
        old_mapping = self.old_src_build_reader.datasource_mapping
        return self._make_mapping_diff(old_mapping, new_mapping) 
[docs]
    def to_dict(self) -> dict:
        result = {
            "old": {
                "_version": self.old_src_build_reader.build_version,
                "_count": self.old_src_build_reader.build_stats.get("total"),
            },
            "new": {
                "_version": self.new_src_build_reader.build_version,
                "_count": self.new_src_build_reader.build_stats.get("total"),
                "_fields": self.diff_datasource_mapping(),
                "_summary": self.diff_stats_from_metadata_file,
            },
            "stats": self.diff_build_stats(),
            "sources": self.diff_datasource_info(),
            "note": self.addon_note,
            "generated_on": str(datetime.now().astimezone()),
        }
        return result 
 
[docs]
class ReleaseNoteTxt(object):
    def __init__(self, source: ReleaseNoteSource):
        self.source = source  # member kept for debugging
        self.changes = source.to_dict()
    @classmethod
    def _format_number(cls, num, sign=None):
        try:
            sign_symbol = ""
            if sign:
                if num > 0:
                    sign_symbol = "+"
                elif num < 0:
                    sign_symbol = "-"
            num_str = locale.format_string("%d", abs(num), grouping=True)
            return "%s%s" % (sign_symbol, num_str)
        except TypeError:
            # something wrong with converting, maybe we don't even have a number to format...
            return "N.A"
[docs]
    def save(self, filepath):
        try:
            import prettytable
        except ImportError:
            raise ImportError("Please install prettytable to use this rendered")
        txt = ""
        title = "Build version: '%s'" % self.changes["new"]["_version"]
        txt += title + "\n"
        txt += "".join(["="] * len(title)) + "\n"
        dt = dtparse(self.changes["generated_on"])
        txt += "Previous build version: '%s'\n" % self.changes["old"]["_version"]
        txt += "Generated on: %s\n" % dt.strftime("%Y-%m-%d at %H:%M:%S")
        txt += "\n"
        table = prettytable.PrettyTable(
            ["Updated datasource", "prev. release", "new release", "prev. # of docs", "new # of docs"]
        )
        table.align["Updated datasource"] = "l"
        table.align["prev. release"] = "c"
        table.align["new release"] = "c"
        table.align["prev. # of docs"] = "r"
        table.align["new # of docs"] = "r"
        for src, info in sorted(self.changes["sources"]["added"].items(), key=lambda e: e[0]):
            main_info = dict([(k, v) for k, v in info.items() if k.startswith("_")])
            sub_infos = dict([(k, v) for k, v in info.items() if not k.startswith("_")])
            if sub_infos:
                for sub, sub_info in sub_infos.items():
                    table.add_row(
                        ["%s.%s" % (src, sub), "-", main_info["_version"], "-", self._format_number(sub_info["_count"])]
                    )  # only _count avail there
            else:
                main_count = main_info.get("_count") and self._format_number(main_info["_count"]) or ""
                table.add_row([src, "-", main_info.get("_version", ""), "-", main_count])
        for src, info in sorted(self.changes["sources"]["deleted"].items(), key=lambda e: e[0]):
            main_info = dict([(k, v) for k, v in info.items() if k.startswith("_")])
            sub_infos = dict([(k, v) for k, v in info.items() if not k.startswith("_")])
            if sub_infos:
                for sub, sub_info in sub_infos.items():
                    table.add_row(
                        [
                            "%s.%s" % (src, sub),
                            main_info.get("_version", ""),
                            "-",
                            self._format_number(sub_info["_count"]),
                            "-",
                        ]
                    )  # only _count avail there
            else:
                main_count = main_info.get("_count") and self._format_number(main_info["_count"]) or ""
                table.add_row([src, main_info.get("_version", ""), "-", main_count, "-"])
        for src, info in sorted(self.changes["sources"]["updated"].items(), key=lambda e: e[0]):
            # extract information from main-source
            old_main_info = dict([(k, v) for k, v in info["old"].items() if k.startswith("_")])
            new_main_info = dict([(k, v) for k, v in info["new"].items() if k.startswith("_")])
            old_main_count = old_main_info.get("_count") and self._format_number(old_main_info["_count"]) or None
            new_main_count = new_main_info.get("_count") and self._format_number(new_main_info["_count"]) or None
            if old_main_count is None:
                assert new_main_count is None, (
                    "Sub-sources found for '%s', old and new count should " % src + "both be None. Info was: %s" % info
                )
                old_sub_infos = dict([(k, v) for k, v in info["old"].items() if not k.startswith("_")])
                new_sub_infos = dict([(k, v) for k, v in info["new"].items() if not k.startswith("_")])
                # old & new sub_infos should have the same structure (same existing keys)
                # so we just use one of them to explore
                if old_sub_infos:
                    assert new_sub_infos
                    for sub, sub_info in old_sub_infos.items():
                        table.add_row(
                            [
                                "%s.%s" % (src, sub),
                                old_main_info.get("_version", ""),
                                new_main_info.get("_version", ""),
                                self._format_number(sub_info["_count"]),
                                self._format_number(new_sub_infos[sub]["_count"]),
                            ]
                        )
            else:
                assert new_main_count is not None, (
                    "No sub-sources found, old and new count should NOT " + "both be None. Info was: %s" % info
                )
                table.add_row(
                    [
                        src,
                        old_main_info.get("_version", ""),
                        new_main_info.get("_version", ""),
                        old_main_count,
                        new_main_count,
                    ]
                )
        if table._rows:
            txt += table.get_string()
            txt += "\n"
        else:
            txt += "No datasource changed.\n"
        total_count = self.changes["new"].get("_count")
        if self.changes["sources"]["added"]:
            txt += "New datasource(s): %s\n" % ", ".join(sorted(list(self.changes["sources"]["added"])))
        if self.changes["sources"]["deleted"]:
            txt += "Deleted datasource(s): %s\n" % ", ".join(sorted(list(self.changes["sources"]["deleted"])))
        if self.changes["sources"]:
            txt += "\n"
        table = prettytable.PrettyTable(["Updated stats.", "previous", "new"])
        table.align["Updated stats."] = "l"
        table.align["previous"] = "r"
        table.align["new"] = "r"
        for stat_name, stat in sorted(self.changes["stats"]["added"].items(), key=lambda e: e[0]):
            table.add_row([stat_name, "-", self._format_number(stat["_count"])])
        for stat_name, stat in sorted(self.changes["stats"]["deleted"].items(), key=lambda e: e[0]):
            table.add_row([stat_name, self._format_number(stat["_count"]), "-"])
        for stat_name, stat in sorted(self.changes["stats"]["updated"].items(), key=lambda e: e[0]):
            table.add_row(
                [
                    stat_name,
                    self._format_number(stat["old"]["_count"]),
                    self._format_number(stat["new"]["_count"]),
                ]
            )
        if table._rows:
            txt += table.get_string()
            txt += "\n\n"
        if self.changes["new"]["_fields"]:
            new_fields = sorted(self.changes["new"]["_fields"].get("add", []))
            deleted_fields = sorted(self.changes["new"]["_fields"].get("remove", []))
            updated_fields = sorted(self.changes["new"]["_fields"].get("replace", []))
            if new_fields:
                txt += "New field(s): %s\n" % ", ".join(new_fields)
            if deleted_fields:
                txt += "Deleted field(s): %s\n" % ", ".join(deleted_fields)
            if updated_fields:
                txt += "Updated field(s): %s\n" % ", ".join(updated_fields)
            txt += "\n"
        if total_count is not None:
            txt += "Overall, %s documents in this release\n" % (self._format_number(total_count))
        if self.changes["new"]["_summary"]:
            sumups = []
            sumups.append("%s document(s) added" % self._format_number(self.changes["new"]["_summary"].get("add", 0)))
            sumups.append(
                "%s document(s) deleted" % self._format_number(self.changes["new"]["_summary"].get("delete", 0))
            )
            sumups.append(
                "%s document(s) updated" % self._format_number(self.changes["new"]["_summary"].get("update", 0))
            )
            txt += ", ".join(sumups) + "\n"
        else:
            txt += "No information available for added/deleted/updated documents\n"
        if self.changes.get("note"):
            txt += "\n"
            txt += "Note: %s\n" % self.changes["note"]
        with open(filepath, "w") as fout:
            fout.write(txt)
        return txt