Skip to content

spatial_graph_algorithms.denoise

Score, filter, and evaluate likely false edges in SpatialGraph objects.

Usage

from spatial_graph_algorithms.denoise import EdgeFilterer, score_edges
from spatial_graph_algorithms.metrics import evaluate_denoising

scores = score_edges(sg, method="jaccard")
sg_clean = EdgeFilterer().by_fraction(
    sg, scores, fraction=0.10, method="jaccard"
)
report = evaluate_denoising(sg_clean, scores=scores, method="jaccard")

API Reference

spatial_graph_algorithms.denoise.SCORE_POLARITY = {'betweenness': 'positive', 'jaccard': 'negative', 'square_bipartite': 'negative', 'community': 'negative', 'random_walk': 'negative', 'degree': 'positive'} module-attribute

spatial_graph_algorithms.denoise.EdgeScorer

Compute edge suspicion scores from graph topology.

Each method accepts a CSR adjacency matrix and returns a dict mapping canonical edge tuples (min(u, v), max(u, v)) to float scores. Score polarity (high = suspect vs low = suspect) is method-specific — see :data:SCORE_POLARITY and each method's docstring.

Methods:

Name Description
betweenness

Edge betweenness centrality. High = suspect.

jaccard

Jaccard neighbourhood overlap. Low = suspect. Unipartite only.

square_bipartite

4-cycle count. Low = suspect. Bipartite only.

community

Louvain intra-community consistency. Low = suspect.

random_walk

Stochastic walk visitation frequency. Low = suspect.

degree

Max endpoint degree. High = suspect. O(n), no graph construction.

