воскресенье, 30 июня 2019 г.

Машинное обучение через решающие деревья

Это краткий пересказ курса: Введение в Data Science и машинное обучение с дополнением 2 тем: градиентный бустинг и решающие деревья в Spark.

Предподготовка данных на Python

Первым этапом машинного обучения - предподготовка данных. Это приведение данные в удобный для обучения вид.
Основных требования к разметке данных для решающих деревьев два:
* отсутствие текстовых полей
* отсутствие пустых строк

Раньше мы уже рассматривали, как работать с данными в Spark через Dataframe и Rdd.
Python , в среде datascientist, является самым распространенным языком для работы с данными.
Рассмотрим основные приемы для обработки таблиц:

* Загрузка и фильтрация
#numpy и pandas - основные библиотеки для работы с данными
import pandas as pd
import numpy as np

#загрузка данных cvs в Pandas Dataframe
sp = pd.read_csv('StudentsPerformance.csv')

#размерности таблицы:
sp.shape #(1000, 8)

#пред просмотр нескольких строк таблицы:
sp.head()

#создание DataFrame из 2 хэш массивов:
pd.DataFrame({'col1': ser1, 'col2': ser2})

#обращение к столбцам и колонкам по их номерам:
sp.iloc[[0,3,10], [0,5,-1]]

#фильтрация dataframe
#каждое условие должно заключаться в скобки ()
sp[sp.gender == 'female']
sp.query("gender == 'female'") 

#создание новой колонки на основе другой
sp['total'] = df['col1'] + df['col2']

* Работа с датой и временем
#преобразование unix timestamp к типу даты
dfs["date"] = pd.to_datetime(dfs.timestamp, unit='s')

#типа даты объектный и содержит методы для извлечений частей: день недели, года и т.д.
dfs["day"] = dfs.date.dt.date

#если дата в ключе, то к ее составляющим можно обращаться без фильтрации
stock['2010'].shape  #только 2010 год
stock['2010-02':'2011-03'].shape #Диапазон дат

#разгруппировать дни на строки по 2ч (первая строка будет с данными, остальные пустые)
stock.resample('2h').asfreq()

#группировать до более крупного периода (аналог group by с aggregate = mean)
stock.resample('1w').mean()

* join и группировка
#join df1 с df2 по столбцу column
df_join = df1.merge(df2, how='outer', on='column')

#группировка по полю gender и получение среднего по столбцу "writing score"
sp.groupby("gender").aggregate({'writing score': 'mean'})

#distinct
user_date.user_id.nunique()

#удаление дубликатов по колонкам subset:
dfe.drop_duplicates(subset=['user_id','day'])

#получение списка значений timestamp в одной ячейке по сгруппированному значению user_id (аналог listagg в бд)
dfe.groupby('user_id').timestamp.apply(list).head()

#pivot нескольких значений в оной колонке на несколько колонок
#для каждого юзера сформируется N колонок action (где N число значений в ней), в каждую из них ляжет count(distinct step_id)
user_scores = df.pivot_table(index='user_id', columns='action', values='step_id', aggfunc='count', fill_value=0)

* Работа с плавающим окном данных (аналог window в бд)
#Плавающее окно размером = 3 предыдущие строки (min_periods - если строк меньше 3, то окно сжимается до 1)
stock.rolling(3, min_periods=1).mean()

#нарастающий итог; 3 - размер окна
stock.expanding(3).mean()

#угасающий нарастающий итог с коэф. = 0,7
stock.ewm(alpha=0.7).mean()

#Пример: сглаживание графика (среднее за 10 дней)
stock['Open'].rolling(10, min_periods=1).mean().plot()

#Сравнение текущей строки с предыдущей (Аналог Lag/Lead в бд) 
df.groupby('user_id').timestamp.apply(list).apply(np.diff).values

От себя добавлю, что с моей точки зрения, данные проще подготавливать через SQL, чем простыней кода в Python или Scala.

Решающие деревья

Решающее дерево представляет из себя дерево где в узлах находится вопрос, а в ребрах ответ на него.
Задача машинного обучения - подобрать такую последовательность вопросов, чтобы определить основную тенденцию и как можно быстрей дойти до однозначного ответа.

Пример дерева предсказания - выживет ли человек на Титанике: 1 уровень пол, 2 уровень - Возраст или класс: sklearn tree DecisionTreeClassifier

