Skip to content

mecfs_bio.build_system.scheduler.topological_scheduler

A scheduler that builds a dependency graph of tasks, and executes them in topological order

Classes:

Functions:

TopologicalSchedulerSettings

Attributes:

print_progress class-attribute instance-attribute

print_progress: bool = True

dependees_of_targets

dependees_of_targets(
    G: DiGraph, targets: Sequence[AssetId]
) -> nx.DiGraph

Given a dependency graph of assets, find the transitive dependees of given targets

Source code in mecfs_bio/build_system/scheduler/topological_scheduler.py
def dependees_of_targets(G: nx.DiGraph, targets: Sequence[AssetId]) -> nx.DiGraph:
    """
    Given a dependency graph of assets, find the transitive dependees of given targets
    """
    reachable = set(targets)
    for target in targets:
        reachable = nx.descendants(G, target) | reachable
    subgraph = nx.DiGraph(G.subgraph(reachable))
    return subgraph

dependees_of_targets_from_tasks

dependees_of_targets_from_tasks(
    tasks: Tasks, targets: Sequence[AssetId]
) -> nx.DiGraph

Given a set of tasks, and a list of targets, build a minimal dependency graph containing the targets and all their transitive dependees.

Source code in mecfs_bio/build_system/scheduler/topological_scheduler.py
def dependees_of_targets_from_tasks(
    tasks: Tasks, targets: Sequence[AssetId]
) -> nx.DiGraph:
    """
    Given a set of tasks, and a list of targets, build a minimal dependency graph containing the targets
    and all their transitive dependees.
    """
    G = get_dependency_graph_from_tasks(tasks)
    return dependees_of_targets(G, targets)

dependencies_of_targets

dependencies_of_targets(
    G: DiGraph, targets: Sequence[AssetId]
) -> nx.DiGraph

Given a dependency graph of assets, find the transitive dependencies of given targets

Source code in mecfs_bio/build_system/scheduler/topological_scheduler.py
def dependencies_of_targets(G: nx.DiGraph, targets: Sequence[AssetId]) -> nx.DiGraph:
    """
    Given a dependency graph of assets, find the transitive dependencies of given targets
    """
    reachable = set(targets)
    for target in targets:
        reachable = nx.ancestors(G, target) | reachable
    subgraph = nx.DiGraph(G.subgraph(reachable))
    return subgraph

dependencies_of_targets_from_tasks

dependencies_of_targets_from_tasks(
    tasks: Tasks, targets: Sequence[AssetId]
) -> nx.DiGraph

Given a set of tasks, and a list of targets, build a minimal dependency graph containing the targets and all their transitive dependencies.

Source code in mecfs_bio/build_system/scheduler/topological_scheduler.py
def dependencies_of_targets_from_tasks(
    tasks: Tasks, targets: Sequence[AssetId]
) -> nx.DiGraph:
    """
    Given a set of tasks, and a list of targets, build a minimal dependency graph containing the targets
    and all their transitive dependencies.
    """
    G = get_dependency_graph_from_tasks(tasks)
    return dependencies_of_targets(G, targets)

get_dependency_graph_from_tasks

get_dependency_graph_from_tasks(tasks: Tasks) -> nx.DiGraph

Convert a set of Tasks into a DAG according to their dependency structure.

Source code in mecfs_bio/build_system/scheduler/topological_scheduler.py
def get_dependency_graph_from_tasks(tasks: Tasks) -> nx.DiGraph:
    """
    Convert a set of Tasks into a DAG according to their dependency structure.
    """
    G: nx.DiGraph = nx.DiGraph()
    for asset_id, task in tasks.items():
        G.add_node(asset_id)
        for dep in task.deps:
            G.add_edge(dep.asset_id, asset_id)
    return G

topological

topological(
    rebuilder: Rebuilder[Info],
    tasks: Tasks,
    targets: Sequence[AssetId],
    wf: WF,
    info: Info,
    meta_to_path: MetaToPath,
    incremental_save_path: Path | None = None,
    settings: TopologicalSchedulerSettings = TopologicalSchedulerSettings(),
) -> tuple[Mapping[AssetId, Asset], Info]

A scheduler that builds a dependency graph of tasks, and executes them in topological order. Based on Mokhov, Andrey, Neil Mitchell, and Simon Peyton Jones. "Build systems à la carte: Theory and practice." Journal of Functional Programming 30 (2020): e11.

Source code in mecfs_bio/build_system/scheduler/topological_scheduler.py
def topological[
    Info,
](
    rebuilder: Rebuilder[Info],
    tasks: Tasks,
    targets: Sequence[AssetId],
    wf: WF,
    info: Info,
    meta_to_path: MetaToPath,
    incremental_save_path: Path | None = None,
    settings: TopologicalSchedulerSettings = TopologicalSchedulerSettings(),
) -> tuple[Mapping[AssetId, Asset], Info]:
    """
    A scheduler that builds a dependency graph of tasks, and executes them in topological order.
    Based on
    Mokhov, Andrey, Neil Mitchell, and Simon Peyton Jones.
    "Build systems à la carte: Theory and practice." Journal of Functional Programming 30 (2020): e11.
    """
    G = dependencies_of_targets_from_tasks(tasks, targets)
    todo: list[AssetId] = list(nx.topological_sort(G))
    done: set[AssetId] = set()
    store: dict[AssetId, Asset] = {}
    frontier = _get_initial_frontier(G)
    while len(G) > 0:
        _print_progress(
            todo=todo,
            done=done,
            print_progress_list=settings.print_progress,
        )
        node = frontier[0]
        task = tasks[node]
        maybe_asset = get_asset_if_exists(
            meta=task.meta,
            meta_to_path=meta_to_path,
        )

        class SimpleFetch(Fetch):
            def __call__(self, asset_id: AssetId) -> Asset:
                return store[asset_id]

        new_asset, info = rebuilder.rebuild(
            task=task,
            asset=maybe_asset,
            fetch=SimpleFetch(),
            wf=wf,
            info=info,
            meta_to_path=meta_to_path,
        )
        if incremental_save_path is not None:
            rebuilder.save_info(info, incremental_save_path)
        store[node] = new_asset
        frontier, G = _update_frontier_and_G(G=G, completed=node, frontier=frontier)
        done.add(node)
    logger.info(_get_progress_list(todo=todo, done=done))
    return store, info