Source code in src/spatial_graph_algorithms/denoise/__init__.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
class EdgeScorer:
    """Compute edge suspicion scores from graph topology.

    Each method accepts a CSR adjacency matrix and returns a dict mapping
    canonical edge tuples ``(min(u, v), max(u, v))`` to float scores.
    Score polarity (high = suspect vs low = suspect) is method-specific —
    see :data:`SCORE_POLARITY` and each method's docstring.

    Methods
    -------
    betweenness(adj)
        Edge betweenness centrality. High = suspect.
    jaccard(adj)
        Jaccard neighbourhood overlap. Low = suspect. Unipartite only.
    square_bipartite(adj)
        4-cycle count. Low = suspect. Bipartite only.
    community(adj, *, n_runs)
        Louvain intra-community consistency. Low = suspect.
    random_walk(adj, *, walks_per_node, return_chance, seed)
        Stochastic walk visitation frequency. Low = suspect.
    degree(adj)
        Max endpoint degree. High = suspect. O(n), no graph construction.
    """

    def betweenness(
        self,
        adj: scipy.sparse.csr_matrix,
    ) -> dict[tuple[int, int], float]:
        """Score edges by edge betweenness centrality.

        High centrality signals a critical bridge — a characteristic of
        long-range false connections that span otherwise distinct communities.

        Parameters
        ----------
        adj : scipy.sparse.csr_matrix
            Symmetric unweighted adjacency matrix.

        Returns
        -------
        dict[tuple[int, int], float]
            Edge → normalised betweenness in ``[0, 1]``.  High = suspect.

        Notes
        -----
        Uses ``networkx.edge_betweenness_centrality``, runtime ``O(VE)``.
        Slow on graphs with more than a few thousand nodes.

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer
        >>> sg = generate(n=100, seed=0)
        >>> scores = EdgeScorer().betweenness(sg.adjacency_matrix)
        >>> all(0.0 <= v <= 1.0 for v in scores.values())
        True
        """
        G = nx.from_scipy_sparse_array(adj)
        centrality = nx.edge_betweenness_centrality(G, normalized=True)
        return {_canonical(u, v): s for (u, v), s in centrality.items()}

    def jaccard(
        self,
        adj: scipy.sparse.csr_matrix,
    ) -> dict[tuple[int, int], float]:
        """Score edges by Jaccard neighbourhood overlap.

        Edges connecting nodes with few shared neighbours are more likely to
        be false connections bridging unrelated graph regions.

        Parameters
        ----------
        adj : scipy.sparse.csr_matrix
            Symmetric unweighted adjacency matrix for a **unipartite** graph.

        Returns
        -------
        dict[tuple[int, int], float]
            Edge → Jaccard score in ``[0, 1]``.  Low = suspect.

        Raises
        ------
        ValueError
            If *adj* represents a bipartite graph.

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer
        >>> sg = generate(n=100, seed=0)
        >>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
        >>> all(0.0 <= v <= 1.0 for v in scores.values())
        True
        """
        G = nx.from_scipy_sparse_array(adj)
        if nx.is_bipartite(G):
            raise ValueError(
                "jaccard scorer is designed for unipartite graphs. "
                "Use square_bipartite for bipartite graphs."
            )
        scores: dict[tuple[int, int], float] = {}
        rows, cols = adj.nonzero()
        for r, c in zip(rows, cols):
            if r >= c:
                continue
            nb_u = set(adj[r].indices)
            nb_v = set(adj[c].indices)
            union = len(nb_u | nb_v)
            scores[(int(r), int(c))] = len(nb_u & nb_v) / union if union > 0 else 0.0
        return scores

    def square_bipartite(
        self,
        adj: scipy.sparse.csr_matrix,
    ) -> dict[tuple[int, int], float]:
        """Score edges by 4-cycle participation in a bipartite graph.

        True edges in a spatial bipartite graph typically close many 4-cycles
        (shared neighbours in the opposite partition).  False edges span
        unrelated partitions and form fewer squares.

        Parameters
        ----------
        adj : scipy.sparse.csr_matrix
            Symmetric unweighted adjacency matrix for a **bipartite** graph.

        Returns
        -------
        dict[tuple[int, int], float]
            Edge → 4-cycle count (as float).  Low = suspect.

        Raises
        ------
        ValueError
            If *adj* does not represent a bipartite graph.

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer
        >>> sg = generate(n=80, mode="knn_bipartite", seed=0)
        >>> scores = EdgeScorer().square_bipartite(sg.adjacency_matrix)
        >>> all(v >= 0 for v in scores.values())
        True
        """
        G = nx.from_scipy_sparse_array(adj)
        if not nx.is_bipartite(G):
            raise ValueError(
                "square_bipartite scorer requires a bipartite graph. "
                "Use jaccard for unipartite graphs."
            )
        top_nodes, bottom_nodes = nx.bipartite.sets(G)
        top_adj = {u: set(G[u]) for u in top_nodes}
        bottom_adj = {v: set(G[v]) for v in bottom_nodes}

        edge_scores: dict[tuple[int, int], float] = {}
        for u, v in G.edges():
            if u in bottom_nodes:
                u, v = v, u  # ensure u ∈ top, v ∈ bottom
            b_u = top_adj[u] - {v}
            t_v = bottom_adj[v] - {u}
            count = sum(len(top_adj[w].intersection(b_u)) for w in t_v)
            edge_scores[_canonical(u, v)] = float(count)
        return edge_scores

    def square_bipartite_linalg(
        self,
        adj: scipy.sparse.csr_matrix,
    ) -> dict[tuple[int, int], float]:
        """Alternative square_bipartite implementation using sparse matrix multiplication."""
        # W3 = A @ A @ A - diag(A @ A) @ A - A @ diag(A @ A) + A.multiply(A).multiply(A)
        # (total 3-paths) - (i->k->i->j) - (i->j->k->j) + (i->j->i->j)
        adj      = adj.astype(np.int64)

        adj2     = adj @ adj
        adj2_diag   = adj2.diagonal().astype(np.int64)

        adj3     = adj2 @ adj

        w3 = (
            adj3
            - scipy.sparse.diags(adj2_diag, format="csr", dtype=np.int64) @ adj
            - adj @ scipy.sparse.diags(adj2_diag, format="csr", dtype=np.int64)
            + adj.multiply(adj).multiply(adj)
        )

        # only take scores if there is an edge in the original graph
        w3 = w3.multiply(adj > 0)

        # align w3 values with adj edges
        scores: dict[tuple[int, int], float] = {}
        rows, cols = adj.nonzero()
        for r, c in zip(rows, cols):
            scores[(int(r), int(c))] = float(w3[r, c])
        return scores


    def community(
        self,
        adj: scipy.sparse.csr_matrix,
        *,
        n_runs: int = 5,
    ) -> dict[tuple[int, int], float]:
        """Score edges by Louvain community membership consistency.

        Runs the Louvain algorithm (via igraph) *n_runs* times.  Each edge
        is scored by the fraction of runs it was classified as intra-community.
        Edges that persistently bridge communities are likely false connections.

        Parameters
        ----------
        adj : scipy.sparse.csr_matrix
            Symmetric unweighted adjacency matrix.
        n_runs : int
            Independent Louvain repetitions.  Higher = more stable scores.
            Default 5.

        Returns
        -------
        dict[tuple[int, int], float]
            Edge → intra-community fraction in ``[0, 1]``.  Low = suspect.

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer
        >>> sg = generate(n=100, seed=0)
        >>> scores = EdgeScorer().community(sg.adjacency_matrix, n_runs=3)
        >>> all(0.0 <= v <= 1.0 for v in scores.values())
        True
        """
        rows, cols = adj.nonzero()
        ig_edges = [(int(r), int(c)) for r, c in zip(rows, cols) if r < c]
        G_ig = _igraph.Graph(n=adj.shape[0], edges=ig_edges, directed=False)

        canonical_edges = {
            (int(min(r, c)), int(max(r, c)))
            for r, c in zip(rows, cols)
            if r != c
        }
        within: dict[tuple[int, int], int] = {e: 0 for e in canonical_edges}

        for _ in range(n_runs):
            partition = G_ig.community_multilevel()
            membership = partition.membership
            for e in canonical_edges:
                if membership[e[0]] == membership[e[1]]:
                    within[e] += 1

        return {e: cnt / n_runs for e, cnt in within.items()}

    def random_walk(
        self,
        adj: scipy.sparse.csr_matrix,
        *,
        walks_per_node: int = 32,
        return_chance: float = 0.3,
        expansions: int = 1,
        seed: int | None = None,
    ) -> dict[tuple[int, int], float]:
        """Score edges by stochastic walk visitation frequency.

        Performs *walks_per_node* random walks from every node.  At each step
        there is a *return_chance* of attempting to return home.  A walk only
        counts when the current position has **two edge-disjoint paths** back
        to the origin in the locally discovered subgraph — edges that act as
        bridges (typical of false connections) will have their walks discarded
        more often, yielding lower scores.

        After normalisation, the visitation matrix is raised to the power
        ``2 ** expansions`` via repeated matrix squaring (*expansions* = 1
        computes ``Y @ Y``), amplifying score separation between true and
        false edges.

        Parameters
        ----------
        adj : scipy.sparse.csr_matrix
            Symmetric unweighted adjacency matrix.
        walks_per_node : int
            Walks launched per origin node.  Default 32.
        return_chance : float
            Per-step probability of attempting a return.  Default 0.3.
        expansions : int
            Number of matrix-squaring steps applied after normalisation.
            Default 1 (computes ``Y @ Y``).  Set to 0 to skip.
        seed : int, optional
            Random seed for reproducibility.

        Returns
        -------
        dict[tuple[int, int], float]
            Edge → normalised visitation frequency.  Low = suspect.

        Warns
        -----
        ResourceWarning
            When *adj* has more than 2 000 nodes (O(n²) memory footprint).

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer
        >>> sg = generate(n=60, seed=0)
        >>> scores = EdgeScorer().random_walk(sg.adjacency_matrix, walks_per_node=4, seed=0)
        >>> len(scores) == sg.n_edges
        True
        """
        import warnings

        n = adj.shape[0]
        if n > 2000:
            warnings.warn(
                f"random_walk scorer allocates a ({n}×{n}) visitation matrix "
                f"({n * n * 8 / 1e6:.0f} MB). "
                "Consider betweenness or jaccard for large graphs.",
                ResourceWarning,
                stacklevel=2,
            )

        rng = np.random.default_rng(seed)
        G = nx.from_scipy_sparse_array(adj)
        adj_list = [list(G.neighbors(i)) for i in range(n)]
        visit = np.zeros((n, n), dtype=np.float64)

        for origin in range(n):
            for _ in range(walks_per_node):
                current = origin
                path: list[tuple[int, int]] = []
                discovered = nx.Graph()
                discovered.add_node(origin)

                while True:
                    nbs = adj_list[current]
                    if not nbs:
                        break
                    nxt = int(rng.choice(nbs))
                    path.append((current, nxt))

                    discovered.add_edge(current, nxt, weight=1)

                    # 1-hop BFS to enrich the discovered subgraph (matches
                    # the original's mini-BFS with range(1))
                    for nb in adj_list[nxt]:
                        if not discovered.has_edge(nxt, nb):
                            discovered.add_edge(nxt, nb, weight=1)

                    if rng.random() < return_chance:
                        # A walk only counts when two edge-disjoint paths exist
                        # from the current position back to the origin.
                        # Find the first path, remove its edges, then verify a
                        # second path still exists — this selectively discards
                        # walks that traversed bridges or near-bridges.
                        temp = discovered.copy()
                        try:
                            first = nx.shortest_path(temp, source=current,
                                                     target=origin,
                                                     weight="weight")
                            for k in range(len(first) - 1):
                                temp.remove_edge(first[k], first[k + 1])
                            nx.shortest_path(temp, source=current,
                                             target=origin, weight="weight")
                        except nx.NetworkXNoPath:
                            path = []
                        break

                    current = nxt

                for src, dst in path:
                    visit[origin, dst] += 1
                    visit[origin, src] += 1

        # Row-normalise
        row_sums = visit.sum(axis=1)
        with np.errstate(divide="ignore", invalid="ignore"):
            norm_factors = np.where(row_sums > 0, 1.0 / row_sums, 0.0)
        Y = visit * norm_factors[:, np.newaxis]

        # Matrix-squaring expansion: amplifies score separation
        for _ in range(expansions):
            Y = np.dot(Y, Y)

        scores: dict[tuple[int, int], float] = {}
        rows, cols = adj.nonzero()
        for r, c in zip(rows, cols):
            if r >= c:
                continue
            avg = (Y[r, c] + Y[c, r]) / 2.0
            scores[(int(r), int(c))] = float(avg)
        return scores


    def degree(
        self,
        adj: scipy.sparse.csr_matrix,
    ) -> dict[tuple[int, int], float]:
        """Score edges by the maximum degree of their two endpoints.

        False edges inflate the degree of their endpoints above the baseline
        set by true local connections.  Scoring each edge by
        ``max(deg_u, deg_v)`` highlights edges that touch anomalously
        high-degree nodes.

        Degree is read directly from the CSR ``indptr`` array via
        ``np.diff(adj.indptr)`` — O(n), no graph construction required.

        Parameters
        ----------
        adj : scipy.sparse.csr_matrix
            Symmetric unweighted adjacency matrix.

        Returns
        -------
        dict[tuple[int, int], float]
            Edge → ``max(deg_u, deg_v)`` as float.  High = suspect.

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer
        >>> sg = generate(n=100, seed=0)
        >>> scores = EdgeScorer().degree(sg.adjacency_matrix)
        >>> len(scores) == sg.n_edges
        True
        """
        degrees = np.diff(adj.indptr).astype(np.float64)
        scores: dict[tuple[int, int], float] = {}
        rows, cols = adj.nonzero()
        for r, c in zip(rows, cols):
            if r >= c:
                continue
            scores[(int(r), int(c))] = float(max(degrees[r], degrees[c]))
        return scores

