Test harness:
To build a modular test rig around this, it seemed easiest to wrap stuff up in a class. I'm pretty sure I left everything else in the original implementation alone.
I'm just using time
to measure how long the function takes with various input values. I'm too impatient to run it with your original parameters, so I'm mostly using smaller ones. I have basically no idea what your code actually does, so it's possible I'm doing something wrong in how I test it.
Cleanup:
This is fairly opinionated, and
may not actually help with performance
actually made the function a little slower
each individual change is as likely to hurt performance as to help
only helps performance by ~10%.
- I greatly prefer to use python built-ins over numpy/scipy etc for data-structure manipulation.
- Why is
stable_cumsum
a function? It looks like you had an assertion in there or something that got removed...
- Your variable names are pretty opaque. Without domain knowledge, I can't do much to fix this. You're also reassigning variables more than necessary, which is good to avoid.
- I golfed the math a little and I think it reads better.
Math:
The algorithm seams to be O(n^3). calculate_pa
is (presumably) the bottleneck. What can we do to simplify it?
- We can pull the
+ log1p(self.n_samples)
term outside the sum
, and just use multiplication.
- A sum of logs is a log of the product, and
log1p
is just "ln(x+1)", so we can move the log calculation outside the aggregation.
- But then I think we're stuck. The calculation doesn't actually work in this form because of a math-overflow error, and I haven't actually figured out any fundamental simplification that would touch the complexity-class.
- Your domain knowledge may be able to push this forward. In particular, I noticed that I could get it to work for very small tests by just omitting the
+1
from the product-term. It looks like the exact math in that area doesn't really matter too much; maybe there's a lighter-weight calculation you could be maximizing?
Code:
import numpy as np, pandas as pd, warnings, numbers
from math import log1p, sqrt
from scipy import linalg
from scipy.special import gammaln
from scipy.sparse import issparse
from scipy.sparse.linalg import svds
from sklearn.datasets import make_multilabel_classification
from tqdm import tqdm
from random import randint # insecure RNG is fine for this task.
from time import time
from operator import itemgetter, mul
from functools import reduce
from math import log
class Original:
def __init__(self, verbose):
self.verbose = verbose
def log(self, *message):
if self.verbose:
print(*message)
def infer_dimension(self, spectrum, n_samples):
ll = np.empty_like(spectrum)
ll[0] = -np.inf # we don't want to return n_components = 0
for rank in (tqdm(range(1, spectrum.shape[0]))
if self.verbose else range(1, spectrum.shape[0])):
ll[rank] = self.assess_dimension(spectrum, rank, n_samples)
return ll.argmax()
def stable_cumsum(self, arr, axis=None, rtol=1e-05, atol=1e-08):
out = np.cumsum(arr, axis=axis, dtype=np.float64)
expected = np.sum(arr, axis=axis, dtype=np.float64)
return out
def assess_dimension(self, spectrum, rank, n_samples):
n_features = spectrum.shape[0]
if not 1 <= rank < n_features:
raise ValueError("the tested rank should be in [1, n_features - 1]")
eps = 1e-15
if spectrum[rank - 1] < eps:
return -np.inf
pu = -rank * log1p(2.)
for i in range(1, rank + 1):
pu += (gammaln((n_features - i + 1) / 2.) - log1p(np.pi) * (n_features - i + 1) / 2.)
pl = np.sum(np.log(spectrum[:rank]))
pl = -pl * n_samples / 2.
v = max(eps, np.sum(spectrum[rank:]) / (n_features - rank))
pv = -np.log1p(v) * n_samples * (n_features - rank) / 2.
m = n_features * rank - rank * (rank + 1.) / 2.
pp = log1p(2. * np.pi) * (m + rank) / 2.
pa = 0.
spectrum_ = spectrum.copy()
spectrum_[rank:n_features] = v
for i in range(rank):
for j in range(i + 1, len(spectrum)):
pa += log1p((spectrum[i] - spectrum[j]) *
(1. / spectrum_[j] - 1. / spectrum_[i])) + log1p(n_samples)
ll = pu + pl + pv + pp - pa / 2. - rank * log1p(n_samples) / 2.
return ll
def find_components(self, data):
try:
X = data.to_numpy()
except:
X = data
self.log('Data is already in numpy format, or cannot be converted')
n_samples = X.shape[0]
av = np.mean(data, axis=0)
X -= av
self.log("Calculating S...")
U, S, Vt = linalg.svd(X, full_matrices=False)
self.log("Calculating explained variance...")
explained_variance_ = (S ** 2) / (n_samples - 1)
self.log("Calculating total variance variance...")
total_var = explained_variance_.sum()
self.log("Calculating explained variance ratio...")
explained_variance_ratio_ = explained_variance_ / total_var
self.log("Inferring dimensions...")
n_components = self.infer_dimension(explained_variance_, n_samples)
if 0 < n_components < 1.0:
self.log("n_components not complete | Calculating ratio cumsum...")
ratio_cumsum = self.stable_cumsum(explained_variance_ratio_)
n_components = np.searchsorted(ratio_cumsum, n_components,
side='right') + 1
return n_components
class Cleaned:
def __init__(self, verbose):
self.verbose = verbose
def log(self, *message):
if self.verbose:
print(*message)
def infer_dimension(self):
return max(
range(1, self.spectrum.shape[0]),
key=self.assess_dimension,
default=0
)
def stable_cumsum(self, arr, axis=None):
# Why is this broken out at all?
return np.cumsum(arr, axis=axis, dtype=np.float64)
def assess_dimension(self, rank):
assert 1 <= rank < self.n_features, "the tested rank should be in [1, n_features - 1]"
epsilon = 1e-15
if self.spectrum[rank - 1] < epsilon:
return -np.inf
else:
pu = (-rank * log1p(2.)) + sum(
gammaln((self.n_features - i + 1) / 2.) - log1p(np.pi) * (self.n_features - i + 1) / 2.
for i in range(1, rank + 1)
)
pl = -np.sum(np.log(self.spectrum[:rank])) * self.n_samples / 2.
v = max(epsilon, np.sum(self.spectrum[rank:]) / (self.n_features - rank))
pv = -np.log1p(v) * self.n_samples * (self.n_features - rank) / 2.
m = self.n_features * rank - rank * (rank + 1.) / 2.
pp = log1p(2. * np.pi) * (m + rank) / 2.
pa = self.calculate_pa(rank, v)
return pu + pl + pv + pp - pa / 2. - rank * log1p(self.n_samples) / 2.
def calculate_pa(self, rank, v):
_spectrum = self.spectrum.copy()
_spectrum[rank:self.n_features] = v
i_spectrum = 1. / _spectrum
return sum(
log1p(
(self.spectrum[i] - self.spectrum[j]) * (i_spectrum[j] - i_spectrum[i])
) + log1p(self.n_samples)
for i in range(rank)
for j in range(i + 1, self.n_features)
)
def find_components(self, X):
self.n_samples = X.shape[0]
centered = X - np.mean(X, axis=0)
U, S, Vt = linalg.svd(centered, full_matrices=False)
self.spectrum = (S ** 2) / (self.n_samples - 1)
self.n_features = self.spectrum.shape[0]
n_components = self.infer_dimension()
if 0 < n_components < 1.0:
explained_variance_ratio = self.spectrum / self.spectrum.sum()
ratio_cumsum = self.stable_cumsum(explained_variance_ratio)
return np.searchsorted(ratio_cumsum, n_components, side='right') + 1
else:
return n_components
class Optimized(Cleaned):
def calculate_pa(self, rank, v):
_spectrum = self.spectrum.copy()
_spectrum[rank:self.n_features] = v
i_spectrum = 1. / _spectrum
number_of_terms = -0.5 * rank * (rank - (2 * len(self.spectrum)) + 1) # https://www.wolframalpha.com/input/?i=sum%28sum%281+for+j+from+i+%2B+1+to+s+-+1%29+for+i+from+0+to+r+-+1%29
constant_part = log1p(self.n_samples) * number_of_terms
# This commented-out block
return log(reduce(mul, (
((self.spectrum[i] - self.spectrum[j]) * (i_spectrum[j] - i_spectrum[i]))
for i in range(rank)
for j in range(i + 1, self.n_features)
))) + constant_part
#return sum(
# log(
# 1 + ((self.spectrum[i] - self.spectrum[j]) * (i_spectrum[j] - i_spectrum[i]))
# )
# for i in range(rank)
# for j in range(i + 1, self.n_features)
#) + constant_part
def make_test(samples, features, classes):
X, _ = make_multilabel_classification(n_samples=samples, n_features=features, n_classes=classes)
return (samples, features, classes, X, None)
def time_function(f, *args, post_process=lambda x: x):
start = time()
retval = f(*args)
end = time()
return post_process(retval), end - start
def print_row(*args):
strings = [f'{a:^12}' if isinstance(a, str) else (f'{a:>12}' if isinstance(a, int) else f'{a:>12.6}')
for a in args]
print('|'.join(strings))
def main():
candidates = {
"Original": Original(False).find_components,
"Cleaned": Cleaned(False).find_components,
"Optimized": Optimized(False).find_components,
}
tests = [
make_test(100, 50, 2),
make_test(1000, 500, 2),
]
print_row('samples', 'features', 'classes', *candidates.keys(), 'answer')
for samples, features, classes, data, known in tests:
results = [time_function(f, data.copy(), post_process=int)
for f in candidates.values()]
answers = [
answer
for answer in [known, *(r[0] for r in results)]
if answer is not None
]
assert 1 == len(set(answers)), f'1 != len({"; ".join(map(str, answers))})'
print_row(samples, features, classes, *(r[1] for r in results), results[0][0])
if __name__ == '__main__':
main()
Prints:
samples | features | classes | Original | Cleaned | Optimized | answer
100| 50| 2| 0.166063| 0.153894| 0.151417| 49
SO_254392.py:185: RuntimeWarning: overflow encountered in double_scalars
return log(reduce(mul, (
Traceback (most recent call last):
File "SO_254392.py", line 236, in <module>
main()
File "SO_254392.py", line 232, in main
assert 1 == len(set(answers)), f'1 != len({"; ".join(map(str, answers))})'
AssertionError: 1 != len(499; 499; 1)
or, if we cut out the failed optimization path:
samples | features | classes | Original | Cleaned | answer
100| 50| 2| 0.161394| 0.154552| 49
1000| 500| 2| 44.1136| 35.9878| 499