Source code for networkx.algorithms.flow.utils

"""
网络流算法实用类和函数。
"""

from collections import deque

import networkx as nx

__all__ = [
    "CurrentEdge",
    "Level",
    "GlobalRelabelThreshold",
    "build_residual_network",
    "detect_unboundedness",
    "build_flow_dict",
]


class CurrentEdge:
    """循环方式遍历节点出边的机制。当发生回绕时,会引发StopIteration异常。"""

    __slots__ = ("_edges", "_it", "_curr")

    def __init__(self, edges):
        self._edges = edges
        if self._edges:
            self._rewind()

    def get(self):
        return self._curr

    def move_to_next(self):
        try:
            self._curr = next(self._it)
        except StopIteration:
            self._rewind()
            raise

    def _rewind(self):
        self._it = iter(self._edges.items())
        self._curr = next(self._it)


class Level:
    """当前关卡中的活跃和不活跃节点。"""

    __slots__ = ("active", "inactive")

    def __init__(self):
        self.active = set()
        self.inactive = set()


class GlobalRelabelThreshold:
    """在应用全局重标记启发式方法之前的作业测量。"""

    def __init__(self, n, m, freq):
        self._threshold = (n + m) / freq if freq else float("inf")
        self._work = 0

    def add_work(self, work):
        self._work += work

    def is_reached(self):
        return self._work >= self._threshold

    def clear_work(self):
        self._work = 0


[docs] @nx._dispatchable(edge_attrs={"capacity": float("inf")}, returns_graph=True) def build_residual_network(G, capacity): """构建一个残差网络并初始化零流。 残差网络 :samp:`R` 来自输入图 :samp:`G` ,具有与 :samp:`G` 相同的节点。:samp:`R` 是一个有向图,如果 :samp:`(u, v)` 不是自环,并且 :samp:`(u, v)` 和 :samp:`(v, u)` 中至少有一个存在于 :samp:`G` 中,则 :samp:`R` 包含一对边 :samp:`(u, v)` 和 :samp:`(v, u)` 。 对于 :samp:`R` 中的每条边 :samp:`(u, v)` ,如果 :samp:`(u, v)` 在 :samp:`G` 中存在,则 :samp:`R[u][v]['capacity']` 等于 :samp:`(u, v)` 在 :samp:`G` 中的容量,否则为零。如果容量是无限的,:samp:`R[u][v]['capacity']` 将有一个高任意有限值,该值不影响问题的解决方案。此值存储在 :samp:`R.graph['inf']` 中。对于 :samp:`R` 中的每条边 :samp:`(u, v)` ,:samp:`R[u][v]['flow']` 表示 :samp:`(u, v)` 的流函数,并且满足 :samp:`R[u][v]['flow'] == -R[v][u]['flow']` 。 定义为流入汇点 :samp:`t` 的总流量的流值存储在 :samp:`R.graph['flow_value']` 中。如果未指定 :samp:`cutoff` ,则仅使用满足 :samp:`R[u][v]['flow'] < R[u][v]['capacity']` 的边 :samp:`(u, v)` 到达 :samp:`t` 的可达性会诱导一个最小 :samp:`s` -:samp:`t` 割。 """ if G.is_multigraph(): raise nx.NetworkXError("MultiGraph and MultiDiGraph not supported (yet).") R = nx.DiGraph() R.__networkx_cache__ = None # Disable caching R.add_nodes_from(G) inf = float("inf") # Extract edges with positive capacities. Self loops excluded. edge_list = [ (u, v, attr) for u, v, attr in G.edges(data=True) if u != v and attr.get(capacity, inf) > 0 ] # Simulate infinity with three times the sum of the finite edge capacities # or any positive value if the sum is zero. This allows the # infinite-capacity edges to be distinguished for unboundedness detection # and directly participate in residual capacity calculation. If the maximum # flow is finite, these edges cannot appear in the minimum cut and thus # guarantee correctness. Since the residual capacity of an # infinite-capacity edge is always at least 2/3 of inf, while that of an # finite-capacity edge is at most 1/3 of inf, if an operation moves more # than 1/3 of inf units of flow to t, there must be an infinite-capacity # s-t path in G. inf = ( 3 * sum( attr[capacity] for u, v, attr in edge_list if capacity in attr and attr[capacity] != inf ) or 1 ) if G.is_directed(): for u, v, attr in edge_list: r = min(attr.get(capacity, inf), inf) if not R.has_edge(u, v): # Both (u, v) and (v, u) must be present in the residual # network. R.add_edge(u, v, capacity=r) R.add_edge(v, u, capacity=0) else: # The edge (u, v) was added when (v, u) was visited. R[u][v]["capacity"] = r else: for u, v, attr in edge_list: # Add a pair of edges with equal residual capacities. r = min(attr.get(capacity, inf), inf) R.add_edge(u, v, capacity=r) R.add_edge(v, u, capacity=r) # Record the value simulating infinity. R.graph["inf"] = inf return R
@nx._dispatchable( graphs="R", preserve_edge_attrs={"R": {"capacity": float("inf")}}, preserve_graph_attrs=True, ) def detect_unboundedness(R, s, t): """在R中检测一条容量无限的s-t路径。""" q = deque([s]) seen = {s} inf = R.graph["inf"] while q: u = q.popleft() for v, attr in R[u].items(): if attr["capacity"] == inf and v not in seen: if v == t: raise nx.NetworkXUnbounded( "Infinite capacity path, flow unbounded above." ) seen.add(v) q.append(v) @nx._dispatchable(graphs={"G": 0, "R": 1}, preserve_edge_attrs={"R": {"flow": None}}) def build_flow_dict(G, R): """从残差网络构建一个流字典。""" flow_dict = {} for u in G: flow_dict[u] = {v: 0 for v in G[u]} flow_dict[u].update( (v, attr["flow"]) for v, attr in R[u].items() if attr["flow"] > 0 ) return flow_dict