Functions

betweenness(adj)

Score edges by edge betweenness centrality.

High centrality signals a critical bridge — a characteristic of long-range false connections that span otherwise distinct communities.

Parameters:

Name Type Description Default
adj csr_matrix

Symmetric unweighted adjacency matrix.

required

Returns:

Type Description
dict[tuple[int, int], float]

Edge → normalised betweenness in [0, 1]. High = suspect.

Notes

Uses networkx.edge_betweenness_centrality, runtime O(VE). Slow on graphs with more than a few thousand nodes.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer
>>> sg = generate(n=100, seed=0)
>>> scores = EdgeScorer().betweenness(sg.adjacency_matrix)
>>> all(0.0 <= v <= 1.0 for v in scores.values())
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def betweenness(
    self,
    adj: scipy.sparse.csr_matrix,
) -> dict[tuple[int, int], float]:
    """Score edges by edge betweenness centrality.

    High centrality signals a critical bridge — a characteristic of
    long-range false connections that span otherwise distinct communities.

    Parameters
    ----------
    adj : scipy.sparse.csr_matrix
        Symmetric unweighted adjacency matrix.

    Returns
    -------
    dict[tuple[int, int], float]
        Edge → normalised betweenness in ``[0, 1]``.  High = suspect.

    Notes
    -----
    Uses ``networkx.edge_betweenness_centrality``, runtime ``O(VE)``.
    Slow on graphs with more than a few thousand nodes.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer
    >>> sg = generate(n=100, seed=0)
    >>> scores = EdgeScorer().betweenness(sg.adjacency_matrix)
    >>> all(0.0 <= v <= 1.0 for v in scores.values())
    True
    """
    G = nx.from_scipy_sparse_array(adj)
    centrality = nx.edge_betweenness_centrality(G, normalized=True)
    return {_canonical(u, v): s for (u, v), s in centrality.items()}

jaccard(adj)

Score edges by Jaccard neighbourhood overlap.

Edges connecting nodes with few shared neighbours are more likely to be false connections bridging unrelated graph regions.

Parameters:

Name Type Description Default
adj csr_matrix

Symmetric unweighted adjacency matrix for a unipartite graph.

required

Returns:

Type Description
dict[tuple[int, int], float]

Edge → Jaccard score in [0, 1]. Low = suspect.

Raises:

Type Description
ValueError

If adj represents a bipartite graph.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer
>>> sg = generate(n=100, seed=0)
>>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
>>> all(0.0 <= v <= 1.0 for v in scores.values())
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def jaccard(
    self,
    adj: scipy.sparse.csr_matrix,
) -> dict[tuple[int, int], float]:
    """Score edges by Jaccard neighbourhood overlap.

    Edges connecting nodes with few shared neighbours are more likely to
    be false connections bridging unrelated graph regions.

    Parameters
    ----------
    adj : scipy.sparse.csr_matrix
        Symmetric unweighted adjacency matrix for a **unipartite** graph.

    Returns
    -------
    dict[tuple[int, int], float]
        Edge → Jaccard score in ``[0, 1]``.  Low = suspect.

    Raises
    ------
    ValueError
        If *adj* represents a bipartite graph.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer
    >>> sg = generate(n=100, seed=0)
    >>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
    >>> all(0.0 <= v <= 1.0 for v in scores.values())
    True
    """
    G = nx.from_scipy_sparse_array(adj)
    if nx.is_bipartite(G):
        raise ValueError(
            "jaccard scorer is designed for unipartite graphs. "
            "Use square_bipartite for bipartite graphs."
        )
    scores: dict[tuple[int, int], float] = {}
    rows, cols = adj.nonzero()
    for r, c in zip(rows, cols):
        if r >= c:
            continue
        nb_u = set(adj[r].indices)
        nb_v = set(adj[c].indices)
        union = len(nb_u | nb_v)
        scores[(int(r), int(c))] = len(nb_u & nb_v) / union if union > 0 else 0.0
    return scores

square_bipartite(adj)

Score edges by 4-cycle participation in a bipartite graph.

True edges in a spatial bipartite graph typically close many 4-cycles (shared neighbours in the opposite partition). False edges span unrelated partitions and form fewer squares.

Parameters:

Name Type Description Default
adj csr_matrix

Symmetric unweighted adjacency matrix for a bipartite graph.

required

Returns:

Type Description
dict[tuple[int, int], float]

Edge → 4-cycle count (as float). Low = suspect.

Raises:

Type Description
ValueError

If adj does not represent a bipartite graph.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer
>>> sg = generate(n=80, mode="knn_bipartite", seed=0)
>>> scores = EdgeScorer().square_bipartite(sg.adjacency_matrix)
>>> all(v >= 0 for v in scores.values())
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def square_bipartite(
    self,
    adj: scipy.sparse.csr_matrix,
) -> dict[tuple[int, int], float]:
    """Score edges by 4-cycle participation in a bipartite graph.

    True edges in a spatial bipartite graph typically close many 4-cycles
    (shared neighbours in the opposite partition).  False edges span
    unrelated partitions and form fewer squares.

    Parameters
    ----------
    adj : scipy.sparse.csr_matrix
        Symmetric unweighted adjacency matrix for a **bipartite** graph.

    Returns
    -------
    dict[tuple[int, int], float]
        Edge → 4-cycle count (as float).  Low = suspect.

    Raises
    ------
    ValueError
        If *adj* does not represent a bipartite graph.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer
    >>> sg = generate(n=80, mode="knn_bipartite", seed=0)
    >>> scores = EdgeScorer().square_bipartite(sg.adjacency_matrix)
    >>> all(v >= 0 for v in scores.values())
    True
    """
    G = nx.from_scipy_sparse_array(adj)
    if not nx.is_bipartite(G):
        raise ValueError(
            "square_bipartite scorer requires a bipartite graph. "
            "Use jaccard for unipartite graphs."
        )
    top_nodes, bottom_nodes = nx.bipartite.sets(G)
    top_adj = {u: set(G[u]) for u in top_nodes}
    bottom_adj = {v: set(G[v]) for v in bottom_nodes}

    edge_scores: dict[tuple[int, int], float] = {}
    for u, v in G.edges():
        if u in bottom_nodes:
            u, v = v, u  # ensure u ∈ top, v ∈ bottom
        b_u = top_adj[u] - {v}
        t_v = bottom_adj[v] - {u}
        count = sum(len(top_adj[w].intersection(b_u)) for w in t_v)
        edge_scores[_canonical(u, v)] = float(count)
    return edge_scores

