← back

Implement Isolation Forest for Anomaly Detection

#367 · Machine Learning · Medium

⊣ Solve on deep-ml.com

Problem

Implement the Isolation Forest algorithm for anomaly detection. Build an ensemble of random isolation trees, then score each data point based on how quickly it gets isolated (shorter paths indicate anomalies).

Solution

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import numpy as np

class IsolationTree:
    def __init__(self, max_depth):
        self.max_depth = max_depth
        self.split_feature = None
        self.split_value = None
        self.left = None
        self.right = None
        self.size = 0

    def fit(self, X, depth=0):
        self.size = len(X)
        if depth >= self.max_depth or len(X) <= 1:
            return
        feature = np.random.randint(0, X.shape[1])
        min_val, max_val = X[:, feature].min(), X[:, feature].max()
        if min_val == max_val:
            return
        split = np.random.uniform(min_val, max_val)
        self.split_feature = feature
        self.split_value = split
        left_mask = X[:, feature] < split
        self.left = IsolationTree(self.max_depth)
        self.right = IsolationTree(self.max_depth)
        self.left.fit(X[left_mask], depth + 1)
        self.right.fit(X[~left_mask], depth + 1)

    def path_length(self, x):
        if self.split_feature is None:
            return _c(self.size)
        if x[self.split_feature] < self.split_value:
            return 1 + self.left.path_length(x)
        return 1 + self.right.path_length(x)

def _c(n):
    if n <= 1:
        return 0
    return 2 * (np.log(n - 1) + 0.5772156649) - 2 * (n - 1) / n

def isolation_forest(X: np.ndarray, n_trees: int = 100, sample_size: int = 256) -> np.ndarray:
    n = X.shape[0]
    max_depth = int(np.ceil(np.log2(sample_size)))
    trees = []
    for _ in range(n_trees):
        idx = np.random.choice(n, min(sample_size, n), replace=False)
        tree = IsolationTree(max_depth)
        tree.fit(X[idx])
        trees.append(tree)

    scores = np.zeros(n)
    for i in range(n):
        avg_path = np.mean([t.path_length(X[i]) for t in trees])
        scores[i] = 2 ** (-avg_path / _c(sample_size))
    return scores

Explanation

  1. Build an ensemble of isolation trees, each trained on a random subsample.
  2. Each tree recursively partitions data by picking a random feature and random split value. Anomalies are isolated in fewer splits.
  3. For scoring, compute the average path length across all trees. The anomaly score is 2^(-E[h(x)] / c(n)) where c(n) is the average path length in a BST.
  4. Scores near 1 indicate anomalies; scores near 0.5 indicate normal points.

Complexity

  • Time: O(T s log(s)) for building trees + O(T n log(s)) for scoring
  • Space: O(T * s) for tree storage