Azure Databricksの特徴量ストア機能を使ってみた

こんばんは。最近、Databricksの特徴量ストア機能を触ってみたので、分かったことなどを備忘メモで纏めておきたいと思います。

それではまいります。

Contents

Azure Databricksの特徴量ストア機能とは

特徴量ストアとは – Azure Databricks | Microsoft Learn

  • 機械学習のモデル構築や予測で利用する特徴量をデータサイエンティストが検索して共有できるように(=再利用できるように)する一元化されたリポジトリ
  • これによって、チーム間で車輪の再発明(同じ特徴量を必要とする複数のチームが、それぞれ独立して同じ開発を行ってしまうなど)を防ぐことに役立つ
  • Databricksでは、特徴量ストアの実体はDeltaテーブルで実装されている
  • ので、既存のDelta Tableを特徴量ストアとして利用することもできる
  • Unity Catalogを利用している場合は、その恩恵(アクセス制御等のセキュリティ、データ系列、タグ付け、ワークスペース間共有)もうけることができる
  • オンラインストア、オフラインストアの2つのタイプに分類でき、予測のワークロード(バッチ or リアルタイム)に応じて使い分けることができる
  • オンラインストアは、リアルタイム推論で求められる低遅延の要件を満たすために利用されるオプション。現在、Azure CosmosDB, Azure MySQL, Azure SQL Databaseがオンラインストアとして利用できる(CosmosDBが一番できることが多い)

なるほど。公式ドキュメントを読んで思った疑問は以下。

Unity Catalogに普通にDelta Tableを作るのと、特徴量ストアとして作成するのは何が違う?

>これは明確な議論は見つけられなかったので推測ですが、おそらく特徴量ストアとして保存した場合は、特徴量ストアのAPIが持つ豊富な特徴量操作関連の機能(例えば、特徴量ストアテーブル間の結合構文の簡略化、等)を利用できる点がメリットになるのではないかと思います。

ので、Databricksで特徴量エンジニアリング後の特徴量情報を保存する際には、基本特徴量ストアとして保持しておけば良いのかなと思いました。

オンラインストアってオフラインストアと比べてそんなに低遅延なの?

特徴量ストアとは – Azure Databricks | Microsoft Learn

オフラインストアを利用する場合は、予測時にもDatabricksの特徴量ストアから特徴量データを取得して予測を行います。

一方でオンラインストアを利用する場合は、あらかじめDatabricks特徴量ストアのデータをCosmosDBなどのオフラインストアに”公開”する作業を実施しておき、予測時、アプリケーションはオンラインストアから特徴量データを取得する流れになります。

推測ですが、いずれもアプリケーションからみるとリモートのストア(Databricksのストレージなのか、CosmosDBなどなのかは違うけど)から特徴量を取得することになると思うので、データ転送のレイテンシでは差がないけど、データストア上で必要な特徴量をクエリする際のパフォーマンスで差が出てくるのかなと。これはすなわち、DatabricksのPhotonエンジンと、オンラインストアのデータベースエンジンの性能差になるのかな・・?Photonエンジンでのクエリもだいぶ高速な気はするけど・・・

ここは検証したような記事はこの記事を書いている時点では見つけられなかったので、また時間をみつけて検証してみようと思います。

とりあえずは、より低遅延が求められる場合はオンラインストアを利用する、くらいで覚えておこうと思います。

実際に特徴量ストア機能を使ってみた

以下の公式ドキュメントで紹介されているサンプルノートブックを参考に、この機能を試してみました。(今回はオフラインストアを)

ポイントインタイム サポートで時系列特徴テーブルを使用する – Azure Databricks | Microsoft Learn

feature-store-time-series-example – Databricks (microsoft.com)

Databricksのノートブックで、MLクラスターを使って以下のコードを書いていきます。

1、ダミー特徴量データを生成

IoTのシナリオで、デバイスセンサーが収集する時系列データを再現します。

import pandas as pd
import numpy as np
from pyspark.sql.functions import *
 
wavelength_lo, wavelength_hi = 209.291, 213.111
ppm_lo, ppm_hi = 35, 623.99
temp_lo, temp_hi = 15.01, 25.99
humidity_lo, humidity_hi = 35.16, 43.07
 
def is_person_in_the_room(wavelength, ppm, temp, humidity):
  return (
    (wavelength < (wavelength_lo + (wavelength_hi - wavelength_lo) * .45)) &
    (ppm > (.9 * ppm_hi)) &
    (temp > ((temp_hi + temp_lo) / 2)) &
    (humidity > (humidity_hi * .6))
  )
 
def generate_dataset(start, end):
  def generate_sensor_df(features):
    return pd.DataFrame({
      'room': np.random.choice(3, end-start),
      'ts': start + np.random.choice(end-start, end-start, replace=False) + np.random.uniform(-0.99, 0.99, end-start),
      **features
    }).sort_values(by=['ts'])    
  
  wavelength_df = generate_sensor_df({
    'wavelength': np.random.normal(np.mean([wavelength_lo, wavelength_hi]), 2, end-start),
  })
  temp_df = generate_sensor_df({
    'temp': np.random.normal(np.mean([temp_lo, temp_hi]), 4, end-start),
    'humidity': np.random.normal(np.mean([humidity_lo, humidity_hi]), 2, end-start), 
  })
  
  ppm_bern = np.random.binomial(1, 0.3, end-start)
  ppm_normal_1 = np.random.normal(ppm_lo, 8, end-start)
  ppm_normal_2 = np.random.normal(ppm_hi, 3, end-start)
  ppm_df = generate_sensor_df({
    'ppm': ppm_bern*ppm_normal_1+(1-ppm_bern)*ppm_normal_2
  })
  
  df = pd.DataFrame({
    'room': np.random.choice(3, end-start),    
    'ts': np.random.uniform(start, end, end-start)
  }).sort_values(by=['ts'])
  for right_df in [wavelength_df, ppm_df, temp_df]:
    df = pd.merge_asof(
      df, 
      right_df, 
      on='ts', 
      by='room'
    )
  df['person'] = is_person_in_the_room(df['wavelength'], df['ppm'], df['temp'], df['humidity'])
  
  wavelength_df['wavelength'] += np.random.uniform(-1, 1, end-start) * 0.2
  ppm_df['ppm'] += np.random.uniform(-1, 1, end-start) * 2
  temp_df['temp'] += np.random.uniform(-1, 1, end-start) 
  temp_df['humidity'] += np.random.uniform(-1, 1, end-start)
  
  light_sensors = spark.createDataFrame(wavelength_df) \
    .withColumn("ts", col("ts").cast('timestamp')) \
    .select(col("room").alias("r"), col("ts").alias("light_ts"), col("wavelength"))
  temp_sensors = spark.createDataFrame(temp_df) \
    .withColumn("ts", col("ts").cast('timestamp')) \
    .select("room", "ts", "temp", "humidity")
  co2_sensors = spark.createDataFrame(ppm_df) \
    .withColumn("ts", col("ts").cast('timestamp')) \
    .select(col("room").alias("r"), col("ts").alias("co2_ts"), col("ppm"))
  ground_truth = spark.createDataFrame(df[['room', 'ts', 'person']]) \
    .withColumn("ts", col("ts").cast('timestamp'))  
 
  return temp_sensors, light_sensors, co2_sensors, ground_truth  
 
temp_sensors, light_sensors, co2_sensors, ground_truth = generate_dataset(1458031648, 1458089824)
fixed_temps = temp_sensors.select("room", "ts", "temp").sample(False, 0.01).withColumn("temp", temp_sensors.temp + 0.25)

ダミー特徴量データ1:温度センサーデータ

ダミー特徴量データ2:光センサーデータ