Список вопросов для обучения мы должны подготовить на этапе 1 - "Предподготовка данных". Также для каждой строки со списком признаков мы должны дать ответ.

Рассмотрим на примере небольшой выборки данных с классификацией Кошек и Собак по признакам 'Шерстист', 'Гавкает', 'Лазает по деревьям'.
Обучение происходит на выборке, в которой известен ответ.
 Шерстист  Гавкает  Лазает по деревьям      Вид
       1        1                   0  собачка
       1        1                   0  собачка
       1        1                   0  собачка
       1        1                   0  собачка
       1        0                   1    котик
       1        0                   1    котик
       1        0                   1    котик
       1        0                   1    котик
       1        1                   1    котик
       0        0                   1    котик

Обучение дерева используя python библиотеку sklearn:
#подключение библиотек
from sklearn import tree
import pandas as pd
import numpy as nm
import matplotlib.pyplot as plt
import seaborn as sns

#чтение таблицы
cat = pd.read_csv('https://stepik.org/media/attachments/course/4852/cats.csv')

#в переменной X передаем признаки с вопросами
X = cat[['Шерстист', 'Гавкает', 'Лазает по деревьям']]

#в y - ответ для обучения
y=cat['Вид']

#создаем объект - решающее дерево из библиотеки sklearn
clf = tree.DecisionTreeClassifier(criterion='entropy')

#производим обучение дерева на заготовленных вопросах и ответах:
clf.fit(X, y)

#посмотрим на получившееся дерево
tree.plot_tree(clf, feature_names=list(X),
     class_names=['Cat', 'Dog'],
     filled=True)
plt.show()
Визуализация дерева (видно, что для однозначное классификации достаточно признака "Лазает по деревьям"):
sklearn tree DecisionTreeClassifier
Если не влезать в детали реализации, то это все. Имея обученное дерево мы дальше можем его применять для предсказания данных без ответов.
Но рассмотрим все же алгоритм обучения подробней:

1. Определяем начальную неопределенность (энтропию ответов).
Может быть ответы уже распределены так, что ничего дальше делать не надо.
Формула энтропии:
Энтропия = - SUM{1 .. чиcло категорий в ответе} (вероятность * Log2( вероятность) )
#вероятность получить собачку в ответах
p0=len(cat[cat['Вид'] == 'собачка'])/len(cat)

#и котика
p1=len(cat[cat['Вид'] == 'котик'])/len(cat)

#энтропия ответов
EY = -p0*nm.log2(p0) + -p1*nm.log2(p1)

2. Определим вероятность каждого параметра по формуле:
( Кол-во строк, где признак = 0 и Ответ = Категория 1 ) / ( Кол-во строк, где признак = 0)
Вероятность для признака 'Шерстист' == 0
p01 = len(cat[(cat['Шерстист']==0) & (cat['Вид'] == 'собачка')]) / len(cat[(cat['Шерстист']==0)])
p02 = len(cat[(cat['Шерстист']==0) & (cat['Вид'] == 'котик')]) / len(cat[(cat['Шерстист']==0)])

Так повторяется для других признаков и значений в признаках

3. Для каждого значения признака определяется его мера неопределенности - энтропия.
Энтропия для признака 'Шерстист' со значением = 0
-(p01*nm.log2(p01) + p02*nm.log2(p02))

4. Определяем прирост информации (снижение неопределенности) при переходе с параметра в вершине на параметр в следующем шаге.
Для этого воспользуемся формулой:
IG = исходная энтропия (1) - энтропия по фиче N (2)
Энтропия по фиче N = (число значений левой ветви (0) / общее число наблюдений * Энтропия лев. ветви) + (число правой (1) / общее число наблюд. * энтропия правой)
#общее число наблюдений
N=len(cat)
#число значений левой ветви (0)
n0 = len(cat[(cat['Шерстист']==0)])   
#число правой (1)
n1 = len(cat[(cat['Шерстист']==1)])

#энтропия по левой ветви (где признак 'Шерстист' = 0)
e0 = -(p01*nm.log2(p01) + p02*nm.log2(p02))

#энтропия по правой ветви (где признак 'Шерстист' = 1)
e1 = -(p10*nm.log2(p10) + p20*nm.log2(p20))

