~jb753/clusterfunc

b7056ffaaa9fa01c3fa650796e43077c228addc7 — James Brind 1 year, 23 days ago 329a1e8
working
2 files changed, 115 insertions(+), 122 deletions(-)

M clusterfunc.py
M test_clusterfunc.py
M clusterfunc.py => clusterfunc.py +96 -115
@@ 6,16 6,15 @@ class ClusteringException(BaseException):
    """Allow catching of clustering errors specifically."""
    pass

def check_normalised(x, Dst, Dmax, ERmax, rtol=1e-9):
def check_normalised(x, Dmin, Dmax, ERmax, rtol=1e-9):
    """Verify that a normalised clustering vector obeys the desired limits, raise if not."""


    # Check x

    if not x[0] == 0.:
        raise ClusteringException(f'Normalised start value x[0]={x[0]}, expected 0.0.')

    if not np.isclose(x[-1],1.):
    if not np.isclose(x[-1],1.,rtol=rtol):
        raise ClusteringException(f'Normalised end value x[-1]={x[-1]}, expected 1.0.')

    if not ((x>=0.) & (x<= 1. + rtol)).all():


@@ 30,8 29,8 @@ def check_normalised(x, Dst, Dmax, ERmax, rtol=1e-9):
    if not (dx > 0.).all():
        raise ClusteringException(f'Normalised spacing min(dx)={dx.min()}, expected all > 0.0.')

    if not np.isclose(dx[0], Dst):
        raise ClusteringException(f'Normalised start spacing dx[0]={dx[0]}, expected {Dst}')
    if not np.isclose(dx[0], Dmin):
        raise ClusteringException(f'Normalised start spacing dx[0]={dx[0]}, expected {Dmin}')

    if not (dx <= Dmax).all():
        raise ClusteringException(f'Normalised spacing max(dx)={dx.max()} exceeds target {Dmax}.')


@@ 43,122 42,135 @@ def check_normalised(x, Dst, Dmax, ERmax, rtol=1e-9):
    if not (ER <= ERmax).all():
        raise ClusteringException(f'Expansion ratio max(ER)={ER.max()} exceeds target {ERmax}.')

def normalised_N_fixed(Dst, Dmax, ERmax, N, check=True):
    """Single-sided clustering on the unit interval with specified number of point."""

    if Dmax <= Dst:
        raise ClusteringException(f'Dmax={Dmax} should be > Dst={Dst}')
    if not (ER >= 1. - rtol).all():
        raise ClusteringException(f'Expansion ratio min(ER)={ER.min()} less than unity.')

    # Number of points needed to expand Dst to Dmax at full stretch
    # Expanding for more points will give D > Dmax, which we don't want
    N1a = np.ceil(np.log(Dmax / Dst) / np.log(ERmax)).astype(int)+1
