Apache Sparkの構造化ストリーミングとはどういうものかを理解する

こんばんは。最近Databricksを勉強する過程でSparkの技術に触れる機会が増えてきました。

今日は、その中の一つ、構造化ストリーミングという技術について色々と調べたので、わかったことをメモしておこうと思います。

なお、最近この手の技術の理解のとっかかりにChatGPTを活用していますので、この記事もそのやり取りを大いに引用していきます笑

ただ、ChatGPTは間違ったこともそれっぽく答えてくるので、勉強しはじめには、それが本当に正しいのか、判断がつかないことも多々あります笑(この記事を書いている時点では、まさにその状態なので、誤りをそのまま掲載している可能性もあります) この点は、なるべく原典となる情報を紹介してもらうようにするか、ない場合でも、今後勉強を深めていくうちにわかっていくだろう & その時にChatGPTの説明と食い違っているところがあれば、それも相まってより記憶に残る知識になるだろうと、まぁ楽観的に考えています笑

さて、前置きが長くなりましたが、本題に入ります。

*以下の文章はほとんどがChatGPTによって生成された文章です。

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Contents

構造化ストリーミングの概要

Apache Sparkの構造化ストリーミング(Structured Streaming)は、Spark SQLエンジン上に構築された、リアルタイムデータ処理エンジンだよ。構造化ストリーミングのすごいところは、バッチ処理とストリーミング処理を統一したAPIを提供している点。つまり、バッチで書いたコードを少し修正するだけでストリーミングデータにも対応できるんだ。

主な特徴

  • 容易なスケーリングと回復力: 自動でスケーリングしてくれるし、ノードがダウンしても自動的に回復する。
  • イベントタイム処理: データが生成された時間に基づいて処理ができる。これは、ネットワーク遅延などでデータが遅れた場合でも正確に処理が行えるってこと。
  • Exactly-once 処理: データを一度だけ正確に処理する機能。これは、データの重複や損失を防ぐために重要。

要するに、構造化ストリーミングはリアルタイムデータ処理の難しさを抽象化してくれて、開発者が簡単に扱えるようにしているんだ。バッチ処理とほとんど同じようなコードでストリーミング処理ができるので、かなり便利だよ。

なぜ「構造化」とついているのか

“構造化”という名前がついてるのは、データにスキーマ(つまり、構造)を持っているからだよ。

・普通のストリーミング

  • データ: 生のバイトストリームとして扱われることが多い。
  • 柔軟性: スキーマがないため、非常に柔軟。
  • 処理: スキーマがないため、データを理解して処理するのは手間がかかる場合がある。

・構造化ストリーミング

  • データ: 行と列で構成されたテーブル形式(データフレーム)。
  • 柔軟性: スキーマがあるため、型の安全性やクエリの最適化が容易。
  • 処理: SQLライクなクエリで簡単にデータ処理ができる。

構造化ストリーミングでは、普通のバッチ処理で行われるような高度なデータ操作が、ストリーミングデータに対しても可能だ。SQLでアグリゲーションしたり、ウィンドウ関数を使ったりとかね。

要するに、”構造化”ってのはこのような便利な機能をもたらしてくれるわけさ。

(以下私の補足)

上記は、公式ガイドに図付きで解説がありますね。

データの入力があると、内部的にはそれに対する処理の結果をテーブルの形で構造化して保持し、

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

出力シンクとして設定しているデータソースに(規定では)差分を書き出す、という動作をするようですね。

そうすると、出力シンクに出力される前のResult Tableは、処理によっては長い間データを保持し続けないといけなくて、メモリなどが多く消費されてしまうことへの配慮も必要だったりするのかなという疑問が。

(ここからまたChatGPTによる解説)

いい疑問だね。実際には、構造化ストリーミングは全てのデータをメモリに保持しているわけではないんだ。

  1. ステートレスなオペレーション: 例えば、mapやfilterといったステートレスな変換は、データが到着したらすぐに処理して出力するだけ。これにはメモリにデータを保持する必要がない。
  2. ウィンドウ付きオペレーション: 一定時間や一定量のデータに対してのみ計算を行う場合(ウィンドウ)もある。この場合、そのウィンドウのデータだけがメモリに保持される。
  3. ステートフルオペレーション: 一部の計算(例:ランキング、累積和)ではステート(状態)を保持する必要があるが、そのステートも一定量にトリミングされるか、ディスクに保存される。
  4. ウォーターマーク: 古いデータをどれだけ保持するかを制御するための概念もある。ウォーターマークが指定された時間を過ぎたデータは削除される。
  5. チェックポイント: さらに、チェックポイント機能を使ってステートをディスクに定期的に保存することも可能。
  6. 出力: そして最終的には、計算結果はシンク(出力先)に送られる。この時点でメモリからは削除される。