#общая энтропия признака
e = e0 * n0/N + e1 * n1/N

#считаем прирост информации для этого признака:
IG = EY - e

Повторяем операцию для других признаков.

5. Определяется признак, по которому происходит наибольший прирост информации (снижение неопределенности)
По этой фиче происходит сплит дерева на 2 ветви.

6. На следующем этапе пункты 2-5 повторяются на новом наборе данных, т.к. список значений и ответов будет другим, т.к. он разделен на 2 части.
Повтор происходит пока неопределенность не упадет до 0 или мы не достигнем лимита числа повторений.

Полный расчет прироста информации на python
cat = pd.read_csv('https://stepik.org/media/attachments/course/4852/cats.csv')

#1 - энтропия ответов
p0=len(cat[cat['Вид'] == 'собачка'])/len(cat)
p1=len(cat[cat['Вид'] == 'котик'])/len(cat)
EY = -p0*nm.log2(p0) + -p1*nm.log2(p1)

#2-3 энтропия для значений внутри признака
def get_entropy(column):
    p0 = len(cat[(cat[column]==0)])
    p01 = len(cat[(cat[column]==0) & (cat['Вид'] == 'собачка')])/p0
    p02 = len(cat[(cat[column]==0) & (cat['Вид'] == 'котик')])/p0    
    p1 = len(cat[(cat[column]==1)])
    p11 = len(cat[(cat[column]==1) & (cat['Вид'] == 'собачка')])/p1
    p12 = len(cat[(cat[column]==1) & (cat['Вид'] == 'котик')])/p1
    
    return([-(p01*nm.log2(p01) + p02*nm.log2(p02)), -(p11*nm.log2(p11) + p12*nm.log2(p12))])
 

#4 - общая энтропия по признаку
def get_EYX(column):
    N=len(cat)
    n0 = len(cat[(cat[column]==0)])   
    n1 = len(cat[(cat[column]==1)])
    
    e = get_entropy(column)
    
    return nm.nan_to_num(e[0]) * n0/N + nm.nan_to_num(e[1]) * n1/N
   
#5 - вывод прироста информации по каждой из фич:   
for column in cat:
    print(column, nm.round(EY - get_EYX(column),2))
    
# Шерстист 0.08
# Гавкает 0.61
# Лазает по деревьям 0.97 # - наибольший прирост информации


Из описания этого алгоритма видно его основной недостаток - это жадный алгоритм, который ищет лучшее решение на одном уровне дерева.
Задача построения оптимального дерева является NP полной, т.е. жадность не гарантирует оптимальность дерева в целом.
Чтобы найти оптимальный вариант дерева, нужны перебрать все его варианты построения.

Также к минусам можно отнести переобучаемость дерева, когда рекурсия ветвления дерева ни как не может найти однозначное решение,
то в ход идут даже очень малозначимые признаки, которые в реальности не отражают общую тенденцию, а решают какие то случайные отклонения признака.

Далее рассмотрим варианты подбора параметров дерева, чтобы уменьшить второй минус - чрезмерное переобучение.

Тренировочная и тестовая выборка

Все последующие исследование будем проводить над реальным набором данных с сайта kaggle.com - список пассажиров Титаника с их характеристиками и ответом: умер или выжил

Загрузим и предобработаем датасет, чтобы на нем можно было строить решающее дерево:
titanic_data = pd.read_csv('train.csv')

#выберем переменные для обучения
X = titanic_data.drop(["PassengerId", "Survived", "Name", "Ticket", "Cabin"], axis=1)

#делает пивот строковых значений в числовые колонки
X = pd.get_dummies(X)

##заполним пропущенные значения - медианной
X = X.fillna({'Age': X.Age.median()})

#колонка с ответом: выжил или погиб
y = titanic_data.Survived

Одним из способов определения переобучилось ли дерево или нет, является проверка его на тестовой выборке, которую оно не видело.
Если % правильных решений на trainig выборке выше, чем на тестовой, то это однозначный признак того, что дерево ушло в решение частных проблем и не нашло глобальной зависимости, которая проявляется в тестовых и тренировочных данных одновременно.

Если у нас в наличие только тренировочный набор, то имеет смысл его вручную разделить на тренировочный и тестовый:
from sklearn.model_selection import train_test_split 