def unit_fixed_N(Dmin, Dmax, ERmax, N, check=True):
    """Single-sided clustering on the unit interval with specified number of point.

    # Length at which we will need to cap the spacing
    L1a = Dst * (1.-ERmax**(N1a-1))/(1.-ERmax)
    There are four cases in decreasing order of grid points:
        1) Too many points for a uniform grid at Dmin.
        2) Can reach unit length without capping the spacing
        3) A section with capped spacing is required
        4) Not enough points to reach unit length at full stretch
        """

    # Number of points for unit length at full stretch
    # Any less than this, we cannot reach
    N1b = np.ceil(np.log(1.-(1.-ERmax)/Dst)/np.log(ERmax)).astype(int) + 1
    if Dmax <= Dmin:
        raise ClusteringException(f'Dmax={Dmax} should be > Dmin={Dmin}')

    # Number of points for uniform spacing at Dst
    Nunif = np.ceil(1./Dst)
    if ERmax <= 1:
        raise ClusteringException(f'ERmax={ERmax} should be > 1')

    # Rule out having too many points for a uniform spacing at Dmin
    Nunif = np.ceil(1./Dmin).astype(int)+1
    if N > Nunif:
        raise ClusteringException(
        f'Too many points N={N} for a uniform spacing of Dst={Dst}.'
        f'Too many points N={N} for a uniform spacing of Dmin={Dmin}.'
        )


    if N < N1b:
    # Rule out too few points to reach unit length, regardless of capping
    Nfull = np.ceil(np.log(Dmax / Dmin) / np.log(ERmax)).astype(int)+1
    Lfull = Dmin * (1.-ERmax**(Nfull-1))/(1.-ERmax)
    if N < Nfull and Lfull < 1.:
        raise ClusteringException(
        f'Insufficient points to reach unit length with Dst={Dst}, ERmax={ERmax} (for any Dmax)'
        f'Not enough points N={N} to reach unit length for ERmax={ERmax}, Dmin={Dmin}, regardless of Dmax., L={Lfull}, Nfull={Nfull}'
        )

    if N <= N1a or L1a > 1.:
        # No need to cap spacing
        # Reduce expansion ratio to achieve unit length

        # Closure to return L and dLdER as function of ER
        def _iter(ERi):
            # Length error
            L = Dst * (1.0 - ERi ** (N - 1)) / (1.0 - ERi) - 1.0
            # Derivative
            dL = (
                Dst
                * ((N - 2.0) * ERi**N - (N - 1.0) * ERi ** (N - 1.0) + ERi)
                / (1.0 - ERi) ** 2.0
                / ERi
            )
            return L, dL

        # Solve for the ER that gives unit length
        ER = root_scalar(_iter, x0=ERmax, fprime=True).root
        print(ER)
    # Assume we do not need to cap the spacing
    # What expansion ratio is needed, and what is the max spacing?
    Nm1, Nm2, Nm3 = N-1, N-2, N-3
    Nm1Nm2 = Nm1*Nm2
    def _iter_ER(ERi):
        y = ERi - 1. + Dmin*(1.-ERi**Nm1)
        dy = 1. - Dmin*Nm1*ERi**Nm2
        d2y = -Dmin*Nm1Nm2*ERi**Nm3
        return y, dy, d2y

        # Evaluate coordinates
        dx = Dst * ER ** np.arange(0, N - 1)
        x = cumsum0(dx)
    # If we cannot reach irregardless of capping, give up
    if _iter_ER(ERmax)[0] > 0.:
        raise ClusteringException(
        f'Insufficient points N={N} to reach unit length with Dmin={Dmin}, ERmax={ERmax}, no capping to Dmax needed.'
        )

    # Find root 
    soln = root_scalar(_iter_ER, x0=ERmax, fprime2=True, maxiter=200)
    assert soln.converged
    ERnocap = soln.root
    Dnocap = Dmin*ERnocap**Nm2

    # If capping constraint is satisfied, but ER is too big, we don't have enough points
    if (ERnocap > ERmax and Dnocap <= Dmax):
        raise ClusteringException(
        f'Insufficient points to reach unit length with Dmin={Dmin}, ERmax={ERmax}, no capping to Dmax needed.'
        )
    # If ERnocap and Dnocap satisfy our constraints, we have the solution
    elif ERnocap <= ERmax and Dnocap <= Dmax:
        dx = Dmin*ERnocap**np.arange(Nm1)
    # We need to have a capped section
    else:
        # If we used all N points, D would exceed Dmax, so uniform needed

        # Length as a function of spacing cap, with ERmax constant
        def _iter_Dmax(ERi, Dmaxi):
            Nstretch_i = np.ceil(np.log(Dmaxi / Dmin) / np.log(ERi)).astype(int)+1
            Lstretch_i = Dmin * (1.-ERi**(Nstretch_i-1))/(1.-ERi)
            dx1max_i = Dmin * ERi ** (Nstretch_i-2)
            Ncap_i = N - Nstretch_i
            Lcap_i = Ncap_i * 0.5 * (dx1max_i + Dmaxi)
            return Lstretch_i + Lcap_i - 1.

        # How far can we go with a linear increase in spacing?
        N2 = N - N1a
        dx1max = Dst * ERmax ** (N1a-2)
        L2 = N2 * 0.5 * (dx1max + Dmax)
        if L1a + L2 < 1.:
            raise ClusteringException(f'Not enough points to cluster with Dmax={Dmax} cap, total length only {L1+L2}.')
        # If we use all points at max ER and don't have enough length, will not work
        LL = _iter_Dmax(ERmax, Dmax)
        if LL < 0.:
            raise ClusteringException(f'Not enough points to cluster with Dmin={Dmin}, ERmax={ERmax}, Dmax={Dmax} capping, total length only {LL+1}.')

        # Binary search for cap distance that gives closes to unit length
        def _iter(Dmaxi):
            N1ai = np.ceil(np.log(Dmaxi / Dst) / np.log(ERmax)).astype(int)+1
            L1ai = Dst * (1.-ERmax**(N1ai-1))/(1.-ERmax)
            dx1max = Dst * ERmax ** (N1ai-2)
            N2i = N - N1ai
            L2i = N2i * 0.5 * (dx1max + Dmaxi)
            return L1ai + L2i - 1.

        Dmax_high = Dmax
        Dmax_low = Dst
        Dmax_low = Dmin
        while Dmax_high-Dmax_low > 0.01*Dmax:
            Dmax_new = 0.5*(Dmax_high+Dmax_low)
            err_new = _iter(Dmax_new)
            err_new = _iter_Dmax(ERmax, Dmax_new)
            if err_new > 0:
                Dmax_high = Dmax_new
            else:
                Dmax_low = Dmax_new

        def _iter(ERi):
            N1ai = np.ceil(np.log(Dmax_high / Dst) / np.log(ERi)).astype(int)+1
            L1ai = Dst * (1.-ERi**(N1ai-1))/(1.-ERi)
            dx1max = Dst * ERi ** (N1ai-2)
            N2i = N - N1ai
            L2i = N2i * 0.5 * (dx1max + Dmax_high)
            return L1ai + L2i - 1.

        ER = root_scalar(_iter, x0=ERmax).root
        # Fine-tune the expansion ratio for exactly unit length
        soln = root_scalar(_iter_Dmax, bracket=(1.+1e-9, ERmax), args=(Dmax_high,))
        assert soln.converged
        ER = soln.root

        N1a_cap = np.ceil(np.log(Dmax_high / Dst) / np.log(ER)).astype(int)+1
        N2_cap = N - N1a_cap
        dx1 = Dst * ER ** np.arange(0, N1a_cap-1)
        dx2 = np.linspace(dx1[-1],Dmax_high, N2_cap)
        # Evaluate the clustering
        Nstretch = np.ceil(np.log(Dmax_high / Dmin) / np.log(ER)).astype(int)+1
        Ncap = N - Nstretch
        dx1 = Dmin * ER ** np.arange(0, Nstretch-1)
        dx2 = np.linspace(dx1[-1],Dmax_high, Ncap)
        dx = np.concatenate((dx1, dx2))
        x = cumsum0(dx)

    x = cumsum0(dx)

    x /= x[-1]

    if check:
        if not len(x) == N:
            raise ClusteringException(f'Incorrect number of points len(x)={len(x)}, expected {N}.')
        check_normalised(x, Dst, Dmax, ERmax)
        check_normalised(x, Dmin, Dmax, ERmax)

    return x

