Source code for rpmrh.rpm
"""RPM-related classes and procedures."""
import operator
import re
from functools import partialmethod
from pathlib import Path
from typing import Any
from typing import Callable
from typing import ClassVar
from typing import Tuple
from typing import Union
from typing import cast
import attr
from attr.validators import instance_of
from .util import system_import
_rpm = system_import("rpm")
# type aliases and helpers
CompareOperator = Callable[[Any, Any], bool]
# .el7_4 format
LONG_DIST_RE = re.compile(
r"""
(\. # short dist tag starts with a dot…
[^\W\d_]+ # … followed by at least one letter…
\d+) # … and ended by at least one digit
[^.]* # any other characters up to the next dot
""",
flags=re.VERBOSE,
)
# Argument normalization
_DEFAULT_EPOCH: int = 0
_DEFAULT_ARCH: str = "src"
def _normalize_epoch(epoch: Union[str, bytes, int, None]) -> int:
"""Normalize epoch value into proper integer."""
return int(epoch) if epoch is not None else _DEFAULT_EPOCH
def _normalize_architecture(architecture: Union[str, None]) -> str:
"""Normalize architecture value into string."""
return architecture if architecture is not None else _DEFAULT_ARCH
def _normalize_path(path: Union[str, Path]) -> Path:
"""Normalize path arguments into canonical absolute paths"""
return Path(path).resolve()
def _ensure_text(data: Union[str, bytes], *, encoding: str = "utf-8") -> str:
"""Decode input data into Unicode string if necessary"""
return data.decode(encoding) if isinstance(data, bytes) else data
[docs]@attr.s(slots=True, cmp=False, frozen=True, hash=True)
class Metadata:
"""Generic RPM metadata.
This class should act as a basis for all the RPM-like objects,
providing common comparison and other "dunder" methods.
"""
#: Regular expression for extracting epoch from an NEVRA string
_EPOCH_RE: ClassVar = re.compile(r"(\d+):")
#: Regular expression for splitting up NVR string
_NVRA_RE: ClassVar = re.compile(
r"""
^
(?P<name>\S+)- # package name
(?P<version>[\w.]+)- # package version
(?P<release>\w+(?:\.[\w+]+)+?) # package release, with required dist tag
(?:\.(?P<arch>\w+))? # optional package architecture
(?:\.rpm)? # optional rpm extension
$
""",
flags=re.VERBOSE,
)
#: RPM name
name: str = attr.ib(validator=instance_of(str))
#: RPM version
version: str = attr.ib(validator=instance_of(str))
#: RPM release
release: str = attr.ib(validator=instance_of(str))
#: Optional RPM epoch
epoch: int = attr.ib(
validator=instance_of(int), default=_DEFAULT_EPOCH, converter=_normalize_epoch
)
#: RPM architecture
arch: str = attr.ib(
validator=instance_of(str),
default=_DEFAULT_ARCH,
converter=_normalize_architecture,
)
# Alternative constructors
[docs] @classmethod
def from_nevra(cls, nevra: str) -> "Metadata":
"""Parse a string NEVRA and converts it to respective fields.
Keyword arguments:
nevra: The name-epoch:version-release-arch to parse.
Returns:
New instance of Metadata.
Raises:
ValueError: The :ref:`nevra` argument is not valid NEVRA string.
"""
arguments = {}
# Extract the epoch, if present
def replace_epoch(match):
arguments["epoch"] = match.group(1)
return ""
nvra = cls._EPOCH_RE.sub(replace_epoch, nevra, count=1)
# Parse the rest of the string
match = cls._NVRA_RE.match(nvra)
if not match:
message = "Invalid NEVRA string: {}".format(nevra)
raise ValueError(message)
arguments.update(
(name, value)
for name, value in match.groupdict().items()
if value is not None
)
return cls(**arguments)
# Derived attributes
@property
def nvr(self) -> str:
""":samp:`{name}-{version}-{release}` string of the RPM object"""
return "{s.name}-{s.version}-{s.release}".format(s=self)
@property
def nevra(self) -> str:
""":samp:`{name}-{epoch}:{version}-{release}.{arch}` string of the RPM object"""
return "{s.name}-{s.epoch}:{s.version}-{s.release}.{s.arch}".format(s=self)
@property
def label(self) -> Tuple[str, str, str]:
"""Label compatible with RPM's C API."""
return (str(self.epoch), self.version, self.release)
@property
def canonical_file_name(self):
"""Canonical base file name of a package with this metadata."""
if self.epoch:
format = "{s.name}-{s.epoch}:{s.version}-{s.release}.{s.arch}.rpm"
else:
format = "{s.name}-{s.version}-{s.release}.{s.arch}.rpm"
return format.format(s=self)
# Comparison methods
def _compare(self, other: "Metadata", oper: CompareOperator) -> bool:
"""Generic comparison of two RPM-like objects.
Keyword arguments:
other: The object to compare with
oper: The operator to use for the comparison.
Returns:
bool: The result of the comparison.
NotImplemented: Incompatible operands.
"""
try:
if self.name == other.name:
return oper(_rpm.labelCompare(self.label, other.label), 0)
else:
return oper(self.name, other.name)
except AttributeError:
return NotImplemented
__eq__ = cast(CompareOperator, partialmethod(_compare, oper=operator.eq))
__ne__ = cast(CompareOperator, partialmethod(_compare, oper=operator.ne))
__lt__ = cast(CompareOperator, partialmethod(_compare, oper=operator.lt))
__le__ = cast(CompareOperator, partialmethod(_compare, oper=operator.le))
__gt__ = cast(CompareOperator, partialmethod(_compare, oper=operator.gt))
__ge__ = cast(CompareOperator, partialmethod(_compare, oper=operator.ge))
# String representations
def __str__(self) -> str:
return self.nevra
[docs]@attr.s(slots=True, frozen=True, hash=True, cmp=False)
class LocalPackage:
"""Existing RPM package on local file system."""
#: Resolved path to the RPM package
path: Path = attr.ib(converter=_normalize_path)
#: Metadata of the package
metadata: Metadata = attr.ib(validator=instance_of(Metadata))
@path.validator
def _existing_file_path(self, _attribute, path):
"""The path must point to an existing file.
Raises:
FileNotFoundError: The path does not points to a file.
"""
if not path.is_file():
raise FileNotFoundError(path)
@metadata.default
def _file_metadata(self) -> Metadata:
"""Read metadata from an RPM file.
Keyword arguments:
file: The IO object to read the metadata from.
It has to provide a file descriptor – in-memory
files are unsupported.
Returns:
New instance of Metadata.
"""
transaction = _rpm.TransactionSet()
# Ignore missing signatures warning
transaction.setVSFlags(_rpm._RPMVSF_NOSIGNATURES)
with self.path.open(mode="rb") as file:
header = transaction.hdrFromFdno(file.fileno())
# Decode the metadata
metadata = {
"name": _ensure_text(header[_rpm.RPMTAG_NAME]),
"version": _ensure_text(header[_rpm.RPMTAG_VERSION]),
"release": _ensure_text(header[_rpm.RPMTAG_RELEASE]),
"epoch": header[_rpm.RPMTAG_EPOCHNUM],
}
# For source RPMs the architecture reported is a binary one
# for some reason
if header[_rpm.RPMTAG_SOURCEPACKAGE]:
metadata["arch"] = "src"
else:
metadata["arch"] = _ensure_text(header[_rpm.RPMTAG_ARCH])
return Metadata(**metadata)
# Path-like protocol
def __fspath__(self) -> str:
return str(self.path)
# String representation
def __str__(self):
return self.__fspath__()
# Utility functions
[docs]def shorten_dist_tag(metadata: Metadata) -> Metadata:
"""Shorten release string by removing extra parts of dist tag.
Examples:
- abcde-1.0-1.el7_4 → abcde-1.0-1.el7
- binutils-3.6-4.el8+4 → binutils-3.6-4.el8
- abcde-1.0-1.fc27 → abcde-1.0-1.fc27
Keyword arguments:
metadata: The metadata to shorten.
Returns:
Potentially modified metadata.
"""
return attr.evolve(metadata, release=LONG_DIST_RE.sub(r"\1", metadata.release))