ダミー特徴量データ3:CO2濃度センサーデータ

目的変数ダミーデータ:その部屋に人がいたかどうか

2,Unity Catalog上のDeltaテーブル(特徴量ストア)にダミー特徴量データを保存

特徴量ストアの機能を使うには、FeatureStoreClientライブラリを使ってクライアントを生成します。

from databricks.feature_store.client import FeatureStoreClient
from databricks.feature_store.entities.feature_lookup import FeatureLookup
 
fs = FeatureStoreClient()

その上で、特徴量ストア用のUnity Catalogのカタログ、スキーマを作成し、

%sql
USE CATALOG ml;
CREATE SCHEMA IoTAnalysis;

先ほど作成したダミーデータを特徴量ストア用のテーブルに保存します。

# Creates a time-series feature table for the temperature sensor data using the room as a primary key and the time as the timestamp key.
fs.create_table(
    "ml.IoTAnalysis.temp_sensors",
    primary_keys=["room", "ts"],
    timestamp_keys=["ts"],
    df=temp_sensors,
    description="Readings from temperature and humidity sensors",
)
 
# Creates a time-series feature table for the light sensor data using the room as a primary key and the time as the timestamp key.
 
# For Databricks Runtime 13.2 for Machine Learning or above:
fs.create_table(
    "ml.IoTAnalysis.light_sensors",
    primary_keys=["r", "light_ts"],
    timestamp_keys=["light_ts"],
    df=light_sensors,
    description="Readings from light sensors",
)
 
# Creates a time-series feature table for the CO2 sensor data using the room as a primary key and the time as the timestamp key. 
 
# For Databricks Runtime 13.2 for Machine Learning or above:
fs.create_table(
    "ml.IoTAnalysis.co2_sensors",
    primary_keys=["r", "co2_ts"],
    timestamp_keys=["co2_ts"],
    df=co2_sensors,
    description="Readings from CO2 sensors",
)

3,特徴量データの更新

特徴量ストアのデータは例えば以下のようにして更新することもできます。

更新用データ

temp_ft = fs.read_table("ml.IoTAnalysis.temp_sensors").drop('temp')
temp_update_df = fixed_temps.join(temp_ft, ["room", "ts"])

display(temp_ft)
display(temp_update_df)

write_tableのmergeモードを使って、更新できるようです。

fs.write_table("ml.IoTAnalysis.temp_sensors", temp_update_df, mode="merge")

4,特徴量ストアのデータを利用して学習用データセットの作成

続いて学習用データセットの作成です。以下のようにFeatureLookup関数を使うことで、複数の特量ストアテーブルのデータからキーに一致した特徴量を自動的に集めることができます。

# Split Train & Test Data
training_labels, test_labels = ground_truth.randomSplit([0.75, 0.25])
 
# Create point-in-time feature lookups that define the features for the training set. Each point-in-time lookup must include a `lookup_key` and `timestamp_lookup_key`.
feature_lookups = [
    FeatureLookup(
        table_name="ml.IoTAnalysis.temp_sensors",
        feature_names=["temp", "humidity"],
        rename_outputs={
          "temp": "room_temperature",
          "humidity": "room_humidity"
        },
        lookup_key="room",
        timestamp_lookup_key="ts"
    ),
    FeatureLookup(
        table_name="ml.IoTAnalysis.light_sensors",
        feature_names=["wavelength"],
        rename_outputs={"wavelength": "room_light"},
        lookup_key="room",
        timestamp_lookup_key="ts",      
    ),
    FeatureLookup(
        table_name="ml.IoTAnalysis.co2_sensors",
        feature_names=["ppm"],
        rename_outputs={"ppm": "room_co2"},
        lookup_key="room",
        timestamp_lookup_key="ts",      
    ),  
]
 
training_set = fs.create_training_set(
    training_labels,
    feature_lookups=feature_lookups,
    exclude_columns=["room", "ts"],
    label="person",
)
training_df = training_set.load_df()

