Source code for bandhic.utils.topdom

# -*- coding: utf-8 -*-
# topdom.py

"""
TopDom: A Python implementation of the TopDom algorithm for detecting TADs (Topologically Associating Domains) in Hi-C data.
Author: Weibing Wang
Date: 2025-06-11
Email: wangweibing@xidian.edu.cn

This module provides functions to detect TADs from Hi-C matrices using the TopDom algorithm.
It is translated from the original R implementation and adapted for use with the `band_hic_matrix` class from the BandHiC package.

Reference:
1. Shin, H. et al. TopDom: an efficient and deterministic method for identifying topological domains in genomes. Nucleic Acids Research 44, e70 (2016). https://doi.org/10.1093/nar/gkw064

"""

import numpy as np
import pandas as pd
import time
import scipy.stats as stats
import scipy.sparse as sparse
import bandhic as bh
from typing import Union

__all__ = [
    "call_tad",
    "topdom",
]

[docs] def call_tad(hic_matrix:Union[bh.band_hic_matrix, np.ndarray], resolution:int, chrom_short:str, window_size_bp:int=200000, min_TAD_size:int=None, stat_filter:bool=True, verbose:bool=False): """ Detect TADs from a Hi-C matrix using the TopDom algorithm. Different from the `TopDom` function, this function accepts a Hi-C matrix in either `band_hic_matrix` format or as a dense numpy array. This function is designed to be used with the BandHiC package and provides a convenient interface for TAD detection. Parameters ---------- hic_matrix : band_hic_matrix or np.ndarray The Hi-C matrix, either as a band_hic_matrix object or a dense numpy array. resolution : int The resolution of the Hi-C matrix. chrom_short : str The chromosome name without 'chr' prefix. window_size_bp : int, optional The size of the window to consider for detecting TADs, in base pairs. Default is 200000. min_TAD_size : int, optional The minimum size of a TAD to be considered valid, in base pairs. Default is None. stat_filter : bool, optional Whether to apply statistical filtering to remove false positives. Default is True. verbose : bool, optional Whether to print detailed information during processing. Default is False. Returns ------- domains : pd.DataFrame DataFrame containing detected TADs with columns 'chr', 'from.id', 'from.coord', 'to.id', 'to.coord', 'tag'. bins : pd.DataFrame DataFrame containing bin information columns 'id', 'chr', 'from.coord', 'to.coord', 'local.ext', 'mean.cf', 'pvalue'. """ n = hic_matrix.shape[0] from_cord = np.arange(0, n)*resolution to_cord = np.arange(1, n+1)*resolution chrom = np.repeat('chr'+chrom_short, n) bins = pd.DataFrame({'id': np.arange(n), 'chr': chrom, 'from.coord': from_cord, 'to.coord': to_cord}) domains, bins = topdom(hic_matrix, bins, window_size_bp // resolution, verbose=False, stat_filter=stat_filter) if min_TAD_size is None: return domains, bins if not isinstance(min_TAD_size, int): raise ValueError("min_TAD_size must be an integer representing the minimum TAD size in base pairs.") if min_TAD_size < resolution: raise ValueError("min_TAD_size must be greater than or equal to the resolution of the Hi-C matrix.") # Merge small TADs merged_TAD = domains.copy() left_index = [] merged_TAD['size_bin'] = merged_TAD['to.id']-merged_TAD['from.id'] merged_TAD.reset_index(drop=True, inplace=True) for index in merged_TAD.index: row = merged_TAD.loc[index, :] if row.size_bin < min_TAD_size//resolution and row.tag != 'gap': if left_index: last_index = left_index[-1] last_tag = merged_TAD.at[last_index, 'tag'] last_mean_cf = bins.at[row['from.id'], 'mean.cf'] else: last_index = None last_tag = None if index+1 < merged_TAD.shape[0]: next_tag = merged_TAD.at[index+1, 'tag'] next_mean_cf = bins.at[row['to.id'], 'mean.cf'] else: next_tag = None if last_tag == 'gap' and next_tag == 'gap': left_index.append(index) continue elif last_tag == 'gap' or not last_index: # Merge to next TAD merged_TAD.at[index+1, 'from.id'] = row['from.id'] merged_TAD.at[index+1, 'from.coord'] = row['from.coord'] merged_TAD.at[index+1, 'size_bin'] += row['size_bin'] elif next_tag == 'gap': # Merge to last TAD if last_index: merged_TAD.at[last_index, 'to.id'] = row['to.id'] merged_TAD.at[last_index, 'to.coord'] = row['to.coord'] merged_TAD.at[last_index, 'size_bin'] += row['size_bin'] else: left_index.append(index) continue else: # Merge based on mean contact frequency if last_mean_cf >= next_mean_cf: # Merge to last TAD if last_index: merged_TAD.at[last_index, 'to.id'] = row['to.id'] merged_TAD.at[last_index, 'to.coord'] = row['to.coord'] merged_TAD.at[last_index, 'size_bin'] += row['size_bin'] else: left_index.append(index) continue else: merged_TAD.at[index+1, 'from.id'] = row['from.id'] merged_TAD.at[index+1, 'from.coord'] = row['from.coord'] merged_TAD.at[index+1, 'size_bin'] += row['size_bin'] else: left_index.append(index) merged_TAD = merged_TAD.loc[left_index, :] return merged_TAD, bins
[docs] def topdom(hic_matrix, bins, window_size, stat_filter=True, verbose=False): """ Detect TADs using TopDom algorithm. Parameters ---------- hic_matrix : band_hic_matrix or np.ndarray The Hi-C matrix, either as a band_hic_matrix object or a dense numpy array. bins : pd.DataFrame DataFrame containing bin information with columns 'chr', 'from.coord', 'to.coord'. window_size : int Size of the window to consider for detecting TADs. stat_filter : bool, optional Whether to apply statistical filtering to remove false positives. Default is True. verbose : bool, optional Whether to print detailed information during processing. Default is False. Returns ------- domains : pd.DataFrame DataFrame containing detected TADs with columns 'chr', 'from.id', 'from.coord', 'to.id', 'to.coord', 'tag'. bins : pd.DataFrame Updated DataFrame containing bin information with additional columns 'local.ext', 'mean.cf', 'pvalue'. """ n_bins=hic_matrix.shape[0] mean_cf=np.zeros(n_bins) pvalue=np.ones(n_bins) local_ext=np.array(n_bins*[-0.5]) if verbose: print("#########################################################################") print("Step 1 : Generating binSignals by computing bin-level contact frequencies") print("#########################################################################") ptm=time.process_time() # def get_mean(i,mat_data,window_size): # diamond=Get_Diamond_Matrix(mat_data=hic_diag,i=i,size=window_size) # return np.mean(diamond) for i in range(n_bins): diamond=Get_Diamond_Matrix(mat_data=hic_matrix,i=i,size=window_size) mean_cf[i]=np.mean(diamond) eltm=time.process_time()-ptm if verbose: print("Step 1 Running Time : %f"%(eltm)) print("Step 1 : Done !!") print("#########################################################################") print("Step 2 : Detect TD boundaries based on binSignals") print("#########################################################################") ptm=time.process_time() gap_idx=Which_Gap_Region2(matrix_data=hic_matrix,w=window_size) proc_regions=Which_Process_Region(rmv_idx=gap_idx,n_bins=n_bins,min_size=3) #print(gap_idx) #print(proc_regions) for i,row in proc_regions.iterrows(): start=row['start'] end=row['end'] local_ext[start:end]=Detect_Local_Extreme(x=mean_cf[start:end]) eltm=time.process_time() if verbose: print("Step 2 Running Time : %f"%(eltm)) print("Step 2 : Done !!") print("#########################################################################") print("Step 3 : Statistical Filtering of false positive TD boundaries") print("#########################################################################") if stat_filter: ptm=time.process_time() if verbose: print("-- Matrix Scaling....") # scale_matrix_data = matrix_data.ravel(order='C').copy() # scale_matrix_data = scale_matrix_data.astype(np.float64) # for i in range(2*window_size): # scale_matrix_data[np.arange(i,n_bins*n_bins,1+n_bins)]= scale(scale_matrix_data[np.arange(i,n_bins*n_bins,1+n_bins)]) # #scale_matrix_data[np.arange(i*n_bins,n_bins*n_bins,1+n_bins)]= scale(scale_matrix_data[np.arange(i*n_bins,n_bins*n_bins,1+n_bins)]) # scale_matrix_data=scale_matrix_data.reshape(matrix_data.shape) scale_hic_diag=hic_matrix.copy() scale_hic_diag=scale_hic_diag.astype(np.float64) for i in range(2*window_size): if isinstance(scale_hic_diag,bh.band_hic_matrix): diag=scale_hic_diag.diag(i) diag=scale_array(diag) scale_hic_diag.set_diag(i,diag) elif isinstance(scale_hic_diag,np.ndarray): row_idx=np.arange(n_bins-i) col_idx=row_idx+i scale_hic_diag[row_idx,col_idx]= scale_array(scale_hic_diag[row_idx,col_idx]) else: raise ValueError("scale_hic_diag must be a band_hic_matrix or np.ndarray") #np.savetxt('../test/scale_hic_diag.txt',scale_hic_diag.data,delimiter='\t') if verbose: print("-- Compute p-values by Wilcox Ranksum Test") for i,row in proc_regions.iterrows(): start=row['start'] end=row['end'] if verbose: print("Process Regions from %d to %d"%(start,end)) pvalue[start:end]=Get_Pvalue(scale_hic_diag,start,end,local_ext,size=window_size,scale=1) #print(np.sum(pvalue[start:end]<=0.05)) #print(pvalue) if verbose: print("-- Done!") print("-- Filtering False Positives") local_ext[np.intersect1d(np.argwhere(local_ext==-1),np.argwhere(pvalue<=0.05))]=-2 local_ext[np.argwhere(local_ext==-1)]=0 local_ext[np.argwhere(local_ext==-2)]=-1 if verbose: print("-- Done!") eltm=time.process_time() print("Step 3 Running Time : %f"%(eltm)) print("Step 3 : Done!") signal_idx=np.argwhere(local_ext==-1) gap_idx=np.argwhere(local_ext==-0.5) #print(signal_idx) domains=Convert_Bin_To_Domain( bins, signal_idx, gap_idx, pvalues=pvalue, pvalue_cut=0.05) #print(domains[domains['tag']=='boundary']) bins['local.ext']=local_ext bins['mean.cf']=mean_cf bins['pvalue']=pvalue return domains,bins
def scale_array(x: np.ndarray) -> np.ndarray: """ Scale the input array to have zero mean and unit variance. Parameters ---------- x : np.ndarray Input array to be scaled. Returns ------- np.ndarray Scaled array. """ if x.size == 0: return x _mean = np.mean(x) _std = np.std(x) if _std < 10*np.finfo(_std.dtype).eps: _std = 1.0 # Avoid division by small standard deviation return (x - np.mean(x)) / (np.std(x)) # Avoid division by zero def Get_Diamond_Matrix(mat_data,i,size): n_bins=mat_data.shape[0] if(i==n_bins-1): return(0) lowerbound=max(0,i-size+1) upperbound=min(i+size,n_bins-1) return mat_data[lowerbound:(i+1),(i+1):(upperbound+1)] def Which_Gap_Region2(matrix_data,w): n_bins=matrix_data.shape[0] gap=np.zeros(n_bins) for i in range(n_bins): if np.sum(matrix_data[i,max(0,i-w):min(i+w,n_bins)])==0: gap[i]=-0.5 idx=np.argwhere(gap==-0.5).ravel() return idx def Which_Process_Region(rmv_idx,n_bins,min_size=3): gap_idx=rmv_idx proc_regions=[] proc_set=np.setdiff1d(range(n_bins),rmv_idx) n_proc_set=len(proc_set) i=0 while(i<n_proc_set-1): start=proc_set[i] j=i+1 while(j<n_proc_set): if proc_set[j]-proc_set[j-1]<=1: j=j+1 else: proc_regions.append([start,proc_set[j-1]+1]) i=j break if j>=n_proc_set-1: proc_regions.append([start,proc_set[j-1]+1]) break df_proc_regions=pd.DataFrame(proc_regions,columns=['start','end']) df_proc_regions=df_proc_regions[abs(df_proc_regions['end']-df_proc_regions['start'])>min_size] return df_proc_regions def Detect_Local_Extreme(x): n_bins=len(x) ret=np.zeros(n_bins) x[np.isnan(x)]=0 if(n_bins<=3): #print(np.min(x)) #print(np.min(y)) ret[np.argmin(x)]=-1 ret[np.argmax(x)]=1 return ret new_point=Data_Norm(x=np.arange(n_bins),y=x) x=new_point[1] cp=Change_Point(x=np.arange(n_bins),y=x) if len(cp[0])<=2: return ret if len(cp[0])==n_bins: return ret for i in range(1,len(cp[0])-1): if x[cp[0][i]] >= x[cp[0][i]-1] and x[cp[0][i]] >= x[cp[0][i]+1]: ret[cp[0][i]] = 1 elif x[cp[0][i]] < x[cp[0][i]-1] and x[cp[0][i]] < x[cp[0][i]+1]: ret[cp[0][i]] = -1 min_val = min( x[ cp[0][i-1] ], x[ cp[0][i] ] ) max_val = max( x[ cp[0][i-1] ], x[ cp[0][i] ] ) if min( x[cp[0][i-1]:cp[0][i]+1] ) < min_val: ret[ cp[0][i-1] + np.argmin( x[cp[0][i-1]:cp[0][i]]+1 ) ] = -1 if max( x[cp[0][i-1]:cp[0][i]+1] ) > max_val: ret[ cp[0][i-1] + np.argmax( x[cp[0][i-1]:cp[0][i]]+1 ) ] = 1 return ret def Data_Norm(x,y): ret_x=np.zeros(len(x)) ret_y=np.zeros(len(y)) ret_x[0]=x[0] ret_y[0]=y[0] diff_x=np.diff(x) diff_y=np.diff(y) epsilon=1e-8 scale_x=1/np.mean(np.abs(diff_x)+epsilon) scale_y=1/np.mean(np.abs(diff_y)+epsilon) for i in range(1,len(x)): ret_x[i]=ret_x[i-1]+(diff_x[i-1]*scale_x) ret_y[i]=ret_y[i-1]+(diff_y[i-1]*scale_y) return [ret_x,ret_y] def Change_Point(x,y): if len(x)!=len(y): print("ERROR : The length of x and y should be the same") return 0 n_bins=len(x) Fv=np.array(n_bins*[np.nan]) Ev=np.array(n_bins*[np.nan]) cp=[0] i=0 Fv[0]=0 while i<n_bins-1: j=i+1 Fv[j]=np.sqrt((x[j]-x[i])**2+(y[j]-y[i])**2) while j<n_bins-1: j=j+1 #k=list(range(i+1,j)) Ev[j] = np.sum( np.abs( (y[j]-y[i] )*x[i+1:j] - (x[j] -x[i])*y[i+1:j] - (x[i]*y[j]) + (x[j]*y[i]) ) ) / np.sqrt( (x[j]-x[i])**2 + (y[j] - y[i] )**2 ) Fv[j] = np.sqrt( (x[j]-x[i])**2 + (y[j] - y[i])**2 ) - ( np.sum( np.abs( (y[j]-y[i] )*x[i+1:j] - (x[j] -x[i])*y[i+1:j] - (x[i]*y[j]) + (x[j]*y[i]) ) ) / np.sqrt( (x[j]-x[i])**2 + (y[j] - y[i] )**2 ) ) if (np.isnan(Fv[j])) or (np.isnan(Fv[j-1])): j=j-1 cp.append(j) break if Fv[j]<Fv[j-1]: j=j-1 cp.append(j) break i=j cp.append(n_bins-1) return[cp,Fv,Ev] def Get_Pvalue(matrix_data,start,end,local_ext,size,scale=1): n_bins=end-start pvalue=np.ones(n_bins) index_array=np.intersect1d(np.arange(start,end-1),np.where(local_ext==-1)[0]) if index_array.shape[0]: for i in index_array: dia=Get_Diamond_Matrix2(matrix_data,start,end,i,size=size).ravel() ups=Get_Upstream_Triangle(matrix_data,start,end,i,size=size).ravel() downs=Get_Downstream_Triangle(matrix_data,start,end,i,size=size).ravel() pvalue[i-start]=stats.ranksums(dia*scale,np.append(ups,downs),alternative='less',nan_policy='omit').pvalue pvalue[np.isnan(pvalue)]=1 return pvalue def Get_Diamond_Matrix2(mat_data,start,end,i,size): n_bins=end-start new_mat=np.array([[np.nan]*size]*size) for k in range(size): lower = min(i+1,end-1) upper = min(i+size+1,end) if i-k>=start and i<end-1: new_mat[size-k-1,0:upper-lower]=mat_data[i-k,lower:upper] return new_mat def Get_Upstream_Triangle(mat_data,start,end,i,size): lower=max(start,i-size) tmp_mat=mat_data[lower:(i+1),lower:(i+1)] return tmp_mat[np.triu_indices(tmp_mat.shape[0],k=1)] def Get_Downstream_Triangle(mat_data,start,end,i,size): if i==end-1: return np.nan upper=min(i+size+1,end) tmp_mat=mat_data[i+1:upper,i+1:upper] return tmp_mat[np.triu_indices(tmp_mat.shape[0],k=1)] def Convert_Bin_To_Domain(bins,signal_idx,gap_idx,pvalues=None,pvalue_cut=None): n_bins=len(bins) rmv_idx=np.setdiff1d(np.arange(n_bins),gap_idx) proc_region=Which_Process_Region(rmv_idx,n_bins,min_size=0) #print(proc_region) from_coord=bins.loc[proc_region['start'],'from.coord'] n_procs=len(proc_region) gap=pd.DataFrame({'from.coord':from_coord,'tag':['gap']*n_procs}) rmv_idx2=np.append(signal_idx,gap_idx) rmv_idx2=np.sort(rmv_idx2) proc_region=Which_Process_Region(rmv_idx2,n_bins,min_size=0) from_coord=bins.loc[proc_region['start'],'from.coord'] n_procs=len(proc_region) #print(proc_region) domain=pd.DataFrame({'from.coord':from_coord,'tag':['domain']*n_procs}) #summary=gap.append(domain) summary=pd.concat([gap,domain]) #print(summary) rmv_idx=np.setdiff1d(np.arange(n_bins),signal_idx) proc_region=Which_Process_Region(rmv_idx,n_bins,min_size=1) n_procs=len(proc_region) if n_procs>0: from_coord=bins.loc[proc_region['start'],'from.coord'] boundary=pd.DataFrame({'from.coord':from_coord,'tag':['boundary']*n_procs}) #summary=summary.append(boundary) summary=pd.concat([summary,boundary]) summary=summary.sort_values(by=['from.coord']) #print(summary) # todo #to_coord=summary['from.coord'].iloc[1:len(summary)].append(pd.Series([bins.loc[n_bins-1,'to.coord']])) to_coord=pd.concat([summary['from.coord'].iloc[1:len(summary)],pd.Series([bins.loc[n_bins-1,'to.coord']])]) to_coord.index=summary.index summary['to.coord']=to_coord summary['from.id']=summary.index to_id=list(summary.index)[1:len(summary.index)] to_id.append(n_bins) to_id=pd.Series(to_id,index=summary.index) summary['to.id']=to_id # summary['size']=summary['to.coord']-summary['from.coord'] # summary['span']=summary['to.id']-summary['from.id'] summary['chr']=bins['chr'].values[0] summary=summary[['chr','from.id','from.coord','to.id','to.coord','tag']] #print(summary) if np.any(pvalues) and pvalue_cut!=None: for i,row in summary.iterrows(): if row['tag']=='domain': domain_bins_idx=np.arange(row['from.id'],row['to.id']) p_value_constr=np.argwhere(pvalues[domain_bins_idx]<pvalue_cut) if(len(domain_bins_idx)==len(p_value_constr)): row['tag']='boundary' return summary