diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 354549f..b87c200 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,12 +2,12 @@ # See https://pre-commit.com/hooks.html for more hooks %YAML 1.2 --- +exclude: "site/.*" default_install_hook_types: [pre-commit, commit-msg] default_stages: [pre-commit] -exclude: 'site/.*' repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v3.2.0 + rev: v6.0.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -18,12 +18,12 @@ repos: - id: check-toml - id: debug-statements - repo: https://github.com/asottile/add-trailing-comma - rev: v3.1.0 + rev: v4.0.0 hooks: - id: add-trailing-comma - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.11.6 + rev: v0.14.4 hooks: # Run the linter. - id: ruff @@ -33,7 +33,7 @@ repos: types_or: [python, pyi] # options: ignore one line things [E701] - repo: https://github.com/adrienverge/yamllint - rev: v1.35.1 + rev: v1.37.1 hooks: - id: yamllint name: yamllint @@ -41,14 +41,12 @@ repos: entry: yamllint language: python types: [file, yaml] - args: ['-d', "{\ - extends: default,\ - rules: {\ - colons: { max-spaces-after: -1 }\ - }\ - }"] + args: [ + '-d', + "{ extends: default, rules: { colons: { max-spaces-after: -1 } } }", + ] - repo: https://github.com/rhysd/actionlint - rev: v1.7.4 + rev: v1.7.8 hooks: - id: actionlint name: Lint GitHub Actions workflow files @@ -69,18 +67,9 @@ repos: args: [--staged, -c, "general.ignore=B6,T3", --msg-filename] stages: [commit-msg] - repo: https://github.com/crate-ci/typos - rev: v1.28.2 + rev: v1 hooks: - id: typos - - repo: https://github.com/markdownlint/markdownlint - rev: v0.13.0 - hooks: - - id: markdownlint - name: Markdownlint - description: Run markdownlint on your Markdown files - entry: mdl --style .markdown.rb - language: ruby - files: \.(md|mdown|markdown)$ - repo: https://github.com/Yelp/detect-secrets rev: v1.5.0 hooks: @@ -89,3 +78,15 @@ repos: language: python entry: detect-secrets-hook args: [''] + - repo: https://github.com/rvben/rumdl-pre-commit + rev: v0.0.173 # Use the latest release tag + hooks: + - id: rumdl + # To only check (default): + # args: [] + # To automatically fix issues: + # args: [--fix] + - repo: https://github.com/RobertCraigie/pyright-python + rev: v1.1.407 # pin a tag; latest as of 2025-10-01 + hooks: + - id: pyright diff --git a/ROADMAP.md b/ROADMAP.md new file mode 100644 index 0000000..23c5224 --- /dev/null +++ b/ROADMAP.md @@ -0,0 +1,37 @@ +# Roadmap + +- [ ] BLOCKING: we need to make decision about versions/diff and merge + +- [ ] We need to manually close the agent. (new branch)!!! + +- [ ] Diff arbol: + - [ ] No imprimir pretty, solo indicar diferencias + - [ ] Manera de "ignorar rutas" + - [ ] Descargar Plantillas + +- [ ] Errores: + - [ ] `get_project_config` esconde errores? + +- [ ] Refactor to V1.1 + - [ ] Auditoria de version: rango de tiempo + - [ ] Reorgnaizer Internos + - service gh + - gh-name + - name + - audit name + - algo FACÍL usar + - [ ] Typer +- [ ] Imprimir: + - [ ] sitio web + - [ ] es espejo? Cómo está "upstream" +- [ ] Imprimir *un* repo o grupo, no solo -a +- [ ] Mostrar info de ramas + +- [ ] Esconder archivo/privado +- [ ] Mejorar filtrar +- [ ] Imprimir enlaces/activación para funciones (discussión) +- [ ] Issues/PRs stats + +- [ ] revisar locales + +- [ ] Seguridad, codeowners, permisos acciones diff --git a/src/integration_test/test.sh b/integration_test/test.es.sh similarity index 78% rename from src/integration_test/test.sh rename to integration_test/test.es.sh index a571973..0f5a32a 100644 --- a/src/integration_test/test.sh +++ b/integration_test/test.es.sh @@ -10,6 +10,7 @@ REPO="$1" total=0 success=0 fail=0 +failed_cmds=() print_and_run() { echo -e "\n==============================" @@ -19,7 +20,8 @@ print_and_run() { ((success++)) else ((fail++)) - echo -e "\033[0;31m❌ Falló el commando:\033[0m" + failed_cmds+=("$*") + echo -e "\033[0;31m❌ Falló el comando:\033[0m" echo -e "\033[0;31m$*\033[0m" fi } @@ -33,10 +35,8 @@ run_basic_commands() { local repo_cmds=( "tags -r $REPO" - "project-configs -r $REPO" "releases -r $REPO" "pypi -r $REPO" - "audit-releases -r $REPO" "audit-repo -r $REPO" "audit-versions -r $REPO" ) @@ -55,10 +55,8 @@ run_pretty_commands() { local repo_cmds=( "tags -r $REPO" - "project-configs -r $REPO" "releases -r $REPO" "pypi -r $REPO" - "audit-releases -r $REPO" "audit-repo -r $REPO" "audit-versions -r $REPO" ) @@ -66,6 +64,8 @@ run_pretty_commands() { for cmd in "${repo_cmds[@]}"; do print_and_run "$base_cmd $cmd" done + + print_and_run "$base_cmd audit-versions -r $REPO -c 3" } run_pretty_json_commands() { @@ -77,10 +77,8 @@ run_pretty_json_commands() { local repo_cmds=( "tags -r $REPO" - "project-configs -r $REPO" "releases -r $REPO" "pypi -r $REPO" - "audit-releases -r $REPO" "audit-repo -r $REPO" "audit-versions -r $REPO" ) @@ -96,6 +94,13 @@ run_pretty_json_commands echo -e "\n==============================" echo -e "Resumen de ejecución:" -printf "Total de commandos : %d\n" "$total" -printf "Commandos exitosos : \033[0;32m%d\033[0m\n" "$success" -printf "Commandos fallidos : \033[0;31m%d\033[0m\n" "$fail" +printf "Total de comandos : %d\n" "$total" +printf "Comandos exitosos : \033[0;32m%d\033[0m\n" "$success" +printf "Comandos fallidos : \033[0;31m%d\033[0m\n" "$fail" + +if (( fail > 0 )); then + echo -e "\n\033[0;31m❌ Lista de comandos fallidos:\033[0m" + for cmd in "${failed_cmds[@]}"; do + echo -e "\033[0;31m$cmd\033[0m" + done +fi diff --git a/pyproject.toml b/pyproject.toml index 8292b15..9b16c20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,3 +101,6 @@ help = "Run test by test, slowly, quitting after first error" [tool.poe.tasks.filter-test] cmd = "pytest -W error -vvvx -rA" help = "Run any/all tests one by one with basic settings: can include filename and -k filters" + +[tool.typos] +files.extend-exclude = ["*.es.*"] diff --git a/src/github_helper/_adapters/api_to_cli.py b/src/github_helper/_adapters/api_to_cli.py index 4fc4db9..9ef2f29 100644 --- a/src/github_helper/_adapters/api_to_cli.py +++ b/src/github_helper/_adapters/api_to_cli.py @@ -14,10 +14,8 @@ class GHAdapter: def _check_options(self, formatters): formats = {k for k, v in formatters.items() if v} invalid_formats = {"html", "url"} & formats - if ( - (self._command == "auth-status" and formats) - or (self._command != "repos" and invalid_formats) - or (self._command == "audit-releases" and self._json) + if (self._command == "auth-status" and formats) or ( + self._command != "repos" and invalid_formats ): raise NotImplementedError( f"{', '.join(invalid_formats)} not valid flags for {self._command}", @@ -123,8 +121,8 @@ async def transform_releases_data(self, releases_data): return to_table.format_table( [ [ - release["tag"], - delim.join(release["files"]), + release.tag, + delim.join(release.files), ] for release in releases_data ], @@ -139,11 +137,10 @@ async def transform_pypi_data(self, releases_data): return to_table.format_table( [ [ - f"{name}-{release['tag']}", - delim.join(release["files"]), + f"{release.tag}", + delim.join(release.files), ] - for name, subobject in releases_data.items() - for release in subobject + for release in releases_data ], pretty=self._pretty, headers=("version", "files"), diff --git a/src/github_helper/_adapters/to_html/_components.py b/src/github_helper/_adapters/to_html/_components.py index 468e518..0c2b676 100644 --- a/src/github_helper/_adapters/to_html/_components.py +++ b/src/github_helper/_adapters/to_html/_components.py @@ -1,3 +1,5 @@ +from collections.abc import Iterable + import logistro from htmy import Component, Renderer, core, html @@ -31,8 +33,8 @@ def modal( async def render_page( - heads: core.BaseTag, - content: core.BaseTag, + heads: Iterable[core.BaseTag], + content: Iterable[core.BaseTag], ): tailwindcss_cdn = "https://cdn.tailwindcss.com" _logger.debug("Rendering.") diff --git a/src/github_helper/_cli.py b/src/github_helper/_cli.py index 1834d22..babda17 100644 --- a/src/github_helper/_cli.py +++ b/src/github_helper/_cli.py @@ -119,6 +119,12 @@ def _get_cli_args(): help="Name of repository required.", required=True, ) + configs_parser.add_argument( + "-f", + "--filename", + help="Name of the file required.", + required=True, + ) releases_parser = subparsers.add_parser( "releases", @@ -150,36 +156,6 @@ def _get_cli_args(): help="Use testing pypi, not regular.", ) - pypi_audit_parser = subparsers.add_parser( - "audit-pypi", - description="Show all pypi packages from a repo with comments.", - help="Return all pypi releases of a repo.", - ) - pypi_audit_parser.add_argument( - "-r", - "--repo", - help="Name of repository required.", - required=True, - ) - pypi_audit_parser.add_argument( - "-t", - "--testing", - action="store_true", - help="Use testing pypi, not regular.", - ) - - releases_audit_parser = subparsers.add_parser( - "audit-releases", - description="Show all releases from a repo with comments.", - help="Return all releases of a repo with comments.", - ) - releases_audit_parser.add_argument( - "-r", - "--repo", - help="Name of repository required.", - required=True, - ) - audit_repo = subparsers.add_parser( "audit-repo", description="", @@ -203,6 +179,25 @@ def _get_cli_args(): required=True, ) + group = audit_versions.add_mutually_exclusive_group() + + group.add_argument( + "-c", + "--count", + action="store", + type=int, + default=15, + help="How many versions to audit", + ) + group.add_argument( + "-v", + "--version", + action="store", + type=str, + default=None, + help="Audit a specific version in more detail", + ) + basic_args = parser.parse_args() return parser, vars(basic_args) @@ -227,8 +222,11 @@ def run_cli(): async def _run_cli_async(): # noqa: C901, PLR0912, PLR0915 complex parser, cli_args = _get_cli_args() repo = cli_args.pop("repo", None) + filename = cli_args.pop("filename", None) paginate = cli_args.pop("all", False) # Internamente en gh api es un --paginate command = cli_args.pop("command", None) + count = cli_args.pop("count", None) + version = cli_args.pop("version", None) cli_args.pop("log") cli_args.pop("human") testing = cli_args.pop("testing", False) @@ -255,7 +253,7 @@ async def _run_cli_async(): # noqa: C901, PLR0912, PLR0915 complex data, sadness = await gh.get_remote_tags(repo) data = await adpt.transform_tags_data(data) case "project-configs": - data, sadness = await gh.get_project_configs(repo) + data, sadness = await gh.get_project_configs(repo, filename) data = await adpt.transform_project_configs_data(data) case "releases": data, sadness = await gh.get_releases(repo) @@ -263,17 +261,15 @@ async def _run_cli_async(): # noqa: C901, PLR0912, PLR0915 complex case "pypi": data, sadness = await gh.get_pypi(repo, testing=testing) data = await adpt.transform_pypi_data(data) - case "audit-pypi": - data, sadness = await gh.audit_pypi(repo, testing=testing) - data = await adpt.transform_audit_releases_data(data) - case "audit-releases": - data, sadness = await gh.audit_releases(repo) - data = await adpt.transform_audit_releases_data(data) case "audit-repo": data, sadness = await gh.audit_rulesets(repo) data = await adpt.transform_audit_rulesets_data(data) case "audit-versions": - data, sadness = await gh.audit_versions(repo) + data, sadness = await gh.audit_versions( + repo, + count=count, + only_version=version, + ) data = await adpt.transform_audit_versions_data(data) case _: print("No command supplied.", file=sys.stderr) diff --git a/src/github_helper/api/__init__.py b/src/github_helper/api/__init__.py index 8b521be..6ea66e3 100644 --- a/src/github_helper/api/__init__.py +++ b/src/github_helper/api/__init__.py @@ -1,27 +1,53 @@ """A CLI dashboard for github status.""" import asyncio +import itertools import re +import sys import tomllib import warnings +from dataclasses import dataclass from pathlib import Path +from typing import Any, TypedDict, TypeVar import aiohttp import jq # type: ignore [import-not-found] import logistro import orjson +from colored import Fore, Style from github_helper._services import gh as srv from github_helper._services import repos as repo_srv from github_helper._services import ssh_srv from github_helper._services.gh import GHError, ScopesError, ScopesWarning from github_helper._utils import load_json -from github_helper.api import _audit, _compare_versions +from github_helper.api import _audit, versions + +from ._pkg_audit import ReleaseAudit _logger = logistro.getLogger(__name__) _SCRIPT_DIR = Path(__file__).resolve().parent _TEMPLATE_PATH = _SCRIPT_DIR / "templates" +# could just put this in CLI and be done with it +# could also force +if not sys.stdout.isatty(): + + class _NoColor: + def __getattr__(self, name): + return "" + + # Override colored's foreground, background, and style + Fore = Style = _NoColor() # type: ignore[misc, assignment] + + +async def _noop(tup=0, ret=None): + ### Allows us to fake async noops + if not tup: + return ret + else: + return (ret,) * tup + _check_ran = False @@ -50,6 +76,11 @@ def _log_one_json(obj): _logger.debug2(f"gh result:\n {raw!s}") +_T = TypeVar("_T") +RetVal = tuple[_T, int] +"""Return type for api's that can exit the program.""" + + class GHApi: """Provides access to status functions ontop of gh program.""" @@ -285,24 +316,27 @@ async def query_version(repo): sadness = int(not repos) return repos, sadness - async def get_remote_tags(self, repo): - """Return tags for a repo.""" - _ = await self.get_user() - tags_jq = jq.compile("map({tag: .name})") - owner, repo = self._split_full_name(full_name=repo) - endpoint = f"repos/{owner}/{repo}/tags" - _logger.debug(f"Calling API: {endpoint}") - retval, out, err = await srv.gh_api(endpoint) - srv.check_retval(retval, err, endpoint=endpoint) - tags = tags_jq.input_value(orjson.loads(out)).first() - sadness = int(not tags) - return tags, sadness + # this should take a name TODO (or several) + class ConfigDescription(TypedDict): + """Description of a config.""" + + object: Any + _original: str + + ConfigSet = dict[str, ConfigDescription] - async def get_project_configs(self, repo): + async def get_project_configs( + self, + repo: str, + *filenames: str, + ref: str | None = None, + ) -> RetVal[ConfigSet]: """Find all projects in a repo.""" + if not filenames: + raise ValueError("A least one filename must be supplied.") owner, repo = self._split_full_name(full_name=repo) folder_repos = repo_srv.RepoFolder() - projects = {} + projects: dict = {} _logger.debug(f"Downloading repo {owner}/{repo}") r = await folder_repos.add_repo( @@ -310,91 +344,129 @@ async def get_project_configs(self, repo): repo, url="ssh://git@github.com", ) - ref = "main" if "main" in await r.list_branches() else "master" - py_files = await r.get_files_by_name("pyproject.toml", ref=ref) - js_files = await r.get_files_by_name("package.json", ref=ref) - if py_files: - projects["py"] = {} - for f in py_files: - _logger.debug2(f"Found py: {f['path']}") - projects["py"][f["path"]] = { - "object": tomllib.loads(f["content"].decode()), - "_original": f["content"].decode(), - } - if js_files: - projects["js"] = {} - for f in js_files: - _logger.debug2(f"Found js: {f['path']}") + if not ref: + ref = "main" if "main" in await r.list_branches() else "master" + files: list[dict] = [] + for name in filenames: + files.extend(await r.get_files_by_name(name, ref=ref) or []) + configs: GHApi.ConfigSet = {} + for f in files: + obj = None + if f["path"].endswith(".json"): obj = orjson.loads(f["content"]) - projects["js"][f["path"]] = { - "object": obj, - "_original": f["content"].decode(), - } + elif f["path"].endswith(".toml"): + obj = tomllib.loads(f["content"].decode()) + configs[f["path"]] = { + "object": obj, + "_original": f["content"].decode(), + } sadness = int(not projects) - return projects, sadness - - async def get_pypi(self, repo, *, testing=False): - """Get all pypi releases for a particular project.""" - project_configs, sadness = await self.get_project_configs(repo) - project_names = set() - if "py" in project_configs: - for config in project_configs["py"].values(): - name = config.get("object", {}).get("project", {}).get("name", {}) - if name: - project_names.add(name) + return configs, sadness + + @dataclass(slots=True, kw_only=True) + class Tag: + """Return type for get_remote_tags.""" + + tag: str + """Tag""" + + version: versions.Version | None = None + """Calculated version.""" + + def __post_init__(self): + """Initialize derivative values.""" + self.version = versions.Version(self.tag) + + def tag_diff(self) -> bool: + """Check if our version tag correctly formatted.""" + # NOTE: this is biased towards python, semver is different! + return self.tag.removeprefix("v") != str(self.version).removeprefix("v") + + def __json__(self): + """Convert to json.""" + return { + "tag": self.tag, + "version": self.version.__json__() if self.version else None, + } + + async def get_remote_tags(self, repo, count=None) -> RetVal[list[Tag]]: + """Return tags ("tag":"name") for a repo.""" + _ = await self.get_user() + tags_jq = jq.compile("map({tag: .name})") + owner, repo = self._split_full_name(full_name=repo) + endpoint = f"repos/{owner}/{repo}/tags" + _logger.debug(f"Calling API: {endpoint}") + retval, out, err = await srv.gh_api(endpoint) + srv.check_retval(retval, err, endpoint=endpoint) + tags_dict = tags_jq.input_value(orjson.loads(out)).first() + tags = [GHApi.Tag(**tag) for tag in tags_dict] + sadness = int(not tags) + return tags[:count], sadness + + @dataclass(slots=True, kw_only=True) + class Release(Tag): + """Release object containing version and files.""" + + prerelease: bool + """Does the source mark it as prerelease?""" + files: list[str] + """Files that came with it.""" + audit: ReleaseAudit | None = None + + def __json__(self): + """Convert to json.""" + old = GHApi.Tag.__json__(self) + old.update( + { + "prerelease": self.prerelease, + "files": self.files, + "audit": self.audit.__json__() if self.audit else None, + }, + ) + return old + + async def get_pypi( + self, + project_name: str, + *, + testing: bool = False, + ) -> RetVal[list[Release]]: + """Get all pypi releases for ALL projects in a repo.""" prefix = "test." if testing else "" - async def fetch_json(name): - url = f"https://{prefix}pypi.org/pypi/{name}/json" - _logger.debug(url) - try: - # TODO: probably need to check that project exists first - jq_dir = ( - r".releases // {} | " - r"to_entries | map(" - r"{tag: .key, files:" - r"[ .value[] | select(.yank != true) | .filename ]" - r"})" - ) - pypi_jq = jq.compile(jq_dir) - session = aiohttp.ClientSession() - response = await session.get(url) - pypi_json = await response.json() - data = pypi_jq.input_value(pypi_json).first() - return _compare_versions.order_versions(data, "tag") - finally: - await response.release() - await session.close() - - releases = {} - for name in project_names: - releases[name] = await fetch_json(name) - sadness = int(not releases) - _logger.debug2(releases) - return releases, sadness - - async def audit_pypi(self, repo, count=7, *, testing=False): - """Get all pypi releases for a project and audit it.""" - releases, sadness = await self.get_pypi(repo, testing=testing) - if sadness: - return None, sadness - - releases = [release for project in releases.values() for release in project] - for release in releases: - release["audit"] = _compare_versions.ReleaseAudit( - release, - prerelease_respect=True, + url = f"https://{prefix}pypi.org/pypi/{project_name}/json" + _logger.debug(url) + try: + jq_dir = ( + r".releases // {} | " + r"to_entries | map(" + r"{tag: .key, files:" + r"[ .value[] | select(.yanked != true) | .filename ]" + r"})" ) + pypi_jq = jq.compile(jq_dir) + session = aiohttp.ClientSession() + response = await session.get(url) + pypi_json = await response.json() + if pypi_json.get("message", None) == "Not Found": + return [], 1 + releases = pypi_jq.input_value(pypi_json).first() + finally: + await response.release() + await session.close() - # I want count to be the API call or something - # but it has to be ordered first. - return ( - _compare_versions.order_versions(releases, "tag")[:count], - sadness, - ) + sadness = int(not releases) + coerced_releases: list[GHApi.Release] = [ + GHApi.Release( + **r, + prerelease=versions.Version(r["tag"]).is_prerelease, + ) + for r in releases + ] + return coerced_releases, sadness - async def get_releases(self, repo): + async def get_releases(self, repo: str) -> RetVal[list[Release]]: """Return releases for a repo.""" _ = await self.get_user() releases_jq = jq.compile( @@ -413,57 +485,115 @@ async def get_releases(self, repo): retval, out, err = await srv.gh_api(endpoint) srv.check_retval(retval, err, endpoint=endpoint) obj = orjson.loads(out) - _log_one_json(obj) releases = releases_jq.input_value(obj).first() sadness = int(not releases) - _logger.debug2(releases) - return releases, sadness - - async def audit_releases(self, repo, count=7): - """Run get_releases and process information.""" - releases, sadness = await self.get_releases(repo) - if sadness: - return None, sadness - - for release in releases: - release["audit"] = _compare_versions.ReleaseAudit(release) - - # I want count to be the API call or something - # but it has to be ordered first. - return ( - _compare_versions.order_versions(releases, "tag")[:count], - sadness, - ) - - async def audit_versions(self, repo): + coerced_releases: list[GHApi.Release] = [ + GHApi.Release( + **r, + ) + for r in releases + ] + return coerced_releases, sadness + + async def audit_versions( # noqa: C901 + self, + repo: str, + count: int = 20, + only_version: str | None = None, + ) -> RetVal[list[dict]]: """ Verify that version of a repository have differences. Args: repo: the name of the repo to verify. Can be "owner/repo" or just "repo" and owner is assumed to be the current user. + count: the number of versions to look at + only_version: deep dive on one version """ - tags, sadness = await self.get_remote_tags(repo) - releases, sadness = await self.get_releases(repo) + project_configs, sadness = await self.get_project_configs( + repo, + "pyproject.toml", + ) - filtered_tags = _compare_versions.filter_versions(tags, "tag") - filtered_releases = _compare_versions.filter_versions(releases, "tag") + project_names = [] + for path, config in project_configs.items(): + if not path.endswith("pyproject.toml"): + continue + name = config.get("object", {}).get("project", {}).get("name", {}) + if name: + project_names.append(name) + if len(project_names) > 1: + raise NotImplementedError("Repo has more than one project :-(") + + ( + (tags, _), + (release, _), + (pypi, pypi_sadness), + (test_pypi, test_pypi_sadness), + ) = await asyncio.gather( + self.get_remote_tags(repo), + self.get_releases(repo), + (self.get_pypi(project_names[0]) if project_names else _noop(2, [])), + ( + self.get_pypi(project_names[0], testing=True) + if project_names + else _noop(2, []) + ), + ) - versions = filtered_tags | filtered_releases + @dataclass(slots=True) + class VersionSet: + gh_tags: GHApi.Tag | None = None + gh_releases: GHApi.Release | None = None + pypi: GHApi.Release | None = None + test_pypi: GHApi.Release | None = None + + all_versions: dict[ + versions.Version, + VersionSet, + ] = {} + + for attrname, o in itertools.chain( + (("gh_tags", t) for t in tags), + (("gh_releases", r) for r in release), + (("pypi", r) for r in pypi), + (("test_pypi", r) for r in test_pypi), + [], + ): + v = getattr(o, "version", None) or versions.Version(o.tag) + if not v.valid: + continue + vset = all_versions.setdefault(v, VersionSet()) + if getattr(vset, attrname, None): + warnings.warn( + "Looks like conflicting poorly-written versions caused overwrite.", + stacklevel=2, + ) + if isinstance(o, GHApi.Release): + o.audit = ReleaseAudit(o) + setattr(vset, attrname, o) + + if only_version: + v = versions.Version(only_version) + r = all_versions.get(v) + if not r: + return [], 1 + all_versions = {v: r} + all_versions = dict(sorted(all_versions.items(), reverse=True)) + result = [ + { + "version": v.__json__(), + "gh_tags": r.gh_tags.__json__() if r.gh_tags else None, + "gh_releases": (r.gh_releases.__json__() if r.gh_releases else None), + "pypi": r.pypi.__json__() if r.pypi else None, + "test.pypi": (r.test_pypi.__json__() if r.test_pypi else None), + "validity": str(v.kind), + } + for v, r in list(all_versions.items())[:count] # count + ] - result = _compare_versions.order_versions( - [ - { - "version": v, - "tags": v in filtered_tags, - "releases": v in filtered_releases, - } - for v in versions - ], - "version", - ) - return result, sadness + return result, 0 # maybe no sadness for audit? async def _get_ruleset(self, owner, repo, ruleset_id): """Return releset for a user by Id.""" diff --git a/src/github_helper/api/_audit.py b/src/github_helper/api/_audit.py index 1392f10..d69a408 100644 --- a/src/github_helper/api/_audit.py +++ b/src/github_helper/api/_audit.py @@ -1,7 +1,7 @@ import fnmatch from pathlib import Path -import jsondiff as jd +import jsondiff as jd # type: ignore[import-untyped] from github_helper._services.gh import GHError from github_helper._utils import load_json diff --git a/src/github_helper/api/_compare_versions.py b/src/github_helper/api/_compare_versions.py deleted file mode 100644 index 5ce6643..0000000 --- a/src/github_helper/api/_compare_versions.py +++ /dev/null @@ -1,397 +0,0 @@ -""" -Github and Pypi versions can contain multiple files: here are tools. - -How do I improve parsing? - -1. If you want to process a new type of file: - a. Modify `_get_file_notes()` - b. Return a dictionary with a known action + needed key-value pairs - I. Possibly modify the existing action in `add_file()` to deal - with new information - II. Make sure `__str__` knows how to print that information. - c. Create a new action: - I. Add the action to `add_file()` - II. If it's going to add to `self.projects`, you either need to - - Create a new entry in `__str__()` to print it - - Modify the existing code (but you'd probably be doing b.) -2. If you want to update the OS Compatibility tree, to include new tags, - look at the bdist branch conditional in _build_python_summary - - -""" - -import copy -import re -import sys -from dataclasses import field - -import colored -import logistro -import semver -from packaging import utils, version - -_logger = logistro.getLogger(__name__) - -if not sys.stdout.isatty(): - - class NoColor: - def __getattr__(self, name): - return "" - - # Override colored's foreground, background, and style - colored.Fore = colored.Back = colored.Style = NoColor() - - -def order_versions(versions: list[dict], key: str): - if not versions: - return versions - return sorted( - versions, - key=lambda x: version.parse(x[key]), - reverse=True, - ) - - -def filter_versions(versions: list[dict], key: str): - _regex = re.compile( - r"^" + version.VERSION_PATTERN + r"$", - re.VERBOSE, - ) - return {v[key] for v in versions if _regex.match(v[key])} - - -bdist_template = { - "Windows": { - "x86_64": [], - "arm64": [], - "win32": [], - }, - "Mac": { - "x86_64": {}, - "arm64": {}, - "universal2": {}, - }, - "Linux": { - "x86_64": { - "Glibc": { - "2.17": [], - }, - "musl": {}, - }, - "aarch64": { - "Glibc": { - "2.17": [], - }, - "musl": {}, - }, - "armv7l": { - "Glibc": { - "2.17": [], - }, - "musl": {}, - }, - }, -} - - -class ReleaseAudit: - prerelease_agree: bool - projects: field(default_factory=dict) - file_notes: field(default_factory=dict[str, dict]) - unknown_files: field(default_factory=set) - ignore_counter: field(default_factory=dict[str, int]) - - def __init__(self, release, *, prerelease_respect=False): - self.tag = release["tag"] - self.version = self.explode_versions() - if not self.version: - return - self.prerelease_agree = ( - (self.version["is_prerelease"] == release["prerelease"]) - if "prerelease" in release - else prerelease_respect - ) - - self.file_notes = {} - self.unknown_files = set() - self.ignore_counter = {} - self.projects = {} - [self.summarize_file(file) for file in release["files"]] - - def explode_versions(self): - v = None - try: - v = version.Version(self.tag) - except version.InvalidVersion: - pass - try: - v = semver.Version.parse(self.tag) - except ValueError: - pass - if not v: - return None - ret = { - "major": v.major, - "minor": v.minor, - "patch": v.patch if hasattr(v, "patch") else v.micro, - "pre": v.prerelease if hasattr(v, "prerelease") else v.pre, - "dev": v.dev if hasattr(v, "dev") else None, - "post": v.post if hasattr(v, "post") else None, - } - ret["is_prerelease"] = ( - v.is_prerelease if hasattr(v, "is_prerelease") else bool(ret["pre"]) - ) - return ret - - def __str__(self): # noqa: C901, PLR0912 - ret = "" - if not self.prerelease_agree: - ret += f"{colored.Fore.red}PRERELEASE DISAGREEMENT{colored.Style.reset}\n" - if not self.projects: - ret += f"{colored.Fore.red}No Valid Projects Found.{colored.Style.reset}\n" - else: - for k, v in self.projects.items(): - if k.startswith("python/"): - ## look at bdist/sdist - warn = "" - if not v["bdist"]: - warn = "bdist" - if not v["sdist"]: - warn = ", sdist" if warn else "sdist" - warn = ( - f", {colored.Fore.red}missing {warn}{colored.Style.reset}" - if warn - else "" - ) - - pure = "" - if v["pure"]: - pure = ( - f", {colored.Fore.green}pure: {', '.join(v['pure'])}" - f"{colored.Style.reset}" - ) - - ret += f"{colored.Style.bold}{k}{colored.Style.reset}{warn}{pure}\n" - - ## look at tags - ret += ( - "\n".join(self._build_tree_str(v["bdist-tree"])) + "\n" - if v["bdist-tree"] - else "" - ) - - if v["unknown-tags"]: - ret += " Unknown Tags:\n " - ret += "\n ".join(v["unknown-tags"]) + "\n" - if self.unknown_files: - ret += ( - f"{colored.Fore.yellow}{colored.Style.bold}" - f"{len(self.unknown_files)} Unknown Files:" - f"{colored.Style.reset}\n" - ) - for i, f in enumerate(self.unknown_files): - if i > 3: # noqa: PLR2004 - ret += f" {colored.Fore.yellow}...{colored.Style.reset}\n" - break - ret += f" {colored.Fore.yellow}{f}{colored.Style.reset}\n" - if self.ignore_counter: - ret += "ignored: " - for k, v in self.ignore_counter.items(): - ret += f"{k} {v} time(s)" - ret += "\n" - return ret - - def summarize_file(self, filename: str): - notes = self._get_file_notes(filename) - match notes.get("action"): - case "ignore": - why = notes.get("type", "other") - self.ignore_counter[why] = self.ignore_counter.get(why, 0) + 1 - case "python-compat": - self._build_python_project_summary(notes) - case _: - self.unknown_files.add(filename) - return notes - - # the action you return will trigger behavior above - def _get_file_notes(self, filename: str): - if filename in (f"{self.tag}.zip", f"{self.tag}.tar.gz"): - return {"type": "gh-archive", "action": "ignore"} - elif filename.endswith("tar.gz"): - try: - name, version = utils.parse_sdist_filename(filename) - except utils.InvalidSdistFilename: - _logger.debug(f"Invalid Sdist Filename: {filename}") - else: - return { - "name": name, - "type": "sdist", - "language": "python", - "action": "python-compat", - } - elif filename.endswith(".whl"): - try: - name, version, build, compat_tags = utils.parse_wheel_filename(filename) - except utils.InvalidWheelFilename: - _logger.debug(f"Invalid Wheel Filename: {filename}") - return {"error": "invalid name", "value": filename} - else: - return { - "name": name, - "type": "bdist", - "language": "python", - "tags": compat_tags, - "action": "python-compat", - } - elif filename.endswith(("sigstore.json", ".sha256")): - return {"type": "metadata", "action": "ignore"} - return {"error": "unrecognized name", "value": filename} - - # So, refactor, python project should be it's own class - # And maybe should have its own adapters? - def _build_python_project_summary(self, notes): # noqa: PLR0912, C901 - name = f"python/{notes['name']}" - if name not in self.projects: - self.projects[name] = { - "sdist": False, - "bdist": False, - "pure": [], - "bdist-tree": None, - "unknown-tags": [], - } - ref = self.projects[name] - if notes.get("type") == "sdist": - ref["sdist"] = True - elif notes.get("type") == "bdist": - ref["bdist"] = True - for t in notes.get("tags"): - pair = f"{t.interpreter}-{t.abi}" - # t.abi, t.interpreter, t.platform - if t.platform == "any" and t.abi == "none": - ref["pure"].append(t.interpreter) - continue - - if not ref["bdist-tree"]: - ref["bdist-tree"] = copy.deepcopy(bdist_template) - - if t.platform == "any": - if "All OS" not in ref["bdist-tree"]: - ref["bdist-tree"]["All OS"] = [pair] - else: - ref["bdist-tree"]["All OS"].append(pair) - elif r := self._parse_mac_platform(t.platform): - arch_dict = ref["bdist-tree"]["Mac"][r["arch"]] - if r["version"] not in arch_dict: - arch_dict[r["version"]] = [pair] - else: - arch_dict[r["version"]].append(pair) - elif r := self._parse_win_platform(t.platform): - ref["bdist-tree"]["Windows"][r].append(pair) - elif (r := self._parse_manylinux_platform(t.platform)) or ( - r := self._parse_musllinux_platform(t.platform) - ): - if r["arch"] not in ref["bdist-tree"]["Linux"]: - ref["bdist-tree"]["Linux"][r["arch"]] = { - "Glibc": {"2.17": []}, - "musl": {}, - } - libc = r["libc"] - libc_dict = ref["bdist-tree"]["Linux"][r["arch"]][libc] - if r["version"] not in libc_dict: - libc_dict[r["version"]] = [pair] - else: - libc_dict[r["version"]].append(pair) - else: - ref["unknown-tags"].append(str(t)) - - def _parse_mac_platform(self, tag): - pattern = r"^macosx_(\d+)(?:_(\d+))?_(.+)$" - m = re.match(pattern, tag) - if not m: - return {} - - major = int(m.group(1)) - minor = int(m.group(2) or 0) # Default minor version to 0 if omitted - arch = m.group(3) - return {"version": f"{major!s}.{minor!s}", "arch": arch} - - def _parse_win_platform(self, tag: str): - tag = tag.lower() - if tag == "win32": - return "win32" - if tag.startswith("win_"): - arch = tag.split("_", 1)[1] - if arch == "amd64": - return "x86_64" - elif arch == "arm64": - return "arm64" - return False - - def _parse_manylinux_platform(self, tag: str): - if tag.startswith("manylinux_"): - # PEP 600 perennial tag: manylinux_X_Y_arch - m = re.match(r"^manylinux_([0-9]+)_([0-9]+)_(.+)$", tag) - if not m: - return {} - glibc_major = int(m.group(1)) - glibc_minor = int(m.group(2)) - arch = m.group(3) - return { - "version": f"{glibc_major}.{glibc_minor}", - "arch": arch, - "libc": "Glibc", - } - elif tag.startswith("manylinux"): - # Legacies: *1_x86_64, *2010_i686, *2014_x86_64, etc. - m = re.match(r"^manylinux(\d+)_(.+)$", tag) - if not m: - return {} - identifier = m.group(1) # e.g. "1", "2010", "2014" - arch = m.group(2) - # Map known legacy identifiers to glibc versions (optional) - version = None - if identifier == "1": - version = "2.5" - elif identifier == "2010": - version = "2.12" - elif identifier == "2014": - version = "2.17" - else: - return {} - return {"version": version, "arch": arch, "libc": "Glibc"} - else: - return {} - - def _parse_musllinux_platform(self, tag: str): - m = re.match(r"^musllinux_([0-9]+)_([0-9]+)_(.+)$", tag) - if not m: - return False - musl_major = int(m.group(1)) - musl_minor = int(m.group(2)) - arch = m.group(3) - return { - "version": f"{musl_major}.{musl_minor}", - "arch": arch, - "libc": "musl", - } - - def _build_tree_str(self, obj, indent="", *, is_last=True): - lines = [] - - if isinstance(obj, dict): - items = list(obj.items()) - for i, (key, value) in enumerate(items): - is_last = i == len(items) - 1 - branch = "`-- " if is_last else "|-- " - next_indent = indent + (" " if is_last else "| ") - lines.append(f"{indent}{branch}{key}") - if not value: - lines[-1] += f" {colored.Fore.red}missing{colored.Style.reset}" - elif isinstance(value, dict): - lines.extend(self._build_tree_str(value, next_indent)) - elif isinstance(value, (list, tuple)): - lines[-1] += ( - f" {colored.Fore.green}>> " - f"{', '.join(value)}{colored.Style.reset}" - ) - return lines diff --git a/src/github_helper/api/_pkg_audit/__init__.py b/src/github_helper/api/_pkg_audit/__init__.py new file mode 100644 index 0000000..d91c811 --- /dev/null +++ b/src/github_helper/api/_pkg_audit/__init__.py @@ -0,0 +1,81 @@ +"""Tools for auditing packages.""" + +import logistro + +from github_helper.api.versions import Version + +from ._py_audit import PythonAudit +from ._types import ReturnMessages + +_logger = logistro.getLogger(__name__) + + +class ReleaseAudit: + """ + Release audit turns a release object into a summary. + + The release audit goes through an audit, file by file, + and tries to sort them as well as looking to see if sub-processors + understand them. + """ + + tag: str + """The original tag.""" + unknown_files: set + """Files that couldn't be understood trying to calculate notes.""" + ignored_files: dict[str, int] + """Files that we don't care about .""" + prerelease_agree: bool + """Does the version agree with the mark about prerelease.""" + version: Version + """The parsed Version.""" + python_audit: PythonAudit + """A python audit.""" + + def __init__(self, release): + """Audit a release object.""" + self.tag = release.tag + self.unknown_files = set() + self.ignored_files = {} + self.version = Version(self.tag) + self.python_audit = PythonAudit(self.version) + + if not self.version.valid: # we should not audit not valid + self.prerelease_agree = None + return + + self.prerelease_agree = self.version.is_prerelease == release.prerelease + + for filename in release.files: + notes = self.process_file(filename) + + match notes.get("action"): + case "ignore": + why = notes.get("type", "other") + self.ignored_files[why] = self.ignored_files.get(why, 0) + 1 + case "resolved": + pass + case _: + self.unknown_files.add(filename) + + # the action you return will trigger behavior above + def process_file(self, filename: str) -> ReturnMessages: + if filename in (f"{self.tag}.zip", f"{self.tag}.tar.gz"): + return {"type": "gh-archive", "action": "ignore"} + elif filename.endswith(("sigstore.json", ".sha256")): + return {"type": "metadata", "action": "ignore"} + elif note := self.python_audit.check_file(filename): + return note + return {"error": "unrecognized name", "value": filename} + + def __json__(self): + return { + "tag": self.tag, + "version": self.version.__json__(), + "prerelease_agree": self.prerelease_agree, + "ignored_files": self.ignored_files, + "unknown_files": list(self.unknown_files), + "audits": { + "python": self.python_audit.__json__(), + }, + } diff --git a/src/github_helper/api/_pkg_audit/_py_audit.py b/src/github_helper/api/_pkg_audit/_py_audit.py new file mode 100644 index 0000000..302960c --- /dev/null +++ b/src/github_helper/api/_pkg_audit/_py_audit.py @@ -0,0 +1,232 @@ +import copy +import re +from dataclasses import dataclass, field +from typing import Literal + +import logistro +from packaging import tags, utils + +from github_helper.api.versions import Version + +from ._types import Audit, Error + +_logger = logistro.getLogger(__name__) + +bdist_template = { + "Windows": { + "x86_64": [], + "arm64": [], + "win32": [], + }, + "Mac": { + "x86_64": {}, + "arm64": {}, + "universal2": {}, + }, + "Linux": { + "x86_64": { + "Glibc": { + "2.17": [], + }, + "musl": {}, + }, + "aarch64": { + "Glibc": { + "2.17": [], + }, + "musl": {}, + }, + "armv7l": { + "Glibc": { + "2.17": [], + }, + "musl": {}, + }, + }, +} + + +class PythonAudit: + projects: dict[str, "Project"] + version: Version + + def __init__(self, v: Version) -> None: + self.projects: dict[str, Project] = {} + self.version = v + + def check_file(self, filename) -> Audit | Error | None: + if filename.endswith("tar.gz"): + try: + name, version = utils.parse_sdist_filename(filename) + except utils.InvalidSdistFilename: + _logger.debug(f"Invalid Sdist Filename: {filename}") + return None + else: + self.projects.setdefault(name, Project()) + self.projects[name].add_sdist(filename) + self.projects[name].version_weird = ( + self.version != version or self.projects[name].version_weird + ) + return {"name": name, "action": "resolved"} + elif filename.endswith(".whl"): + try: + ( + name, + version, + build, + tags, + ) = utils.parse_wheel_filename(filename) + except utils.InvalidWheelFilename: + _logger.debug(f"Invalid Wheel Filename: {filename}") + return {"error": "invalid name", "value": filename} + else: + self.projects.setdefault(name, Project()) + self.projects[name].add_bdist(filename, tags) + self.projects[name].version_weird = ( + self.version != version or self.projects[name].version_weird + ) + return {"name": name, "action": "resolved"} + return None + + def __json__(self): + return {k: v.__json__() for k, v in self.projects.items()} + + +@dataclass(slots=True, kw_only=True) +class Project: + sdist: bool = False + bdist: bool = False + version_weird: bool = False + pure_tags: set[tags.Tag] = field(default_factory=set) + all_tags: set[tags.Tag] = field(default_factory=set) + unknown_tags: set[tags.Tag] = field(default_factory=set) + files: set[str] = field(default_factory=set) + bdist_tree: dict | None = None + + def _tags_to_strlist(self, tags): + return [str(t) for t in tags] + + def __json__(self): + return { + "sdist": self.sdist, + "bdist": self.bdist, + "versions_match": not (self.version_weird), + "pure_tags": self._tags_to_strlist(self.pure_tags), + "unknown_tags": self._tags_to_strlist(self.unknown_tags), + "other_tags": self._tags_to_strlist( + self.all_tags - self.pure_tags - self.unknown_tags, + ), + "files": list(self.files), + "compat_tree": self.bdist_tree, + } + + def add_sdist(self, file: str): + self.sdist = True + self.files.add(file) + + def add_bdist(self, file: str, tags: set[tags.Tag] | frozenset[tags.Tag]) -> None: + self.bdist = True + self.files.add(file) + self.all_tags.update(tags) + for t in tags: + # t.interpreter, t.abi, t.platform + pair = f"{t.interpreter}-{t.abi}" + if t.abi == "none" and t.platform == "any": + self.pure_tags.add(t) + continue + if not self.bdist_tree: + self.bdist_tree = copy.deepcopy(bdist_template) + if t.platform == "any": + x = self.bdist_tree.setdefault("All OS", []) + x.append(pair) + elif mactag := self._parse_mac_platform(t.platform): + x = self.bdist_tree["Mac"][mactag.arch].setdefault("version", []) + x.append(pair) + elif arch := self._parse_win_platform(t.platform): + self.bdist_tree["Windows"][arch].append(pair) + elif linuxtag := self._parse_linux_platform(t.platform): + arch_default: dict = {"Glibc": {"2.17": []}, "musl": {}} + x = self.bdist_tree["Linux"].setdefault(linuxtag.arch, arch_default) + x.setdefault(linuxtag.version, []).append(pair) + else: + self.unknown_tags.add(t) + + @dataclass(slots=True, frozen=True) + class MacTag: + version: str + arch: str + + def _parse_mac_platform(self, tag) -> MacTag | None: + pattern = r"^macosx_(\d+)(?:_(\d+))?_(.+)$" + m = re.match(pattern, tag) + if not m: + return None + + major = int(m.group(1)) + minor = int(m.group(2) or 0) # Default minor version to 0 if omitted + arch = m.group(3) + return Project.MacTag(version=f"{major!s}.{minor!s}", arch=arch) + + def _parse_win_platform( + self, + tag: str, + ) -> Literal["win32", "x86_64", "arm64"] | None: + tag = tag.lower() + if tag == "win32": + return "win32" + if tag.startswith("win_"): + arch = tag.split("_", 1)[1] + if arch == "amd64": + return "x86_64" + elif arch == "arm64": + return "arm64" + return None + + @dataclass(frozen=True, slots=True) + class LinuxTag: + version: str + arch: str + libc: Literal["Glibc", "musl"] + + def _parse_linux_platform(self, tag: str) -> LinuxTag | None: + libc: Literal["Glibc", "musl"] + if tag.startswith("manylinux_"): + libc = "Glibc" + # PEP 600 perennial tag: manylinux_X_Y_arch + m = re.match(r"^manylinux_([0-9]+)_([0-9]+)_(.+)$", tag) + if not m: + return None + glibc_major = int(m.group(1)) + glibc_minor = int(m.group(2)) + arch = m.group(3) + version = f"{glibc_major}.{glibc_minor}" + elif tag.startswith("manylinux"): + libc = "Glibc" + # Legacies: *1_x86_64, *2010_i686, *2014_x86_64, etc. + m = re.match(r"^manylinux(\d+)_(.+)$", tag) + if not m: + return None + identifier = m.group(1) # e.g. "1", "2010", "2014" + arch = m.group(2) + # Map known legacy identifiers to glibc versions (optional) + if identifier == "1": + version = "2.5" + elif identifier == "2010": + version = "2.12" + elif identifier == "2014": + version = "2.17" + else: + return None + elif m := re.match(r"^musllinux_([0-9]+)_([0-9]+)_(.+)$", tag): + libc = "musl" + musl_major = int(m.group(1)) + musl_minor = int(m.group(2)) + arch = m.group(3) + version = f"{musl_major}.{musl_minor}" + else: + return None + return Project.LinuxTag( + version=version, + arch=arch, + libc=libc, + ) diff --git a/src/github_helper/api/_pkg_audit/_types.py b/src/github_helper/api/_pkg_audit/_types.py new file mode 100644 index 0000000..0381082 --- /dev/null +++ b/src/github_helper/api/_pkg_audit/_types.py @@ -0,0 +1,19 @@ +from typing import Literal, TypedDict + + +class Ignore(TypedDict): + type: str + action: Literal["ignore"] + + +class Error(TypedDict): + error: str + value: str + + +class Audit(TypedDict): + name: str + action: Literal["resolved"] + + +ReturnMessages = Ignore | Error | Audit diff --git a/src/github_helper/api/versions.py b/src/github_helper/api/versions.py new file mode 100644 index 0000000..3b3b3d0 --- /dev/null +++ b/src/github_helper/api/versions.py @@ -0,0 +1,335 @@ +"""For a unified version framework.""" + +import re +import sys +import warnings +from dataclasses import dataclass +from enum import StrEnum +from functools import total_ordering + +import logistro +import semver +from colored import Fore, Style +from packaging import version as pyversion + +_logger = logistro.getLogger(__name__) + +if not sys.stdout.isatty(): + + class _NoColor: + def __getattr__(self, name): + return "" + + # Override colored's foreground, background, and style + Fore = Style = _NoColor() # type: ignore[misc, assignment] + + +@dataclass(frozen=True, slots=True) +class BadVersion: + """A weak parser that looks for instances where someone tried to tag.""" + + _tag_part_re = re.compile(r"^(\d*)(?:\.?(.*))?$") + tag: str + major: int + minor: int + micro: int + pre: None = None + local: str | None = None + is_prerelease: bool = False + + def __init__(self, tag: str): + """Look for tag-like structures that parsers won't return.""" + object.__setattr__(self, "tag", tag) + object.__setattr__(self, "major", 0) + object.__setattr__(self, "minor", 0) + object.__setattr__(self, "micro", 0) + object.__setattr__(self, "pre", None) + object.__setattr__(self, "local", None) + object.__setattr__(self, "is_prerelease", False) + tag = tag.removeprefix("v") + major_match = self._tag_part_re.search(tag) + if not major_match or not major_match.group(1): + raise ValueError + # else + object.__setattr__(self, "major", int(major_match.group(1))) + if not major_match.group(2): + return + # else + minor_match = self._tag_part_re.search(major_match.group(2)) + if not minor_match or not minor_match.group(1): + object.__setattr__(self, "local", major_match.group(2)) + return + # else + object.__setattr__(self, "minor", int(minor_match.group(1))) + if not minor_match.group(2): + return + # else + micro_match = self._tag_part_re.search(minor_match.group(2)) + if not micro_match or not micro_match.group(1): + object.__setattr__(self, "local", minor_match.group(2)) + return + # else + object.__setattr__(self, "micro", int(micro_match.group(1))) + object.__setattr__(self, "local", micro_match.group(2) or None) + + +_VersionTypes = semver.Version | pyversion.Version | BadVersion + + +@total_ordering +@dataclass(frozen=True, slots=True) +class Version: + """A unified version class. Most similar to Python, not SemVer.""" + + class Type(StrEnum): + """The possible types of version formats.""" + + PYTHON = "Python" + SEMVER = "SemVer" + MALFORMED_PY = "Malformed Python" + MALFORMED = f"{Fore.red}Malformed{Style.reset}" + UNPARSABLE = f"{Fore.red}Unparsable{Style.reset}" + + tag: str + valid: bool + kind: Type + major: int + minor: int + micro: int + pre: tuple[str, int] | None + dev: int | None + post: int | None + local: str | None + is_prerelease: bool + _parsed: _VersionTypes + + def __init__(self, tag: str, *, raise_parse_exc: bool = False): + """Convert a tag into a Version object, collecting info.""" + if not isinstance(tag, str): + raise TypeError("tag argument must be string.") + object.__setattr__(self, "tag", tag) + parsed_v, kind = self._test_parsers(rpe=raise_parse_exc) + object.__setattr__(self, "kind", kind) + if not parsed_v: + object.__setattr__(self, "valid", False) + return + object.__setattr__(self, "valid", True) + self._enumerate_version(parsed_v) + + def _test_parsers( # noqa: C901 + self, + *, + rpe: bool = False, + ) -> tuple[_VersionTypes | None, Type | None]: + """See which parsers handle the tag.""" + tag = self.tag + try: + parsed: _VersionTypes = pyversion.Version(tag) + + if str(parsed) != tag.removeprefix("v"): + old_parsed = parsed + try: + parsed = semver.Version.parse(tag) + except ValueError: + return old_parsed, Version.Type.MALFORMED_PY + else: + return parsed, Version.Type.PYTHON + except Exception: + if rpe: + raise + else: + return parsed, Version.Type.PYTHON + try: + parsed = semver.Version.parse(tag) + except Exception: + if rpe: + raise + else: + return parsed, Version.Type.SEMVER + try: + parsed = BadVersion(tag) + except ValueError: + pass + else: + return parsed, Version.Type.MALFORMED + return None, Version.Type.UNPARSABLE + + def _enumerate_version(self, v: _VersionTypes) -> None: + """Break tag attributes into unified attributes.""" + if hasattr(v, "epoch") and v.epoch: + raise NotImplementedError( + "Not working with versions with epochs, but would be easy tbh.", + ) + object.__setattr__(self, "_parsed", v) + object.__setattr__(self, "major", v.major) + object.__setattr__(self, "minor", v.minor) + object.__setattr__( + self, + "micro", + v.micro if hasattr(v, "micro") else v.patch, + ) + if hasattr(v, "prerelease"): + if not v.prerelease: + pre = None + else: + match = re.match(r"([a-zA-Z]+)(?:[.\-]?(\d+))?", v.prerelease) + if not match: + warnings.warn( + "Prerelease string parsed but is not valid", + stacklevel=1, + ) + pre = (v.prerelease, 0) + else: + label = match.group(1).lower() + number = int(match.group(2)) if match.group(2) else 0 + pre = (label, number) + else: + pre = v.pre + object.__setattr__( + self, + "pre", + pre, + ) + object.__setattr__( + self, + "dev", + v.dev if hasattr(v, "dev") else None, + ) + object.__setattr__( + self, + "post", + v.post if hasattr(v, "post") else None, + ) + object.__setattr__( + self, + "local", + v.local if hasattr(v, "local") else v.build, + ) + object.__setattr__( + self, + "is_prerelease", + (v.is_prerelease if hasattr(v, "is_prerelease") else bool(v.prerelease)), + ) + + def __json__(self): + """Convert to json.""" + return self.__str__() + + def __str__(self): + """Print Version as string.""" + return self.__repr__() + + def __repr__(self): + """Print Version as python-compatible string if possible.""" + if not self.valid: + return "" + post = f".post{self.post}" if self.post is not None else "" + dev = f".dev{self.dev}" if self.dev is not None else "" + pre = f"{self.pre[0]}{self.pre[1]}" if self.pre is not None else "" + local = f"+{self.local}" if self.local is not None else "" + return f"{self.major}.{self.minor}.{self.micro}{pre}{post}{dev}{local}" + + def _cmp_tuple(self) -> tuple: + if not self.valid: + return (float("NaN"),) + return ( + self.major, + self.minor, + self.micro, + self.pre, + self.post, + self.dev, + self.local, + ) + + # this could def be simplified to tuple comparison + # but some values would need to be normalized first + # thx ai + def _compare( # noqa: C901, PLR0911, PLR0912 + self, + other, + ) -> int: # intrinsically special NotImplemented accepted as return + """ + Compare two versions with major, minor, micro, pre, post, dev. + + Returns: + -1 if self < other + 0 if self == other + 1 if self > other + NotImplemented if wrong type + + """ + if not self.valid: + return NotImplemented + if isinstance(other, _VersionTypes): + other = Version(str(other)) + elif not isinstance(other, Version): + return NotImplemented + # Compare major, minor, micro + if (c := (self.major - other.major)) != 0: + return (c > 0) - (c < 0) + if (c := (self.minor - other.minor)) != 0: + return (c > 0) - (c < 0) + if (c := (self.micro - other.micro)) != 0: + return (c > 0) - (c < 0) + + # Compare pre-releases + if self.pre is None and other.pre is not None: + return 1 # final > pre + if self.pre is not None and other.pre is None: + return -1 # pre < final + if self.pre is not None and other.pre is not None: + if self.pre[0] != other.pre[0]: + return (self.pre[0] > other.pre[0]) - (self.pre[0] < other.pre[0]) + if self.pre[1] != other.pre[1]: + return (self.pre[1] > other.pre[1]) - (self.pre[1] < other.pre[1]) + + # Compare post-releases (before dev!) + if self.post is None and other.post is not None: + return -1 # no post < post + if self.post is not None and other.post is None: + return 1 # post > no post + if self.post is not None and other.post is not None: # noqa: SIM102 clarity + if self.post != other.post: + return (self.post > other.post) - (self.post < other.post) + + # Compare dev-releases (only if same pre/post/final) + if self.dev is None and other.dev is not None: + return 1 # no dev > dev + if self.dev is not None and other.dev is None: + return -1 # dev < no dev + if self.dev is not None and other.dev is not None: # noqa: SIM102 clarity + if self.dev != other.dev: + return (self.dev > other.dev) - (self.dev < other.dev) + + # local is not used in official sorting + # but if we do not provide some arbitrary tie breaking + # python will sometimes think two versions are equal + # in some instances (like when physical sorting) + # and disequal in others (like when looking for keys) + # so functions like sorted() which do both will break + # so just sort arbitrarily unless they _really_ _are_ _equal_ + if self.local == other.local: + return 0 + elif self.local is not None and other.local is None: + return 1 + else: + return -1 + + def __eq__(self, other) -> bool: + """Check if equal.""" + c = self._compare(other) + if c is NotImplemented: + return NotImplemented + return c == 0 + + def __lt__(self, other) -> bool: + """Check if less than.""" + c = self._compare(other) + if c is NotImplemented: + return NotImplemented + return c < 0 + + def __hash__(self): + """Return hash based on normalized value.""" + return self._cmp_tuple().__hash__()