Refactoring into CondaPackageHelper for reuse

This commit is contained in:
romainx
2020-03-01 08:19:28 +01:00
parent acdfecf76e
commit b46fa3f9e6
2 changed files with 188 additions and 117 deletions

179
test/helpers.py Normal file
View File

@@ -0,0 +1,179 @@
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
# CondaPackageHelper is partially based on the work https://oerpli.github.io/post/2019/06/conda-outdated/.
# See copyright below.
#
# MIT License
# Copyright (c) 2019 Abraham Hinteregger
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import re
from collections import defaultdict
from itertools import chain
import logging
import ast
from tabulate import tabulate
LOGGER = logging.getLogger(__name__)
class CondaPackageHelper:
"""Conda package helper permitting to get information about packages
"""
def __init__(self, container):
# if isinstance(container, TrackedContainer):
self.running_container = CondaPackageHelper.start_container(container)
self.specs = None
self.installed = None
self.available = None
self.comparison = None
@staticmethod
def start_container(container):
"""Start the TrackedContainer and return an instance of a running container"""
LOGGER.info(f"Starting container {container.image_name} ...")
return container.run(
tty=True, command=["start.sh", "bash", "-c", "sleep infinity"]
)
def installed_packages(self):
"""Return the installed packages"""
if self.installed is None:
LOGGER.info(f"Grabing the list of installed packages ...")
self.installed = self._packages(["conda", "list"])
return self.installed
def available_packages(self):
"""Return the available packages"""
if self.available is None:
LOGGER.info(
f"Grabing the list of available packages (can take a while) ..."
)
self.available = self._packages(["conda", "search", "--outdated"])
return self.available
def specified_packages(self):
"""Return the specifications (i.e. packages installation requested)"""
if self.specs is None:
LOGGER.info(f"Grabing the list of specifications ...")
self.specs = self._specifications(
["grep", "^# update specs:", "/opt/conda/conda-meta/history"]
)
return self.specs
def check_updatable_packages(self, specifications_only=True):
"""Check the updatables packages including or not dependencies"""
specs = self.specified_packages()
installed = self.installed_packages()
available = self.available_packages()
self.comparison = list()
for pkg, inst_vs in self.installed.items():
if not specifications_only or pkg in specs:
avail_vs = sorted(
list(available[pkg]), key=CondaPackageHelper.semantic_cmp
)
if not avail_vs:
continue
current = min(inst_vs, key=CondaPackageHelper.semantic_cmp)
newest = avail_vs[-1]
if avail_vs and current != newest:
if CondaPackageHelper.semantic_cmp(
current
) < CondaPackageHelper.semantic_cmp(newest):
self.comparison.append(
{"Package": pkg, "Current": current, "Newest": newest}
)
return self.comparison
def get_outdated_summary(self, specifications_only=True):
"""Return a summary of outdated packages"""
if specifications_only:
nb_packages = len(self.specs)
else:
nb_packages = len(self.installed)
nb_updatable = len(self.comparison)
updatable_ratio = nb_updatable / nb_packages
return f"{nb_updatable}/{nb_packages} ({updatable_ratio:.0%}) packages could be updated"
def get_outdated_table(self):
"""Return a table of outdated packages"""
return tabulate(self.comparison, headers="keys")
def _execute_command(self, command):
"""Execute a command on a running container"""
rc = self.running_container.exec_run(command)
return rc.output.decode("utf-8")
def _specifications(self, command):
"""Return the list of specifications from a command"""
specifications = CondaPackageHelper._extract_specifications(
self._execute_command(command)
)
LOGGER.debug(f"List of specifications and versions {specifications}")
return specifications
def _packages(self, command):
"""Return the list of packages from a command"""
packages = CondaPackageHelper._extract_packages(self._execute_command(command))
LOGGER.debug(f"List of packages and versions {packages}")
return packages
@staticmethod
def _extract_packages(lines):
"""Extract packages and versions from the lines returned by the list of packages"""
ddict = defaultdict(set)
for line in lines.splitlines()[2:]:
pkg, version = re.match(r"^(\S+)\s+(\S+)", line, re.MULTILINE).groups()
ddict[pkg].add(version)
return ddict
@staticmethod
def _extract_specifications(lines):
"""Extract packages and versions from the lines returned by the list of specifications"""
packages_list = list()
for spec in (spec for spec in lines.split("\n") if spec.strip() != ""):
spec_list = ast.literal_eval(spec.split(": ")[1])
packages_list += map(lambda x: x.split("=", 1), spec_list)
# TODO: could be improved
return {package[0]: set(package[1:]) for package in packages_list}
@staticmethod
def semantic_cmp(version_string):
"""Manage semantic versioning for comparison"""
def mysplit(string):
version_substrs = lambda x: re.findall(r"([A-z]+|\d+)", x)
return list(chain(map(version_substrs, string.split("."))))
def str_ord(string):
num = 0
for char in string:
num *= 255
num += ord(char)
return num
def try_int(version_str):
try:
return int(version_str)
except ValueError:
return str_ord(version_str)
mss = list(chain(*mysplit(version_string)))
return tuple(map(try_int, mss))