square_bipartite_linalg(adj)

Alternative square_bipartite implementation using sparse matrix multiplication.

Source code in src/spatial_graph_algorithms/denoise/__init__.py
def square_bipartite_linalg(
    self,
    adj: scipy.sparse.csr_matrix,
) -> dict[tuple[int, int], float]:
    """Alternative square_bipartite implementation using sparse matrix multiplication."""
    # W3 = A @ A @ A - diag(A @ A) @ A - A @ diag(A @ A) + A.multiply(A).multiply(A)
    # (total 3-paths) - (i->k->i->j) - (i->j->k->j) + (i->j->i->j)
    adj      = adj.astype(np.int64)

    adj2     = adj @ adj
    adj2_diag   = adj2.diagonal().astype(np.int64)

    adj3     = adj2 @ adj

    w3 = (
        adj3
        - scipy.sparse.diags(adj2_diag, format="csr", dtype=np.int64) @ adj
        - adj @ scipy.sparse.diags(adj2_diag, format="csr", dtype=np.int64)
        + adj.multiply(adj).multiply(adj)
    )

    # only take scores if there is an edge in the original graph
    w3 = w3.multiply(adj > 0)

    # align w3 values with adj edges
    scores: dict[tuple[int, int], float] = {}
    rows, cols = adj.nonzero()
    for r, c in zip(rows, cols):
        scores[(int(r), int(c))] = float(w3[r, c])
    return scores

community(adj, *, n_runs=5)

Score edges by Louvain community membership consistency.

Runs the Louvain algorithm (via igraph) n_runs times. Each edge is scored by the fraction of runs it was classified as intra-community. Edges that persistently bridge communities are likely false connections.

Parameters:

Name Type Description Default
adj csr_matrix

Symmetric unweighted adjacency matrix.

required
n_runs int

Independent Louvain repetitions. Higher = more stable scores. Default 5.

5

Returns:

Type Description
dict[tuple[int, int], float]

Edge → intra-community fraction in [0, 1]. Low = suspect.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer
>>> sg = generate(n=100, seed=0)
>>> scores = EdgeScorer().community(sg.adjacency_matrix, n_runs=3)
>>> all(0.0 <= v <= 1.0 for v in scores.values())
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def community(
    self,
    adj: scipy.sparse.csr_matrix,
    *,
    n_runs: int = 5,
) -> dict[tuple[int, int], float]:
    """Score edges by Louvain community membership consistency.

    Runs the Louvain algorithm (via igraph) *n_runs* times.  Each edge
    is scored by the fraction of runs it was classified as intra-community.
    Edges that persistently bridge communities are likely false connections.

    Parameters
    ----------
    adj : scipy.sparse.csr_matrix
        Symmetric unweighted adjacency matrix.
    n_runs : int
        Independent Louvain repetitions.  Higher = more stable scores.
        Default 5.

    Returns
    -------
    dict[tuple[int, int], float]
        Edge → intra-community fraction in ``[0, 1]``.  Low = suspect.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer
    >>> sg = generate(n=100, seed=0)
    >>> scores = EdgeScorer().community(sg.adjacency_matrix, n_runs=3)
    >>> all(0.0 <= v <= 1.0 for v in scores.values())
    True
    """
    rows, cols = adj.nonzero()
    ig_edges = [(int(r), int(c)) for r, c in zip(rows, cols) if r < c]
    G_ig = _igraph.Graph(n=adj.shape[0], edges=ig_edges, directed=False)

    canonical_edges = {
        (int(min(r, c)), int(max(r, c)))
        for r, c in zip(rows, cols)
        if r != c
    }
    within: dict[tuple[int, int], int] = {e: 0 for e in canonical_edges}

    for _ in range(n_runs):
        partition = G_ig.community_multilevel()
        membership = partition.membership
        for e in canonical_edges:
            if membership[e[0]] == membership[e[1]]:
                within[e] += 1

    return {e: cnt / n_runs for e, cnt in within.items()}

random_walk(adj, *, walks_per_node=32, return_chance=0.3, expansions=1, seed=None)

Score edges by stochastic walk visitation frequency.

Performs walks_per_node random walks from every node. At each step there is a return_chance of attempting to return home. A walk only counts when the current position has two edge-disjoint paths back to the origin in the locally discovered subgraph — edges that act as bridges (typical of false connections) will have their walks discarded more often, yielding lower scores.

After normalisation, the visitation matrix is raised to the power 2 ** expansions via repeated matrix squaring (expansions = 1 computes Y @ Y), amplifying score separation between true and false edges.

Parameters:

Name Type Description Default
adj csr_matrix

Symmetric unweighted adjacency matrix.

required
walks_per_node int

Walks launched per origin node. Default 32.

32
return_chance float

Per-step probability of attempting a return. Default 0.3.

0.3
expansions int

Number of matrix-squaring steps applied after normalisation. Default 1 (computes Y @ Y). Set to 0 to skip.

1
seed int

Random seed for reproducibility.

None

Returns:

Type Description
dict[tuple[int, int], float]

Edge → normalised visitation frequency. Low = suspect.

Warns:

Type Description
ResourceWarning

