Source code for bandhic.utils.topdom

# -*- coding: utf-8 -*-
# topdom.py

"""
TopDom: A Python implementation of the TopDom algorithm for detecting TADs (Topologically Associating Domains) in Hi-C data.
Author: Weibing Wang
Date: 2025-06-11
Email: wangweibing@xidian.edu.cn

This module provides functions to detect TADs from Hi-C matrices using the TopDom algorithm.
It is translated from the original R implementation and adapted for use with the `band_hic_matrix` class from the BandHiC package.

Reference:
1. Shin, H. et al. TopDom: an efficient and deterministic method for identifying topological domains in genomes. Nucleic Acids Research 44, e70 (2016). https://doi.org/10.1093/nar/gkw064

"""

import numpy as np
import pandas as pd
import time
import scipy.stats as stats
import scipy.sparse as sparse
import bandhic as bh
from typing import Union

__all__ = [
    "call_tad",
    "topdom",
]


[docs]def call_tad( hic_matrix: Union[bh.band_hic_matrix, np.ndarray], resolution: int, chrom_short: str, window_size_bp: int = 200_000, min_TAD_size: int = None, stat_filter: bool = True, verbose: bool = False, ): """ Detect TADs from a Hi-C matrix using the TopDom algorithm. Different from the `TopDom` function, this function accepts a Hi-C matrix in either `band_hic_matrix` format or as a dense numpy array. This function is designed to be used with the BandHiC package and provides a convenient interface for TAD detection. Parameters ---------- hic_matrix : band_hic_matrix or np.ndarray The Hi-C matrix, either as a band_hic_matrix object or a dense numpy array. resolution : int The resolution of the Hi-C matrix. chrom_short : str The chromosome name without 'chr' prefix. window_size_bp : int, optional The size of the window to consider for detecting TADs, in base pairs. Default is 200000. min_TAD_size : int, optional The minimum size of a TAD to be considered valid, in base pairs. Default is None. stat_filter : bool, optional Whether to apply statistical filtering to remove false positives. Default is True. verbose : bool, optional Whether to print detailed information during processing. Default is False. Returns ------- domains : pd.DataFrame DataFrame containing detected TADs with columns 'chr', 'from.id', 'from.coord', 'to.id', 'to.coord', 'tag'. bins : pd.DataFrame DataFrame containing bin information columns 'id', 'chr', 'from.coord', 'to.coord', 'local.ext', 'mean.cf', 'pvalue'. """ n = hic_matrix.shape[0] from_cord = np.arange(0, n) * resolution to_cord = np.arange(1, n + 1) * resolution chrom = np.repeat("chr" + chrom_short, n) bins = pd.DataFrame( { "id": np.arange(n), "chr": chrom, "from.coord": from_cord, "to.coord": to_cord, } ) domains, bins = topdom( hic_matrix, bins, window_size_bp // resolution, verbose=False, stat_filter=stat_filter, ) if min_TAD_size is None: return domains, bins if not isinstance(min_TAD_size, int): raise ValueError( "min_TAD_size must be an integer representing the minimum TAD size in base pairs." ) if min_TAD_size < resolution: raise ValueError( "min_TAD_size must be greater than or equal to the resolution of the Hi-C matrix." ) # Merge small TADs merged_TAD = domains.copy() left_index = [] merged_TAD["size_bin"] = merged_TAD["to.id"] - merged_TAD["from.id"] merged_TAD.reset_index(drop=True, inplace=True) for index in merged_TAD.index: row = merged_TAD.loc[index, :] if row.size_bin < min_TAD_size // resolution and row.tag != "gap": if left_index: last_index = left_index[-1] last_tag = merged_TAD.at[last_index, "tag"] last_mean_cf = bins.at[row["from.id"], "mean.cf"] else: last_index = None last_tag = None if index + 1 < merged_TAD.shape[0]: next_tag = merged_TAD.at[index + 1, "tag"] next_mean_cf = bins.at[row["to.id"], "mean.cf"] else: next_tag = None if last_tag == "gap" and next_tag == "gap": left_index.append(index) continue elif last_tag == "gap" or not last_index: # Merge to next TAD merged_TAD.at[index + 1, "from.id"] = row["from.id"] merged_TAD.at[index + 1, "from.coord"] = row["from.coord"] merged_TAD.at[index + 1, "size_bin"] += row["size_bin"] elif next_tag == "gap": # Merge to last TAD if last_index: merged_TAD.at[last_index, "to.id"] = row["to.id"] merged_TAD.at[last_index, "to.coord"] = row["to.coord"] merged_TAD.at[last_index, "size_bin"] += row["size_bin"] else: left_index.append(index) continue else: # Merge based on mean contact frequency if last_mean_cf >= next_mean_cf: # Merge to last TAD if last_index: merged_TAD.at[last_index, "to.id"] = row["to.id"] merged_TAD.at[last_index, "to.coord"] = row["to.coord"] merged_TAD.at[last_index, "size_bin"] += row[ "size_bin" ] else: left_index.append(index) continue else: merged_TAD.at[index + 1, "from.id"] = row["from.id"] merged_TAD.at[index + 1, "from.coord"] = row["from.coord"] merged_TAD.at[index + 1, "size_bin"] += row["size_bin"] else: left_index.append(index) merged_TAD = merged_TAD.loc[left_index, :] return merged_TAD, bins
[docs]def topdom(hic_matrix, bins, window_size, stat_filter=True, verbose=False): """ Detect TADs using TopDom algorithm. Parameters ---------- hic_matrix : band_hic_matrix or np.ndarray The Hi-C matrix, either as a band_hic_matrix object or a dense numpy array. bins : pd.DataFrame DataFrame containing bin information with columns 'chr', 'from.coord', 'to.coord'. window_size : int Size of the window to consider for detecting TADs. stat_filter : bool, optional Whether to apply statistical filtering to remove false positives. Default is True. verbose : bool, optional Whether to print detailed information during processing. Default is False. Returns ------- domains : pd.DataFrame DataFrame containing detected TADs with columns 'chr', 'from.id', 'from.coord', 'to.id', 'to.coord', 'tag'. bins : pd.DataFrame Updated DataFrame containing bin information with additional columns 'local.ext', 'mean.cf', 'pvalue'. """ n_bins = hic_matrix.shape[0] mean_cf = np.zeros(n_bins) pvalue = np.ones(n_bins) local_ext = np.array(n_bins * [-0.5]) if verbose: print( "#########################################################################" ) print( "Step 1 : Generating binSignals by computing bin-level contact frequencies" ) print( "#########################################################################" ) ptm = time.process_time() # def get_mean(i,mat_data,window_size): # diamond=Get_Diamond_Matrix(mat_data=hic_diag,i=i,size=window_size) # return np.mean(diamond) for i in range(n_bins): diamond = Get_Diamond_Matrix( mat_data=hic_matrix, i=i, size=window_size ) avg = np.mean(diamond) if avg is not np.ma.masked: mean_cf[i] = avg else: mean_cf[i] = 0 eltm = time.process_time() - ptm if verbose: print("Step 1 Running Time : %f" % (eltm)) print("Step 1 : Done !!") print( "#########################################################################" ) print("Step 2 : Detect TD boundaries based on binSignals") print( "#########################################################################" ) ptm = time.process_time() gap_idx = Which_Gap_Region2(matrix_data=hic_matrix, w=window_size, k=25) proc_regions = Which_Process_Region( rmv_idx=gap_idx, n_bins=n_bins, min_size=3 ) # print(gap_idx) # print(proc_regions) for i, row in proc_regions.iterrows(): start = row["start"] end = row["end"] local_ext[start:end] = Detect_Local_Extreme(x=mean_cf[start:end]) eltm = time.process_time() if verbose: print("Step 2 Running Time : %f" % (eltm)) print("Step 2 : Done !!") print( "#########################################################################" ) print("Step 3 : Statistical Filtering of false positive TD boundaries") print( "#########################################################################" ) if stat_filter: ptm = time.process_time() if verbose: print("-- Matrix Scaling....") # scale_matrix_data = matrix_data.ravel(order='C').copy() # scale_matrix_data = scale_matrix_data.astype(np.float64) # for i in range(2*window_size): # scale_matrix_data[np.arange(i,n_bins*n_bins,1+n_bins)]= scale(scale_matrix_data[np.arange(i,n_bins*n_bins,1+n_bins)]) # #scale_matrix_data[np.arange(i*n_bins,n_bins*n_bins,1+n_bins)]= scale(scale_matrix_data[np.arange(i*n_bins,n_bins*n_bins,1+n_bins)]) # scale_matrix_data=scale_matrix_data.reshape(matrix_data.shape) scale_hic_diag = hic_matrix.copy() scale_hic_diag = scale_hic_diag.astype(np.float64) for i in range(2 * window_size): if isinstance(scale_hic_diag, bh.band_hic_matrix): diag = scale_hic_diag.diag(i) diag = scale_array(diag) scale_hic_diag.set_diag(i, diag) elif isinstance(scale_hic_diag, np.ndarray): row_idx = np.arange(n_bins - i) col_idx = row_idx + i scale_hic_diag[row_idx, col_idx] = scale_array( scale_hic_diag[row_idx, col_idx] ) else: raise ValueError( "scale_hic_diag must be a band_hic_matrix or np.ndarray" ) # np.savetxt('../test/scale_hic_diag.txt',scale_hic_diag.data,delimiter='\t') if verbose: print("-- Compute p-values by Wilcox Ranksum Test") for i, row in proc_regions.iterrows(): start = row["start"] end = row["end"] if verbose: print("Process Regions from %d to %d" % (start, end)) pvalue[start:end] = Get_Pvalue( scale_hic_diag, start, end, local_ext, size=window_size, scale=1, ) # print(np.sum(pvalue[start:end]<=0.05)) # print(pvalue) if verbose: print("-- Done!") print("-- Filtering False Positives") local_ext[ np.intersect1d( np.argwhere(local_ext == -1), np.argwhere(pvalue <= 0.05) ) ] = -2 local_ext[np.argwhere(local_ext == -1)] = 0 local_ext[np.argwhere(local_ext == -2)] = -1 if ( isinstance(hic_matrix, bh.band_hic_matrix) and hic_matrix.mask_row_col is not None ): local_ext[hic_matrix.mask_row_col] = -0.5 if verbose: print("-- Done!") eltm = time.process_time() print("Step 3 Running Time : %f" % (eltm)) print("Step 3 : Done!") signal_idx = np.argwhere(local_ext == -1) gap_idx = np.argwhere(local_ext == -0.5) # print(signal_idx) domains = Convert_Bin_To_Domain( bins, signal_idx, gap_idx, pvalues=pvalue, pvalue_cut=0.05 ) # print(domains[domains['tag']=='boundary']) bins["local.ext"] = local_ext bins["mean.cf"] = mean_cf bins["pvalue"] = pvalue return domains, bins
def scale_array(x: np.ndarray) -> np.ndarray: """ Scale the input array to have zero mean and unit variance. Parameters ---------- x : np.ndarray Input array to be scaled. Returns ------- np.ndarray Scaled array. """ if x.size == 0: return x _mean = np.mean(x) _std = np.std(x) if _std < 10 * np.finfo(_std.dtype).eps: _std = 1.0 # Avoid division by small standard deviation return (x - np.mean(x)) / (np.std(x)) # Avoid division by zero def Get_Diamond_Matrix(mat_data, i, size): n_bins = mat_data.shape[0] if i == n_bins - 1: return 0 lowerbound = max(0, i - size + 1) upperbound = min(i + size, n_bins - 1) return mat_data[lowerbound : (i + 1), (i + 1) : (upperbound + 1)] def Which_Gap_Region2(matrix_data, w, k=0): n_bins = matrix_data.shape[0] gap = np.zeros(n_bins) if ( isinstance(matrix_data, bh.band_hic_matrix) and matrix_data.mask_row_col is not None ): gap_idx = np.where(matrix_data.mask_row_col)[0] if gap_idx.size == 0: short_runs = [] breaks = np.where(np.diff(gap_idx) != 1)[0] + 1 runs = np.split(gap_idx, breaks) # short_runs = [(r[0], r[-1]) for r in runs if r.size < k] short_runs = [r for r in runs if r.size > k] return np.concatenate(short_runs) for i in range(n_bins): if np.sum(matrix_data[i, max(0, i - w) : min(i + w, n_bins)]) == 0: gap[i] = -0.5 idx = np.argwhere(gap == -0.5).ravel() return idx def Which_Process_Region(rmv_idx, n_bins, min_size=3): gap_idx = rmv_idx proc_regions = [] proc_set = np.setdiff1d(range(n_bins), rmv_idx) n_proc_set = len(proc_set) i = 0 while i < n_proc_set - 1: start = proc_set[i] j = i + 1 while j < n_proc_set: if proc_set[j] - proc_set[j - 1] <= 1: j = j + 1 else: proc_regions.append([start, proc_set[j - 1] + 1]) i = j break if j >= n_proc_set - 1: proc_regions.append([start, proc_set[j - 1] + 1]) break df_proc_regions = pd.DataFrame(proc_regions, columns=["start", "end"]) df_proc_regions = df_proc_regions[ abs(df_proc_regions["end"] - df_proc_regions["start"]) > min_size ] return df_proc_regions def Detect_Local_Extreme(x): n_bins = len(x) ret = np.zeros(n_bins) x[np.isnan(x)] = 0 if n_bins <= 3: # print(np.min(x)) # print(np.min(y)) ret[np.argmin(x)] = -1 ret[np.argmax(x)] = 1 return ret new_point = Data_Norm(x=np.arange(n_bins), y=x) x = new_point[1] cp = Change_Point(x=np.arange(n_bins), y=x) if len(cp[0]) <= 2: return ret if len(cp[0]) == n_bins: return ret for i in range(1, len(cp[0]) - 1): if x[cp[0][i]] >= x[cp[0][i] - 1] and x[cp[0][i]] >= x[cp[0][i] + 1]: ret[cp[0][i]] = 1 elif x[cp[0][i]] < x[cp[0][i] - 1] and x[cp[0][i]] < x[cp[0][i] + 1]: ret[cp[0][i]] = -1 min_val = min(x[cp[0][i - 1]], x[cp[0][i]]) max_val = max(x[cp[0][i - 1]], x[cp[0][i]]) if min(x[cp[0][i - 1] : cp[0][i] + 1]) < min_val: ret[cp[0][i - 1] + np.argmin(x[cp[0][i - 1] : cp[0][i]] + 1)] = -1 if max(x[cp[0][i - 1] : cp[0][i] + 1]) > max_val: ret[cp[0][i - 1] + np.argmax(x[cp[0][i - 1] : cp[0][i]] + 1)] = 1 return ret def Data_Norm(x, y): ret_x = np.zeros(len(x)) ret_y = np.zeros(len(y)) ret_x[0] = x[0] ret_y[0] = y[0] diff_x = np.diff(x) diff_y = np.diff(y) epsilon = 1e-8 scale_x = 1 / np.mean(np.abs(diff_x) + epsilon) scale_y = 1 / np.mean(np.abs(diff_y) + epsilon) for i in range(1, len(x)): ret_x[i] = ret_x[i - 1] + (diff_x[i - 1] * scale_x) ret_y[i] = ret_y[i - 1] + (diff_y[i - 1] * scale_y) return [ret_x, ret_y] def Change_Point(x, y): if len(x) != len(y): print("ERROR : The length of x and y should be the same") return 0 n_bins = len(x) Fv = np.array(n_bins * [np.nan]) Ev = np.array(n_bins * [np.nan]) cp = [0] i = 0 Fv[0] = 0 while i < n_bins - 1: j = i + 1 Fv[j] = np.sqrt((x[j] - x[i]) ** 2 + (y[j] - y[i]) ** 2) while j < n_bins - 1: j = j + 1 # k=list(range(i+1,j)) Ev[j] = np.sum( np.abs( (y[j] - y[i]) * x[i + 1 : j] - (x[j] - x[i]) * y[i + 1 : j] - (x[i] * y[j]) + (x[j] * y[i]) ) ) / np.sqrt((x[j] - x[i]) ** 2 + (y[j] - y[i]) ** 2) Fv[j] = np.sqrt((x[j] - x[i]) ** 2 + (y[j] - y[i]) ** 2) - Ev[j] if (np.isnan(Fv[j])) or (np.isnan(Fv[j - 1])): j = j - 1 cp.append(j) break if Fv[j] < Fv[j - 1]: j = j - 1 cp.append(j) break i = j cp.append(n_bins - 1) return [cp, Fv, Ev] def Get_Pvalue(matrix_data, start, end, local_ext, size, scale=1): n_bins = end - start pvalue = np.ones(n_bins) index_array = np.intersect1d( np.arange(start, end - 1), np.where(local_ext == -1)[0] ) if index_array.shape[0]: for i in index_array: dia = Get_Diamond_Matrix2( matrix_data, start, end, i, size=size ).ravel() ups = Get_Upstream_Triangle( matrix_data, start, end, i, size=size ).ravel() downs = Get_Downstream_Triangle( matrix_data, start, end, i, size=size ).ravel() if ( isinstance(matrix_data, bh.band_hic_matrix) and matrix_data.mask is not None ): dia = dia.compressed() ups = ups.compressed() downs = downs.compressed() pvalue[i - start] = stats.ranksums( dia * scale, np.append(ups, downs), alternative="less", nan_policy="omit", ).pvalue pvalue[np.isnan(pvalue)] = 1 return pvalue def Get_Diamond_Matrix2(mat_data, start, end, i, size): n_bins = end - start new_mat = np.array([[np.nan] * size] * size) for k in range(size): lower = min(i + 1, end - 1) upper = min(i + size + 1, end) if i - k >= start and i < end - 1: new_mat[size - k - 1, 0 : upper - lower] = mat_data[ i - k, lower:upper ] return new_mat def Get_Upstream_Triangle(mat_data, start, end, i, size): lower = max(start, i - size) tmp_mat = mat_data[lower : (i + 1), lower : (i + 1)] return tmp_mat[np.triu_indices(tmp_mat.shape[0], k=1)] def Get_Downstream_Triangle(mat_data, start, end, i, size): if i == end - 1: return np.nan upper = min(i + size + 1, end) tmp_mat = mat_data[i + 1 : upper, i + 1 : upper] return tmp_mat[np.triu_indices(tmp_mat.shape[0], k=1)] def Convert_Bin_To_Domain( bins, signal_idx, gap_idx, pvalues=None, pvalue_cut=None ): n_bins = len(bins) rmv_idx = np.setdiff1d(np.arange(n_bins), gap_idx) proc_region = Which_Process_Region(rmv_idx, n_bins, min_size=0) # print(proc_region) from_coord = bins.loc[proc_region["start"], "from.coord"] n_procs = len(proc_region) gap = pd.DataFrame({"from.coord": from_coord, "tag": ["gap"] * n_procs}) rmv_idx2 = np.append(signal_idx, gap_idx) rmv_idx2 = np.sort(rmv_idx2) proc_region = Which_Process_Region(rmv_idx2, n_bins, min_size=0) from_coord = bins.loc[proc_region["start"], "from.coord"] n_procs = len(proc_region) # print(proc_region) domain = pd.DataFrame( {"from.coord": from_coord, "tag": ["domain"] * n_procs} ) # summary=gap.append(domain) summary = pd.concat([gap, domain]) # print(summary) rmv_idx = np.setdiff1d(np.arange(n_bins), signal_idx) proc_region = Which_Process_Region(rmv_idx, n_bins, min_size=1) n_procs = len(proc_region) if n_procs > 0: from_coord = bins.loc[proc_region["start"], "from.coord"] boundary = pd.DataFrame( {"from.coord": from_coord, "tag": ["boundary"] * n_procs} ) # summary=summary.append(boundary) summary = pd.concat([summary, boundary]) summary = summary.sort_values(by=["from.coord"]) # print(summary) # todo # to_coord=summary['from.coord'].iloc[1:len(summary)].append(pd.Series([bins.loc[n_bins-1,'to.coord']])) to_coord = pd.concat( [ summary["from.coord"].iloc[1 : len(summary)], pd.Series([bins.loc[n_bins - 1, "to.coord"]]), ] ) to_coord.index = summary.index summary["to.coord"] = to_coord summary["from.id"] = summary.index to_id = list(summary.index)[1 : len(summary.index)] to_id.append(n_bins) to_id = pd.Series(to_id, index=summary.index) summary["to.id"] = to_id # summary['size']=summary['to.coord']-summary['from.coord'] # summary['span']=summary['to.id']-summary['from.id'] summary["chr"] = bins["chr"].values[0] summary = summary[ ["chr", "from.id", "from.coord", "to.id", "to.coord", "tag"] ] # print(summary) if np.any(pvalues) and pvalue_cut != None: for i, row in summary.iterrows(): if row["tag"] == "domain": domain_bins_idx = np.arange(row["from.id"], row["to.id"]) p_value_constr = np.argwhere( pvalues[domain_bins_idx] < pvalue_cut ) if len(domain_bins_idx) == len(p_value_constr): row["tag"] = "boundary" return summary