#разбиваем на тестовые и тренировочные признаки и ответы в соотношении 67 к 33.
#random_state - задает зерно случайности при выборке строк. Если его не задать, то при каждом запуске в тест и train будут попадать разные строки.
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state = 42)


Произведем обучение дерева на обучающей выборке и сравним потом с тестовой, которую дерево не видело:
#обучение дерева
clf.fit(X_train, y_train)

# число правильных результатов 97% на обучающей выборке
clf.score(X, y) 

# на тестовой выборке всего 78% правильных результатов
clf.score(X_test, y_test) 


Самым простым способом снижения переобучения - это уменьшение глубины дерева. Уменьшение числа вопросов физически не дает дереву уйти в частности.
Остается найти такую высоту дерева, при которой дерево давало бы наилучший резальутат на тестовой выборке, которую оно не видело.
Это можно сделать простым перебором:
from sklearn.model_selection import cross_val_score

#переменная, куда будем записывать высоту дерева и % правильных ответов на тестовой и тренировочной выборке
scores_data = pd.DataFrame()

#цикл по высоте дерева от 1 до 100
for mxd in range(1,100):
    #создаем и обучаем дерево с жестко заданой высотой
    clf = tree.DecisionTreeClassifier(criterion='entropy', max_depth = mxd)
    clf.fit(X_train, y_train)
    
    #тренировочные и тестовые скоры дерева
    tr_scr = clf.score(X_train, y_train)
    tst_scr = clf.score(X_test, y_test)
    
    #еще дополнительное разбиение и тестирование:
    #   * обучение происходит на 4 частях, а на 1 предсказание и тестирование
    #   ** и так по кругу 5 раз
    # в mean_cross_scr записываем среднее значение кроссвалидации
    mean_cross_scr = cross_val_score(clf, X_train, y_train, cv=5).mean()
    
    #помещаем замеры в массив
    tmp_scr_data = pd.DataFrame({'max_depth': [mxd], 'tr_scr': [tr_scr], 'tst_scr': [tst_scr], 'mean_cross_scr': [mean_cross_scr]})
    scores_data = scores_data.append(tmp_scr_data)
    
#определим глубину дерева, на которой переобучение минимально
#используем для этого значение кроссвалидации
scores_data[scores_data.mean_cross_scr == scores_data.mean_cross_scr.max()]

#визуализируем скор, в зависимости от глубины
plt.plot(scores_data['max_depth'], scores_data['train'], label='train')
plt.plot(scores_data['max_depth'], scores_data['test'], label='test')
plt.plot(scores_data['max_depth'], scores_data['mean_cross_score'], label='mean_cross_score')
plt.ylabel('score')
plt.xlabel('depth')
plt.legend()
plt.show()

sklearn tree DecisionTreeClassifier score
Еще параметры, которые можно настраивать у дерева:
* min_samples_split - минимальный размер сэмпла для сплита. Если в листе меньше этого числа наблюдений, то он не будет сплититься.
* min_samples_leaf - минимальное число элементов в листе. Если в дочерний лист попадет меньше этого числа элементов, то родитель не будет сплититься.
Эти параметры можно задавать, если дерево разбивается на слишком частных значениях (далеких от половины)

Дополнительные параметры оценки качества дерева

Кроме % правильных ответов, качество дерево можно оценивать более детально:
TP - true positive (правильное предсказание срабатывания)
FP - false positive (ложное срабатывание)
FN - false negative (должно было сработать, но нет)
Precission = TP / (TP + FP) --точность (нахождение ответа не переплачивая лишними срабатываниями) --важна, если мало ресурсов
Recall = TP / (TP + FN) -- полнота (сколько % от общего числа мы нашли) -- важно если много ресурсов
показатели независимы, выбирать нужный в зависимости от числа ресурсов

Комбинированная средняя метрика F = 2 * (precision * recall) / (precision + recall)

F меру не всегда надо доводить до максимума, к примеру:
инет магазин. Предсказали со 100% вероятностью, что человек купит салат. Человек купил и ушел из магазина.
Все равно нужно предложить колу с чипсами (высокомаржинальный товар), чтобы получить дополнительную прибыль.
Это можно как то внести в модель в виде параметра или не завышать F меру (в данном случае Recall - пусть будут лишние покупки)

