Optimizing Code
In this post, I will discuss optimizing python code for numeric math on the example of an asymmetric least squares baseline correction algorithm. Before we start out, it is important to note, that not every piece of code needs to be perfectly optimized. The goal is not to be as fast as possible, but as fast as necessary.
In the previous post about baseline correction (asymmetric least squares) we recreated an algorithm from (Baek et al.) to automatically remove baselines from images. The most important part of that algorithm was solving the following equation for $\mathbf{z}$:
$$ ( \mathbf{W} + \lambda \mathbf{D}^T\mathbf{D}) \mathbf{z} = \mathbf{Wy}$$In this equation, $\mathbf{W}$ is a diagonal matrix containing weights for the fit of the baseline, $\mathbf{D}$ performs the first derivative, it looks like this:
$$\mathbf{D} = \begin{bmatrix} 0&0&0&0&\dots&0&0&0\\ 1&-1&0&0&\dots&0&0&0\\ 0&1&-1&0&\dots&0&0&0\\ \vdots&&&&\ddots&&&\vdots\\ 0&0&0&0&\dots&1&-1&0\\ 0&0&0&0&\dots&0&1&-1\\ \end{bmatrix}$$The remaining variables in the equation are $\lambda$, a scalar, and $\mathbf{z}$ and $\mathbf{y}$, two vectors containing the baseline and the input spectrum, respectively.
The code for performing that algorithm looked as follows:
import numpy as np
from numpy.linalg import solve
def apply_fit(y, w, lam):
"""
apply smooth least squares fit to signal y
y... signal, 1D vector
w... weights [0,1], length of y
lam... smoothing factor, >>0
"""
# create difference matrix
D = np.zeros((len(y), len(y)))
diag_row, diag_col = np.diag_indices(len(y)-1)
diag_row = diag_row + 1
D[diag_row, diag_col] = 1
D[diag_row, diag_col+1] = -1
# create weight matrix
W = np.diag(w, 0)
# create matrix A for least squares
A = (W + lam * D.T @ D)
z = solve(A, W @ y)
return z
def logistic(d, m, sigma):
return 1 / (1 + np.exp(2*(d-(2*sigma-m))/sigma))
def arPLS(y, lam, N, ratio):
w_old = 0
w = np.ones(len(y))
for n in range(N):
z = apply_fit(signal, w, lam)
diff = (y-z)
diff_neg = diff[diff < 0]
diff_pos = diff[diff >= 0]
w_old = w.copy()
w[y>=z] = logistic(diff_pos, np.mean(diff_neg), np.std(diff_neg))
w[y<z] = 1
if np.linalg.norm(w_old -w)/np.linalg.norm(w_old) < ratio:
#print ("stopped at n = {}".format(n))
break
return z, w_old
In order to optimize each individual part of the algorithm on its own, we will quickly rewrite it as a class:
class arPLS(object):
def __init__(self):
pass
def apply_fit(self, y, w, lam):
"""
apply smooth least squares fit to signal y
y... signal, 1D vector
w... weights [0,1], length of y
lam... smoothing factor, >>0
"""
# create difference matrix
D = np.zeros((len(y), len(y)))
diag_row, diag_col = np.diag_indices(len(y)-1)
diag_row = diag_row + 1
D[diag_row, diag_col] = 1
D[diag_row, diag_col+1] = -1
# create weight matrix
W = np.diag(w, 0)
# create matrix A for least squares
A = (W + lam * D.T @ D)
z = solve(A, W @ y)
return z
def logistic(self, d, m, sigma):
return 1 / (1 + np.exp(2*(d-(2*sigma-m))/sigma))
def __call__(self, y, lam, N, ratio):
w_old = 0
w = np.ones(len(y))
for n in range(N):
z = self.apply_fit(signal, w, lam)
diff = (y-z)
diff_neg = diff[diff < 0]
diff_pos = diff[diff >= 0]
w_old = w.copy()
w[y>=z] = self.logistic(diff_pos, np.mean(diff_neg), np.std(diff_neg))
w[y<z] = 1
if np.linalg.norm(w_old -w)/np.linalg.norm(w_old) < ratio:
#print ("stopped at n = {}".format(n))
break
return z, w_old
General guidelines for optimizing numpy
code¶
But before we get to work on these functions, let's consider a few basic things about optimizing numpy
python code.
Vectorize, vectorize, vectorize¶
I am using numpy
to perform numeric math in python. numpy
is a library that provides data types for arrays (vectors, matrices, tensors,...) and provides N that wraps The main thing to keep in mind when using numpy
is that in order to be fast, operations should be vectorized as much as possible in order to make them fast. Here is an example:
def set_values_loop(a, value):
for idx in range(len(a)):
a[idx] = value
a = np.zeros(100000)
%timeit set_values_loop(a, 1)
def set_values_vectorized(a, value):
a[:] = value
a = np.zeros(100000)
%timeit set_values_vectorized(a, 1)
The vectorized code performs orders of magnitude faster than the one running in a loop.
The takeaway: loops bad, vectorize good
Looking back at our baseline function, there are several lines, where we already use vecorized code instead of loops. For example:
diag_row, diag_col = np.diag_indices(len(y)-2)
diag_row = diag_row + 1
D[diag_row, diag_col] = 1
D[diag_row, diag_col+1] = -2
D[diag_row, diag_col+2] = 1
could well be part of a loop that looks like this:
for diag_row in range(1,len(y)-1):
D[diag_row, diag_row-1] = 1
D[diag_row, diag_row] = -2
D[diag_row, diag_row+1] = 1
Similarly
diff_neg = diff[diff < 0]
diff_pos = diff[diff >= 0]
might have been performed in a combination of for
loop and if
. Instead, here we used logical indexing. Which returns all elements from the numpy
array, where the expression in the square brackets evaluates to true
. Logical indexing can also be used to assign values, like we do in the following lines:
w[y>=z] = logistic(diff_pos, np.mean(diff_neg), np.std(diff_neg))
w[y<z] = 1
optimize algorithms¶
Matrix operations, even creating matrices can potentially take very long. Hence, it is a good idea to only perform them when absolutely necessary.
def stupid_allocate(N=1024):
for idx in range(10):
A = np.sum(np.diag(np.ones(N))@(idx*np.arange(N)))
return A
print ("stupid_allocate")
%timeit stupid_allocate(10000)
def better_allocate(N=1024):
ones = np.diag(np.ones(N))
for idx in range(10):
A = np.sum(ones@(idx*np.arange(N)))
return A
print ("better_allocate")
%timeit better_allocate(10000)
Of course, taking a step back to figure out if there is a more efficient way to do things is also a good idea. Since $A$ can also be calculated as
idx * N*(N-1)/2
, we can save a lot of time:
def clever_allocate(N=1024):
for idx in range(10):
A = idx * N *(N-1)/2
return A
print ("clever_allocate")
%timeit clever_allocate(10000)
In the baseline algorithm above, we re-create the matrix D
every time we call apply_fit
, however, the matrix only depends on the size of y
, hence we actually only need to create the matrix once. Since we rewrote the algorithm as a class, we have to change just the parts that create or use D
, or rather D.T @ D
, since D
itself is never actually used.
class arPLS_D(arPLS):
def __init__(self, len_y):
super().__init__()
self.DTD = self.create_DTD(len_y)
def create_DTD(self, len_y):
D = np.zeros((len_y, len_y))
diag_row, diag_col = np.diag_indices(len_y-1)
diag_row = diag_row + 1
D[diag_row, diag_col] = 1
D[diag_row, diag_col+1] = -1
return D.T @ D
def apply_fit(self, y, w, lam):
DTD = self.DTD
# create weight matrix
W = np.diag(w, 0)
# create matrix A for least squares
A = (W + lam * DTD)
z = solve(A, W @ y)
return z
Let's compare the improvements we have gained so far, be reusing the example from last time:
from numpy.random import randn
import matplotlib.pyplot as plt
%matplotlib notebook
wn = np.linspace(0,1000, 1000)
baseline = np.cos(2*np.pi*wn/500)
feature = np.exp(-(wn-500)**2/20)
noise = 0.1*randn(len(wn))
signal = baseline + feature + noise
%timeit arPLS()(signal, 100, 100, .0001)
z, w_old = arPLS()(signal, 100, 100, .0001)
plt.figure()
plt.plot(wn, signal, label="signal")
plt.plot(wn, z, label="arPLS baseline")
plt.legend()
%timeit arPLS_D(len(signal))(signal, 100, 100, .00001)
z, w_old = arPLS_D(len(signal))(signal, 100, 100, .00001)
plt.figure()
plt.plot(wn, signal, label="signal")
plt.plot(wn, z, label="arPLS baseline, calculate D once")
plt.legend()
If we take another look at D
, there is one more improvement we can make. D
is mostly full of zeros, with non-zero values concentrated around the diagonal. The technical term for this property is sparse. We need to tell numpy
which elements of the matrix are non-zero. This is what the scipy.sparse
module is for. It contains a range of functions to create different kinds of sparse matrices as well as functions that perform linear algebra on them.
from scipy.sparse import diags as sp_diags
from scipy.sparse.linalg import spsolve
class arPLS_D_sparse(arPLS_D):
def __init__(self, len_y):
super().__init__(len_y)
self.DTD = self.create_DTD(len_y)
def create_DTD(self, len_y):
diag_0 = np.hstack([0, -1*np.ones(len_y-1)])
diag_m1 = np.hstack([np.ones(len_y-1)])
D = sp_diags([diag_m1, diag_0], offsets=[-1,0])
return D.T @ D
def apply_fit(self, y, w, lam):
DTD = self.DTD
# create weight matrix
W = sp_diags(w, 0)
# create matrix A for least squares
A = (W + lam * DTD)
z = spsolve(A, W @ y)
return z
%timeit arPLS_D_sparse(len(signal))(signal, 100, 100, .00001)
z, w_old = arPLS_D_sparse(len(signal))(signal, 100, 100, .00001)
plt.figure()
plt.plot(wn, signal, label="signal")
plt.plot(wn, z, label="arPLS baseline, calculate D once, sparse")
plt.legend()
Now, this last step significantly improves the computation time. However, using spare matrices is not a cure-all. If a matrix is not actually sparse, then declaring it as sparse will actually slow the computation down:
from timeit import timeit
N_total = 2000
iterations = 10
result_sparse = []
result_full = []
from tqdm import tqdm
for N_diags in range(0,N_total//4,50):
all_diags = [np.random.randn(N_total - abs(idx)) for idx in range(-N_diags, N_diags+1)]
D = sp_diags(all_diags, list(range(-N_diags, N_diags+1)))
time_global = {"D":D}
result_sparse\
.append(timeit('D.T@D', globals=time_global, number=iterations)/iterations)
time_global = {"D":D.toarray()}
result_full\
.append(timeit('D.T@D', globals=time_global, number=iterations)/iterations)
plt.figure()
plt.plot(np.arange(len(result_full))*10+1, result_full, label="matrix multiplication (full)")
plt.plot(np.arange(len(result_sparse))*10+1, result_sparse, label="matrix multiplication (sparse)")
plt.legend()
plt.xlabel("filled diagonals / 1")
plt.ylabel("time per multuplication / s");
And that brings us to the most important guideline for optimization:
Test, test, test Don't just hope that what you are doing improves the speed of your code, but verify that empirically.
I have been using the %timeit
Jupyter magic for most of this post, which is a great way to quickly verify runtime. There is also the python timeit
module, which gives a bit more control over the execution.
Bibliography¶
Baek, S.-J. et al., Baseline correction using asymmetrically reweighted penalized least squares smoothing. The Analyst, 140(1), pp.250–257. Available at: http://xlink.rsc.org/?DOI=C4AN01061B.