View File

@@ -1,128 +1,20 @@
"""
Based on the work https://oerpli.github.io/post/2019/06/conda-outdated/.
See copyright below.
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
MIT License
Copyright (c) 2019 Abraham Hinteregger
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import re
import subprocess
from collections import defaultdict
from itertools import chain
import logging
import ast
import pytest
from tabulate import tabulate
from helpers import CondaPackageHelper
LOGGER = logging.getLogger(__name__)
def get_versions(lines):
"""Extract versions from the lines"""
ddict = defaultdict(set)
for line in lines.splitlines()[2:]:
pkg, version = re.match(r"^(\S+)\s+(\S+)", line, re.MULTILINE).groups()
ddict[pkg].add(version)
return ddict
def semantic_cmp(version_string):
"""Manage semantic versioning for comparison"""
def mysplit(string):
version_substrs = lambda x: re.findall(r"([A-z]+|\d+)", x)
return list(chain(map(version_substrs, string.split("."))))
def str_ord(string):
num = 0
for char in string:
num *= 255
num += ord(char)
return num
def try_int(version_str):
try:
return int(version_str)
except ValueError:
return str_ord(version_str)
mss = list(chain(*mysplit(version_string)))
return tuple(map(try_int, mss))
def print_result(installed, specs, result):
"""Print the result of the outdated check"""
if specs is None:
nb_packages = len(installed)
else:
nb_packages = len(specs)
nb_updatable = len(result)
updatable_ratio = nb_updatable / nb_packages
LOGGER.info(
f"{nb_updatable}/{nb_packages} ({updatable_ratio:.0%}) packages could be updated"
)
LOGGER.info(f"\n{tabulate(result, headers='keys')}\n")
@pytest.mark.info
def test_outdated_packages(container, remove_dependencies=True):
"""Getting the list of outdated packages"""
def test_outdated_packages(container, specifications_only=True):
"""Getting the list of updatable packages"""
LOGGER.info(f"Checking outdated packages in {container.image_name} ...")
c = container.run(tty=True, command=["start.sh", "bash", "-c", "sleep infinity"])
sp_i = c.exec_run(["conda", "list"])
sp_v = c.exec_run(["conda", "search", "--outdated"])
specs = None
if remove_dependencies:
raw_specs = c.exec_run(
["grep", "^# update specs:", "/opt/conda/conda-meta/history"]
)
specs = parse_specs(raw_specs.output.decode("utf-8"))
installed = get_versions(sp_i.output.decode("utf-8"))
available = get_versions(sp_v.output.decode("utf-8"))
result = list()
for pkg, inst_vs in installed.items():
if not remove_dependencies or pkg in specs:
avail_vs = sorted(list(available[pkg]), key=semantic_cmp)
if not avail_vs:
continue
current = min(inst_vs, key=semantic_cmp)
newest = avail_vs[-1]
if avail_vs and current != newest:
if semantic_cmp(current) < semantic_cmp(newest):
result.append(
{"Package": pkg, "Current": current, "Newest": newest}
)
print_result(installed, specs, result)
def parse_specs(raw_specs):
"""Return the list of requested packages (specs)
Versions are removed from packages.
"""
packages = list()
for spec in (spec for spec in raw_specs.split("\n") if spec.strip() != ""):
packages += map(
lambda x: x.split("=", 1)[0], ast.literal_eval(spec.split(": ")[1])
)
LOGGER.debug(f"List of specs from conda env: {packages}")
return packages
pkg_helper = CondaPackageHelper(container)
pkg_helper.check_updatable_packages(specifications_only)
LOGGER.info(pkg_helper.get_outdated_summary(specifications_only))
LOGGER.info(f"\n{pkg_helper.get_outdated_table()}\n")