Пример расчета этих показателей в python:
from sklearn.metrics import precision_score
predictions = clf.predict(X_test)
scores_data = pd.DataFrame({'predictions': predictions, 'Y': y_test})
tp = scores_data[scores_data.predictions == scores_data.Y].shape[0]
fp = scores_data[scores_data.predictions != scores_data.Y].shape[0]
precision = tp / (tp + fp)

#одной строкой:
precision = precision_score(y_test, predictions, average='micro') 


Другой пример, когда нам важно спасти с Титаника как можно больше людей, не смотря на затраты ресурсов, даже если вероятность выжить человека низка.
Отсечка для определения класса выжил/погиб = 0,5
Мы можем получить на выходе дерева не класс, а вероятность и отобрать то число людей, с такой вероятностью, которую мы сами захотим:
prediction_prob = best_clf.predict_proba(X_test)

#считаем выжившими людей, у которых вероятность выжить > 0.2 (вместо 0,5 по умолчанию)
y_pred = nm.where(prediction_prob[:, 1] > 0.2, 1, 0)


Автоматический подбор оптимального дерева

К счастью такие циклы не нужно писать вручную каждый раз, в sklearn есть встроенные функции по перебору параметром дерева по заранее заданному словарю:
from sklearn.model_selection import GridSearchCV
clf = tree.DecisionTreeClassifier()

#параметры, которые будем подбирать
#список параметров можно взять из описания объема clf
parameters = {'criterion': ['gini', 'entropy'], 'max_depth': range(1,20), 'min_samples_split': range(2,50, 2), 'min_samples_leaf': range(1,20)}

#cv - групп для кросс-валидации
grid_search_cv_clf = GridSearchCV(clf, parameters, cv=5, n_jobs=-1)

#обучение N деревьев, где N - число сочетаний parameters
grid_search_cv_clf.fit(X_train, y_train)

#просмотр параметров лучшего дерева
grid_search_cv_clf.best_params_ # {'criterion': 'gini', 'max_depth': 6, 'min_samples_leaf': 7, 'min_samples_split': 4}

#лучшее дерево
best_clf = grid_search_cv_clf.best_estimator_
best_clf.score(X_test, y_test)
Если параметров в переборе очень много, то можно немного ускорить:
* Задать n_jobs=-1 для GridSearchCV - это активирует многопоточный перебор параметров дерева. Число потоков = числу ядер.
* уменьшить число групп для кроссвалидации : cv=3
* Замен GridSearchCV использовать RandomizedSearchCV. Этот вариант перебирает не все варианты, а несколько рандомных. Удобно, если нужно быстро проверить предположение.

Случайный лес

Random Forest - создание нескольких деревьев, но каждому дереву выделена часть фичей (колонок) и часть данных (строк)
Всем деревья переданы одинаковые параметры обучения (глубина, размер сплита и прочее). Результатом является усредненная вероятность деревьев, отработавших по своей части фичей и строк.
Причем, чем больше деревьев - тем лучше.

Случайный лес решает сразу 2 проблемы обучения деревьев:
* множеств невысоких деревьев - решает проблему переобучения
* перебор порядка колонок - проблему неоптимальности жадного алгоритма, т.к. выбирает порядок фич почти случайно.

from sklearn.ensemble import RandomForestClassifier
clf_rf = RandomForestClassifier()

#n_estimators - число деревьев
parameters = {'n_estimators': [10, 20, 30], 'max_depth': [2,5,7,10]}
grid_search_cv_clf = GridSearchCV(clf_rf, parameters, cv=5)
grid_search_cv_clf.fit(X_train, y_train)

grid_search_cv_clf.best_params_ # {'max_depth': 5, 'n_estimators': 30}
best_clf = grid_search_cv_clf.best_estimator_
grid_search_cv_clf.best_params_
best_clf.score(X_test, y_test) #0.82

Посмотрим какие фичи дали максимальный прирост точности:
feature_importances_df = pd.DataFrame({'features': X_train.columns, 'feature_importances': best_clf.feature_importances_}).sort_values('feature_importances', ascending=False)
/*
    features  feature_importances
6    Sex_male             0.282525
5  Sex_female             0.221981
0      Pclass             0.150135
1         Age             0.113366
4        Fare             0.111061
2       SibSp             0.032356
3       Parch             0.030536
9  Embarked_S             0.027200
7  Embarked_C             0.021080
8  Embarked_Q             0.009759
*/

