Source code for dynex_scikit_plugin.utilities

# The following traversal code is adapted from NumPy's implementation.

# Copyright (c) 2005-2022, NumPy Developers.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
#        notice, this list of conditions and the following disclaimer.
#
#     * Redistributions in binary form must reproduce the above
#        copyright notice, this list of conditions and the following
#        disclaimer in the documentation and/or other materials provided
#        with the distribution.
#
#     * Neither the name of the NumPy Developers nor the names of any
#        contributors may be used to endorse or promote products derived
#        from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# Modifications are licensed under the Apache 2.0 Software license.

# Copyright 2023 D-Wave Systems Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import typing

import numpy as np
import numpy.typing as npt

__all__ = ["corrcoef", "cov", "dot_2d"]


[docs]def corrcoef(x: npt.ArrayLike, *, out: typing.Optional[np.ndarray] = None, rowvar: bool = True, copy: bool = True, ) -> np.ndarray: """A drop-in replacement for :func:`numpy.corrcoef`. This method is modified to avoid unnecessary memory usage when working with :class:`numpy.memmap` arrays. It does not support the full range of arguments accepted by :func:`numpy.corrcoef`. Additionally, in the case that a row of ``x`` is fixed, this method will return a correlation value of 0 rather than :class:`numpy.nan`. Args: x: See :func:`numpy.corrcoef`. out: Output argument. This must be the exact kind that would be returned if it was not used. rowvar: See :func:`numpy.corrcoef`. copy: If ``True``, ``x`` is not modified by this function. Returns: See :func:`numpy.corrcoef`. """ c = cov(x, out=out, rowvar=rowvar, copy=copy) try: d = np.diag(c) except ValueError: # scalar covariance # nan if incorrect value (nan, inf, 0), 1 otherwise return c / c stddev = np.sqrt(d.real) # the places that stddev == 0 are exactly the places that the columns # are fixed. We can safely ignore those when dividing np.divide(c, stddev[:, None], out=c, where=stddev[:, None] != 0) np.divide(c, stddev[None, :], out=c, where=stddev[None, :] != 0) # Clip real and imaginary parts to [-1, 1]. This does not guarantee # abs(a[i,j]) <= 1 for complex arrays, but is the best we can do without # excessive work. np.clip(c.real, -1, 1, out=c.real) if np.iscomplexobj(c): np.clip(c.imag, -1, 1, out=c.imag) return c
[docs]def cov(m: npt.ArrayLike, *, out: typing.Optional[np.ndarray] = None, rowvar: bool = True, copy: bool = True, ) -> np.ndarray: """A drop-in replacement for :func:`numpy.cov`. This method is modified to avoid unnecessary memory usage when working with :class:`numpy.memmap` arrays. It does not support the full range of arguments accepted by :func:`numpy.cov`. Args: m: See :func:`numpy.cov`. out: Output argument. This must be the exact kind that would be returned if it was not used. rowvar: See :func:`numpy.cov`. copy: If ``True``, ``x`` is not modified by this function. Returns: See :func:`numpy.cov`. """ # we want to modify X, so if copy=True we make a copy and re-call if copy: if hasattr(m, "flush"): # we could do a lot of fiddling here, but it's easier to just # disallow this case and rely on the user making a modifiable # X raise ValueError("memmap arrays cannot be copied easily") return cov(np.array(m), rowvar=rowvar, copy=False, out=out) # handle array-like if isinstance(m, np.memmap): X = m else: X = np.atleast_2d(np.asarray(m, dtype=np.result_type(m, np.float64))) if X.ndim != 2: raise ValueError("X must have 2 dimensions") if not rowvar and X.shape[0] != 1: X = X.T # Get the product of frequencies and weights avg = np.average(X, axis=1) # Determine the normalization fact = max(X.shape[1] - 1, 0) X -= avg[:, None] if hasattr(m, "flush"): X.flush() X_T = X.T out = dot_2d(X, X_T.conj(), out=out) out *= np.true_divide(1, fact) if hasattr(out, "flush"): out.flush() return out
[docs]def dot_2d(a: npt.ArrayLike, b: npt.ArrayLike, *, out: typing.Optional[np.ndarray] = None, chunksize: int = int(1e+9), ) -> np.ndarray: """A drop-in replacment for :func:`numpy.dot` for 2d arrays. This method is modified to avoid unnecessary memory usage when working with :class:`numpy.memmap` arrays. Args: a: See :func:`numpy.dot`. ``a.ndim`` must be 2. b: See :func:`numpy.dot`. ``b.ndim`` must be 2. out: See :func:`numpy.dot`. chunksize: The number of bytes that should be created by each step of the multiplication. This is used to keep the total memory usage low when multiplying :class:`numpy.memmap` arrays. Returns: See :func:`numpy.dot`. """ if not isinstance(a, np.memmap): a = np.asarray(a) if not isinstance(b, np.memmap): b = np.asarray(b) if a.ndim != 2: raise ValueError("a must be a 2d array") if b.ndim != 2: raise ValueError("b must be a 2d array") if out is None: out = np.empty((a.shape[0], b.shape[1]), dtype=np.result_type(a, b)) elif out.shape[0] != a.shape[0] or out.shape[1] != b.shape[1]: raise ValueError(f"out must be a ({a.shape[0]}, {b.shape[1]}) array") is_memmap = hasattr(out, "flush") num_rows = max(chunksize // (out.dtype.itemsize * out.shape[1]), 1) for start in range(0, out.shape[0], num_rows): np.dot(a[start:start+num_rows, :], b, out=out[start:start+num_rows, :]) if is_memmap: out.flush() return out