display(training_df)

5,機械学習モデルの構築

下の例では、LightGBMを利用してモデルを構築しています。

features_and_label = training_df.columns
training_data = training_df.toPandas()[features_and_label]
 
X_train = training_data.drop(["person"], axis=1)
y_train = training_data.person.astype(int)
 
import lightgbm as lgb
import mlflow.lightgbm
from mlflow.models.signature import infer_signature
 
mlflow.lightgbm.autolog()
 
model = lgb.train(
  {"num_leaves": 32, "objective": "binary"}, 
  lgb.Dataset(X_train, label=y_train.values),
  5
)

構築したモデルは、Feature Storeクライアントを介してモデルレジストリに登録しています。(この方法は今後非推奨になって、モデルもUnity Catalogに登録するようになるはず)

# Register the model in Model Registry.
# When you use `log_model`, the model is packaged with feature metadata so it automatically looks up features from Feature Store at inference.
model_name = "iotdemo_model"

fs.log_model(
  model,
  artifact_path="model_packaged",
  flavor=mlflow.lightgbm,
  training_set=training_set,
  registered_model_name=model_name
)

6,構築したモデルの評価(スコア付け)

モデルを評価するためには、まずモデルレジストリ(or Unity Catalog)に登録した最新バージョンのモデルを取得してきて、

from mlflow.tracking import MlflowClient
def get_latest_model_version(model_name):
    latest_version = 1
    mlflow_client = MlflowClient()
    for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
      version_int = int(mv.version)
      if version_int > latest_version:
        latest_version = version_int
    return latest_version

検証用データに対してスコアリングを行います。このとき、score_batch関数にモデルと、検証用データを渡すと対応する特徴量(特徴量ストアの一意キーで)を特徴量ストアから取得してきて、あわせてスコアリングを行ってくれるようです。

Databricks FeatureStoreClient — FeatureStore 0.15.1 documentation

scored = fs.score_batch(
  f"models:/{model_name}/{get_latest_model_version(model_name)}",
  test_labels,
  result_type="float",
)

scoredの中身はこんな感じ。検証用ラベル(person)とそれに対応する特徴量(room_temperature~room_co2)、およびモデルによる予測結果(prediction)が含まれています。

この手順は以下ドキュメントでも解説されています。

Databricks Feature Store を使用してモデルをトレーニングする – Azure Databricks | Microsoft Learn

最後にPredictionのスコアをtrue/falseで判定しなおして、精度(Accuracy)を出して評価しています。

from pyspark.sql.types import BooleanType
 
classify_udf = udf(lambda pred: pred > 0.5, BooleanType())
class_scored = scored.withColumn("person_prediction", classify_udf(scored.prediction))
 
display(class_scored.limit(5))
from pyspark.sql.functions import avg, round
display(class_scored.select(round(avg((class_scored.person_prediction == class_scored.person).cast("int")), 3).alias("accuracy")))

以上、Databricksの特徴量ストアを試してみたメモでした。まだまだこの例では特徴量ストアのすべての価値を引き出せていない気はしますが、とりあえず特徴量データセットの生成がいくぶんか簡単になることは分かりました。公式ドキュメントには他のシナリオのサンプルノートブックも多く用意されているようなので、もっと理解を深めるためにそれらも試してみたい。

少しでも参考になれば幸いです。

おしまい

この記事を気に入っていただけたらシェアをお願いします!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

ABOUT US
Yuu113
初めまして。ゆうたろうと申します。 兵庫県出身、東京でシステムエンジニアをしております。現在は主にデータ分析、機械学習を活用してビジネスモデリングに取り組んでいます。 日々学んだことや経験したことを整理していきたいと思い、ブログを始めました。旅行、カメラ、IT技術、江戸文化が大好きですので、これらについても記事にしていきたいと思っています。