When adj has more than 2 000 nodes (O(n²) memory footprint).

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer
>>> sg = generate(n=60, seed=0)
>>> scores = EdgeScorer().random_walk(sg.adjacency_matrix, walks_per_node=4, seed=0)
>>> len(scores) == sg.n_edges
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def random_walk(
    self,
    adj: scipy.sparse.csr_matrix,
    *,
    walks_per_node: int = 32,
    return_chance: float = 0.3,
    expansions: int = 1,
    seed: int | None = None,
) -> dict[tuple[int, int], float]:
    """Score edges by stochastic walk visitation frequency.

    Performs *walks_per_node* random walks from every node.  At each step
    there is a *return_chance* of attempting to return home.  A walk only
    counts when the current position has **two edge-disjoint paths** back
    to the origin in the locally discovered subgraph — edges that act as
    bridges (typical of false connections) will have their walks discarded
    more often, yielding lower scores.

    After normalisation, the visitation matrix is raised to the power
    ``2 ** expansions`` via repeated matrix squaring (*expansions* = 1
    computes ``Y @ Y``), amplifying score separation between true and
    false edges.

    Parameters
    ----------
    adj : scipy.sparse.csr_matrix
        Symmetric unweighted adjacency matrix.
    walks_per_node : int
        Walks launched per origin node.  Default 32.
    return_chance : float
        Per-step probability of attempting a return.  Default 0.3.
    expansions : int
        Number of matrix-squaring steps applied after normalisation.
        Default 1 (computes ``Y @ Y``).  Set to 0 to skip.
    seed : int, optional
        Random seed for reproducibility.

    Returns
    -------
    dict[tuple[int, int], float]
        Edge → normalised visitation frequency.  Low = suspect.

    Warns
    -----
    ResourceWarning
        When *adj* has more than 2 000 nodes (O(n²) memory footprint).

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer
    >>> sg = generate(n=60, seed=0)
    >>> scores = EdgeScorer().random_walk(sg.adjacency_matrix, walks_per_node=4, seed=0)
    >>> len(scores) == sg.n_edges
    True
    """
    import warnings

    n = adj.shape[0]
    if n > 2000:
        warnings.warn(
            f"random_walk scorer allocates a ({n}×{n}) visitation matrix "
            f"({n * n * 8 / 1e6:.0f} MB). "
            "Consider betweenness or jaccard for large graphs.",
            ResourceWarning,
            stacklevel=2,
        )

    rng = np.random.default_rng(seed)
    G = nx.from_scipy_sparse_array(adj)
    adj_list = [list(G.neighbors(i)) for i in range(n)]
    visit = np.zeros((n, n), dtype=np.float64)

    for origin in range(n):
        for _ in range(walks_per_node):
            current = origin
            path: list[tuple[int, int]] = []
            discovered = nx.Graph()
            discovered.add_node(origin)

            while True:
                nbs = adj_list[current]
                if not nbs:
                    break
                nxt = int(rng.choice(nbs))
                path.append((current, nxt))

                discovered.add_edge(current, nxt, weight=1)

                # 1-hop BFS to enrich the discovered subgraph (matches
                # the original's mini-BFS with range(1))
                for nb in adj_list[nxt]:
                    if not discovered.has_edge(nxt, nb):
                        discovered.add_edge(nxt, nb, weight=1)

                if rng.random() < return_chance:
                    # A walk only counts when two edge-disjoint paths exist
                    # from the current position back to the origin.
                    # Find the first path, remove its edges, then verify a
                    # second path still exists — this selectively discards
                    # walks that traversed bridges or near-bridges.
                    temp = discovered.copy()
                    try:
                        first = nx.shortest_path(temp, source=current,
                                                 target=origin,
                                                 weight="weight")
                        for k in range(len(first) - 1):
                            temp.remove_edge(first[k], first[k + 1])
                        nx.shortest_path(temp, source=current,
                                         target=origin, weight="weight")
                    except nx.NetworkXNoPath:
                        path = []
                    break

                current = nxt

            for src, dst in path:
                visit[origin, dst] += 1
                visit[origin, src] += 1

    # Row-normalise
    row_sums = visit.sum(axis=1)
    with np.errstate(divide="ignore", invalid="ignore"):
        norm_factors = np.where(row_sums > 0, 1.0 / row_sums, 0.0)
    Y = visit * norm_factors[:, np.newaxis]

    # Matrix-squaring expansion: amplifies score separation
    for _ in range(expansions):
        Y = np.dot(Y, Y)

    scores: dict[tuple[int, int], float] = {}
    rows, cols = adj.nonzero()
    for r, c in zip(rows, cols):
        if r >= c:
            continue
        avg = (Y[r, c] + Y[c, r]) / 2.0
        scores[(int(r), int(c))] = float(avg)
    return scores

degree(adj)

Score edges by the maximum degree of their two endpoints.

False edges inflate the degree of their endpoints above the baseline set by true local connections. Scoring each edge by max(deg_u, deg_v) highlights edges that touch anomalously high-degree nodes.

Degree is read directly from the CSR indptr array via np.diff(adj.indptr) — O(n), no graph construction required.

Parameters:

Name Type Description Default
adj csr_matrix

Symmetric unweighted adjacency matrix.

required

Returns:

Type Description
dict[tuple[int, int], float]

Edge → max(deg_u, deg_v) as float. High = suspect.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer
>>> sg = generate(n=100, seed=0)
>>> scores = EdgeScorer().degree(sg.adjacency_matrix)
>>> len(scores) == sg.n_edges
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def degree(
    self,
    adj: scipy.sparse.csr_matrix,
) -> dict[tuple[int, int], float]:
    """Score edges by the maximum degree of their two endpoints.

    False edges inflate the degree of their endpoints above the baseline
    set by true local connections.  Scoring each edge by
    ``max(deg_u, deg_v)`` highlights edges that touch anomalously
    high-degree nodes.

    Degree is read directly from the CSR ``indptr`` array via
    ``np.diff(adj.indptr)`` — O(n), no graph construction required.

    Parameters
    ----------
    adj : scipy.sparse.csr_matrix
        Symmetric unweighted adjacency matrix.

    Returns
    -------
    dict[tuple[int, int], float]
        Edge → ``max(deg_u, deg_v)`` as float.  High = suspect.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer
    >>> sg = generate(n=100, seed=0)
    >>> scores = EdgeScorer().degree(sg.adjacency_matrix)
    >>> len(scores) == sg.n_edges
    True
    """
    degrees = np.diff(adj.indptr).astype(np.float64)
    scores: dict[tuple[int, int], float] = {}
    rows, cols = adj.nonzero()
    for r, c in zip(rows, cols):
        if r >= c:
            continue
        scores[(int(r), int(c))] = float(max(degrees[r], degrees[c]))
    return scores

spatial_graph_algorithms.denoise.EdgeFilterer

Remove edges from a SpatialGraph using scored edge rankings.

All methods accept a :class:SpatialGraph and a scores dict (from :class:EdgeScorer) and return a new SpatialGraph with:

  • Fewer edges in the adjacency matrix.
  • edge_metadata["is_removed"] boolean column (created fresh if edge_metadata was None).
  • The largest connected component extracted — nodes that lose all edges after filtering are removed so the returned graph is always connected.

Methods:

Name Description
by_fraction

Remove the worst fraction of scored edges.

by_threshold

Remove edges above or below an explicit score value.

by_percentile

Remove edges outside a percentile band.

by_optimal_f1

Oracle: sweep thresholds, pick the one maximising F1 vs ground truth.