На этом курс машинного обучения на sptepik заканчивается. Сертификат о прохождении.
Но я рассмотрю еще 1 вариант построения дерева и альтернативную реализацию в Spark.

Градиентный бустинг

Каждое дерево должно уменьшать ошибку (компенсировать ошибку предыдущего шага). Для этого вводятся 2 связанных параметра: n_estimators и learning_rate.
Чем меньше learning_rate (шаг отрицательного прироста ошибки), тем больше n_estimators (небольших деревьев) нужно.
У деревьев задается небольшая высота, обычно 2 - 8.
An = An-1 + learning_rate*Bb

Ошибка = (предсказанный ответ - реальный ответ)**2
Направление ошибки (производная) = 2 * (предсказанный ответ - реальный ответ)
Если классификация [0-1], то лучше использовать сигмойдальную активационную функцию.
Описание активационных функций и градиентного спуска для уменьшения ошибки описано тут: Введение в нейронные сети

Преимущество перед random forest - что обучение направленное (не случайное), и хороший результат может быть достигнут на меньшем числе деревьев
from sklearn.ensemble import GradientBoostingClassifier
parameters = {'n_estimators': [10, 15, 20, 30], 'learning_rate': [0.1,0.5,1], 'max_depth': [3,5,7,10], 'min_samples_split': [2,5,7,10,15,20], 'min_samples_leaf': [1,2,5,7,10]}
clf_rf = GradientBoostingClassifier()
grid_search_cv_clf = GridSearchCV(clf_rf, parameters, cv=5, n_jobs=-1)
grid_search_cv_clf.fit(X_train, y_train)
best_clf = grid_search_cv_clf.best_estimator_
grid_search_cv_clf.best_params_ #{'learning_rate': 0.1, 'max_depth': 5, 'min_samples_leaf': 10, 'min_samples_split': 2, 'n_estimators': 15}


Деревья в SparkML

Python не самый быстрый язык, так что для для решающий деревьев есть множество альтернативных библиотек.
Одна из них встроена в экосистему Hadoop/Spark - Spark ML.

Подготовка данных на HiveQL:
create table titanic_train stored as orc as 
select 
cast(trim(Survived) as decimal(1,0)) Survived,
cast(trim(Pclass) as decimal(10,0)) Pclass,
case when trim(Sex) = 'male' then 1 else 0 end Sex_male, 
case when trim(Sex) = 'male' then 0 else 1 end Sex_female, 
coalesce( cast(trim(Age) as decimal(3,0)),28 ) Age,
cast(trim(SibSp) as decimal(10,0)) SibSp,
cast(trim(Parch) as decimal(10,0)) Parch, 
cast(trim(Fare) as decimal(12,4)) Fare,
case when trim(Embarked) = 'C' then 1 else 0 end as Embarked_C,
case when trim(Embarked) = 'Q' then 1 else 0 end as Embarked_Q,
case when trim(Embarked) = 'S' then 1 else 0 end as Embarked_S
from titanic_train__imp
;

Подключение библиотек, загрузка данных в Spark DataFrame и разделение на тестовые и тренировочные выборки:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, OneHotEncoderEstimator}  

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator  
import org.apache.spark.mllib.evaluation.MulticlassMetrics  
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics  
import org.apache.spark.ml.classification.{RandomForestClassifier,GBTClassifier}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit, CrossValidator, CrossValidatorModel}  
import org.apache.spark.ml.param.ParamMap

//загрузка
val titanic_data = spark.sqlContext.sql("select survived as label, pclass, sex_male, sex_female, age, sibsp, parch, fare,embarked_c, embarked_q, embarked_s from tmp.titanic_train")

//преобразование датафрейма в объект с ответом и вектором фич
val assembler = (new VectorAssembler()).setInputCols(Array("pclass", "sex_male", "sex_female", "age", "sibsp", "parch", "fare", "embarked_c", "embarked_q", "embarked_s")).setOutputCol("features")
val output = assembler.transform(titanic_data).select($"label",$"features")

//разделение на test и train
val splits = output.randomSplit(Array(0.7, 0.3), seed = 42)
val training = splits(0)
val test = splits(1)