だから、無限にメモリが溢れるという心配は基本的にはないんだよ。ちゃんと設計されているからね。

構造化ストリーミング機能の進化の歴史

余談ですが、構造化ストリーミングはApache 2.x系から登場したとあったので、Apache Spark全体の機能追加の歴史と、構造化ストリーミングのアップデートについても整理しておきます。

(ChatGPTにお願いすると、これもパッとマトリクスにまとめてくれて、超便利・・・)

— 以下、ChatGPTによるコメント —

Apache Sparkの歴史を少しまとめてみるね。ただし、情報は2022年1月時点のもので、それ以降のアップデートは反映されてないから注意してね。

バージョン発表時期主な機能アップデート
Spark 1.02014年5月初の公開バージョン。RDD(Resilient Distributed Dataset)導入
Spark 1.12014年9月パフォーマンス改善、APIの改善
Spark 1.22014年12月Python APIの強化、クラスターメトリクスの追加
Spark 1.32015年3月DataFramesの導入
Spark 1.42015年6月SparkRの導入、Machine Learning Pipelinesの導入
Spark 1.52015年9月DataFramesのパフォーマンス改善、新しいデータソースAPI
Spark 1.62016年1月パフォーマンスの大幅な改善、データフレームAPIの強化
Spark 2.02016年7月構造化ストリーミングの導入、DataSet APIの導入
Spark 2.12017年2月ストリーミングの改善、新しいMLアルゴリズム
Spark 2.22017年7月グラフ処理の強化、PythonとRのAPI改善
Spark 2.32018年2月Kubernetesサポート、Pandas UDFサポート
Spark 2.42018年11月Barrier Execution Modeの導入、Avroサポート
Spark 3.02020年6月Adaptive Query Execution(AQE)、新しいUI、Python 3.xサポート
Spark 3.12021年2月ANSI SQLサポートの改善、パフォーマンスの最適化

このテーブルには大きな変更点や新機能だけをピックアップしてるから、より詳細な情報は公式のリリースノートでチェックしてみてね。

次に構造化ストリーミングの進化に焦点を当ててみるよ。ただ、この情報も2022年1月時点のものだから、それ以降のアップデートは入ってないことに注意してね。

ョン発表時期主な機能アップデートと進化
Spark 2.02016年7月構造化ストリーミングの初版リリース。基本的なストリーミング処理が可能。
Spark 2.12017年2月ウィンドウ関数、イベントタイム、ウォーターマークがサポートされる。
Spark 2.22017年7月フォールトトレランスが強化され、コンパクトなファイルシンクのサポート。
Spark 2.32018年2月コンティニュアスモードが追加。低レイテンシのストリーミングが可能に。
Spark 2.42018年11月Kafkaのサポート強化、Pythonのサポート改善。
Spark 3.02020年6月パフォーマンス改善、新たなソースとシンクのサポート、AQEのサポート。
Spark 3.12021年2月セキュリティ強化、より多くのデータソースへの対応。

構造化ストリーミングは、Spark 2.xで登場してから結構な進化を遂げているよ。機能が追加されるたびに、より柔軟で高機能なストリーミング処理が可能になってきているんだ。もちろん、これは大まかなまとめだから、もっと詳しくは公式ドキュメントをチェックしてみてね。

構造化ストリーミングの技術的なポイント

構造化ストリーミングの内部動作はかなり面白いよ。ざっくりと主要なポイントを挙げてみるね。

マイクロバッチとコンティニュアス処理

デフォルトでは「マイクロバッチ」処理が行われる。このモードでは、ストリーミングデータを小さなバッチに分けて、それぞれを個別に処理することで、エンドツーエンドで100ミリ秒以下のレイテンシを実現。ただし、Spark2.3以降は、「コンティニュアス処理」モードも登場し、これによってほぼリアルタイムでの処理も可能で、こちらは1ミリ秒以下のレイテンシを実現可能。

イベントタイムとウォーターマーク

構造化ストリーミングは「イベントタイム」に基づいた処理ができる。ウォーターマークという概念を使って、遅延データに対応したり、状態を効率的に管理したりする。

ステートフルオペレーション

集計やジョインといったステートフルなオペレーションもサポートしている。内部的にはステートを管理するための状態ストアが用いられていて、これが高度な処理を可能にしている。

出力とエクザクトリー・ワンス保証

処理結果は多様なデータシンク(Kafka, Parquet, Delta Lakeなど)に出力できる。また、内部的にはチェックポイントとWrite-Ahead Logs(WAL)を使って、「エクザクトリー・ワンス」の処理を保証している。

フォールトトレランス

ノード障害が発生した場合でも、チェックポイントとWALのおかげで、データの一貫性を保ちつつ処理を再開することができる。

このように、構造化ストリーミングは内部的にかなり高度な技術を使って、使いやすさ、パフォーマンス、信頼性を両立しているんだ。ちなみに、これらの多くはApache Sparkのバッチ処理エンジンに由来するものだよ。

マイクロバッチとコンティニュアス処理

簡単に言うと、マイクロバッチは「小分けにして処理」、コンティニュアスは「来たらすぐ処理」って感じ。マイクロバッチは設定や運用が比較的簡単だけど、レイテンシーが少し高くなる可能性がある。一方で、コンティニュアスモードは低レイテンシーが要求される状況で便利だけど、設定や管理が少し複雑になる可能性があるんだ。


マイクロバッチコンティニュアス
動作小バッチで順次処理レコードごとにリアルタイム処理
データの扱いバッチ単位で処理レコード単位で処理
レイテンシー秒単位(設定に依存)ミリ秒単位
リソース使用不連続(ジョブごと)連続的
容易さ比較的簡単少し難しい場合も

イベントタイムとウォーターマーク

イベントタイム

  • 動作: データレコードが生成された時刻に基づいて処理する。つまり、システムがデータを受信した時刻(処理時刻)ではなく、実際にイベントが発生した時刻で分析する。
  • 実現できること: タイムウィンドウに基づいた集計や分析が可能。例えば、特定の時間帯に発生したイベントを集計するとか、過去5分間の平均値を計算するといったことができる。

ウォーターマーク

  • 動作: 「遅延データをどれくらい許容するか」を設定する概念。ウォーターマークが設定された時間よりも前のイベントタイムを持つデータは、集計や処理から除外される可能性がある。
  • 実現できること: 遅延データの影響を制限しながら、リアルタイムでの処理を続けることができる。これにより、システムは遅延データによる影響を受けずに、より効率的にリソースを使って処理できる。

例えば、ウォーターマークを5分に設定した場合、現在時刻が12:05なら、12:00以前のイベントタイムを持つデータは無視される可能性があるんだ。これで、リアルタイム処理がスムーズに行えるわけ。

また、実装例で説明するとより理解が深まるよね。PythonでのSparkの構造化ストリーミングを例に取ってみよう。

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col

spark = SparkSession.builder \
    .appName("EventTimeExample") \
    .getOrCreate()

# ソースはKafkaと仮定
streamingInputDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "event_topic") \
    .load()

# timestampカラムをイベントタイムとして使う
streamingInputDF = streamingInputDF \
    .selectExpr("CAST(value AS STRING)", "timestamp") \
    .withWatermark("timestamp", "10 seconds")

# 10秒ごとのウィンドウで集計
windowedCounts = streamingInputDF \
    .groupBy(window(col("timestamp"), "10 seconds")) \
    .count()

query = windowedCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

上の例にもウォーターマークを含めているけど、ウォーターマークを設定すると、その時間より古いイベントデータは考慮から外れる。この例では10秒に設定している。

.withWatermark("timestamp", "10 seconds")

この設定で、タイムスタンプが10秒以上古いデータは、新しいウィンドウの集計には含まれなくなるんだ。

実装はかなりシンプルに書いてるから、実際のプロダクション環境ではもっと細かい設定が必要かもしれないけど、基本的なアイディアはこんな感じ。これでイベントタイムとウォーターマークの実装についての理解が少しは深まったかな?

ステートフルオペレーション

  • 動作: ストリームの各イベントが状態(ステート)を持つことができる。つまり、前後のイベントとの関連性を持って処理ができる。
  • 実現できること: セッションのように連続するイベントのまとまりを処理したり、イベントごとのランキングや累積和、平均などを計算するのが可能。
  • コード例:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("StatefulOperationsExample") \
    .getOrCreate()

# ソースはKafkaと仮定
streamingInputDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "event_topic") \
    .load()

streamingInputDF = streamingInputDF.selectExpr("CAST(value AS STRING)")

# ステートフルオペレーションをするための設定
def update_sum(new_values, running_sum):
    if running_sum is None:
        running_sum = 0
    return sum(new_values, running_sum)  

# DStreamを使ってステートを更新
streamingCountsDF = streamingInputDF \
    .map(lambda x: ("key", int(x[0]))) \
    .updateStateByKey(update_sum)