Source code in src/spatial_graph_algorithms/denoise/__init__.py
class EdgeFilterer:
    """Remove edges from a SpatialGraph using scored edge rankings.

    All methods accept a :class:`SpatialGraph` and a *scores* dict (from
    :class:`EdgeScorer`) and return a new ``SpatialGraph`` with:

    - Fewer edges in the adjacency matrix.
    - ``edge_metadata["is_removed"]`` boolean column (created fresh if
      ``edge_metadata`` was ``None``).
    - The largest connected component extracted — nodes that lose all edges
      after filtering are removed so the returned graph is always connected.

    Methods
    -------
    by_fraction(sg, scores, *, fraction, method)
        Remove the worst *fraction* of scored edges.
    by_threshold(sg, scores, *, threshold, method)
        Remove edges above or below an explicit score value.
    by_percentile(sg, scores, *, percentile, method)
        Remove edges outside a percentile band.
    by_optimal_f1(sg, scores)
        Oracle: sweep thresholds, pick the one maximising F1 vs ground truth.
    """

    def _apply(
        self,
        sg: SpatialGraph,
        removed_edges: set[tuple[int, int]],
    ) -> SpatialGraph:
        new_adj = _build_new_adjacency(sg.adjacency_matrix, removed_edges)
        new_meta = _build_new_metadata(sg, removed_edges)
        # Build without edge_metadata so _extract_lcc doesn't filter the audit trail.
        # edge_metadata records all original edges (including removed ones); it is
        # set after construction to preserve the full denoising history for evaluate_denoising().
        sg_clean = SpatialGraph(
            adjacency_matrix=new_adj,
            positions=sg.positions,
            reconstructed_positions=sg.reconstructed_positions,
            node_metadata=sg.node_metadata,
            node_id_map=sg.node_id_map,
            keep_lcc=True,
        )
        sg_clean.edge_metadata = new_meta
        return sg_clean

    def by_fraction(
        self,
        sg: SpatialGraph,
        scores: dict[tuple[int, int], float],
        *,
        fraction: float,
        method: str,
    ) -> SpatialGraph:
        """Remove the worst *fraction* of scored edges.

        Parameters
        ----------
        sg : SpatialGraph
            Input graph.
        scores : dict
            Edge scores from :class:`EdgeScorer`.
        fraction : float
            Fraction of edges to remove, in ``[0, 1]``.
        method : str
            Scoring method that produced *scores*.  Score polarity is looked
            up in :data:`SCORE_POLARITY`.

        Returns
        -------
        SpatialGraph
            New graph with edges removed and ``is_removed`` metadata set.

        Raises
        ------
        ValueError
            If *fraction* is not in ``[0, 1]`` or *method* is unknown.

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer, EdgeFilterer
        >>> sg = generate(n=100, false_edges_fraction=0.1, seed=0)
        >>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
        >>> sg_clean = EdgeFilterer().by_fraction(
        ...     sg, scores, fraction=0.05, method="jaccard"
        ... )
        >>> sg_clean.n_edges < sg.n_edges
        True
        """
        if not 0.0 <= fraction <= 1.0:
            raise ValueError(f"fraction must be in [0, 1], got {fraction}")
        remove_high_scores = _score_direction(method)
        sorted_edges = sorted(scores.items(), key=lambda x: x[1], reverse=remove_high_scores)
        n_remove = int(len(sorted_edges) * fraction)
        removed = {e for e, _ in sorted_edges[:n_remove]}
        return self._apply(sg, removed)

    def by_threshold(
        self,
        sg: SpatialGraph,
        scores: dict[tuple[int, int], float],
        *,
        threshold: float,
        method: str,
    ) -> SpatialGraph:
        """Remove edges whose score crosses *threshold*.

        Parameters
        ----------
        sg : SpatialGraph
            Input graph.
        scores : dict
            Edge scores from :class:`EdgeScorer`.
        threshold : float
            Score cut-off value.
        method : str
            Scoring method that produced *scores*.  Score polarity is looked
            up in :data:`SCORE_POLARITY`.

        Returns
        -------
        SpatialGraph
            New graph with edges removed and ``is_removed`` metadata set.

        Examples
        --------
        >>> sg_clean = EdgeFilterer().by_threshold(
        ...     sg, scores, threshold=0.5, method="betweenness"
        ... )
        """
        return self._by_threshold_direction(
            sg, scores, threshold=threshold, remove_high_scores=_score_direction(method)
        )

    def _by_threshold_direction(
        self,
        sg: SpatialGraph,
        scores: dict[tuple[int, int], float],
        *,
        threshold: float,
        remove_high_scores: bool,
    ) -> SpatialGraph:
        """Remove edges above or below *threshold* using a resolved score direction."""
        if remove_high_scores:
            removed = {e for e, s in scores.items() if s >= threshold}
        else:
            removed = {e for e, s in scores.items() if s <= threshold}
        return self._apply(sg, removed)

    def by_percentile(
        self,
        sg: SpatialGraph,
        scores: dict[tuple[int, int], float],
        *,
        percentile: float,
        method: str,
    ) -> SpatialGraph:
        """Remove edges outside a percentile band.

        Parameters
        ----------
        sg : SpatialGraph
            Input graph.
        scores : dict
            Edge scores from :class:`EdgeScorer`.
        percentile : float
            Percentage of edges to remove from the worst end, in ``[0, 100]``.
        method : str
            Scoring method that produced *scores*.  Score polarity is looked
            up in :data:`SCORE_POLARITY`.

        Returns
        -------
        SpatialGraph
            New graph with edges removed and ``is_removed`` metadata set.

        Raises
        ------
        ValueError
            If *percentile* is not in ``[0, 100]`` or *method* is unknown.

        Examples
        --------
        >>> sg_clean = EdgeFilterer().by_percentile(
        ...     sg, scores, percentile=5.0, method="jaccard"
        ... )
        """
        if not 0.0 <= percentile <= 100.0:
            raise ValueError(f"percentile must be in [0, 100], got {percentile}")
        vals = np.array(list(scores.values()))
        remove_high_scores = _score_direction(method)
        if remove_high_scores:
            threshold = float(np.percentile(vals, 100.0 - percentile))
            removed = {e for e, s in scores.items() if s >= threshold}
        else:
            threshold = float(np.percentile(vals, percentile))
            removed = {e for e, s in scores.items() if s <= threshold}
        return self._apply(sg, removed)

    def by_optimal_f1(
        self,
        sg: SpatialGraph,
        scores: dict[tuple[int, int], float],
    ) -> SpatialGraph:
        """Oracle filter: remove edges at the threshold that maximises F1.

        Sweeps every candidate threshold and selects the one that maximises
        the F1 score between removed edges and the ground-truth ``is_false``
        labels.  Both score polarities are tried; the globally best threshold
        wins regardless of direction.

        This is an **oracle method** — it requires ground-truth labels and
        therefore only applies to simulated graphs.  Use it to establish
        the theoretical upper bound for a given scorer.

        Parameters
        ----------
        sg : SpatialGraph
            Input graph.  Must have ``edge_metadata["is_false"]``.
        scores : dict
            Edge scores from :class:`EdgeScorer`.

        Returns
        -------
        SpatialGraph
            Graph filtered at the optimal F1 threshold, with ``is_removed``
            metadata set.

        Raises
        ------
        ValueError
            If ``sg.edge_metadata`` is ``None`` or lacks an ``is_false`` column.

        Examples
        --------
        >>> from spatial_graph_algorithms.simulate import generate
        >>> from spatial_graph_algorithms.denoise import EdgeScorer, EdgeFilterer
        >>> from spatial_graph_algorithms.metrics import evaluate_denoising
        >>> sg = generate(n=200, false_edges_fraction=0.10, seed=42)
        >>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
        >>> sg_opt = EdgeFilterer().by_optimal_f1(sg, scores)
        >>> report = evaluate_denoising(sg_opt)
        >>> report["f1"] >= 0.0
        True
        """
        if sg.edge_metadata is None or "is_false" not in sg.edge_metadata.columns:
            raise ValueError(
                "by_optimal_f1 requires ground-truth labels: "
                "sg.edge_metadata must have an 'is_false' column."
            )
        meta = sg.edge_metadata
        lo = np.minimum(meta["source"].values, meta["target"].values).astype(np.int64)
        hi = np.maximum(meta["source"].values, meta["target"].values).astype(np.int64)
        is_false_vals = meta["is_false"].values
        labels: dict[tuple[int, int], bool] = {
            (int(lo[i]), int(hi[i])): bool(is_false_vals[i])
            for i in range(len(meta))
        }
        threshold, _best_f1, remove_high_scores = _find_optimal_f1_threshold(scores, labels)
        return self._by_threshold_direction(
            sg, scores, threshold=threshold, remove_high_scores=remove_high_scores
        )

Functions

by_fraction(sg, scores, *, fraction, method)

Remove the worst fraction of scored edges.

Parameters:

Name Type Description Default
sg SpatialGraph

Input graph.

required
scores dict

Edge scores from :class:EdgeScorer.

required
fraction float

Fraction of edges to remove, in [0, 1].

required
method str

Scoring method that produced scores. Score polarity is looked up in :data:SCORE_POLARITY.

required

Returns:

Type Description
SpatialGraph

New graph with edges removed and is_removed metadata set.

Raises:

Type Description
ValueError

If fraction is not in [0, 1] or method is unknown.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer, EdgeFilterer
>>> sg = generate(n=100, false_edges_fraction=0.1, seed=0)
>>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
>>> sg_clean = EdgeFilterer().by_fraction(
...     sg, scores, fraction=0.05, method="jaccard"
... )
>>> sg_clean.n_edges < sg.n_edges
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def by_fraction(
    self,
    sg: SpatialGraph,
    scores: dict[tuple[int, int], float],
    *,
    fraction: float,
    method: str,
) -> SpatialGraph:
    """Remove the worst *fraction* of scored edges.

    Parameters
    ----------
    sg : SpatialGraph
        Input graph.
    scores : dict
        Edge scores from :class:`EdgeScorer`.
    fraction : float
        Fraction of edges to remove, in ``[0, 1]``.
    method : str
        Scoring method that produced *scores*.  Score polarity is looked
        up in :data:`SCORE_POLARITY`.

    Returns
    -------
    SpatialGraph
        New graph with edges removed and ``is_removed`` metadata set.

    Raises
    ------
    ValueError
        If *fraction* is not in ``[0, 1]`` or *method* is unknown.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer, EdgeFilterer
    >>> sg = generate(n=100, false_edges_fraction=0.1, seed=0)
    >>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
    >>> sg_clean = EdgeFilterer().by_fraction(
    ...     sg, scores, fraction=0.05, method="jaccard"
    ... )
    >>> sg_clean.n_edges < sg.n_edges
    True
    """
    if not 0.0 <= fraction <= 1.0:
        raise ValueError(f"fraction must be in [0, 1], got {fraction}")
    remove_high_scores = _score_direction(method)
    sorted_edges = sorted(scores.items(), key=lambda x: x[1], reverse=remove_high_scores)
    n_remove = int(len(sorted_edges) * fraction)
    removed = {e for e, _ in sorted_edges[:n_remove]}
    return self._apply(sg, removed)

by_threshold(sg, scores, *, threshold, method)

Remove edges whose score crosses threshold.

Parameters:

Name Type Description Default
sg SpatialGraph

Input graph.

required
scores dict

Edge scores from :class:EdgeScorer.

required
threshold float

Score cut-off value.

required
method str

Scoring method that produced scores. Score polarity is looked up in :data:SCORE_POLARITY.

required

Returns:

Type Description
SpatialGraph

New graph with edges removed and is_removed metadata set.

Examples:

>>> sg_clean = EdgeFilterer().by_threshold(
...     sg, scores, threshold=0.5, method="betweenness"
... )
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def by_threshold(
    self,
    sg: SpatialGraph,
    scores: dict[tuple[int, int], float],
    *,
    threshold: float,
    method: str,
) -> SpatialGraph:
    """Remove edges whose score crosses *threshold*.

    Parameters
    ----------
    sg : SpatialGraph
        Input graph.
    scores : dict
        Edge scores from :class:`EdgeScorer`.
    threshold : float
        Score cut-off value.
    method : str
        Scoring method that produced *scores*.  Score polarity is looked
        up in :data:`SCORE_POLARITY`.

    Returns
    -------
    SpatialGraph
        New graph with edges removed and ``is_removed`` metadata set.

    Examples
    --------
    >>> sg_clean = EdgeFilterer().by_threshold(
    ...     sg, scores, threshold=0.5, method="betweenness"
    ... )
    """
    return self._by_threshold_direction(
        sg, scores, threshold=threshold, remove_high_scores=_score_direction(method)
    )

by_percentile(sg, scores, *, percentile, method)

Remove edges outside a percentile band.

Parameters:

Name Type Description Default
sg SpatialGraph

Input graph.

required
scores dict

Edge scores from :class:EdgeScorer.

required
percentile float

Percentage of edges to remove from the worst end, in [0, 100].

required
method str

Scoring method that produced scores. Score polarity is looked up in :data:SCORE_POLARITY.

required

Returns:

Type Description
SpatialGraph

New graph with edges removed and is_removed metadata set.

Raises:

Type Description
ValueError

If percentile is not in [0, 100] or method is unknown.

Examples:

>>> sg_clean = EdgeFilterer().by_percentile(
...     sg, scores, percentile=5.0, method="jaccard"
... )
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def by_percentile(
    self,
    sg: SpatialGraph,
    scores: dict[tuple[int, int], float],
    *,
    percentile: float,
    method: str,
) -> SpatialGraph:
    """Remove edges outside a percentile band.

    Parameters
    ----------
    sg : SpatialGraph
        Input graph.
    scores : dict
        Edge scores from :class:`EdgeScorer`.
    percentile : float
        Percentage of edges to remove from the worst end, in ``[0, 100]``.
    method : str
        Scoring method that produced *scores*.  Score polarity is looked
        up in :data:`SCORE_POLARITY`.

    Returns
    -------
    SpatialGraph
        New graph with edges removed and ``is_removed`` metadata set.

    Raises
    ------
    ValueError
        If *percentile* is not in ``[0, 100]`` or *method* is unknown.

    Examples
    --------
    >>> sg_clean = EdgeFilterer().by_percentile(
    ...     sg, scores, percentile=5.0, method="jaccard"
    ... )
    """
    if not 0.0 <= percentile <= 100.0:
        raise ValueError(f"percentile must be in [0, 100], got {percentile}")
    vals = np.array(list(scores.values()))
    remove_high_scores = _score_direction(method)
    if remove_high_scores:
        threshold = float(np.percentile(vals, 100.0 - percentile))
        removed = {e for e, s in scores.items() if s >= threshold}
    else:
        threshold = float(np.percentile(vals, percentile))
        removed = {e for e, s in scores.items() if s <= threshold}
    return self._apply(sg, removed)

by_optimal_f1(sg, scores)

Oracle filter: remove edges at the threshold that maximises F1.

Sweeps every candidate threshold and selects the one that maximises the F1 score between removed edges and the ground-truth is_false labels. Both score polarities are tried; the globally best threshold wins regardless of direction.

This is an oracle method — it requires ground-truth labels and therefore only applies to simulated graphs. Use it to establish the theoretical upper bound for a given scorer.

Parameters:

Name Type Description Default
sg SpatialGraph

Input graph. Must have edge_metadata["is_false"].

required
scores dict

Edge scores from :class:EdgeScorer.

required

Returns:

Type Description
SpatialGraph

Graph filtered at the optimal F1 threshold, with is_removed metadata set.

Raises:

Type Description
ValueError

