実装しながら学ぶランダムフォレスト

実践

本記事では、決定木を基礎とした ランダムフォレスト (Random Forest) の仕組みと、その実装方法について詳しく解説します。
決定木の基本概念から、ランダムフォレストがどのように複数の決定木を組み合わせることで高い汎化性能を実現しているのか説明していきます。また、実装コード(カスタムの決定木およびランダムフォレストの実装)も合わせて提示するので、実際に動作を確認しながら学習を進めることができます。


ランダムフォレストとは?

ランダムフォレストは、多数の決定木を組み合わせたアンサンブル学習手法です。
各決定木は、元のデータセットからブートストラップサンプル(重複を許すランダムサンプリング)を用いて学習され、さらに各ノードでランダムな特徴選択を行います。
この仕組みにより、各木の相関が低減され、最終的な多数決により高い汎化性能が実現されます。

コード例: ブートストラップサンプルの生成

def _bootstrap_sample(self, X, y):
n_samples = X.shape[0]
indices = np.random.choice(n_samples, size=n_samples, replace=True)
return X[indices], y[indices]

数式で理解するランダムフォレスト

ブートストラップサンプル

各決定木は、元のデータセット D からサイズ N のブートストラップサンプル Di を生成して学習します。
このプロセスにより、各木は異なるデータ分布に基づいて学習され、過学習のリスクが軽減されます。

ランダムな特徴選択

各決定木のノードでは、全特徴量からランダムに一部(例: ‘sqrt’ や ‘log2’ に基づく個数)の特徴量を選び、その中から最適な分割基準を見つけ出します。

コード例: ランダムな特徴選択の決定

def _get_max_features(self, n_features):
if isinstance(self.max_features, int):
return self.max_features
elif self.max_features == 'sqrt':
return int(np.sqrt(n_features))
elif self.max_features == 'log2':
return int(np.log2(n_features))
else:
return n_features # 全特徴量を利用

多数決

最終的な予測は、各決定木の予測結果を多数決によって統合することで行われます。\ もし各木の予測が y^(1),y^(2),,y^(K) ならば、最終予測は y^=mode(y^(1),y^(2),,y^(K)) と決定されます。

コード例: 多数決による予測

def predict(self, X):
tree_preds = np.array([tree.predict(X) for tree in self.trees])
tree_preds = np.swapaxes(tree_preds, 0, 1)
y_pred = [Counter(tree_pred).most_common(1)[0][0] for tree_pred in tree_preds]
return np.array(y_pred)

カスタム決定木の実装

決定木の実装では、各ノードの構造(Node クラス)と、再帰的に木を成長させるための DecisionTreeClassifier クラスを定義しています。
情報利得(Gini不純度の減少)を基に最適な分割を見つけ、葉ノードでは最頻値(多数決)を返す仕組みとなっています。

コード例: Node クラスの定義

class Node:
def __init__(self, feature_index=None, threshold=None, left=None, right=None, *, value=None):
self.feature_index = feature_index
self.threshold = threshold
self.left = left
self.right = right
self.value = value

def is_leaf_node(self):
return self.value is not None

コード例: 決定木の成長と最適分割

def _grow_tree(self, X, y, depth=0):
n_samples, n_features = X.shape
n_labels = len(np.unique(y))

if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
leaf_value = self._most_common_label(y)
return Node(value=leaf_value)

feature_idxs = np.random.choice(n_features, self.n_features, replace=False)
best_feat, best_thresh = self._best_split(X, y, feature_idxs)
if best_feat is None:
leaf_value = self._most_common_label(y)
return Node(value=leaf_value)

left_idxs = X[:, best_feat] <= best_thresh
right_idxs = X[:, best_feat] > best_thresh

left = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1)
right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1)

return Node(feature_index=best_feat, threshold=best_thresh, left=left, right=right)

カスタムランダムフォレストの実装

ランダムフォレストでは、上記の決定木を多数生成し、それぞれにブートストラップサンプルを適用して学習させます。
また、各木でランダムな特徴選択を行い、多数決により最終的な予測を行うことで、個々の木の誤差を補完しています。

コード例: ランダムフォレストの fit メソッド

def fit(self, X, y):
self.trees = []
n_features = X.shape[1]
max_features = self._get_max_features(n_features)

for _ in range(self.n_estimators):
X_sample, y_sample = self._bootstrap_sample(X, y)
tree = DecisionTreeClassifier(max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
n_features=max_features)
tree.fit(X_sample, y_sample)
self.trees.append(tree)

5. 完成版コード

以下に、カスタムの決定木とランダムフォレストの実装コードをまとめています。

from collections import Counter
import numpy as np

# ---------------------------
# Custom Decision Tree Classifier
# ---------------------------
class Node:
def __init__(self, feature_index=None, threshold=None, left=None, right=None, *, value=None):
# Index of the feature used for the split
self.feature_index = feature_index
# Threshold value for the split
self.threshold = threshold
# Left and right child nodes
self.left = left
self.right = right
# Predicted class if the node is a leaf
self.value = value

def is_leaf_node(self):
return self.value is not None

class DecisionTreeClassifier:
def __init__(self, min_samples_split=2, max_depth=100, n_features=None):
# Minimum number of samples required to split a node
self.min_samples_split = min_samples_split
# Maximum depth of the tree
self.max_depth = max_depth
# Number of features to consider for each split; if None, use all features
self.n_features = n_features
self.root = None

