# -*- coding: utf-8 -*-
"""
CPU HiCCUPS pipeline for a single chromosome (banded version).
Assumptions:
- Input is a BandHiC `band_hic_matrix` C_raw (float, unnormalized), KR vector `kr` (len=N), and distance-dependent expected vector `expected_dist`.
- Normalization: elementwise within the stored band C_norm[i,j] = C_raw[i,j] * kr[i] * kr[j], still band-stored.
- Operates directly on the band (no tiling, no margin).
Pipeline:
1) Single resolution:
- For each pixel (i,j) compute expected eBL/eDonut/eH/eV for BL/Donut/H/V masks.
- Log-binning on expected to get bin indices (bBL,bDonut,bH,bV).
- Record observed = round(C_norm[i,j]) and build hist[bin, obs].
- Estimate per-bin thresholds + FDR tables via Poisson + reverse cumulative.
- Compute peak = observed - max(thresholds) to obtain a peak matrix.
- Second pass: for peak>0 pixels require
* far from diagonal (|i-j| > peak_width)
* local maxima within peak_width window
* valid expected values (>1e-6)
* OE thresholds + FDR satisfied
→ generate Feature2D.
- Centroid-cluster Feature2D.
2) Multi-resolution:
- Run single-resolution HiCCUPS per resolution.
- Merge via merge_all_resolutions (Juicer-like 5kb/10kb/25kb priority).
3) Output BEDPE.
Note: conceptual/testing implementation; not optimized for huge matrices.
- Only search loops where |i-j| * resolution <= max_loop_dist_bp (default 8 Mb, like CPU HiCCUPS).
- Use KR neighborhood mask (kr_neighborhood) to filter low-quality bins (mirrors Java HiCCUPS removeLowMapQFeatures).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
import numpy as np
import math
import scipy as sp
import bandhic as bh
import numpy.ma as ma
import os
import time
# Parallel/Numba imports for fast HiCCUPS pass
from joblib import Parallel, delayed
from numba import njit
import multiprocessing as mp
__all__ = [
"hiccups",
"load_norm_vector_from_hic",
"load_norm_vector_from_cooler",
]
# =========================================================
# Normalization vector loaders
# =========================================================
[docs]def load_norm_vector_from_hic(hic_file, chrom, resolution, norm):
"""
Load a normalization vector from a .hic file for a given chromosome and resolution.
Parameters
----------
hic_file : str
Path to the .hic file.
chrom : str
Chromosome name, e.g., 'chr1' or '1'.
resolution : int
Bin size in base pairs.
norm : str
Normalization method, e.g., 'KR', 'VC', etc.
Returns
-------
np.ndarray
Normalization vector for the specified chromosome and resolution.
Raises
------
RuntimeError
If the normalization vector cannot be found for the specified parameters.
"""
try:
import hicstraw
except ImportError:
raise ImportError(
"hicstraw is required for reading .hic files. Please install it via 'pip install hic-straw'."
)
chrom_short = (
chrom.replace("chr", "") if chrom.startswith("chr") else chrom
)
try:
hic = hicstraw.HiCFile(hic_file)
chrom_short_specific = (
int(chrom_short)
if chrom_short not in ["X", "Y", "M"]
else chrom_short
)
kr = hic.getMatrixZoomData(
chrom_short, chrom_short, "observed", norm, "BP", resolution
).getNormVector(chrom_short_specific)
except Exception:
raise RuntimeError(
f"Normalization vector not found in .hic for {chrom} at {resolution} bp, norm={norm}"
)
return np.asarray(kr, dtype=float)
[docs]def load_norm_vector_from_cooler(cool_path, chrom, resolution, norm):
"""
Load a normalization vector from a .cool or .mcool file for a given chromosome and resolution.
Parameters
----------
cool_path : str
Path to the .cool or .mcool file.
chrom : str
Chromosome name, e.g., 'chr1'.
resolution : int
Bin size in base pairs.
norm : str
Normalization method, e.g., 'KR', 'VC', etc.
Returns
-------
np.ndarray
Normalization vector for the specified chromosome and resolution.
Raises
------
RuntimeError
If the normalization vector cannot be found for the specified parameters.
"""
import cooler
try:
clr = cooler.Cooler(f"{cool_path}::/resolutions/{resolution}")
kr = clr.bins()[norm][clr.bins()["chrom"] == chrom][norm].values
except Exception:
raise RuntimeError(
f"Normalization vector not found in cooler for {chrom} at {resolution} bp, norm={norm}"
)
return np.asarray(kr, dtype=float)
# =========================================================
# Data structures
# =========================================================
@dataclass(order=True)
class Feature2D:
chr1: str
start1: int
end1: int
chr2: str
start2: int
end2: int
color: Tuple[int, int, int] = (0, 0, 0)
attrs: Dict[str, float] = field(default_factory=dict)
def get(self, key: str, default=None):
return self.attrs.get(key, default)
def set_attr(self, key: str, val: float):
self.attrs[key] = float(val)
def get_float(self, key: str) -> float:
return float(self.attrs[key])
def has_attr(self, key: str) -> bool:
return key in self.attrs
@dataclass
class HiCCUPSConfig:
resolution: int # bp
window: int = 10 # donut window (in bins)
peak_width: int = 2 # peak half-width (in bins)
w1: int = 40 # max bin index (for log-binning)
fdr: float = 0.1 # FDR target (Juicer-like)
max_count: int = 10000 # max observed tracked in hist
cluster_radius_bp: int = 20000 # centroid merge radius (bp)
# OE thresholds, similar to Juicer oeThreshold1/2/3
fdrsum: float = 0.02
oe1: float = 1.5
oe2: float = 1.75
oe3: float = 2.0
max_loop_dist_bp: int = 8_000_000 # max loop search distance (bp)
kr_neighborhood: int = (
5 # KR neighborhood radius (bins), like Java HiCCUPS.krNeighborhood
)
norm: str = "KR"
n_jobs: int = -1 # Number of parallel jobs
OBSERVED = "observed"
PEAK = "peak"
EXPECTEDBL = "expectedBL"
EXPECTEDDONUT = "expectedDonut"
EXPECTEDH = "expectedH"
EXPECTEDV = "expectedV"
BINBL = "binBL"
BINDONUT = "binDonut"
BINH = "binH"
BINV = "binV"
FDRBL = "fdrBL"
FDRDONUT = "fdrDonut"
FDRH = "fdrH"
FDRV = "fdrV"
RADIUS = "radius"
CENTROID1 = "centroid1"
CENTROID2 = "centroid2"
NUMCOLLAPSED = "numCollapsed"
def reverse_cumulative(arr: np.ndarray, axis: int = 1) -> np.ndarray:
if arr.ndim == 1:
return arr[::-1].cumsum()[::-1]
elif arr.ndim == 2:
return np.flip(np.flip(arr, axis=axis).cumsum(axis=axis), axis=axis)
else:
raise ValueError("reverse_cumulative only supports 1D or 2D arrays.")
def _iter_band_indices(mat: bh.band_hic_matrix):
"""
Iterate over valid upper-triangular banded matrix entries.
Parameters
----------
mat : band_hic_matrix
Input banded Hi-C matrix.
Yields
------
i : int
Row index (bin index).
j : int
Column index (bin index), j > i.
k : int
Diagonal offset (j - i).
Notes
-----
Yields all valid upper-triangular (i < j) entries within the stored band, skipping masked entries.
"""
bin_num = mat.bin_num
diag_num = mat.diag_num
for i in range(bin_num):
for k in range(1, diag_num):
j = i + k
if j >= bin_num:
break
if mat.mask is not None and mat.mask[i, k]:
continue
yield i, j, k
# ----------------------------------------------------------
# Numba + joblib accelerated diagonal worker for HiCCUPS
# ----------------------------------------------------------
from numba import njit
import numpy as np
import math
@njit(cache=True, fastmath=True)
def _hiccups_diag_worker_numba(
data,
mask,
expected_dist,
kr,
k_start,
k_end,
window,
peak_w,
max_loop_k,
w1,
max_count,
):
"""
Efficient diagonal worker for HiCCUPS banded matrix computation.
Parameters
----------
data : np.ndarray
Normalized Hi-C banded data array of shape (N, diag_num).
mask : np.ndarray
Boolean mask array of the same shape as `data`, True for masked entries.
expected_dist : np.ndarray
Expected values as a function of diagonal offset, length >= diag_num.
kr : np.ndarray
Normalization vector of length N.
k_start : int
Starting diagonal offset (inclusive).
k_end : int
Ending diagonal offset (exclusive).
window : int
Donut window size (in bins).
peak_w : int
Peak half-width (in bins).
max_loop_k : int
Maximum diagonal offset to consider (in bins).
w1 : int
Number of log-binned expected bins.
max_count : int
Maximum observed count tracked in histograms.
Returns
-------
histBL : np.ndarray
BL box histogram (w1, max_count+1).
histDonut : np.ndarray
Donut histogram (w1, max_count+1).
histH : np.ndarray
Horizontal crosshair histogram (w1, max_count+1).
histV : np.ndarray
Vertical crosshair histogram (w1, max_count+1).
updates : list
List of tuples per valid pixel with (i, diagDist, o, bBL, bDo, bH, bV, e_bl, e_dn, e_h, e_v).
Notes
-----
This function is Numba-accelerated and processes a range of diagonal offsets for the banded matrix.
It computes expected values for BL box, Donut, Horizontal, and Vertical crosshair masks, assigns log-binned indices,
and accumulates observed counts and expected values for downstream thresholding and FDR.
"""
N, diag_num = data.shape
Nexp = len(expected_dist)
histBL = np.zeros((w1, max_count + 1), np.int64)
histDonut = np.zeros_like(histBL)
histH = np.zeros_like(histBL)
histV = np.zeros_like(histBL)
# Parallel arrays for updates
ii_arr = np.empty((N * (k_end - k_start),), dtype=np.int32)
kk_arr = np.empty_like(ii_arr)
oo_arr = np.empty_like(ii_arr)
e_bl_arr = np.empty((N * (k_end - k_start),), dtype=np.float32)
e_dn_arr = np.empty_like(e_bl_arr)
e_h_arr = np.empty_like(e_bl_arr)
e_v_arr = np.empty_like(e_bl_arr)
bBL_arr = np.empty((N * (k_end - k_start),), dtype=np.int16)
bDo_arr = np.empty_like(bBL_arr)
bH_arr = np.empty_like(bBL_arr)
bV_arr = np.empty_like(bBL_arr)
upd_ptr = 0
lognorm = math.log(2.0**0.33)
def bin_val(e):
if e <= 1.0 or math.isnan(e) or math.isinf(e):
return 0
idx = int(math.floor(math.log(e) / lognorm))
if idx < 0:
idx = 0
if idx >= w1:
idx = w1 - 1
return idx
for k in range(k_start, k_end):
diagDist = k
if diagDist <= peak_w:
continue
if diagDist >= max_loop_k:
continue
if diagDist >= Nexp:
continue
if expected_dist[diagDist] <= 0.0 or math.isnan(
expected_dist[diagDist]
):
continue
d_diag = expected_dist[diagDist]
for i in range(N - diagDist):
j = i + diagDist
if mask[i, diagDist]:
continue
if kr[i] == 0.0 or kr[j] == 0.0:
continue
# window size
if diagDist > 1:
wsize = min(window, (diagDist - 1) // 2)
else:
wsize = peak_w + 1
if wsize <= peak_w:
wsize = peak_w + 1
# ----- BL box -----
E_bl = 0.0
Ed_bl = 0.0
for ii in range(i + 1, min(i + wsize + 1, N)):
for jj in range(max(j - wsize, 0), j):
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_bl += v
Ed_bl += expected_dist[dist]
for ii in range(i + 1, min(i + peak_w + 1, N)):
for jj in range(max(j - peak_w, 0), j):
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_bl -= v
Ed_bl -= expected_dist[dist]
while E_bl < 16.0 and 2 * wsize < diagDist:
E_bl = 0.0
Ed_bl = 0.0
wsize += 1
for ii in range(i + 1, min(i + wsize + 1, N)):
for jj in range(max(j - wsize, 0), j):
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_bl += v
Ed_bl += expected_dist[dist]
if (
i + 1 <= ii < i + peak_w + 1
and j - peak_w <= jj < j
):
E_bl -= v
Ed_bl -= expected_dist[dist]
# ----- Donut -----
E_donut = 0.0
Ed_donut = 0.0
for ii in range(max(i - wsize, 0), min(i + wsize + 1, N)):
for jj in range(max(j - wsize, 0), min(j + wsize + 1, N)):
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_donut += v
Ed_donut += expected_dist[dist]
for ii in range(max(i - peak_w, 0), min(i + peak_w + 1, N)):
for jj in range(max(j - peak_w, 0), min(j + peak_w + 1, N)):
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_donut -= v
Ed_donut -= expected_dist[dist]
# ----- Vertical crosshair -----
E_v = 0.0
Ed_v = 0.0
for ii in range(max(i - wsize, 0), max(i - peak_w, 0)):
dist = j - ii
v_mid = data[ii, dist]
if not mask[ii, dist]:
if dist < Nexp:
E_donut -= v_mid
Ed_donut -= expected_dist[dist]
for dj in (-1, 0, 1):
jj = j + dj
if jj < 0 or jj >= N:
continue
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_v += v
Ed_v += expected_dist[dist]
for ii in range(min(i + peak_w + 1, N), min(i + wsize + 1, N)):
dist = j - ii
v_mid = data[ii, dist]
if not mask[ii, dist]:
if dist < Nexp:
E_donut -= v_mid
Ed_donut -= expected_dist[dist]
for dj in (-1, 0, 1):
jj = j + dj
if jj < 0 or jj >= N:
continue
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_v += v
Ed_v += expected_dist[dist]
# ----- Horizontal crosshair -----
E_h = 0.0
Ed_h = 0.0
for jj in range(max(j - wsize, 0), max(j - peak_w, 0)):
dist = jj - i
v_mid = data[i, dist]
if not mask[i, dist]:
if dist < Nexp:
E_donut -= v_mid
Ed_donut -= expected_dist[dist]
for di in (-1, 0, 1):
ii = i + di
if ii < 0 or ii >= N:
continue
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_h += v
Ed_h += expected_dist[dist]
for jj in range(min(j + peak_w + 1, N), min(j + wsize + 1, N)):
dist = jj - i
v_mid = data[i, dist]
if not mask[i, dist]:
if dist < Nexp:
E_donut -= v_mid
Ed_donut -= expected_dist[dist]
for di in (-1, 0, 1):
ii = i + di
if ii < 0 or ii >= N:
continue
if ii < jj:
dist = jj - ii
if dist < Nexp:
v = data[ii, dist]
if not mask[ii, dist]:
E_h += v
Ed_h += expected_dist[dist]
# normalize
e_bl = (
(E_bl * d_diag / Ed_bl) * kr[i] * kr[j] if Ed_bl > 0 else 0.0
)
e_dn = (
(E_donut * d_diag / Ed_donut) * kr[i] * kr[j]
if Ed_donut > 0
else 0.0
)
e_h = (E_h * d_diag / Ed_h) * kr[i] * kr[j] if Ed_h > 0 else 0.0
e_v = (E_v * d_diag / Ed_v) * kr[i] * kr[j] if Ed_v > 0 else 0.0
bBL = bin_val(e_bl)
bDo = bin_val(e_dn)
bH = bin_val(e_h)
bV = bin_val(e_v)
o = int(round(data[i, diagDist] * kr[i] * kr[j]))
if o < 0:
o = 0
if o > max_count:
o = max_count
histBL[bBL, o] += 1
histDonut[bDo, o] += 1
histH[bH, o] += 1
histV[bV, o] += 1
ii_arr[upd_ptr] = i
kk_arr[upd_ptr] = diagDist
oo_arr[upd_ptr] = o
e_bl_arr[upd_ptr] = e_bl
e_dn_arr[upd_ptr] = e_dn
e_h_arr[upd_ptr] = e_h
e_v_arr[upd_ptr] = e_v
bBL_arr[upd_ptr] = bBL
bDo_arr[upd_ptr] = bDo
bH_arr[upd_ptr] = bH
bV_arr[upd_ptr] = bV
upd_ptr += 1
return (
histBL,
histDonut,
histH,
histV,
upd_ptr,
ii_arr,
kk_arr,
oo_arr,
e_bl_arr,
e_dn_arr,
e_h_arr,
e_v_arr,
bBL_arr,
bDo_arr,
bH_arr,
bV_arr,
)
def compute_evalues_and_hist_parallel(
C_norm: bh.band_hic_matrix,
expected_dist: np.ndarray,
conf: HiCCUPSConfig,
kr: np.ndarray,
n_jobs: int = -1,
chunk_size: int = 64,
):
"""
Compute expected values, log-binned indices, observed counts, and histograms for HiCCUPS in parallel.
Parameters
----------
C_norm : band_hic_matrix
Normalized banded Hi-C matrix (elementwise normalized).
expected_dist : np.ndarray
Expected counts as a function of diagonal offset (distance dependency).
conf : HiCCUPSConfig
HiCCUPS configuration parameters.
kr : np.ndarray
Normalization vector (length N).
n_jobs : int, optional
Number of parallel jobs to use (default: -1, all CPUs).
chunk_size : int, optional
Number of diagonals per parallel worker (default: 64).
Returns
-------
observed : band_hic_matrix
Matrix of observed counts (rounded).
eBL : band_hic_matrix
Matrix of BL box expected values.
eDo : band_hic_matrix
Matrix of Donut expected values.
eH : band_hic_matrix
Matrix of Horizontal crosshair expected values.
eV : band_hic_matrix
Matrix of Vertical crosshair expected values.
binBL : band_hic_matrix
BL box log-binned indices.
binDo : band_hic_matrix
Donut log-binned indices.
binH : band_hic_matrix
Horizontal crosshair log-binned indices.
binV : band_hic_matrix
Vertical crosshair log-binned indices.
histBL : np.ndarray
BL box histogram (w1, max_count+1).
histDo : np.ndarray
Donut histogram (w1, max_count+1).
histH : np.ndarray
Horizontal crosshair histogram (w1, max_count+1).
histV : np.ndarray
Vertical crosshair histogram (w1, max_count+1).
Notes
-----
This function runs the main HiCCUPS diagonal pass in parallel, using Numba-accelerated workers.
It computes per-pixel expected values, log-binned indices, and builds histograms for downstream thresholding.
"""
norm_data = C_norm.data
mask = C_norm.mask
if mask is None:
mask = C_norm._extract_raw_mask(norm_data.shape)
N, diag_num = norm_data.shape
max_loop_k = conf.max_loop_dist_bp // conf.resolution
chunks = [
(k, min(k + chunk_size, diag_num))
for k in range(1, diag_num, chunk_size)
]
ctx = mp.get_context("fork")
results = Parallel(
n_jobs=n_jobs,
backend="multiprocessing",
prefer="processes",
)(
delayed(_hiccups_diag_worker_numba)(
norm_data,
mask,
expected_dist,
kr,
k0,
k1,
conf.window,
conf.peak_width,
max_loop_k,
conf.w1,
conf.max_count,
)
for (k0, k1) in chunks
)
histBL = np.zeros((conf.w1, conf.max_count + 1), dtype=np.int64)
histDo = np.zeros_like(histBL)
histH = np.zeros_like(histBL)
histV = np.zeros_like(histBL)
observed = bh.zeros((N, N), diag_num=diag_num, dtype=np.int32)
eBL = bh.zeros((N, N), diag_num=diag_num, dtype=np.float32)
eDo = bh.zeros((N, N), diag_num=diag_num, dtype=np.float32)
eH = bh.zeros((N, N), diag_num=diag_num, dtype=np.float32)
eV = bh.zeros((N, N), diag_num=diag_num, dtype=np.float32)
binBL = bh.zeros((N, N), diag_num=diag_num, dtype=np.int16)
binDo = bh.zeros((N, N), diag_num=diag_num, dtype=np.int16)
binH = bh.zeros((N, N), diag_num=diag_num, dtype=np.int16)
binV = bh.zeros((N, N), diag_num=diag_num, dtype=np.int16)
for (
hBL,
hDo,
hH,
hV,
p,
ii,
kk,
oo,
e_bl_arr,
e_dn_arr,
e_h_arr,
e_v_arr,
bBL_arr,
bDo_arr,
bH_arr,
bV_arr,
) in results:
histBL += hBL
histDo += hDo
histH += hH
histV += hV
if p == 0:
continue
ii = ii[:p]
kk = kk[:p]
observed.data[ii, kk] = oo[:p]
eBL.data[ii, kk] = e_bl_arr[:p]
eDo.data[ii, kk] = e_dn_arr[:p]
eH.data[ii, kk] = e_h_arr[:p]
eV.data[ii, kk] = e_v_arr[:p]
binBL.data[ii, kk] = bBL_arr[:p]
binDo.data[ii, kk] = bDo_arr[:p]
binH.data[ii, kk] = bH_arr[:p]
binV.data[ii, kk] = bV_arr[:p]
return (
observed,
eBL,
eDo,
eH,
eV,
binBL,
binDo,
binH,
binV,
histBL,
histDo,
histH,
histV,
)
# =========================================================
# Step 2: thresholds + FDR tables
# =========================================================
def compute_thresholds_and_fdr(
hist: np.ndarray,
conf: HiCCUPSConfig,
pmf: np.ndarray = None,
):
"""
Compute per-bin thresholds and FDR tables for HiCCUPS using Poisson statistics.
Parameters
----------
hist : np.ndarray
Histogram array of shape (w1, max_count+1), where w1 is the number of log-binned expected bins.
conf : HiCCUPSConfig
HiCCUPS configuration object.
pmf : np.ndarray, optional
Precomputed Poisson PMF (not used; computed internally).
Returns
-------
threshold : np.ndarray
Array of per-bin count thresholds (length w1).
fdrLog : np.ndarray
Array of FDR values for each bin and observed count (shape w1, max_count+1).
Notes
-----
For each log-binned expected bin, thresholds and FDRs are estimated using Poisson statistics
and the observed histogram, following the HiCCUPS approach.
"""
w1, width = hist.shape
threshold = np.zeros(w1, dtype=np.float32)
fdrLog = np.zeros_like(hist, dtype=np.float32)
rcsHist = reverse_cumulative(hist)
for idx in range(w1):
cnt = rcsHist[idx, 0]
if cnt <= 0:
continue
mean_obs = (hist[idx] * np.arange(width)).sum() / cnt
if mean_obs <= 0:
mean_obs = 1e-3
# pmf = poisson_pmf(mean_obs, width - 1)
pmf = sp.stats.poisson.pmf(np.arange(width), mean_obs)
expected = rcsHist[idx, 0] * pmf
rcsExpected = reverse_cumulative(expected)
for j in range(width):
if rcsExpected[j] / rcsHist[idx, j] <= conf.fdr:
# if conf.fdr * rcsExpected[j] <= rcsHist[idx, j]:
threshold[idx] = (width - 2) if j == 0 else (j - 1)
break
for j in range(width):
s2 = rcsHist[idx, j]
if s2 > 0:
fdrLog[idx, j] = rcsExpected[j] / s2
else:
break
return threshold, fdrLog
# =========================================================
# Step 3: Feature2D + FDR + centroid合并
# =========================================================
def generate_peak_feature(
chr_name: str,
res: int,
i: int,
j: int,
observed: int,
peak_val: float,
e_bl: float,
e_dn: float,
e_h: float,
e_v: float,
b_bl: int,
b_dn: int,
b_h: int,
b_v: int,
) -> Feature2D:
"""
Generate a Feature2D object representing a candidate HiCCUPS peak.
Parameters
----------
chr_name : str
Chromosome name.
res : int
Bin size in base pairs.
i : int
First bin index.
j : int
Second bin index.
observed : int
Observed count at (i, j).
peak_val : float
Peak value (observed minus threshold).
e_bl : float
BL box expected value.
e_dn : float
Donut expected value.
e_h : float
Horizontal crosshair expected value.
e_v : float
Vertical crosshair expected value.
b_bl : int
BL box log-binned index.
b_dn : int
Donut log-binned index.
b_h : int
Horizontal crosshair log-binned index.
b_v : int
Vertical crosshair log-binned index.
Returns
-------
Feature2D
Feature2D object with all relevant attributes set.
"""
pos1 = min(i, j) * res
pos2 = max(i, j) * res
f = Feature2D(
chr1=chr_name,
start1=pos1,
end1=pos1 + res,
chr2=chr_name,
start2=pos2,
end2=pos2 + res,
color=(0, 0, 0),
)
f.set_attr(OBSERVED, observed)
f.set_attr(PEAK, peak_val)
f.set_attr(EXPECTEDBL, e_bl)
f.set_attr(EXPECTEDDONUT, e_dn)
f.set_attr(EXPECTEDH, e_h)
f.set_attr(EXPECTEDV, e_v)
f.set_attr(BINBL, b_bl)
f.set_attr(BINDONUT, b_dn)
f.set_attr(BINH, b_h)
f.set_attr(BINV, b_v)
return f
def add_fdr_to_feature(
f: Feature2D,
fdrLogBL: np.ndarray,
fdrLogDonut: np.ndarray,
fdrLogH: np.ndarray,
fdrLogV: np.ndarray,
):
"""
Assign FDR values to a Feature2D object.
Parameters
----------
f : Feature2D
Feature2D object to update.
fdrLogBL : np.ndarray
BL box FDR table.
fdrLogDonut : np.ndarray
Donut FDR table.
fdrLogH : np.ndarray
Horizontal crosshair FDR table.
fdrLogV : np.ndarray
Vertical crosshair FDR table.
"""
obs = int(f.get_float(OBSERVED))
bBL = int(f.get_float(BINBL))
bDo = int(f.get_float(BINDONUT))
bH = int(f.get_float(BINH))
bV = int(f.get_float(BINV))
max_obs_idx = min(obs, fdrLogBL.shape[1] - 1)
f.set_attr(FDRBL, fdrLogBL[bBL, max_obs_idx])
f.set_attr(FDRDONUT, fdrLogDonut[bDo, max_obs_idx])
f.set_attr(FDRH, fdrLogH[bH, max_obs_idx])
f.set_attr(FDRV, fdrLogV[bV, max_obs_idx])
def fdr_thresholds_satisfied(
f: Feature2D,
conf: HiCCUPSConfig,
) -> bool:
"""
Evaluate whether a Feature2D passes HiCCUPS FDR and OE thresholds.
Parameters
----------
f : Feature2D
Feature2D object with all relevant attributes.
conf : HiCCUPSConfig
HiCCUPS configuration parameters.
Returns
-------
bool
True if the feature passes all FDR and observed/expected thresholds, False otherwise.
Notes
-----
This function implements the combined FDR and OE thresholding logic for HiCCUPS loop candidates.
"""
obs = round(f.get_float(OBSERVED))
expBL = f.get_float(EXPECTEDBL)
expDn = f.get_float(EXPECTEDDONUT)
expH = f.get_float(EXPECTEDH)
expV = f.get_float(EXPECTEDV)
fBL = f.get_float(FDRBL)
fDn = f.get_float(FDRDONUT)
fH = f.get_float(FDRH)
fV = f.get_float(FDRV)
numCollapsed = (
int(f.get_float(NUMCOLLAPSED)) if f.has_attr(NUMCOLLAPSED) else 1
)
if min(expBL, expDn, expH, expV) <= 1e-6:
return False
if not (
obs > conf.oe2 * expBL
and obs > conf.oe2 * expDn
and obs > conf.oe1 * expH
and obs > conf.oe1 * expV
and (obs > conf.oe3 * expBL or obs > conf.oe3 * expDn)
and (numCollapsed > 1 or (fBL + fDn + fH + fV) <= conf.fdrsum)
):
return False
fdr_total = max(fBL, fDn, fH, fV)
if fdr_total > conf.fdr:
return False
return True
def fdr_thresholds_satisfied(
f: Feature2D,
conf: HiCCUPSConfig,
) -> bool:
"""
Evaluate whether a Feature2D passes HiCCUPS FDR and OE thresholds.
Parameters
----------
f : Feature2D
Feature2D object with all relevant attributes.
conf : HiCCUPSConfig
HiCCUPS configuration parameters.
Returns
-------
bool
True if the feature passes all FDR and observed/expected thresholds, False otherwise.
Notes
-----
This function implements the combined FDR and OE thresholding logic for HiCCUPS loop candidates.
"""
obs = round(f.get_float(OBSERVED))
expBL = f.get_float(EXPECTEDBL)
expDn = f.get_float(EXPECTEDDONUT)
expH = f.get_float(EXPECTEDH)
expV = f.get_float(EXPECTEDV)
fBL = f.get_float(FDRBL)
fDn = f.get_float(FDRDONUT)
fH = f.get_float(FDRH)
fV = f.get_float(FDRV)
numCollapsed = (
int(f.get_float(NUMCOLLAPSED)) if f.has_attr(NUMCOLLAPSED) else 1
)
# if min(expBL, expDn, expH, expV) <= 1e-6:
# return False
if not (
# obs > conf.oe2 * expBL and
# obs > conf.oe2 * expDn and
# obs > conf.oe1 * expH and
# obs > conf.oe1 * expV and
# (obs > conf.oe3 * expBL or obs > conf.oe3 * expDn) and
(numCollapsed > 1 or (fBL + fDn + fH + fV) <= conf.fdrsum)
):
return False
# fdr_total = max(fBL, fDn, fH, fV)
# if fdr_total > conf.fdr:
# return False
return True
def coalesce_pixels_to_centroid(
feats: List[Feature2D],
conf: HiCCUPSConfig,
) -> List[Feature2D]:
"""
Cluster candidate loop pixels into centroids using a fixed radius.
Parameters
----------
feats : list of Feature2D
List of candidate loop Feature2D objects.
conf : HiCCUPSConfig
HiCCUPS configuration parameters.
Returns
-------
merged : list of Feature2D
List of merged centroid Feature2D objects, each with centroid coordinates and cluster attributes.
Notes
-----
This function clusters pixels in the (start1, start2) plane using a fixed radius (`cluster_radius_bp`).
The pixel with the highest observed count is used as the seed; all pixels within the radius are merged,
and the centroid is updated iteratively.
"""
if not feats:
return []
uniq = {}
for f in feats:
key = (f.chr1, f.start1, f.start2)
if key not in uniq or f.get_float(OBSERVED) > uniq[key].get_float(
OBSERVED
):
uniq[key] = f
feats = list(uniq.values())
merged: List[Feature2D] = []
remaining = feats[:]
radius = conf.cluster_radius_bp
while remaining:
remaining.sort(key=lambda x: x.get_float(OBSERVED), reverse=True)
seed = remaining.pop(0)
cluster = [seed]
cx = seed.start1
cy = seed.start2
changed = True
while changed:
changed = False
new_remaining = []
for f in remaining:
dx = f.start1 - cx
dy = f.start2 - cy
if math.hypot(dx, dy) <= radius:
cluster.append(f)
changed = True
else:
new_remaining.append(f)
remaining = new_remaining
if changed:
cx = int(sum(x.start1 for x in cluster) / len(cluster))
cy = int(sum(x.start2 for x in cluster) / len(cluster))
seed.set_attr(NUMCOLLAPSED, len(cluster))
seed.set_attr(CENTROID1, cx)
seed.set_attr(CENTROID2, cy)
rmax = 0.0
for f in cluster:
r = math.hypot(f.start1 - cx, f.start2 - cy)
if r > rmax:
rmax = r
seed.set_attr(RADIUS, rmax)
merged.append(seed)
return merged
# =========================================================
# Step 4: Single-resolution HiCCUPS
# =========================================================
def run_hiccups_single_resolution(
hic_file: str,
chr_name: str,
conf: HiCCUPSConfig,
):
"""
Run the single-resolution HiCCUPS pipeline for a given chromosome.
Parameters
----------
hic_file : str
Path to the input Hi-C file (.hic, .cool, or .mcool).
chr_name : str
Chromosome name (e.g., 'chr1').
conf : HiCCUPSConfig
HiCCUPS configuration parameters.
Returns
-------
merged_loops : list of Feature2D
List of merged loop features detected at this resolution.
peak : band_hic_matrix
Peak matrix (observed minus threshold) for all pixels.
Notes
-----
This function runs the full single-resolution HiCCUPS pipeline:
loads the matrix, computes normalization, expected values, thresholds, peaks, and merges loops.
"""
res = conf.resolution
diag = conf.max_loop_dist_bp // res
# Load matrix, KR, expected
t_load0 = time.perf_counter()
print(f"Loading Hi-C matrix for {chr_name} at {res} bp resolution...")
C_norm = bh.straw_chr(
hic_file,
chrom=chr_name,
resolution=res,
normalization=conf.norm,
diag_num=diag,
)
ext = os.path.splitext(hic_file)[1].lower()
if ext == ".hic":
kr = load_norm_vector_from_hic(hic_file, chr_name, res, conf.norm)
elif ext in [".cool", ".mcool"]:
kr = load_norm_vector_from_cooler(hic_file, chr_name, res, conf.norm)
else:
raise ValueError(f"Unsupported Hi-C file format: {ext}")
t_load1 = time.perf_counter()
print(
f"[TIME] data loading ({chr_name}, {res} bp): {t_load1 - t_load0:.3f} s"
)
N = C_norm.bin_num
kr = kr[:N]
mask_row_col = np.isnan(kr) | (kr <= 0) | (np.isinf(kr))
C_norm.add_mask_row_col(mask_row_col=mask_row_col)
diag_num = C_norm.diag_num
# # Banded normalization: C_norm[i,k] = C_raw[i,k] / kr[i] / kr[i+k]
# norm_data = np.zeros_like(C_norm.data, dtype=np.float32)
# idx_rows = np.arange(N)
# for k in range(diag_num):
# j_idx = idx_rows + k
# valid = j_idx < N
# norm_data[valid, k] = (
# C_raw.data[valid, k] / kr[valid] / kr[j_idx[valid]]
# )
# C_norm = bh.band_hic_matrix(
# norm_data,
# diag_num=diag_num,
# mask=C_norm.mask,
# mask_row_col=C_raw.mask_row_col,
# band_data_input=True,
# )
expected_dist = C_norm.mean(axis="diag").data
print("Computing expected values and histograms...")
t0 = time.perf_counter()
(
observed,
eBL,
eDonut,
eH,
eV,
binBL,
binDonut,
binH,
binV,
histBL,
histDonut,
histH,
histV,
) = compute_evalues_and_hist_parallel(
C_norm, expected_dist, conf, kr, n_jobs=conf.n_jobs
)
t1 = time.perf_counter()
print(f"[TIME] compute_evalues_and_hist_parallel: {t1 - t0:.3f} s")
t0 = time.perf_counter()
print("Computing thresholds and FDR tables...")
thrBL, fdrBL = compute_thresholds_and_fdr(histBL, conf)
thrDo, fdrDo = compute_thresholds_and_fdr(histDonut, conf)
thrH, fdrH = compute_thresholds_and_fdr(histH, conf)
thrV, fdrV = compute_thresholds_and_fdr(histV, conf)
t1 = time.perf_counter()
print(f"[TIME] thresholds + FDR: {t1 - t0:.3f} s")
t0 = time.perf_counter()
print("Identifying peaks...")
# First pass: compute peak matrix (observed minus threshold), vectorized
peak_data = np.zeros((N, diag_num), dtype=np.float32)
# gather bin-specific thresholds
thr_bl_mat = thrBL[binBL.data]
thr_do_mat = thrDo[binDonut.data]
thr_h_mat = thrH[binH.data]
thr_v_mat = thrV[binV.data]
sb_mat = np.maximum.reduce(
[
thr_bl_mat,
thr_do_mat,
thr_h_mat,
thr_v_mat,
]
)
peak_data = observed.data - sb_mat
peak = bh.band_hic_matrix(
peak_data,
diag_num=diag_num,
mask=C_norm.mask,
mask_row_col=C_norm.mask_row_col,
band_data_input=True,
)
t1 = time.perf_counter()
print(f"[TIME] peak matrix construction: {t1 - t0:.3f} s")
t0 = time.perf_counter()
print("Filtering peaks...")
# Second pass: local maxima, FDR, and OE threshold filtering
peak.mask[
:, : conf.peak_width + 1
] = True # mask out diagonals within peak_width
peak.mask = np.logical_or(peak.mask, peak.data <= 0)
peak.mask = np.logical_or(peak.mask, np.isnan(peak.data))
peak.mask = np.logical_or(peak.mask, np.isinf(peak.data))
peak.mask = np.logical_or(
np.minimum.reduce([eBL.data, eDonut.data, eH.data, eV.data]) <= 1e-6,
peak.mask,
)
peak.mask = np.logical_or(observed.data <= conf.oe2 * eBL.data, peak.mask)
peak.mask = np.logical_or(
observed.data <= conf.oe2 * eDonut.data, peak.mask
)
peak.mask = np.logical_or(observed.data <= conf.oe1 * eH.data, peak.mask)
peak.mask = np.logical_or(observed.data <= conf.oe1 * eV.data, peak.mask)
peak.mask = np.logical_or(
np.logical_and(
observed.data <= conf.oe3 * eBL.data,
observed.data <= conf.oe3 * eDonut.data,
),
peak.mask,
)
observed_clip = np.clip(observed.data, 0, conf.max_count)
peak.mask = np.logical_or(
np.maximum.reduce(
[
fdrBL[binBL.data, observed_clip],
fdrDo[binDonut.data, observed_clip],
fdrH[binH.data, observed_clip],
fdrV[binV.data, observed_clip],
]
)
> conf.fdr,
peak.mask,
)
del observed_clip
candidates: List[Feature2D] = []
rows, cols = np.where(~peak.mask)
for i, k in zip(rows, cols):
diagDist = k
j = i + diagDist
val = peak.data[i, k]
# # Local maxima check (only compare within banded region)
# pw = conf.peak_width
# max_val = -np.inf
# for ii in range(max(0, i - pw), min(N, i + pw + 1)):
# for jj in range(max(0, j - pw), min(N, j + pw + 1)):
# vv = peak[ii, jj]
# if ma.is_masked(vv):
# continue
# if vv > max_val:
# max_val = vv
# if val < max_val:
# continue
e_bl = eBL.data[i, k]
e_dn = eDonut.data[i, k]
e_hh = eH.data[i, k]
e_vv = eV.data[i, k]
o = observed.data[i, k]
bBL = int(binBL.data[i, k])
bDo = int(binDonut.data[i, k])
bH = int(binH.data[i, k])
bV = int(binV.data[i, k])
f = generate_peak_feature(
chr_name,
conf.resolution,
i,
j,
o,
val,
e_bl,
e_dn,
e_hh,
e_vv,
bBL,
bDo,
bH,
bV,
)
add_fdr_to_feature(f, fdrBL, fdrDo, fdrH, fdrV)
candidates.append(f)
t1 = time.perf_counter()
print(f"[TIME] local maxima + filtering: {t1 - t0:.3f} s")
t0 = time.perf_counter()
merged_loops = coalesce_pixels_to_centroid(candidates, conf)
filtered_loops = []
for loop in merged_loops:
if fdr_thresholds_satisfied(loop, conf):
filtered_loops.append(loop)
t1 = time.perf_counter()
print(f"[TIME] centroid merging: {t1 - t0:.3f} s")
# merged_loops = candidates
return filtered_loops, peak
# =========================================================
# Step 5: multi-resolution HiCCUPS + merging
# =========================================================
def euclid_bp(f1: Feature2D, f2: Feature2D) -> float:
dx = f1.start1 - f2.start1
dy = f1.start2 - f2.start2
return math.hypot(dx, dy)
def extract_reproducible_centroids(
list_a: List[Feature2D], list_b: List[Feature2D], radius_bp: int
) -> List[Feature2D]:
centroids = []
for fb in list_b:
for fa in list_a:
if fa.chr1 != fb.chr1:
continue
if euclid_bp(fa, fb) <= radius_bp:
centroids.append(fb)
break
return centroids
def extract_peaks_near_centroids(
peaks: List[Feature2D], centroids: List[Feature2D], radius_bp: int
) -> List[Feature2D]:
out = []
for f in peaks:
for c in centroids:
if f.chr1 == c.chr1 and euclid_bp(f, c) <= radius_bp:
out.append(f)
break
return out
def extract_peaks_not_near_centroids(
peaks: List[Feature2D], centroids: List[Feature2D], radius_bp: int
) -> List[Feature2D]:
out = []
for f in peaks:
keep = True
for c in centroids:
if f.chr1 == c.chr1 and euclid_bp(f, c) <= radius_bp:
keep = False
break
if keep:
out.append(f)
return out
def get_peaks_near_diagonal(
peaks: List[Feature2D], max_dist_bp: int
) -> List[Feature2D]:
out = []
for f in peaks:
if abs(f.start2 - f.start1) <= max_dist_bp:
out.append(f)
return out
def get_strong_peaks(
peaks: List[Feature2D], min_observed: float
) -> List[Feature2D]:
out = []
for f in peaks:
if f.get_float(OBSERVED) >= min_observed:
out.append(f)
return out
def remove_duplicates(features: List[Feature2D]) -> List[Feature2D]:
best: Dict[Tuple[str, int, int], Feature2D] = {}
for f in features:
key = (f.chr1, f.start1, f.start2)
if key not in best or f.get_float(PEAK) > best[key].get_float(PEAK):
best[key] = f
return list(best.values())
def merge_all_resolutions(
looplists: Dict[int, List[Feature2D]]
) -> List[Feature2D]:
"""
Mimics Juicer's HiCCUPSUtils.mergeAllResolutions logic:
- If 5 kb and/or 10 kb resolutions are available:
* If both are present:
- Merge 5 kb and 10 kb loops within overlapping centroid regions
- Add 10 kb peaks that do not have nearby 5 kb centroids
- Add 5 kb peaks that are either close to the diagonal or have strong signal
* If only 5 kb or only 10 kb is present, directly use that resolution
- If 25 kb resolution is available:
* If a merged list already exists:
- Add 25 kb peaks that are far from existing merged centroids
* Otherwise, set merged = 25 kb peaks
- If none of 5 kb / 10 kb / 25 kb resolutions are used:
* Simply take the union of all resolutions and remove duplicates
"""
merged: List[Feature2D] = []
list_altered = False
has5 = 5000 in looplists and len(looplists[5000]) > 0
has10 = 10000 in looplists and len(looplists[10000]) > 0
has25 = 25000 in looplists and len(looplists[25000]) > 0
if has5 or has10:
if has5 and has10:
five = looplists[5000]
ten = looplists[10000]
c5 = extract_reproducible_centroids(ten, five, 2 * 10000)
merged_tmp = extract_peaks_near_centroids(five, c5, 2 * 10000)
c10 = extract_reproducible_centroids(five, ten, 2 * 10000)
distant10 = extract_peaks_not_near_centroids(ten, c10, 2 * 10000)
merged_tmp.extend(distant10)
near_diag = get_peaks_near_diagonal(five, 110000)
strong = get_strong_peaks(five, 100)
merged_tmp.extend(near_diag)
merged_tmp.extend(strong)
merged = remove_duplicates(merged_tmp)
elif has5:
merged = remove_duplicates(looplists[5000])
else:
merged = remove_duplicates(looplists[10000])
list_altered = True
if has25:
twenty5 = looplists[25000]
if list_altered:
c25 = extract_reproducible_centroids(merged, twenty5, 2 * 25000)
distant25 = extract_peaks_not_near_centroids(
twenty5, c25, 2 * 25000
)
merged.extend(distant25)
merged = remove_duplicates(merged)
else:
merged = remove_duplicates(twenty5)
list_altered = True
# 若 5/10/25 都没用上:合并所有分辨率
if not list_altered:
tmp: List[Feature2D] = []
for lst in looplists.values():
tmp.extend(lst)
merged = remove_duplicates(tmp)
return merged
def run_hiccups_multi_resolution(
hic_file: str,
chr_name: str,
configs: Dict[int, HiCCUPSConfig],
):
"""
Multi-resolution HiCCUPS:
- Run run_hiccups_single_resolution for each resolution
- Merge results using merge_all_resolutions
"""
looplists: Dict[int, List[Feature2D]] = {}
for res, conf in configs.items():
loops, _ = run_hiccups_single_resolution(
hic_file,
chr_name,
conf,
)
looplists[res] = loops
merged = merge_all_resolutions(looplists)
return merged
# -------------------------------------------------------------
# High-level HiCCUPS function API (clean & structured)
# -------------------------------------------------------------
def _default_peak_width(res):
"""Juicer default peak width."""
if res == 1000:
return 20
if res == 5000:
return 4
if res == 10000:
return 2
if res == 25000:
return 1
return 3
def _default_window(res):
"""Juicer default donut window."""
if res == 1000:
return 35
if res == 5000:
return 7
if res == 10000:
return 5
if res == 25000:
return 3
return 5
from typing import Union
[docs]def hiccups(
hic_file: str,
chroms: Union[str, list],
resolutions: list,
fdr: list = None,
peak_width: list = None,
window: list = None,
thresholds: list = (0.02, 1.5, 1.75, 2.0),
centroid_radius: list = None,
max_loop_dist_bp: int = 2_000_000,
kr_neighborhood: int = 5,
matrix_size: int = 512,
norm: str = "KR",
out_path: str = None,
n_jobs: int = -1,
):
"""
Run the HiCCUPS loop-calling algorithm on Hi-C data using the BandHiC data structure.
This function provides a high-level interface to a BandHiC-based reimplementation
of the HiCCUPS algorithm. It supports single- or multi-resolution loop detection
across one or more chromosomes and internally applies multiprocessing to accelerate
computation on large, high-resolution Hi-C matrices.
Parameters
----------
hic_file : str
Path to the input Hi-C file. Supported formats include ``.hic``, ``.cool``,
and ``.mcool``.
chroms : str or list of str
Chromosome name or list of chromosome names (e.g., ``"chr1"`` or
``["chr1", "chr2"]``).
resolutions : list of int
List of bin resolutions (in base pairs) at which HiCCUPS will be executed.
fdr : list of float or float, optional
Target false discovery rate (FDR) for loop detection at each resolution.
If a single value is provided, it is applied to all resolutions.
peak_width : list of int or int, optional
Peak half-width (in bins) for each resolution. Defaults follow Juicer
recommendations if not specified.
window : list of int or int, optional
Donut window size (in bins) for each resolution. Defaults follow Juicer
recommendations if not specified.
thresholds : list of float, optional
Threshold parameters ``[fdrsum, oe1, oe2, oe3]`` used for HiCCUPS filtering
and cross-resolution merging.
centroid_radius : list of int or int, optional
Radius (in base pairs) for clustering nearby loop pixels into centroids
at each resolution.
max_loop_dist_bp : int, optional
Maximum genomic distance (in base pairs) for loop search. Default is
2,000,000 bp.
kr_neighborhood : int, optional
Radius (in bins) used to mask low-quality bins based on the KR
normalization vector.
matrix_size : int, optional
Reserved parameter for compatibility; not used in the current implementation.
norm : str, optional
Normalization method to use (e.g., ``"KR"``, ``"VC"``). Default is ``"KR"``.
out_path : str, optional
Path to write the output loops in BEDPE format. If ``None``, results
are not written to disk.
n_jobs : int, optional
Number of parallel worker processes to use. ``-1`` uses all available CPUs.
Returns
-------
loops : list of Feature2D
List of detected chromatin loops after multi-resolution merging.
"""
num_res = len(resolutions)
def _expand_param(param, default_fn=None, default_val=None):
if param is None:
if default_fn is not None:
return [default_fn(r) for r in resolutions]
elif default_val is not None:
return [default_val] * num_res
if isinstance(param, (int, float)):
return [param] * num_res
if isinstance(param, list):
if len(param) == 1:
return param * num_res
if len(param) != num_res:
raise ValueError(f"Parameter length mismatch for {param}")
return param
raise ValueError(f"Unsupported parameter type: {param}")
fdr = _expand_param(fdr, default_val=0.1)
peak_width = _expand_param(peak_width, default_fn=_default_peak_width)
window = _expand_param(window, default_fn=_default_window)
centroid_radius = _expand_param(centroid_radius, default_val=20000)
if isinstance(thresholds, (int, float)):
thresholds = [thresholds] * 4
if len(thresholds) != 4:
raise ValueError("thresholds must have length 4 or be a scalar.")
fdrsum, oe1, oe2, oe3 = thresholds
# Build configs
configs_by_res: Dict[int, HiCCUPSConfig] = {}
for idx, res in enumerate(resolutions):
conf = HiCCUPSConfig(
resolution=res,
window=window[idx],
peak_width=peak_width[idx],
fdr=fdr[idx],
oe1=oe1,
oe2=oe2,
oe3=oe3,
fdrsum=fdrsum,
cluster_radius_bp=centroid_radius[idx],
max_loop_dist_bp=max_loop_dist_bp,
kr_neighborhood=kr_neighborhood,
norm=norm,
n_jobs=n_jobs,
)
configs_by_res[res] = conf
# Support chroms as str or list
if isinstance(chroms, str):
chrom_list = [chroms]
else:
chrom_list = list(chroms)
all_loops = []
for chrom in chrom_list:
loops_chr = run_hiccups_multi_resolution(
hic_file,
chrom,
configs_by_res,
)
all_loops.extend(loops_chr)
if out_path is not None:
write_bedpe(all_loops, out_path)
return all_loops
# =========================================================
# Step 6: BEDPE Output
# =========================================================
def write_bedpe(loops: List[Feature2D], path: str):
"""
header:
#chr1 x1 x2 chr2 y1 y2 name score strand1 strand2 color observed expectedBL expectedDonut expectedH expectedV fdrBL fdrDonut fdrH fdrV numCollapsed centroid1 centroid2 radius
"""
with open(path, "w") as f:
header = [
"#chr1",
"x1",
"x2",
"chr2",
"y1",
"y2",
"name",
"score",
"strand1",
"strand2",
"color",
"observed",
"expectedBL",
"expectedDonut",
"expectedH",
"expectedV",
"fdrBL",
"fdrDonut",
"fdrH",
"fdrV",
"numCollapsed",
"centroid1",
"centroid2",
"radius",
]
f.write("\t".join(header) + "\n")
for feat in loops:
score = feat.get_float(PEAK)
observed = (
feat.get_float(OBSERVED)
if OBSERVED in feat.attrs
else float("nan")
)
expBL = (
feat.get_float(EXPECTEDBL)
if EXPECTEDBL in feat.attrs
else float("nan")
)
expDonut = (
feat.get_float(EXPECTEDDONUT)
if EXPECTEDDONUT in feat.attrs
else float("nan")
)
expH = (
feat.get_float(EXPECTEDH)
if EXPECTEDH in feat.attrs
else float("nan")
)
expV = (
feat.get_float(EXPECTEDV)
if EXPECTEDV in feat.attrs
else float("nan")
)
fdrBL_val = (
feat.get_float(FDRBL) if FDRBL in feat.attrs else float("nan")
)
fdrDo_val = (
feat.get_float(FDRDONUT)
if FDRDONUT in feat.attrs
else float("nan")
)
fdrH_val = (
feat.get_float(FDRH) if FDRH in feat.attrs else float("nan")
)
fdrV_val = (
feat.get_float(FDRV) if FDRV in feat.attrs else float("nan")
)
numCollapsed_val = (
feat.get_float(NUMCOLLAPSED)
if NUMCOLLAPSED in feat.attrs
else float("nan")
)
centroid1_val = (
feat.get_float(CENTROID1)
if CENTROID1 in feat.attrs
else float("nan")
)
centroid2_val = (
feat.get_float(CENTROID2)
if CENTROID2 in feat.attrs
else float("nan")
)
radius_val = (
feat.get_float(RADIUS)
if RADIUS in feat.attrs
else float("nan")
)
color_str = ",".join(map(str, feat.color)) if feat.color else ""
line_items = [
feat.chr1,
str(feat.start1),
str(feat.end1),
feat.chr2,
str(feat.start2),
str(feat.end2),
".", # name
f"{score:.4g}",
".", # strand1
".", # strand2
color_str,
f"{observed:.6g}",
f"{expBL:.6g}",
f"{expDonut:.6g}",
f"{expH:.6g}",
f"{expV:.6g}",
f"{fdrBL_val:.6g}",
f"{fdrDo_val:.6g}",
f"{fdrH_val:.6g}",
f"{fdrV_val:.6g}",
f"{numCollapsed_val:.6g}",
f"{centroid1_val:.6g}",
f"{centroid2_val:.6g}",
f"{radius_val:.6g}",
]
f.write("\t".join(line_items) + "\n")
# =========================================================
# Simple test function for HiCCUPS
# =========================================================
def test_hiccups_basic():
"""
Minimal smoke test for HiCCUPS:
- Runs hiccups on a small chromosome with one resolution.
- Does not validate biological correctness; only checks that
the pipeline executes end‑to‑end without errors.
"""
# hic_path = "/Users/wwb/workspace-local/call_loop/data/GSE63525_GM12878_insitu_primary_replicate_combined.hic"
hic_path = "/Users/wwb/workspace-local/call_loop/data/GSE130275_mESC_WT_combined_1.3B_microc.hic"
chrom = "chr19"
resolutions = [1000]
loops = hiccups(
hic_file=hic_path,
chroms=chrom,
resolutions=resolutions,
out_path="../../test/hiccups_mESC_output_1000_chr19.bedpe",
n_jobs=-1,
)
print(f"Test completed. Loops detected: {len(loops)}")
if __name__ == "__main__":
import time, os, psutil
proc = psutil.Process(os.getpid())
mem_before = proc.memory_info().rss / 1024**2
t0 = time.perf_counter()
test_hiccups_basic()
t1 = time.perf_counter()
mem_after = proc.memory_info().rss / 1024**2
mem_peak = (
proc.memory_info().peak_wset / 1024**2
if hasattr(proc.memory_info(), "peak_wset")
else mem_after
)
print(f"[HiCCUPS] time = {t1 - t0:.2f} s")
print(f"[HiCCUPS] RSS before = {mem_before:.1f} MiB")
print(f"[HiCCUPS] RSS after = {mem_after:.1f} MiB")