If sg.edge_metadata is None or lacks an is_false column.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import EdgeScorer, EdgeFilterer
>>> from spatial_graph_algorithms.metrics import evaluate_denoising
>>> sg = generate(n=200, false_edges_fraction=0.10, seed=42)
>>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
>>> sg_opt = EdgeFilterer().by_optimal_f1(sg, scores)
>>> report = evaluate_denoising(sg_opt)
>>> report["f1"] >= 0.0
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def by_optimal_f1(
    self,
    sg: SpatialGraph,
    scores: dict[tuple[int, int], float],
) -> SpatialGraph:
    """Oracle filter: remove edges at the threshold that maximises F1.

    Sweeps every candidate threshold and selects the one that maximises
    the F1 score between removed edges and the ground-truth ``is_false``
    labels.  Both score polarities are tried; the globally best threshold
    wins regardless of direction.

    This is an **oracle method** — it requires ground-truth labels and
    therefore only applies to simulated graphs.  Use it to establish
    the theoretical upper bound for a given scorer.

    Parameters
    ----------
    sg : SpatialGraph
        Input graph.  Must have ``edge_metadata["is_false"]``.
    scores : dict
        Edge scores from :class:`EdgeScorer`.

    Returns
    -------
    SpatialGraph
        Graph filtered at the optimal F1 threshold, with ``is_removed``
        metadata set.

    Raises
    ------
    ValueError
        If ``sg.edge_metadata`` is ``None`` or lacks an ``is_false`` column.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import EdgeScorer, EdgeFilterer
    >>> from spatial_graph_algorithms.metrics import evaluate_denoising
    >>> sg = generate(n=200, false_edges_fraction=0.10, seed=42)
    >>> scores = EdgeScorer().jaccard(sg.adjacency_matrix)
    >>> sg_opt = EdgeFilterer().by_optimal_f1(sg, scores)
    >>> report = evaluate_denoising(sg_opt)
    >>> report["f1"] >= 0.0
    True
    """
    if sg.edge_metadata is None or "is_false" not in sg.edge_metadata.columns:
        raise ValueError(
            "by_optimal_f1 requires ground-truth labels: "
            "sg.edge_metadata must have an 'is_false' column."
        )
    meta = sg.edge_metadata
    lo = np.minimum(meta["source"].values, meta["target"].values).astype(np.int64)
    hi = np.maximum(meta["source"].values, meta["target"].values).astype(np.int64)
    is_false_vals = meta["is_false"].values
    labels: dict[tuple[int, int], bool] = {
        (int(lo[i]), int(hi[i])): bool(is_false_vals[i])
        for i in range(len(meta))
    }
    threshold, _best_f1, remove_high_scores = _find_optimal_f1_threshold(scores, labels)
    return self._by_threshold_direction(
        sg, scores, threshold=threshold, remove_high_scores=remove_high_scores
    )

spatial_graph_algorithms.denoise.score_edges(sg, *, method, **kwargs)

Score all edges in sg using the specified method.

Convenience wrapper around :class:EdgeScorer.

Parameters:

Name Type Description Default
sg SpatialGraph

Input graph.

required
method str

One of "betweenness", "jaccard", "square_bipartite", "community", "random_walk".

required
**kwargs object

Forwarded to the scorer method (e.g. n_runs=10 for community).

{}

Returns:

Type Description
dict[tuple[int, int], float]

Canonical edge tuples (min(u, v), max(u, v)) → score.

Raises:

Type Description
ValueError

If method is not recognised.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import score_edges
>>> sg = generate(n=100, seed=0)
>>> scores = score_edges(sg, method="jaccard")
>>> len(scores) == sg.n_edges
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def score_edges(
    sg: SpatialGraph,
    *,
    method: str,
    **kwargs: object,
) -> dict[tuple[int, int], float]:
    """Score all edges in *sg* using the specified method.

    Convenience wrapper around :class:`EdgeScorer`.

    Parameters
    ----------
    sg : SpatialGraph
        Input graph.
    method : str
        One of ``"betweenness"``, ``"jaccard"``, ``"square_bipartite"``,
        ``"community"``, ``"random_walk"``.
    **kwargs
        Forwarded to the scorer method (e.g. ``n_runs=10`` for community).

    Returns
    -------
    dict[tuple[int, int], float]
        Canonical edge tuples ``(min(u, v), max(u, v))`` → score.

    Raises
    ------
    ValueError
        If *method* is not recognised.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import score_edges
    >>> sg = generate(n=100, seed=0)
    >>> scores = score_edges(sg, method="jaccard")
    >>> len(scores) == sg.n_edges
    True
    """
    scorer = EdgeScorer()
    if not hasattr(scorer, method):
        raise ValueError(
            f"Unknown scoring method '{method}'. "
            f"Available: {sorted(SCORE_POLARITY)}."
        )
    return getattr(scorer, method)(sg.adjacency_matrix, **kwargs)

spatial_graph_algorithms.denoise.denoise(sg, *, method, fraction_to_remove=0.05, **kwargs)

Score edges and remove the worst-scored fraction in one call.

Convenience wrapper combining :class:EdgeScorer with :meth:EdgeFilterer.by_fraction. Score polarity is looked up automatically from :data:SCORE_POLARITY.

Parameters:

Name Type Description Default
sg SpatialGraph

Input graph.

required
method str

One of "betweenness", "jaccard", "square_bipartite", "community", "random_walk".

required
fraction_to_remove float

Fraction of edges to remove, in [0, 1]. Default 0.05.

0.05
**kwargs object

Forwarded to the scorer method.

{}

Returns:

Type Description
SpatialGraph

New graph with the worst-scored edges removed and is_removed metadata set.

Examples:

>>> from spatial_graph_algorithms.simulate import generate
>>> from spatial_graph_algorithms.denoise import denoise
>>> sg = generate(n=200, false_edges_fraction=0.10, seed=42)
>>> sg_clean = denoise(sg, method="jaccard", fraction_to_remove=0.05)
>>> sg_clean.n_edges < sg.n_edges
True
Source code in src/spatial_graph_algorithms/denoise/__init__.py
def denoise(
    sg: SpatialGraph,
    *,
    method: str,
    fraction_to_remove: float = 0.05,
    **kwargs: object,
) -> SpatialGraph:
    """Score edges and remove the worst-scored fraction in one call.

    Convenience wrapper combining :class:`EdgeScorer` with
    :meth:`EdgeFilterer.by_fraction`.  Score polarity is looked up
    automatically from :data:`SCORE_POLARITY`.

    Parameters
    ----------
    sg : SpatialGraph
        Input graph.
    method : str
        One of ``"betweenness"``, ``"jaccard"``, ``"square_bipartite"``,
        ``"community"``, ``"random_walk"``.
    fraction_to_remove : float
        Fraction of edges to remove, in ``[0, 1]``.  Default 0.05.
    **kwargs
        Forwarded to the scorer method.

    Returns
    -------
    SpatialGraph
        New graph with the worst-scored edges removed and ``is_removed``
        metadata set.

    Examples
    --------
    >>> from spatial_graph_algorithms.simulate import generate
    >>> from spatial_graph_algorithms.denoise import denoise
    >>> sg = generate(n=200, false_edges_fraction=0.10, seed=42)
    >>> sg_clean = denoise(sg, method="jaccard", fraction_to_remove=0.05)
    >>> sg_clean.n_edges < sg.n_edges
    True
    """
    scores = score_edges(sg, method=method, **kwargs)
    return EdgeFilterer().by_fraction(
        sg, scores, fraction=fraction_to_remove, method=method
    )