query = streamingCountsDF \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

この例では、Kafkaからイベントを読み込み、各イベントに”key”というキーをつけて累積和を計算しているんだ。updateStateByKey関数がステートフルオペレーションの鍵となる部分で、ここで新しい値と既存のステート(この場合は累積和)を更新している。

ステートフルオペレーションを使うと、これまでのイベントに依存した複雑な処理も可能になるんだ。どうだろ、これでステートフルオペレーションについてちょっとはクリアになったかな?

フォールトトレランス

  • 動作: システムの一部が故障しても、全体の動作に致命的な影響を与えないように設計されている。
  • 実現できること: システム全体がダウンすることなく、故障した部分を修復したり、別のリソースに切り替えることができる。
  • コード例:

Sparkの構造化ストリーミングでは、フォールトトレランスは大体以下のような方法で確保されている。

  1. チェックポイント: ストリーミングの状態やメタデータを定期的にディスクに保存する。
  2. WAL (Write-Ahead Logs): 受信したデータをすぐにディスクに書き込む。
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("FaultToleranceExample") \
    .getOrCreate()

# チェックポイントとWALの設定
spark.conf.set("spark.sql.streaming.checkpointLocation", "/path/to/checkpoint/dir")
spark.conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

streamingInputDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:

Exactly Once

  • 動作: 同じメッセージやイベントが一回だけ処理されるように設計されている。つまり、何らかの障害やリトライが発生しても、二重に処理されることはない。
  • 実現できること: データの一貫性を高める。たとえば、金額の計算や在庫のアップデートなどで、二重に処理されると困るような場合に便利。
  • コード例:

以下は、Kafkaをシンクとして使い、エグザクトリーワンスを確保するサンプルコード。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExactlyOnceExample") \
    .getOrCreate()

streamingInputDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input_topic") \
    .load()

# 何らかの変換処理
transformedDF = streamingInputDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# エグザクトリーワンスを確保するためにKafkaに書き出す
query = transformedDF \
    .writeStream \
    .outputMode("append") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("checkpointLocation", "/path/to/checkpoint/dir") \
    .option("topic", "output_topic") \
    .start()

query.awaitTermination()

このコードでは、checkpointLocationを指定してチェックポイントを設定している。これによって、ジョブが何らかの理由で再起動した場合でも、以前の状態から安全に処理を再開できる。

まとめ

最後に、構造化ストリーミングの機能がビジネスにもたらす価値についてもまとめてもらった(ほんとなんでもできるよChatGPT・・・)

構造化ストリーミングがビジネスにもたらす価値はかなり大きいよ。ざっくり言うと、以下のようなポイントが挙げられるね。

  1. リアルタイム分析: データをバッチ処理するのではなく、リアルタイムで処理と分析ができるから、即時のビジネスインサイトが手に入る。
  2. スケーラビリティ: Apache Sparkの基盤上で動いているから、データ量が増えても安心。スケーリングも自由自在。
  3. データ品質: “Exactly Once”、”At Least Once”、”At Most Once”といった処理セマンティクスを選べるから、データの品質を犠牲にすることなく処理ができる。
  4. 開発効率: SQLライクなインターフェースを持っているので、開発者が手早く、かつ効率的にアプリケーションを作れる。
  5. 多様なデータソースとシンク: Kafka, AWS S3, Azure Blob Storageなど、様々なデータソースとシンクに対応しているから、既存のインフラにも柔軟に組み込める。
  6. 運用コスト: フォールトトレランスと自動リカバリ機能があるから、システム全体の運用コストが削減できる。
  7. 統合エコシステム: 構造化ストリーミングはSparkのエコシステム(例えば、Spark SQL, MLLib等)ともシームレスに統合できる。これで一貫したデータ処理と分析が可能。

要するに、リアルタイムのデータ処理が必要なビジネスケースが増えている今、構造化ストリーミングはそのニーズを高いレベルで満たしてくれる強力なツールってわけさ。

ChatGPTとの対話を通して、大いに勉強になりました。

私の対話の記録が少しでも参考になりましたら幸いです。

おしまい

この記事を気に入っていただけたらシェアをお願いします!

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です

ABOUT US
Yuu113
初めまして。ゆうたろうと申します。 兵庫県出身、東京でシステムエンジニアをしております。現在は主にデータ分析、機械学習を活用してビジネスモデリングに取り組んでいます。 日々学んだことや経験したことを整理していきたいと思い、ブログを始めました。旅行、カメラ、IT技術、江戸文化が大好きですので、これらについても記事にしていきたいと思っています。