Слученный лес с перебором нескольких вариантов:
val rf = new RandomForestClassifier()

val paramGrid = new ParamGridBuilder().
addGrid(rf.numTrees,Array(10,15, 20,30,40)).
addGrid(rf.maxDepth,Array(5,10,15)).
addGrid(rf.impurity,Array("Gini", "Entropy")).
addGrid(rf.minInstancesPerNode,Array(1,2,5,7,10)).
build()


Градиентный бустинг с перебором вариантов:
val rf = new GBTClassifier()

val paramGrid = new ParamGridBuilder().
addGrid(rf.maxIter,Array(50)).
addGrid(rf.stepSize,Array(0.1, 0.5)). //шаг градиента
addGrid(rf.maxDepth,Array(5,10)).
addGrid(rf.impurity,Array("Entropy")).
addGrid(rf.minInstancesPerNode,Array(1,2,5,7)).
build()

Обучение и кросс-валидация
val cv = new CrossValidator().
setEstimator(rf).
setEvaluator(new MulticlassClassificationEvaluator().setMetricName("weightedRecall")).
setEstimatorParamMaps(paramGrid).
setNumFolds(5).
setParallelism(16)

val model = cv.fit(training)


Предсказание и оценка моделей:
//функция для получения лучшей модели имплисится в CrossValidatorModel
implicit class BestParamMapCrossValidatorModel(cvModel: CrossValidatorModel) {
  def bestEstimatorParamMap: ParamMap = {
    cvModel.getEstimatorParamMaps
           .zip(cvModel.avgMetrics)
           .maxBy(_._2)
           ._1
  }
}

//лучшие параметры после перебора:
println(model.bestEstimatorParamMap)
/*
RandomForestClassifier
{
       rfc_cdc763061b6d-impurity: Entropy,
        rfc_cdc763061b6d-maxDepth: 10,
        rfc_cdc763061b6d-minInstancesPerNode: 1,
        rfc_cdc763061b6d-numTrees: 40
}

GradientBoostedTreesModel
{
        gbtc_5e85b054de97-impurity: Entropy,
        gbtc_5e85b054de97-maxDepth: 5,
        gbtc_5e85b054de97-maxIter: 50,
        gbtc_5e85b054de97-minInstancesPerNode: 7,
        gbtc_5e85b054de97-stepSize: 0.1
}
*/

//Рейтинги моделей
val results = model.bestModel.transform(test).select("label", "prediction")
val score = results.filter($"label" === $"prediction").count().toDouble / test.count().toDouble

/*
RandomForestClassifier = 0.8302583025830258
GradientBoostedTreesModel = 0.8118081180811808
*/


Предсказание закрытых данных на сохраненной модели:
val titanic_test = spark.sqlContext.sql("select PassengerId, pclass, sex_male, sex_female, age, sibsp, parch, coalesce(fare,35.627) fare,embarked_c, embarked_q, embarked_s from tmp.titanic_test order by PassengerId")
val assembler = (new VectorAssembler()).setInputCols(Array("pclass", "sex_male", "sex_female", "age", "sibsp", "parch", "fare", "embarked_c", "embarked_q", "embarked_s")).setOutputCol("features")
val output = assembler.transform(titanic_test).select($"features")
val results = model.bestModel.transform(output).select("prediction")

//джойним ответы с пассажирами (для этого нужно предварительно пронумеровать оба df)
val resultsrn = results.withColumn("rownum",monotonically_increasing_id )
val ids = titanic_test.select("PassengerId").withColumn("rownum",monotonically_increasing_id)
val res = ids.join(resultsrn, ids("rownum") === resultsrn("rownum"))

import org.apache.spark.sql.types.IntegerType
res.select($"PassengerId", $"prediction").withColumn("prediction2", res("prediction").cast(IntegerType)).
select($"PassengerId", $"prediction2").coalesce(1).write.csv("/usr/hdp/ml/predict.csv")



Как вывод, какое дерево лучше, приведу результаты сабмитов дата сета Титаника на kaggle
Python sklearn
* GridSearchCV - 0.77033
* RandomForestClassifier - 0.78468
* GradientBoostingClassifier - 0.79425

Spark ML
* RandomForestClassifier - 0.73684
* GradientBoostedTreesModel - 0.74162

2 комментария: