~jb753/clusterfunc

312638fd0faaedc91616d58641018c6c8f478b76 — James Brind 11 months ago 1bb6922
Implement Vinokur clustering
M clusterfunc/double.py => clusterfunc/double.py +142 -151
@@ 4,6 4,47 @@ from clusterfunc.exceptions import ClusteringException
import clusterfunc.check
import clusterfunc.util
import clusterfunc.single
import clusterfunc.plot

import warnings


def _vinokur(ds, N):
    """Two sided analytic clustering function after Vinokur."""

    s0 = 1.0 / N / ds[0]
    s1 = N * ds[1]

    A = np.sqrt(s0 * s1)
    B = np.sqrt(s0 / s1)

    xi = np.linspace(0.0, 1.0, N)

    if np.abs(B - 1.0) < 0.001:
        # Eqn. (52)
        u = xi * (1.0 + 2.0 * (B - 1.0) * (xi - 0.5) * (1.0 - xi))
    elif B < 1.0:
        # Solve Eqn. (49)
        Dx = _invert_sinx_x(B)
        assert np.isclose(np.sin(Dx) / Dx, B, rtol=1e-1)
        # Eqn. (50)
        u = 0.5 * (1.0 + np.tan(Dx * (xi - 0.5)) / np.tan(Dx / 2.0))
    elif B >= 1.0:
        # Solve Eqn. (46)
        Dy = _invert_sinhx_x(B)
        assert np.isclose(np.sinh(Dy) / Dy, B, rtol=1e-1)
        # Eqn. (47)
        u = 0.5 * (1.0 + np.tanh(Dy * (xi - 0.5)) / np.tanh(Dy / 2.0))
    else:
        raise Exception(f"Unexpected B={B}, s0={s0}, s1={s1}")

    t = u / (A + (1.0 - A) * u)

    # Force to unit interval
    t -= t[0]
    t /= t[-1]

    return t


def _invert_sinhx_x(y):


@@ 60,179 101,129 @@ def _invert_sinx_x(y):
    return np.where(y < 0.26938972, x_low, x_high)


def clu(ds, N):
    """Two sided analytic clustering function after Vinokur."""

    s0 = 1./N/ds[0]
    s1 = N*ds[1]

    A = np.sqrt(s0 * s1)
    B = np.sqrt(s0 / s1)

    xi = np.linspace(0.0, 1.0, N)

    if np.abs(B - 1.0) < 0.001:
        # Eqn. (52)
        u = xi * (1.0 + 2.0 * (B - 1.0) * (xi - 0.5) * (1.0 - xi))
    elif B < 1.0:
        # Solve Eqn. (49)
        Dx = _invert_sinx_x(B)
        assert np.isclose(np.sin(Dx) / Dx, B, rtol=1e-1)
        # Eqn. (50)
        u = 0.5 * (1.0 + np.tan(Dx * (xi - 0.5)) / np.tan(Dx / 2.0))
    elif B >= 1.0:
        # Solve Eqn. (46)
        Dy = _invert_sinhx_x(B)
        assert np.isclose(np.sinh(Dy) / Dy, B, rtol=1e-1)
        # Eqn. (47)
        u = 0.5 * (1.0 + np.tanh(Dy * (xi - 0.5)) / np.tanh(Dy / 2.0))
    else:
        breakpoint()
        raise Exception(f"Unexpected B={B}, s0={s0}, s1={s1}")

    t = u / (A + (1.0 - A) * u)

    # Force to unit interval
    t-=t[0]
    t/=t[-1]

    assert t[0] == 0.
    assert np.isclose(t[-1] , 1.)

    return t

def clu_free(ds, dmax, ERmax, mult=8):
    """"""

    n = 1
    maxiter = 50
def _unit_fixed(dx0, dx1, N, rtol=1e-3):
    """General double-side clustering with fixed number of points"""
    maxiter = 100
    dx = np.array([dx0, dx1])
    dx_in = np.copy(dx)
    for _ in range(maxiter):
        N = mult*n + 1
        x = clu(ds, N)
        dx = np.diff(x)
        ER = clusterfunc.util.ER(x)
        if (ER < ERmax).all() and (dx <= dmax).all():
        x = _vinokur(dx_in, N)
        dx_out = np.diff(x)[(0, -1),]
        fac_dx = dx_out / dx
        if (np.abs(fac_dx - 1.0) < rtol).all():
            break
        else:
            n += 1

            dx_in /= fac_dx
    return x

N = 31
ds = (1e-3, 5e-3)
dmax = 0.1
x = clu(ds, N)
x = clu_free(ds, dmax, 1.2)
dx = np.diff(x)
# dx_end = 1./N/np.array([slopes[0], 1./slopes[1]])
ER = clusterfunc.util.ER(x)
import matplotlib.pyplot as plt
import clusterfunc.plot
clusterfunc.plot.plot(x)
clusterfunc.plot.plot_ER(x)
fig, ax = plt.subplots()
ax.plot(dx)
ax.plot([0.,len(x)-2], ds, 'k*')
plt.show()

def _unit_fixed_N(Dmin0, Dmin1, Dmax, ERmax, N):
    """"""
    if N < 3:
        raise ClusteringException(
            f"Not enough points N={N} to double cluster, need N>=3"
        )

    def _guess_xj(Dmin0, Dmin1, Dmax, ERmax, Na, Nb, xj):
        # Initially assume exact halves
        La = xj
        Lb = 1.0 - xj

        # Evaluate the halves
        xa = clusterfunc.single.fixed_N(0.0, La, Dmin0, Dmax, ERmax, Na)
        xb1 = clusterfunc.single.fixed_N(0.0, Lb, Dmin1, Dmax, ERmax, Nb)
        xb = 1.0 - np.flip(xb1)
        x = np.concatenate((xa, xb[1:]))

        dxa = np.diff(xa)[-1]
        dxb = np.diff(xb)[0]

        return x, dxa, dxb

    # Initial guess of splits
    frac = None
    xj = None

    g = (0.5, 0.4, 0.6, 0.3, 0.7)
    for xjg in g:
        for fracg in g:
            Na, Nb = clusterfunc.util.split_cells(N, fracg)
def _unit_free(dx0, dx1, dmax, ERmax, mult=8, rtol=1e-3):
    """General double-side clustering with free number of points"""
    n = 1
    maxiter = 1000
    flag = False
    with warnings.catch_warnings():
        warnings.filterwarnings("error")
        for _ in range(maxiter):
            N = mult * n + 1
            try:
                _guess_xj(Dmin0, Dmin1, Dmax, ERmax, Na, Nb, xjg)
                xj = xjg
                frac = fracg
            except ClusteringException:
                x = _unit_fixed(dx0, dx1, N, rtol)
            except RuntimeWarning:
                n += 1
                continue
            dx = np.diff(x)
            ER = clusterfunc.util.ER(x)
            if (ER < ERmax).all() and (dx <= dmax).all():
                flag = True
                break
            else:
                n += 1

    if xj is None:
        raise ClusteringException('Could not find initial guess of join point')
    if not flag:
        raise ClusteringException("Could not double cluster")

    return x

    # Iterate on the number of cells split to equalise ER
    maxiter = 100
    k_frac = 0.1  # Constant of proportionality for changes in frac wrt ER
    print('iterating frac')
    for _ in range(maxiter):

        # Split number of cells 
        Na, Nb = clusterfunc.util.split_cells(N, frac)
def fixed(x0, x1, dx0, dx1, N):
    """Double-sided clustering between two values with with fixed number of points.

        print('iterating xj')
        for i in range(maxiter):
            x, dxa, dxb = _guess_xj(Dmin0, Dmin1, Dmax, ERmax, Na, Nb, xj)
    Generate a grid vector x of length N from x0 to x1. Use Vinokur stretching
    from specified spacings at x0 and x1. Expansion ratio and maximum spacing
    are not controlled.

            if dxa > dxb:
                ERnow = dxa / dxb
            else:
                ERnow = dxb / dxa
    Parameters
    ----------
    x0: float
        Start value.
    x1: float
        End value.
    dx0 float
        Boundary spacing at x0.
    dx1 float
        Boundary spacing at x1.
    N: int
        Number of points in the grid vector.

            print(i,':', frac, xj, ERnow)
            if ERnow < ERmax:
                print('xj converged, breaking')
                break
            else:
                xj -= dxa - dxb
            print('new xj', xj)
    Returns
    -------
    x: array
        Grid vector of clustered points.

        ER = clusterfunc.util.ER(x)[(0,-1),]
        dER = np.diff(ER)[0]
        if np.abs(dER)<0.01:
            print('ER converged, breaking')
            break
    """

        frac -= np.clip(k_frac*dER,-0.05, 0.05)
        frac = np.clip(frac, 0.2, 0.8)
    if np.isclose(x0, x1):
        raise ClusteringException(
            f"Cannot distribute points without distinct start and end points, got x0={x0} and x1={x1}"
        )

        print('new frac', frac)
    assert len(x) == N
    # Scale the unit distribution between the given values
    Dx = x1 - x0
    Dxa = np.abs(Dx)
    x = x0 + Dx * _unit_fixed(dx0 / Dxa, dx1 / Dxa, N)

    return x

def _unit_free_N(Dmin0, Dmin1, Dmax, ERmax, mult=8):
    """Double-sided clustering on the unit interval with free number of points."""

    # Start from a low guess
    n = 1
    x = None
    for _ in range(1000):
        try:
            x = _unit_fixed_N(Dmin0, Dmin1, Dmax, ERmax, n*mult + 1)
            flag = True
            break
        except ClusteringException:
            n += 1
            continue
def free(x0, x1, dx0, dx1, dmax, ERmax, mult=8):
    """Double-sided clustering between two values with with fixed number of points.

    Generate a grid vector x from x0 to x1. Use Vinokur stretching from
    specified spacings at x0 and x1. Increase the number of points until
    maximum spacing and expansion ratio criteria are satisfied.

    Parameters
    ----------
    x0: float
        Start value.
    x1: float
        End value.
    dx0 float
        Boundary spacing at x0.
    dx1 float
        Boundary spacing at x1.
    dmax: float
        Maximum spacing.
    ERmax: float
        Expansion ratio > 1.
    mult: int
        Choose a number of cells divisible by this factor.

    Returns
    -------
    x: array
        Grid vector of clustered points.

    """

    if np.isclose(x0, x1):
        raise ClusteringException(
            f"Cannot distribute points without distinct start and end points, got x0={x0} and x1={x1}"
        )

    if x is None:
        raise ClusteringException('Could not bracked valid clustering')
    # Scale the unit distribution between the given values
    Dx = x1 - x0
    Dxa = np.abs(Dx)
    x = x0 + Dx * _unit_free(dx0 / Dxa, dx1 / Dxa, dmax / Dxa, ERmax, mult)

    return x

M clusterfunc/single.py => clusterfunc/single.py +61 -55
@@ 6,31 6,31 @@ from scipy.optimize import root_scalar
import clusterfunc.check


def _unit_fixed_N(Dmin, Dmax, ERmax, N, check=True):
def _unit_fixed(dmin, dmax, ERmax, N, check=True):
    """Single-sided clustering on the unit interval with specified number of points."""

    if N < 2:
        raise ClusteringException(f"Need at least two points to cluster, N={N}")

    if Dmax <= Dmin:
        raise ClusteringException(f"Dmax={Dmax} should be > Dmin={Dmin}")
    if dmax <= dmin:
        raise ClusteringException(f"dmax={dmax} should be > dmin={dmin}")

    if ERmax <= 1:
        raise ClusteringException(f"ERmax={ERmax} should be > 1")

    # Rule out having too many points for a uniform spacing at Dmin
    Nunif = np.ceil(1.0 / Dmin).astype(int) + 1
    # Rule out having too many points for a uniform spacing at dmin
    Nunif = np.ceil(1.0 / dmin).astype(int) + 1
    if N > Nunif:
        raise ClusteringException(
            f"Too many points N={N} for a uniform spacing of Dmin={Dmin}."
            f"Too many points N={N} for a uniform spacing of dmin={dmin}."
        )

    # Rule out too few points to reach unit length, regardless of capping
    Nfull = np.ceil(np.log(Dmax / Dmin) / np.log(ERmax)).astype(int) + 1
    Lfull = Dmin * (1.0 - ERmax ** (Nfull - 1)) / (1.0 - ERmax)
    Nfull = np.ceil(np.log(dmax / dmin) / np.log(ERmax)).astype(int) + 1
    Lfull = dmin * (1.0 - ERmax ** (Nfull - 1)) / (1.0 - ERmax)
    if N < Nfull and Lfull < 1.0:
        raise ClusteringException(
            f"Not enough points N={N} to reach unit length for ERmax={ERmax}, Dmin={Dmin}, regardless of Dmax., L={Lfull}, Nfull={Nfull}"
            f"Not enough points N={N} to reach unit length for ERmax={ERmax}, dmin={dmin}, regardless of dmax., L={Lfull}, Nfull={Nfull}"
        )

    # Assume we do not need to cap the spacing


@@ 39,9 39,9 @@ def _unit_fixed_N(Dmin, Dmax, ERmax, N, check=True):
    Np1 = N + 1

    def _iter_ER(ERi):
        y = Dmin * (1.0 - ERi**Nm1) / (1.0 - ERi) - 1.0
        y = dmin * (1.0 - ERi**Nm1) / (1.0 - ERi) - 1.0
        dy = (
            Dmin
            dmin
            * (Nm2 * ERi**Np1 - Nm1 * ERi**N + ERi**2)
            / (1.0 - ERi) ** 2
            / ERi**2


@@ 51,11 51,11 @@ def _unit_fixed_N(Dmin, Dmax, ERmax, N, check=True):
    # If we cannot reach irregardless of capping, give up
    if _iter_ER(ERmax)[0] < 0.0:
        raise ClusteringException(
            f"Insufficient points N={N} to reach unit length with Dmin={Dmin}, ERmax={ERmax}, no capping to Dmax needed."
            f"Insufficient points N={N} to reach unit length with dmin={dmin}, ERmax={ERmax}, no capping to dmax needed."
        )

    # Check for a uniform solution first
    err_unif = 1.0 / (N - 1) - Dmin
    err_unif = 1.0 / (N - 1) - dmin
    rtol = 1e-9
    if np.abs(err_unif) < rtol:
        ERnocap = 1.0


@@ 65,55 65,55 @@ def _unit_fixed_N(Dmin, Dmax, ERmax, N, check=True):
        assert soln.converged
        ERnocap = soln.root

    Dnocap = Dmin * ERnocap**Nm2
    Dnocap = dmin * ERnocap**Nm2

    # If capping constraint is satisfied, but ER is too big, we don't have enough points
    if ERnocap > ERmax and Dnocap <= Dmax:
    if ERnocap > ERmax and Dnocap <= dmax:
        raise ClusteringException(
            f"Insufficient points to reach unit length with Dmin={Dmin}, ERmax={ERmax}, no capping to Dmax needed."
            f"Insufficient points to reach unit length with dmin={dmin}, ERmax={ERmax}, no capping to dmax needed."
        )
    # If ERnocap and Dnocap satisfy our constraints, we have the solution
    elif ERnocap <= ERmax and Dnocap <= Dmax:
        dx = Dmin * ERnocap ** np.arange(Nm1)
    elif ERnocap <= ERmax and Dnocap <= dmax:
        dx = dmin * ERnocap ** np.arange(Nm1)
    # We need to have a capped section
    else:
        # Length as a function of spacing cap, with ERmax constant
        def _iter_Dmax(ERi, Dmaxi):
            Nstretch_i = np.ceil(np.log(Dmaxi / Dmin) / np.log(ERi)).astype(int) + 1
            Lstretch_i = Dmin * (1.0 - ERi ** (Nstretch_i - 1)) / (1.0 - ERi)
            dx1max_i = Dmin * ERi ** (Nstretch_i - 2)
        def _iter_dmax(ERi, dmaxi):
            Nstretch_i = np.ceil(np.log(dmaxi / dmin) / np.log(ERi)).astype(int) + 1
            Lstretch_i = dmin * (1.0 - ERi ** (Nstretch_i - 1)) / (1.0 - ERi)
            dx1max_i = dmin * ERi ** (Nstretch_i - 2)
            Ncap_i = N - Nstretch_i
            Lcap_i = Ncap_i * 0.5 * (dx1max_i + Dmaxi)
            Lcap_i = Ncap_i * 0.5 * (dx1max_i + dmaxi)
            return Lstretch_i + Lcap_i - 1.0

        # If we use all points at max ER and don't have enough length, will not work
        LL = _iter_Dmax(ERmax, Dmax)
        LL = _iter_dmax(ERmax, dmax)
        if LL < 0.0:
            raise ClusteringException(
                f"Not enough points to cluster with Dmin={Dmin}, ERmax={ERmax}, Dmax={Dmax} capping, total length only {LL+1}."
                f"Not enough points to cluster with dmin={dmin}, ERmax={ERmax}, dmax={dmax} capping, total length only {LL+1}."
            )

        # Binary search for cap distance that gives closes to unit length
        Dmax_high = Dmax
        Dmax_low = Dmin
        while Dmax_high - Dmax_low > 0.01 * Dmax:
            Dmax_new = 0.5 * (Dmax_high + Dmax_low)
            err_new = _iter_Dmax(ERmax, Dmax_new)
        dmax_high = dmax
        dmax_low = dmin
        while dmax_high - dmax_low > 0.01 * dmax:
            dmax_new = 0.5 * (dmax_high + dmax_low)
            err_new = _iter_dmax(ERmax, dmax_new)
            if err_new > 0:
                Dmax_high = Dmax_new
                dmax_high = dmax_new
            else:
                Dmax_low = Dmax_new
                dmax_low = dmax_new

        # Fine-tune the expansion ratio for exactly unit length
        soln = root_scalar(_iter_Dmax, bracket=(1.0 + 1e-9, ERmax), args=(Dmax_high,))
        soln = root_scalar(_iter_dmax, bracket=(1.0 + 1e-9, ERmax), args=(dmax_high,))
        assert soln.converged
        ER = soln.root

        # Evaluate the clustering
        Nstretch = np.ceil(np.log(Dmax_high / Dmin) / np.log(ER)).astype(int) + 1
        Nstretch = np.ceil(np.log(dmax_high / dmin) / np.log(ER)).astype(int) + 1
        Ncap = N - Nstretch
        dx1 = Dmin * ER ** np.arange(0, Nstretch - 1)
        dx2 = np.linspace(dx1[-1], Dmax_high, Ncap)
        dx1 = dmin * ER ** np.arange(0, Nstretch - 1)
        dx2 = np.linspace(dx1[-1], dmax_high, Ncap)
        dx = np.concatenate((dx1, dx2))

    x = clusterfunc.util.cumsum0(dx)


@@ 126,18 126,18 @@ def _unit_fixed_N(Dmin, Dmax, ERmax, N, check=True):
        )

    if check:
        clusterfunc.check.unit_single(x, Dmin, Dmax, ERmax)
        clusterfunc.check.unit_single(x, dmin, dmax, ERmax)

    return x


def _unit_free_N(Dmin, Dmax, ERmax, mult=8):
def _unit_free(dmin, dmax, ERmax, mult=8):
    """Single-sided clustering on the unit interval with free number of points."""

    # Find high and low guesses for n, N = mult*n + 1
    nlow = np.round(1.0 / Dmax / mult).astype(int)
    nlow = np.round(1.0 / dmax / mult).astype(int)
    nhigh = np.minimum(
        np.floor(1.0 / Dmin / mult).astype(int), 1024
        np.floor(1.0 / dmin / mult).astype(int), 1024
    )  # Limit to protect against overflow
    if nlow > nhigh:
        nlow = 1


@@ 148,7 148,7 @@ def _unit_free_N(Dmin, Dmax, ERmax, mult=8):

    Nhigh = nhigh * mult + 1
    try:
        x = _unit_fixed_N(Dmin, Dmax, ERmax, Nhigh, check=True)
        x = _unit_fixed(dmin, dmax, ERmax, Nhigh, check=True)
    except ClusteringException as e:
        print(e)
        raise ClusteringException(


@@ 163,7 163,7 @@ def _unit_free_N(Dmin, Dmax, ERmax, mult=8):

        # Attempt to cluster
        try:
            x = _unit_fixed_N(Dmin, Dmax, ERmax, Nnew, check=True)
            x = _unit_fixed(dmin, dmax, ERmax, Nnew, check=True)
            # New n is valid, use as high limiit
            nhigh = nnew
        except ClusteringException:


@@ 173,7 173,7 @@ def _unit_free_N(Dmin, Dmax, ERmax, mult=8):
    return x


def fixed_N(x0, x1, Dmin, Dmax, ERmax, N, check=True):
def fixed(x0, x1, dmin, dmax, ERmax, N, check=True):
    """Single-sided clustering between two values with with fixed number of points.

    Generate a grid vector x of length N from x0 to x1. Use geometric


@@ 186,9 186,9 @@ def fixed_N(x0, x1, Dmin, Dmax, ERmax, N, check=True):
        Start value.
    x1: float
        End value.
    Dmin: float
        Minimum spacing at zero.
    Dmax: float
    dmin: float
        Minimum spacing at start.
    dmax: float
        Maximum spacing.
    ERmax: float
        Expansion ratio > 1.


@@ 201,17 201,21 @@ def fixed_N(x0, x1, Dmin, Dmax, ERmax, N, check=True):
        Grid vector of clustered points.

    """

    if np.isclose(x0, x1):
        raise ClusteringException(
            f"Cannot distribute points without distinct start and end points, got x0={x0} and x1={x1}"
        )
    dx = x1 - x0
    dxa = np.abs(dx)
    x = x0 + dx * _unit_fixed_N(Dmin / dxa, Dmax / dxa, ERmax, N, check)

    # Scale the unit distribution between the given values
    Dx = x1 - x0
    Dxa = np.abs(Dx)
    x = x0 + Dx * _unit_fixed(dmin / Dxa, dmax / Dxa, ERmax, N, check)

    return x


def free_N(x0, x1, Dmin, Dmax, ERmax, mult=8):
def free(x0, x1, dmin, dmax, ERmax, mult=8):
    """Single-sided clustering between two values with with free number of points.

    Generate a grid vector x from x0 to x1. Use geometric stretching from a


@@ 224,9 228,9 @@ def free_N(x0, x1, Dmin, Dmax, ERmax, mult=8):
        Start value.
    x1: float
        End value.
    Dmin: float
    dmin: float
        Minimum spacing at zero.
    Dmax: float
    dmax: float
        Maximum spacing.
    ERmax: float
        Expansion ratio > 1.


@@ 247,8 251,10 @@ def free_N(x0, x1, Dmin, Dmax, ERmax, mult=8):
                f"got x0={x0} and x1={x1}"
            )
        )
    dx = x1 - x0
    dxa = np.abs(dx)
    x = x0 + dx * _unit_free_N(Dmin / dxa, Dmax / dxa, ERmax, mult)

    # Scale the unit distribution between the given values
    Dx = x1 - x0
    Dxa = np.abs(Dx)
    x = x0 + Dx * _unit_free(dmin / Dxa, dmax / Dxa, ERmax, mult)

    return x

M clusterfunc/symmetric.py => clusterfunc/symmetric.py +9 -9
@@ 6,7 6,7 @@ import clusterfunc.util
import clusterfunc.single


def _unit_fixed_N(Dmin, Dmax, ERmax, N):
def _unit_fixed(Dmin, Dmax, ERmax, N):
    """"""
    if N < 3:
        raise ClusteringException(


@@ 24,8 24,8 @@ def _unit_fixed_N(Dmin, Dmax, ERmax, N):
        Lb = 1.0 - xj

        # Evaluate the halves
        xa = clusterfunc.single.fixed_N(0.0, La, Dmin, Dmax, ERmax, Na)
        xb1 = clusterfunc.single.fixed_N(0.0, Lb, Dmin, Dmax, ERmax, Nb)
        xa = clusterfunc.single.fixed(0.0, La, Dmin, Dmax, ERmax, Na)
        xb1 = clusterfunc.single.fixed(0.0, Lb, Dmin, Dmax, ERmax, Nb)
        xb = 1.0 - np.flip(xb1)
        x = np.concatenate((xa, xb[1:]))



@@ 61,9 61,9 @@ def _unit_fixed_N(Dmin, Dmax, ERmax, N):
    return x


def _unit_free_N(Dmin, Dmax, ERmax, mult=8):
def _unit_free(Dmin, Dmax, ERmax, mult=8):
    """"""
    xa = clusterfunc.single.free_N(0.0, 0.5, Dmin, Dmax, ERmax, mult)
    xa = clusterfunc.single.free(0.0, 0.5, Dmin, Dmax, ERmax, mult)
    assert np.diff(xa).max() * (1.0 + 1e-9) <= Dmax
    xb = 1.0 - np.flip(xa)
    x = np.concatenate((xa, xb[1:]))


@@ 71,7 71,7 @@ def _unit_free_N(Dmin, Dmax, ERmax, mult=8):
    return x


def fixed_N(x0, x1, Dmin, Dmax, ERmax, N):
def fixed(x0, x1, Dmin, Dmax, ERmax, N):
    """Double-sided symmetric clustering with fixed number of points."""

    if np.isclose(x0, x1):


@@ 84,12 84,12 @@ def fixed_N(x0, x1, Dmin, Dmax, ERmax, N):

    dx = x1 - x0
    dxa = np.abs(dx)
    x = x0 + dx * _unit_fixed_N(Dmin / dxa, Dmax / dxa, ERmax, N)
    x = x0 + dx * _unit_fixed(Dmin / dxa, Dmax / dxa, ERmax, N)

    return x


def free_N(x0, x1, Dmin, Dmax, ERmax, mult=8):
def free(x0, x1, Dmin, Dmax, ERmax, mult=8):
    """Double-sided symmetric clustering with free number of points."""

    if np.isclose(x0, x1):


@@ 102,6 102,6 @@ def free_N(x0, x1, Dmin, Dmax, ERmax, mult=8):

    dx = x1 - x0
    dxa = np.abs(dx)
    x = x0 + dx * _unit_free_N(Dmin / dxa, Dmax / dxa, ERmax, mult)
    x = x0 + dx * _unit_free(Dmin / dxa, Dmax / dxa, ERmax, mult)

    return x

A compare.py => compare.py +19 -0
@@ 0,0 1,19 @@
import clusterfunc.symmetric
import clusterfunc.double
import clusterfunc.plot

dmin = 2e-3
dmax = 1e-1
ERmax = 1.2

xd = clusterfunc.double.free(0.0, 1.0, dmin, dmin, dmax, ERmax)
xs = clusterfunc.symmetric.free(0.0, 1.0, dmin, dmax, ERmax)
print(len(xd), len(xs))

clusterfunc.plot.plot(xd)
clusterfunc.plot.plot_ER(xd)
clusterfunc.plot.plot(xs)
clusterfunc.plot.plot_ER(xs)
import matplotlib.pyplot as plt

plt.show()

M tests/test_double.py => tests/test_double.py +43 -20
@@ 7,36 7,28 @@ from clusterfunc.exceptions import ClusteringException


def test_double_unit():
    for Dmin0 in (1e-3, 1e-4):
        for Dmin1_mult in (0.5, 1., 2.):
            for ERmax in (1.1, 1.2):

                Dmin1 = Dmin0*Dmin1_mult

    for Dmin0 in (1e-2, 1e-3, 1e-4, 1e-5):
        for Dmin1_mult in (0.1, 1.0, 10.0):
            for ERmax in (1.05, 1.1, 1.4):
                Dmin1 = Dmin0 * Dmin1_mult
                if Dmin1 >= 0.1:
                    continue
                for Dmax in (0.05, 0.1, 0.2, 1.0):
                    if Dmax <= Dmin0 or Dmax < Dmin1:
                        continue

                # Minimum number of points with capping
                print('--',Dmin0, Dmin1, Dmax, ERmax)
                Nmin = len(
                    clusterfunc.double._unit_free_N(Dmin0, Dmin1, Dmax, ERmax, mult=1)
                    clusterfunc.double._unit_free(Dmin0, Dmin1, Dmax, ERmax, mult=1)
                )
                print(f'Clustered OK with Nmin={Nmin}')
                # Number of points for uniform, limit to not be too huge
                Nmax = np.minimum(2*Nmin, 256)
                print(Nmax)
                # Nmax = Nmin + 8
                clusterfunc.double._unit_fixed_N(Dmin0, Dmin1, Dmax, ERmax, Nmax)
                print(f'Clustered OK with Nmax={Nmax}')
                # print(' ',Nmin, Nmax)

                # Maximum number of points
                Nmax = np.minimum(2 * Nmin, 256)

                # Test without capping
                for N in range(Nmin, Nmax + 1):
                    print(N)

                    # Evaluate
                    x = clusterfunc.double._unit_fixed_N(Dmin0, Dmin1, Dmax, ERmax, N)
                    x = clusterfunc.double._unit_fixed(Dmin0, Dmin1, N)

                    dx = np.diff(x)
                    ER = clusterfunc.util.ER(x)


@@ 46,4 38,35 @@ def test_double_unit():
                    assert np.all(ER <= ERmax * (1.0 + rtol))


test_double_unit()
def test_double_abs():
    """Verify clustering for absolute coordinates."""

    for x0 in (-1.0, 0.0, 2.0):
        for x1 in (-1.0, 0.0, 2.0):
            for Dmin0 in (1e-2, 1e-4):
                for Dmin1_mult in (0.1, 1.0, 10.0):
                    for ERmax in (1.2, 1.4):
                        Dmin1 = Dmin0 * Dmin1_mult
                        if Dmin1 >= 0.1:
                            continue
                        for Dmax in (0.05, 0.1, 0.2, 1.0):
                            if Dmax <= Dmin0 or Dmax < Dmin1:
                                continue

                        if np.isclose(x0, x1):
                            with pytest.raises(ClusteringException):
                                clusterfunc.double.free(
                                    x0, x1, Dmin0, Dmin1, Dmax, ERmax, mult=1
                                )
                        else:
                            x = clusterfunc.double.free(
                                x0, x1, Dmin0, Dmin1, Dmax, ERmax, mult=1
                            )

                            assert np.allclose(x[(0, -1),], (x0, x1))
                            dx = np.diff(x)
                            assert np.diff(np.sign(dx)).all() == 0.0
                            rtol = 1e-6
                            assert np.abs(dx).max() <= Dmax * (1.0 + rtol)
                            ER = clusterfunc.util.ER(x)
                            assert np.all(ER <= ERmax * (1.0 + rtol))

M tests/test_single.py => tests/test_single.py +6 -15
@@ 10,25 10,20 @@ def test_single():

    for Dmin in (2e-1, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5):
        for ER in (1.02, 1.05, 1.1, 1.2):

            # Number of points for uniform, limit to not be too huge
            Nmax = np.minimum(np.floor(1.0 / Dmin).astype(int), 256)

            for Dmax in (0.05, 0.1, 0.2, 1.0):

                if Dmax <= Dmin:
                    continue

                # Minimum number of points with capping
                Nmin = len(
                    clusterfunc.single._unit_free_N(Dmin, Dmax, ER, mult=1)
                )
                Nmin = len(clusterfunc.single._unit_free(Dmin, Dmax, ER, mult=1))

                # Test without capping
                for N in range(Nmin, Nmax + 1):

                    # Evaluate
                    x = clusterfunc.single._unit_fixed_N(Dmin, Dmax, ER, N)
                    x = clusterfunc.single._unit_fixed(Dmin, Dmax, ER, N)

                    # Check
                    clusterfunc.check.unit_single(x, Dmin, Dmax, ER, rtol=1e-9)


@@ 44,13 39,13 @@ def test_unif():
        Dmin = 1.0 / (n - 1)

        # Evaluate
        x = clusterfunc.single._unit_fixed_N(Dmin, Dmax, ER, n)
        x = clusterfunc.single._unit_fixed(Dmin, Dmax, ER, n)

        # Check
        clusterfunc.check.unit_single(x, Dmin, Dmax, ER, rtol=1e-9)


def test_abs():
def test_single_abs():
    """Verify clustering for absolute coordinates."""

    rtol = 1e-9


@@ 64,13 59,9 @@ def test_abs():

                    if np.isclose(x0, x1):
                        with pytest.raises(ClusteringException):
                            clusterfunc.single.free_N(
                                x0, x1, Dmin, Dmax, ER, mult=1
                            )
                            clusterfunc.single.free(x0, x1, Dmin, Dmax, ER, mult=1)
                    else:
                        x = clusterfunc.single.free_N(
                                x0, x1, Dmin, Dmax, ER, mult=1
                            )
                        x = clusterfunc.single.free(x0, x1, Dmin, Dmax, ER, mult=1)

                        assert np.allclose(x[(0, -1),], (x0, x1))
                        dx = np.diff(x)

M tests/test_symmetric.py => tests/test_symmetric.py +10 -20
@@ 6,8 6,8 @@ from clusterfunc.exceptions import ClusteringException


def test_symmetric_unit():
    for Dmin in (1e-1, 1e-2, 1e-3, 1e-4, 1e-5):
        for ER in (1.02, 1.05, 1.1, 1.2):
    for Dmin in (1e-1, 1e-2, 1e-3, 1e-5):
        for ER in (1.05, 1.1, 1.2):
            # Number of points for uniform, limit to not be too huge
            Nmax = np.minimum(np.floor(1.0 / Dmin).astype(int), 512)



@@ 16,21 16,15 @@ def test_symmetric_unit():
                    continue

                # Minimum number of points with capping
                Nmin = len(
                    clusterfunc.symmetric._unit_free_N(Dmin, Dmax, ER, mult=1)
                )
                Nmin = len(clusterfunc.symmetric._unit_free(Dmin, Dmax, ER, mult=1))

                # Test without capping
                for N in range(Nmin, Nmax + 1):
                    print(N)

                for N in range(Nmin, Nmax + 1, 3):
                    # Evaluate
                    x = clusterfunc.symmetric._unit_fixed_N(Dmin, Dmax, ER, N)
                    x = clusterfunc.symmetric._unit_fixed(Dmin, Dmax, ER, N)

                    # Check
                    clusterfunc.check.unit_symmetric(
                        x, Dmin, Dmax, ER, rtol=1e-9
                    )
                    clusterfunc.check.unit_symmetric(x, Dmin, Dmax, ER, rtol=1e-9)

                    dx = np.diff(x)
                    rtol = 1e-9


@@ 42,8 36,8 @@ def test_symmetric_unit():
def test_symmetric_abs():
    rtol = 1e-9
    ER = 1.2
    for x0 in (-1.0, 0.0, 1.0, 2.0):
        for x1 in (-1.0, 0.0, 1.0, 2.0):
    for x0 in (-1.0, 0.0, 2.0):
        for x1 in (-1.0, 0.0, 2.0):
            for Dmin in (1e-1, 1e-2, 1e-4):
                for Dmax in (0.05, 0.1, 0.2, 1.0):
                    if Dmax <= Dmin:


@@ 51,13 45,9 @@ def test_symmetric_abs():

                    if np.isclose(x0, x1):
                        with pytest.raises(ClusteringException):
                            clusterfunc.symmetric.free_N(
                                x0, x1, Dmin, Dmax, ER, mult=1
                            )
                            clusterfunc.symmetric.free(x0, x1, Dmin, Dmax, ER, mult=1)
                    else:
                        x = clusterfunc.symmetric.free_N(
                            x0, x1, Dmin, Dmax, ER, mult=1
                        )
                        x = clusterfunc.symmetric.free(x0, x1, Dmin, Dmax, ER, mult=1)

                        assert np.allclose(x[(0, -1),], (x0, x1))
                        dx = np.diff(x)