
508 lines
17 KiB
Raw Normal View History

2024-10-16 15:53:15 +00:00
from __future__ import annotations
import argparse
import logging
import os
import re
import sys
import yaml
from packaging.requirements import InvalidRequirement, Requirement
BASE_COLLECTIONS_PATH = '/usr/share/ansible/collections'
# regex for a comment at the start of a line, or embedded with leading space(s)
COMMENT_RE = re.compile(r'(?:^|\s+)#.*$')
# obviously already satisfied or unwanted
'ansible', 'ansible-base', 'python', 'ansible-core',
# general python test requirements
'tox', 'pycodestyle', 'yamllint', 'pylint',
'flake8', 'pytest', 'pytest-xdist', 'coverage', 'mock', 'testinfra',
# test requirements highly specific to Ansible testing
'ansible-lint', 'molecule', 'galaxy-importer', 'voluptuous',
# already present in image for py3 environments
'yaml', 'pyyaml', 'json',
logger = logging.getLogger(__name__)
class CollectionDefinition:
This class represents the dependency metadata for a collection
should be replaced by logic to hit the Galaxy API if made available
def __init__(self, collection_path):
self.reference_path = collection_path
# NOTE: Filenames should match constants.DEAFULT_EE_BASENAME and constants.YAML_FILENAME_EXTENSIONS.
meta_file_base = os.path.join(collection_path, 'meta', 'execution-environment')
ee_exists = False
for ext in ('yml', 'yaml'):
meta_file = f"{meta_file_base}.{ext}"
if os.path.exists(meta_file):
with open(meta_file, 'r') as f:
self.raw = yaml.safe_load(f)
ee_exists = True
if not ee_exists:
self.raw = {'version': 1, 'dependencies': {}}
# Automatically infer requirements for collection
for entry, filename in [('python', 'requirements.txt'), ('system', 'bindep.txt')]:
candidate_file = os.path.join(collection_path, filename)
if has_content(candidate_file):
self.raw['dependencies'][entry] = filename
def target_dir(self):
namespace, name = self.namespace_name()
return os.path.join(
BASE_COLLECTIONS_PATH, 'ansible_collections',
namespace, name
def namespace_name(self):
"Returns 2-tuple of namespace and name"
path_parts = [p for p in self.reference_path.split(os.path.sep) if p]
return tuple(path_parts[-2:])
def get_dependency(self, entry):
"""A collection is only allowed to reference a file by a relative path
which is relative to the collection root
req_file = self.raw.get('dependencies', {}).get(entry)
if req_file is None:
return None
if os.path.isabs(req_file):
raise RuntimeError(
'Collections must specify relative paths for requirements files. '
f'The file {req_file} specified by {self.reference_path} violates this.'
return req_file
def line_is_empty(line):
return bool((not line.strip()) or line.startswith('#'))
def read_req_file(path):
"""Provide some minimal error and display handling for file reading"""
if not os.path.exists(path):
print(f'Expected requirements file not present at: {os.path.abspath(path)}')
with open(path, 'r') as f:
def pip_file_data(path):
pip_content = read_req_file(path)
pip_lines = []
for line in pip_content.split('\n'):
if line_is_empty(line):
if line.startswith('-r') or line.startswith('--requirement'):
_, new_filename = line.split(None, 1)
new_path = os.path.join(os.path.dirname(path or '.'), new_filename)
return pip_lines
def bindep_file_data(path):
sys_content = read_req_file(path)
sys_lines = []
for line in sys_content.split('\n'):
if line_is_empty(line):
return sys_lines
def process_collection(path):
"""Return a tuple of (python_dependencies, system_dependencies) for the
collection install path given.
Both items returned are a list of dependencies.
:param str path: root directory of collection (this would contain galaxy.yml file)
col_def = CollectionDefinition(path)
py_file = col_def.get_dependency('python')
pip_lines = []
if py_file:
pip_lines = pip_file_data(os.path.join(path, py_file))
sys_file = col_def.get_dependency('system')
bindep_lines = []
if sys_file:
bindep_lines = bindep_file_data(os.path.join(path, sys_file))
return (pip_lines, bindep_lines)
def process(data_dir=BASE_COLLECTIONS_PATH,
Build a dictionary of Python and system requirements from any collections
installed in data_dir, and any user specified requirements.
Excluded requirements, if any, will be inserted into the return dict.
Example return dict:
'python': {
'collection.a': ['abc', 'def'],
'collection.b': ['ghi'],
'user': ['jkl'],
'exclude: ['abc'],
'system': {
'collection.a': ['ZYX'],
'user': ['WVU'],
'exclude': ['ZYX'],
'excluded_collections': [
paths = []
path_root = os.path.join(data_dir, 'ansible_collections')
# build a list of all the valid collection paths
if os.path.exists(path_root):
for namespace in sorted(os.listdir(path_root)):
if not os.path.isdir(os.path.join(path_root, namespace)):
for name in sorted(os.listdir(os.path.join(path_root, namespace))):
collection_dir = os.path.join(path_root, namespace, name)
if not os.path.isdir(collection_dir):
files_list = os.listdir(collection_dir)
if 'galaxy.yml' in files_list or 'MANIFEST.json' in files_list:
# populate the requirements content
py_req = {}
sys_req = {}
for path in paths:
col_pip_lines, col_sys_lines = process_collection(path)
col_def = CollectionDefinition(path)
namespace, name = col_def.namespace_name()
key = f'{namespace}.{name}'
if col_pip_lines:
py_req[key] = col_pip_lines
if col_sys_lines:
sys_req[key] = col_sys_lines
# add on entries from user files, if they are given
if user_pip:
col_pip_lines = pip_file_data(user_pip)
if col_pip_lines:
py_req['user'] = col_pip_lines
if exclude_pip:
col_pip_exclude_lines = pip_file_data(exclude_pip)
if col_pip_exclude_lines:
py_req['exclude'] = col_pip_exclude_lines
if user_bindep:
col_sys_lines = bindep_file_data(user_bindep)
if col_sys_lines:
sys_req['user'] = col_sys_lines
if exclude_bindep:
col_sys_exclude_lines = bindep_file_data(exclude_bindep)
if col_sys_exclude_lines:
sys_req['exclude'] = col_sys_exclude_lines
retval = {
'python': py_req,
'system': sys_req,
if exclude_collections:
# This file should just be a newline separated list of collection names,
# so reusing bindep_file_data() to read it should work fine.
excluded_collection_list = bindep_file_data(exclude_collections)
if excluded_collection_list:
retval['excluded_collections'] = excluded_collection_list
return retval
def has_content(candidate_file):
"""Beyond checking that the candidate exists, this also assures
that the file has something other than whitespace,
which can cause errors when given to pip.
if not os.path.exists(candidate_file):
return False
with open(candidate_file, 'r') as f:
content =
return bool(content.strip().strip('\n'))
def strip_comments(reqs: dict[str, list]) -> dict[str, list]:
Filter any comments out of the Python collection requirements input.
:param dict reqs: A dict of Python requirements, keyed by collection name.
:return: Same as the input parameter, except with no comment lines.
result: dict[str, list] = {}
for collection, lines in reqs.items():
for line in lines:
# strip comments
if (base_line := COMMENT_RE.sub('', line.strip())):
result.setdefault(collection, []).append(base_line)
return result
def should_be_excluded(value: str, exclusion_list: list[str]) -> bool:
Test if `value` matches against any value in `exclusion_list`.
The exclusion_list values are either strings to be compared in a case-insensitive
manner against value, OR, they are regular expressions to be tested against the
value. A regular expression will contain '~' as the first character.
:return: True if the value should be excluded, False otherwise.
for exclude_value in exclusion_list:
if exclude_value[0] == "~":
pattern = exclude_value[1:]
if re.fullmatch(pattern.lower(), value.lower()):
return True
elif exclude_value.lower() == value.lower():
return True
return False
def filter_requirements(reqs: dict[str, list],
exclude: list[str] | None = None,
exclude_collections: list[str] | None = None,
is_python: bool = True) -> list[str]:
Given a dictionary of Python requirement lines keyed off collections,
return a list of cleaned up (no source comments) requirements
annotated with comments indicating the sources based off the collection keys.
Currently, non-pep508 compliant Python entries are passed through. We also no
longer attempt to normalize names (replace '_' with '-', etc), other than
lowercasing it for exclusion matching, since we no longer are attempting
to combine similar entries.
:param dict reqs: A dict of either Python or system requirements, keyed by collection name.
:param list exclude: A list of requirements to be excluded from the output.
:param list exclude_collections: A list of collection names from which to exclude all requirements.
:param bool is_python: This should be set to True for Python requirements, as each
will be tested for PEP508 compliance. This should be set to False for system requirements.
:return: A list of filtered and annotated requirements.
exclusions: list[str] = []
collection_ignore_list: list[str] = []
if exclude:
exclusions = exclude.copy()
if exclude_collections:
collection_ignore_list = exclude_collections.copy()
annotated_lines: list[str] = []
uncommented_reqs = strip_comments(reqs)
for collection, lines in uncommented_reqs.items():
# Bypass this collection if we've been told to ignore all requirements from it.
if should_be_excluded(collection, collection_ignore_list):
logger.debug("# Excluding all requirements from collection '%s'", collection)
for line in lines:
# Determine the simple name based on type of requirement
if is_python:
parsed_req = Requirement(line)
name =
except InvalidRequirement:
"Passing through non-PEP508 compliant line '%s' from collection '%s'",
line, collection
annotated_lines.append(line) # We intentionally won't annotate these lines (multi-line?)
# bindep system requirements have the package name as the first "word" on the line
name = line.split(maxsplit=1)[0]
if collection.lower() not in {'user', 'exclude'}:
lower_name = name.lower()
if lower_name in EXCLUDE_REQUIREMENTS:
logger.debug("# Excluding requirement '%s' from '%s'", name, collection)
if should_be_excluded(lower_name, exclusions):
logger.debug("# Explicitly excluding requirement '%s' from '%s'", name, collection)
annotated_lines.append(f'{line} # from collection {collection}')
return annotated_lines
def parse_args(args=None):
parser = argparse.ArgumentParser(
'ansible-builder introspection; injected and used during execution environment build'
subparsers = parser.add_subparsers(
help='The command to invoke.',
return parser.parse_args(args)
def run_introspect(args, log):
data = process(args.folder,
exclude_collections=args.exclude_collections)'# Dependency data for %s', args.folder)
excluded_collections = data.pop('excluded_collections', None)
data['python'] = filter_requirements(
exclude=data['python'].pop('exclude', []),
data['system'] = filter_requirements(
exclude=data['system'].pop('exclude', []),
print(yaml.dump(data, default_flow_style=False))
if args.write_pip and data.get('python'):
write_file(args.write_pip, data.get('python') + [''])
if args.write_bindep and data.get('system'):
write_file(args.write_bindep, data.get('system') + [''])
def create_introspect_parser(parser):
introspect_parser = parser.add_parser(
help='Introspects collections in folder.',
'Loops over collections in folder and returns data about dependencies. '
'This is used internally and exposed here for verification. '
'This is targeted toward collection authors and maintainers.'
introspect_parser.add_argument('--sanitize', action='store_true',
'folder', default=BASE_COLLECTIONS_PATH, nargs='?',
'Ansible collections path(s) to introspect. '
'This should have a folder named ansible_collections inside of it.'
'--user-pip', dest='user_pip',
help='An additional file to combine with collection pip requirements.'
'--user-bindep', dest='user_bindep',
help='An additional file to combine with collection bindep requirements.'
'--exclude-bindep-reqs', dest='exclude_bindep',
help='An additional file to exclude specific bindep requirements from collections.'
'--exclude-pip-reqs', dest='exclude_pip',
help='An additional file to exclude specific pip requirements from collections.'
'--exclude-collection-reqs', dest='exclude_collections',
help='An additional file to exclude all requirements from the listed collections.'
'--write-pip', dest='write_pip',
help='Write the combined pip requirements file to this location.'
'--write-bindep', dest='write_bindep',
help='Write the combined bindep requirements file to this location.'
return introspect_parser
def write_file(filename: str, lines: list) -> bool:
parent_dir = os.path.dirname(filename)
if parent_dir and not os.path.exists(parent_dir):
logger.warning('Creating parent directory for %s', filename)
new_text = '\n'.join(lines)
if os.path.exists(filename):
with open(filename, 'r') as f:
if == new_text:
logger.debug("File %s is already up-to-date.", filename)
return False
logger.warning('File %s had modifications and will be rewritten', filename)
with open(filename, 'w') as f:
return True
def main():
args = parse_args()
if args.action == 'introspect':
run_introspect(args, logger)
logger.error("An error has occurred.")
if __name__ == '__main__':