def fit(self, X, y):
self.n_classes_ = len(set(y))
self.n_features_ = X.shape[1]
if self.n_features is None:
self.n_features = self.n_features_
self.root = self._grow_tree(X, y)

def _grow_tree(self, X, y, depth=0):
n_samples, n_features = X.shape
n_labels = len(np.unique(y))

# Stopping conditions: max depth reached, pure node, or insufficient samples
if (depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split):
leaf_value = self._most_common_label(y)
return Node(value=leaf_value)

# Randomly select features for splitting
feature_idxs = np.random.choice(n_features, self.n_features, replace=False)

# Find the best split among the selected features
best_feat, best_thresh = self._best_split(X, y, feature_idxs)
if best_feat is None:
leaf_value = self._most_common_label(y)
return Node(value=leaf_value)

# Determine indices for left and right splits
left_idxs = X[:, best_feat] <= best_thresh
right_idxs = X[:, best_feat] > best_thresh

# Check for empty splits and create leaf nodes if necessary
if np.sum(left_idxs) > 0:
left = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1)
else:
left = Node(value=self._most_common_label(y))

if np.sum(right_idxs) > 0:
right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1)
else:
right = Node(value=self._most_common_label(y))

return Node(feature_index=best_feat, threshold=best_thresh, left=left, right=right)

def _best_split(self, X, y, feature_idxs):
best_gain = -1
split_idx, split_thresh = None, None

for feat_idx in feature_idxs:
X_column = X[:, feat_idx]
thresholds = np.unique(X_column)
for threshold in thresholds:
gain = self._information_gain(y, X_column, threshold)
if gain > best_gain:
best_gain = gain
split_idx = feat_idx
split_thresh = threshold

return split_idx, split_thresh

def _information_gain(self, y, X_column, split_thresh):
parent_gini = self._gini(y)

left_idxs = X_column <= split_thresh
right_idxs = X_column > split_thresh

# Avoid a split that doesn't divide the dataset
if len(y[left_idxs]) == 0 or len(y[right_idxs]) == 0:
return 0

n = len(y)
n_left, n_right = len(y[left_idxs]), len(y[right_idxs])
gini_left, gini_right = self._gini(y[left_idxs]), self._gini(y[right_idxs])
weighted_gini = (n_left / n) * gini_left + (n_right / n) * gini_right

gain = parent_gini - weighted_gini
return gain

def _gini(self, y):
m = len(y)
return 1.0 - sum((np.sum(y == c) / m) ** 2 for c in np.unique(y))

def _most_common_label(self, y):
# Return majority label; if y is empty, return None
if len(y) == 0:
return None
counter = Counter(y)
most_common = counter.most_common(1)[0][0]
return most_common

def predict(self, X):
return np.array([self._traverse_tree(x, self.root) for x in X])

def _traverse_tree(self, x, node):
# If the branch is missing, return None
if node is None:
return None
if node.is_leaf_node():
return node.value
if x[node.feature_index] <= node.threshold:
return self._traverse_tree(x, node.left)
return self._traverse_tree(x, node.right)

# ---------------------------
# Custom Random Forest Classifier
# ---------------------------
class RandomForestClassifier:
def __init__(self, n_estimators=10, max_features='sqrt', max_depth=None, min_samples_split=2):
# Number of trees in the forest
self.n_estimators = n_estimators
# Number of features to consider at each split: 'sqrt', 'log2', or an integer
self.max_features = max_features
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.trees = []

def _bootstrap_sample(self, X, y):
n_samples = X.shape[0]
indices = np.random.choice(n_samples, size=n_samples, replace=True)
return X[indices], y[indices]

def _get_max_features(self, n_features):
if isinstance(self.max_features, int):
return self.max_features
elif self.max_features == 'sqrt':
return int(np.sqrt(n_features))
elif self.max_features == 'log2':
return int(np.log2(n_features))
else:
return n_features # Use all features

def fit(self, X, y):
self.trees = []
n_features = X.shape[1]
max_features = self._get_max_features(n_features)

# Build each tree using a bootstrap sample and random feature selection
for _ in range(self.n_estimators):
X_sample, y_sample = self._bootstrap_sample(X, y)
tree = DecisionTreeClassifier(max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
n_features=max_features)
tree.fit(X_sample, y_sample)
self.trees.append(tree)

def predict(self, X):
# Collect predictions from each tree
tree_preds = np.array([tree.predict(X) for tree in self.trees])
# Transpose to shape (n_samples, n_estimators)
tree_preds = np.swapaxes(tree_preds, 0, 1)
# Majority vote for each sample
y_pred = [Counter(tree_pred).most_common(1)[0][0] for tree_pred in tree_preds]
return np.array(y_pred)

6. まとめ

本記事では、ランダムフォレストの基本概念とその実装方法について解説しました。
決定木の各ノードで情報利得やGini不純度を用いて分割を行い、ブートストラップサンプルとランダムな特徴選択を通じて複数の決定木を構築することで、安定した予測を実現するランダムフォレストの仕組みを理解していただけたと思います。

この実装は、学習目的で作成したカスタムコードですが、さらなる改善や最適化の余地があるため、ぜひフィードバックをお寄せください。
この解説が、ランダムフォレストの理解の参考になれば幸いです。

コメント

タイトルとURLをコピーしました