def normalised_N_free(Dst, Dmax, ERmax, mult=8, check=True):
def unit_free_N(Dmin, Dmax, ERmax, mult=8, check=True):
    """Single-sided clustering on the unit interval with free number of points."""

    # Find high and low guesses for n, N = mult*n + 1
    nlow = np.round(1./Dmax/mult).astype(int)
    nhigh = np.round(1./Dst/mult).astype(int)
    nhigh = np.minimum(np.round(1./Dmin/mult).astype(int),512) # Limit to protect against overflow
    if nlow > nhigh:
        nlow = 1
    if nhigh == nlow:
        raise ClusteringException(f'Unable to find distinct guesses for numbers of points with mult={mult}, try decreasing mult')

    Nhigh = nhigh * mult + 1
    x = unit_fixed_N(Dmin, Dmax, ERmax, Nhigh, check=False)

    # Loop until converged
    while nhigh-nlow > 1:


@@ 169,15 181,15 @@ def normalised_N_free(Dst, Dmax, ERmax, mult=8, check=True):

        # Attempt to cluster
        try:
            x = normalised_N_fixed(Dst, Dmax, ERmax, Nnew, check=False)
            # new n is valid, use as high limiit
            x = unit_fixed_N(Dmin, Dmax, ERmax, Nnew, check=False)
            # New n is valid, use as high limiit
            nhigh = nnew
        except ClusteringException:
            # new n is not valid, use as low limiit
            # New n is not valid, use as low limiit
            nlow = nnew

    if check:
        check_normalised(x, Dst, Dmax, ERmax)
        check_normalised(x, Dmin, Dmax, ERmax)

    return x



@@ 186,34 198,3 @@ def normalised_N_free(Dst, Dmax, ERmax, mult=8, check=True):
def cumsum0(x, axis=None):
    return np.insert(np.cumsum(x, axis=axis), 0, 0.0, axis=axis)


import matplotlib.pyplot as plt




Dst = 0.001
Dmax = 0.08
ER = 1.1
N = 57

x1 = normalised_N_fixed(Dst, Dmax, ER, N)
x2 = normalised_N_free(Dst, Dmax, ER)

fig, ax = plt.subplots()
ax.plot(x1,'-x')
ax.plot(x2,'-x')
ax.set_title('x')
plt.show()

# fig, ax = plt.subplots()
# ax.set_title('dx')
# ax.plot(np.diff(xx),'k-x')
# plt.show()

# fig, ax = plt.subplots()
# ax.plot(dx,'k-x')
# plt.show()
# fig, ax = plt.subplots()
# ax.plot(x,'k-x')
# plt.show()

M test_clusterfunc.py => test_clusterfunc.py +19 -7
@@ 1,14 1,26 @@
import clusterfunc
import numpy as np

def test_normalised_N_fixed():
def test_cluster():

    Dst = 0.001
    Dmax = 0.5
    ER = 1.2
    for N in range(32,100):
    for Dmin in  (2e-1, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5):
        for ER in (1.02, 1.05, 1.1, 1.2):

        clusterfunc.normalised_N_fixed(Dst, Dmax, ER, N)
            # Number of points for uniform, limit to not be too huge
            Nmax = np.minimum(np.floor(1./Dmin).astype(int), 512)

            for Dmax in (0.05, 0.1, 0.2, 1.):

                if Dmax <= Dmin:
                    continue

                # Minimum number of points with capping
                Nmin = len(clusterfunc.unit_free_N(Dmin, Dmax, ER, mult=1))

                # Test without capping
                for N in range(Nmin,Nmax+1):
                    clusterfunc.unit_fixed_N(Dmin, Dmax, ER, N)


if __name__=='__main__':
    test_normalised_N_fixed()
    test_cluster()