# -*- coding: utf-8 -*-
# HiCKRy.py
"""
HiCKRy: A tool for Hi-C contact matrix bias correction
Author: Weibing Wang
Date: 2025-06-11
Email: wangweibing@xidian.edu.cn
This module provides functions to load Hi-C interaction data, compute bias values, and apply the Knight-Ruiz normalization algorithm.
It is designed to work with Hi-C data in the `.hic` format and can handle large datasets efficiently.
This code is modified from the original HiCKRy implementation by the Ay Lab, we added some features and optimizations for usability.
These are two ways to run this script:
1. Command Line Interface (CLI):
python HiCKRy.py -i interactions.gz -f fragments.gz -o bias_output.gz
2. Import as a module:
from bandhic.utils.HiCKRy import hickry
bias, is_valid = hickry(hic_coo, verbose=True)
This module is part of the BandHiC package, which provides a banded Hi-C matrix toolkit.
Reference:
1. Kaul, A., Bhattacharyya, S. & Ay, F. Identifying statistically significant chromatin contacts from Hi-C data with FitHiC2. Nature Protocols 15, 991–1012 (2020).
"""
import gzip
import argparse
import sys
import numpy as np
import scipy.sparse as sps
import time
__all__ = [
"compute_bin_bias",
]
[docs]
def compute_bin_bias(hic_coo,verbose=False):
"""
Compute bias values for Hi-C contact matrices using the Knight-Ruiz normalization algorithm.
Parameters
----------
hic_coo : scipy.sparse.coo_array
A sparse COO matrix representing Hi-C contact data.
verbose : bool, optional
If True, print detailed information during processing. Default is False.
Returns
-------
bias : numpy.ndarray
A 1D array containing the bias values for each bin in the Hi-C matrix.
is_valid : bool
A boolean indicating whether the bias vector is valid (mean and median within typical range).
Notes
-----
This function removes a specified percentage of the most sparse bins from the Hi-C matrix before computing
the bias values. The Knight-Ruiz normalization algorithm is applied to the modified matrix to compute the bias.
The function iteratively removes bins with low interaction counts until a valid bias vector is obtained.
The bias vector is expected to have a mean and median close to 1, indicating balanced interaction frequencies across bins.
"""
zero_ratio=1-np.unique(hic_coo.row[hic_coo.row!=hic_coo.col]).shape[0]/hic_coo.shape[0]
is_valid=False
hic_coo=hic_coo+hic_coo.T
for sparseToRemoveT in np.arange(zero_ratio,1,0.05):
if verbose:
print('%.2f of bins are removed' % sparseToRemoveT)
try:
bias,is_valid = returnBias(hic_coo, sparseToRemoveT, verbose)
except Exception as e:
if verbose:
print(e)
continue
#checkBias(bias)
if is_valid:
return bias.ravel(),is_valid
else:
if not is_valid:
print("WARNING... Bias vector has a median or mean outside of typical range (0.8, 1.2).")
return bias.ravel(),is_valid
def parse_args(arguments):
parser = argparse.ArgumentParser(description="Check help flag")
parser.add_argument("-i", "--interactions", help="Path to the interactions file to generate bias values",required=True, type=str)
parser.add_argument("-f", "--fragments", help="Path to the interactions file to generate bias values",required=True, type=str)
parser.add_argument("-o", "--output", help="Full path to output the generated bias file to", required=True, type=str)
parser.add_argument("-x", "--percentOfSparseToRemove", help="Percent of diagonal to remove", required=False, type=float, default=0.05)
return parser.parse_args()
def loadfastfithicInteractions(interactionsFile, fragsFile, verbose):
if verbose:
print("Creating sparse matrix...")
startT = time.time()
with gzip.open(fragsFile, 'rt') as frag:
ctr = 0
fragDic = {}
revFrag = []
for lines in frag:
line = lines.rstrip().split()
chrom = line[0]
mid = int(line[2])
if chrom not in fragDic:
fragDic[chrom]={}
fragDic[chrom][mid]=ctr
revFrag.append((chrom,mid))
ctr+=1
x = []
y = []
z = []
with gzip.open(interactionsFile, 'rt') as ints:
for lines in ints:
line = lines.rstrip().split()
chrom1 = line[0]
mid1 = int(line[1])
chrom2 = line[2]
mid2 = int(line[3])
z.append(float(line[4]))
x.append(fragDic[chrom1][mid1])
y.append(fragDic[chrom2][mid2])
x = np.asarray(x)
y = np.asarray(y)
z = np.asarray(z)
sparseMatrix = sps.coo_array((z, (x,y)), shape=(ctr,ctr))
endT = time.time()
if verbose:
print("Sparse matrix creation took %s seconds" % (endT-startT))
return sparseMatrix, revFrag
def returnBias(rawMatrix, perc, verbose):
rawMatrix=rawMatrix.copy()
R = rawMatrix.sum()
mtxAndRemoved = removeZeroDiagonalCSR(rawMatrix, perc,verbose)
if verbose:
print("Sparse rows removed")
initialSize = rawMatrix.shape
if verbose:
print("Initial matrix size: %s rows and %s columns" % (initialSize[0], initialSize[1]))
rawMatrix = mtxAndRemoved[0]
removed = mtxAndRemoved[1]
newSize = rawMatrix.shape
if verbose:
print("New matrix size: %s rows and %s columns" % (newSize[0], newSize[1]))
print("Normalizing with KR Algorithm")
result = knightRuizAlg(rawMatrix,verbose=verbose)
colVec = result[0]
#x = sps.diags(colVec.flatten(), 0, format='csr')
bias = computeBiasVector(colVec)
is_valid=checkBias(bias,verbose)
biasWZeros = addZeroBiases(removed, bias)
return biasWZeros,is_valid
def removeZeroDiagonalCSR(mtx, perc, verbose):
iteration = 0
toRemove = []
ctr = 0
rowSums = mtx.sum(axis=0)
rowSums = list(np.array(rowSums).reshape(-1,))
rowSums = list(enumerate(rowSums))
rowSums.sort(key=lambda tup: tup[1])
size = len(rowSums)
rem = int(np.ceil(perc * size))
if verbose:
print("Removing %s percent of most sparse bins" % (perc))
print("... corresponds to %s total rows" % (rem))
valToRemove = rowSums[rem][1]
#print(valToRemove)
if verbose:
print("... corresponds to all bins with less than or equal to %s total interactions" % valToRemove)
for value in rowSums:
if value[1] <= valToRemove:
toRemove.append(value[0])
list(set(toRemove))
toRemove.sort()
mtx = dropcols_coo(mtx, toRemove)
for num in toRemove:
if iteration != 0:
num -= iteration
removeRowCSR(mtx,num)
iteration +=1
return [mtx, toRemove]
def computeBiasVector(x):
one = np.ones((x.shape[0],1))
x = one/x
sums = np.sum(x)
avg = (1.0*sums)/x.shape[0]
bias = np.divide(x,avg)
return bias
def addZeroBiases(lst, vctr):
for values in lst:
vctr = np.insert(vctr,values,-1,axis=0)
return vctr
def dropcols_coo(M, idx_to_drop):
idx_to_drop = np.unique(idx_to_drop)
C = M.tocoo()
keep = ~np.in1d(C.col, idx_to_drop)
C.data, C.row, C.col = C.data[keep], C.row[keep], C.col[keep]
C.col -= idx_to_drop.searchsorted(C.col) # decrement column indices
C._shape = (C.shape[0], C.shape[1] - len(idx_to_drop))
return C.tocsr()
def removeRowCSR(mat, i):
if not isinstance(mat, (sps.csr_array,sps.csr_matrix)):
raise ValueError("works only for CSR format -- use .tocsr() first")
n = mat.indptr[i+1] - mat.indptr[i]
if n > 0:
mat.data[mat.indptr[i]:-n] = mat.data[mat.indptr[i+1]:]
mat.data = mat.data[:-n]
mat.indices[mat.indptr[i]:-n] = mat.indices[mat.indptr[i+1]:]
mat.indices = mat.indices[:-n]
mat.indptr[i:-1] = mat.indptr[i+1:]
mat.indptr[i:] -= n
mat.indptr = mat.indptr[:-1]
mat._shape = (mat._shape[0]-1, mat._shape[1])
def knightRuizAlg(A, tol=1e-6, f1 = False, verbose=False):
n = A.shape[0]
e = np.ones((n,1), dtype = np.float64)
res = []
Delta = 3
delta = 0.1
x0 = np.copy(e)
g = 0.9
etamax = eta = 0.1
stop_tol = tol*0.5
x = np.copy(x0)
rt = tol**2.0
v = x * (A.dot(x))
rk = 1.0 - v
rho_km1 = ((rk.transpose()).dot(rk))[0,0]
rho_km2 = rho_km1
rout = rold = rho_km1
MVP = 0 #we'll count matrix vector products
i = 0 #outer iteration count
if f1:
print ("it in. it res\n"),
while rout > rt: #outer iteration
i += 1
if i > 30:
if verbose:
print('iteration times is overflow!')
break
k = 0
y = np.copy(e)
innertol = max(eta ** 2.0 * rout, rt)
while rho_km1 > innertol: #inner iteration by CG
k += 1
if k == 1:
Z = rk / v
p = np.copy(Z)
#rho_km1 = np.dot(rk.T, Z)
rho_km1 = (rk.transpose()).dot(Z)
else:
beta = rho_km1 / rho_km2
p = Z + beta * p
if k > 10:
if verbose:
print('inner iteation is overflow!')
break
#update search direction efficiently
w = x * A.dot(x * p) + v * p
# alpha = rho_km1 / np.dot(p.T, w)[0,0]
alpha = rho_km1 / (((p.transpose()).dot(w))[0,0])
ap = alpha * p
#test distance to boundary of cone
ynew = y + ap
if np.amin(ynew) <= delta:
if delta == 0:
break
ind = np.where(ap < 0.0)[0]
gamma = np.amin((delta - y[ind]) / ap[ind])
y += gamma * ap
break
if np.amax(ynew) >= Delta:
ind = np.where(ynew > Delta)[0]
gamma = np.amin((Delta - y[ind]) / ap[ind])
y += gamma * ap
break
y = np.copy(ynew)
rk -= alpha * w
rho_km2 = rho_km1
Z = rk / v
#rho_km1 = np.dot(rk.T, Z)[0,0]
rho_km1 = ((rk.transpose()).dot(Z))[0,0]
x *= y
v = x * (A.dot(x))
rk = 1.0 - v
#rho_km1 = np.dot(rk.T, rk)[0,0]
rho_km1 = ((rk.transpose()).dot(rk))[0,0]
rout = rho_km1
MVP += k + 1
#update inner iteration stopping criterion
rat = rout/rold
rold = rout
res_norm = rout ** 0.5
eta_o = eta
eta = g * rat
if g * eta_o ** 2.0 > 0.1:
eta = max(eta, g * eta_o ** 2.0)
eta = max(min(eta, etamax), stop_tol / res_norm)
if f1:
print ("%03i %06i %03.3f %e %e \n" % (i, k, res_norm, rt, rout))
res.append(res_norm)
if f1:
print ("Matrix - vector products = %06i\n" % (MVP))
return [x,i,k]
def checkBias(biasvec,verbose):
is_valid=False
std = np.std(biasvec)
mean = np.mean(biasvec)
median = np.median(biasvec)
if (mean < 0.8 or mean > 1.2):
if verbose:
print("WARNING... Bias vector has a mean outside of typical range (0.8, 1.2).")
print("Consider running with a larger -x option if problems occur")
print("Mean\t%s" % mean)
print("Median\t%s" % median)
print("Std. Dev.\t%s" % std)
is_valid=False
elif (median<0.8 or median > 1.2):
if verbose:
print("WARNING... Bias vector has a median outside of typical range (0.8, 1.2).")
print("Consider running with a larger -x option if problems occur")
print("Mean\t%s" % mean)
print("Median\t%s" % median)
print("Std. Dev.\t%s" % std)
is_valid=False
elif (0.8 <=median <=1.2 and 0.8 <=mean <=1.2):
if verbose:
print("Mean\t%s" % mean)
print("Median\t%s" % median)
print("Std. Dev.\t%s" % std)
is_valid=True
else:
is_valid=False
return is_valid
def outputBias(biasCol, revFrag, outputFilePath):
bpath = outputFilePath
with gzip.open(bpath,'wt') as biasFile:
ctr = 0
for values in np.nditer(biasCol):
chrommidTup = revFrag[ctr]
chrom = chrommidTup[0]
mid = chrommidTup[1]
biasFile.write("%s\t%s\t%s\n" % (chrom, mid, values))
ctr += 1
def main():
args = parse_args(sys.argv[3:])
matrix,revFrag = loadfastfithicInteractions(args.interactions, args.fragments,verbose=True)
bias,is_valid=compute_bin_bias(matrix,verbose=True)
if is_valid:
outputBias(bias, revFrag, args.output)
else:
print('Error!!!')
if __name__=="__main__":
main()