ストリーム処理でKafka TopicのKeyを扱う in KSQL with デモスクリプト
はじめに
Kafkaをベースとしたストリーム処理では、Kafka Topicに流れるイベントを取り込み、処理後に別のTopicに書き込む事を繋げる事によりパイプラインを構築します。ksqlDBの様なストリーム処理基盤はこの処理を開発者から隠蔽化し、SQLを用いてそのロジックのみに注力できるよう補助してくれます。
一方、この隠蔽化によってksqlDBが行なっている内部処理の多くは開発者にはタッチする事が出来ず、出来た場合でも直感的に扱えない事も多くあります。Kafka TopicのKeyもその一つです。
今回はksqlDBにおけるStreamの概念と、Topic Keyを扱う方法について説明します。
StreamとTable
ksqlDBのストリーム処理では、Topicと紐付けるデータモデルを定義しそのモデルに対してクエリを実行する形でデータにアクセスします。具体的にはStream
とTable
で:
- STREAM - Topicを流れるイベントを、時系列を維持したデータの流れとして体現。ステートレス。
- TABLE - Topicを流れるイベントからKey単位でデータの最新状態をマテリアライズ。ステートフル。
同じKafka上で扱うデータですが、モデルによってksqlDBにおける扱いも、そしてそれを支える内部の仕組みも異なります。1 ただksqlDBのクエリ上はどちらもCREATE
構文を利用してSQL同様の手順で作成します。例としてStreamの定義のサンプルは以下の様になります。
CREATE STREAM Clickstream (
IP VARCHAR,
USERID INT,
REMOTE_USER VARCHAR,
TIME VARCHAR,
_TIME INT,
REQUEST VARCHAR,
STATUS VARCHAR,
BYTES VARCHAR,
REFERRER VARCHAR,
AGENT VARCHAR
) WITH (
KAFKA_TOPIC='datagen-topic', VALUE_FORMAT='JSON');
ここではdatagen-topic
というJSONでシリアライズされたTopicに対してClickstream
という名前のStreamデータモデルを定義しています。
CSASとCTAS
Topicをモデル化したStreamやTableを定義したとして、今度はそのモデルに対して処理をする必要性が出てきます。
一般的なデータフロープログラミングのモデルではプログラム内で処理をチェイニングした結果を別Topicに出力しますが、SQLにはそのような構文モデルはありません。一方リレーショナルDBではストアドプロシージャ等を利用しますが、DB毎にその仕様は異なります。ksqlDBではSQLに近い構文でありながら、かつより汎用的な方法でデータの加工処理とストアを結び付ける必要があります。
ksqlDBではもっと汎用的なSELECT
とCREATE STREAM
を組み合わせる事により処理とストアを結びつけます。具体的にはCREATE STREAM AS SELECT
構文を利用します。
CREATE STREAM EventsWithoutKey
WITH (KAFKA_TOPIC='404events', VALUE_FORMAT='JSON')
AS SELECT
IP,
USERID,
_TIME TIME_IN_INT,
STATUS,
BYTES
FROM Clickstream
WHERE STATUS = '404'
EMIT CHANGES;
ここでは前述したClickstream
というStreamから必要なカラムを指定してSELECTした結果を新たなStreamとして定義する処理です。さらにWHERE句を利用してフィルタリングした結果のみ抽出し、かつ新たに404events
というTopicに対して出力しています。物理的にはdatagen-topic
にあるデータが変換/加工され404events
という新たなTopicに登録されています。
当然この構文はksqlDBでのストリーム処理には頻出構文であり、CSAS (CREATE STREAM AS SELECT
)、CTAS (CREATE TABLE AS SELECT
)と呼ばれます。
Keyとデータモデル
CREATE STREAM
とCREATE TABLE
ではTopicのKeyの扱い方が異なります。
以前のバージョンではStreamでもTableでもROWKEY
という名称でTopicのKeyに紐づくフィールドをモデルに追加します。このROWKEY
は物理的にはTopicのKeyでありながら、Valueを扱うksqlDBから参照出来るという、他のフィールドとは扱いも振る舞いも異なります。また、コミュニティではこの勝手に追加されるROWKEY
というフィールドの扱い方に関して多くの混乱を招きました。ストリーム処理をSQLで処理をする上では直感的では無いという判断でした。
このKeyの扱い方はksqlDB 0.10で大きく変わりました。これは一律ではなくStreamとTableで定義を変える事で:
- CREATE TABLE - 明示的にKeyを指定する必要があり、それには
PRIMARY KEY
と指定するフィールドが必要。 - CREATE STREAM - キーをそもそも指定しない。
このアプローチはTABLEの構文としても自然であり、かつSTREAMを扱う際にはKeyが存在すらしないという潔いものです。結果としてこの変更と思想が以降のksqlDBのコミュニティに広がりました。
StreamとKey
それでもStreamでKeyを扱いたいというのが本エントリの主旨です。
実際にStreamでKeyを指定する必要性がある場合は存在し、具体的にはJOINの際にはKey指定したものしかJOIN出来ません。この為StreamでもKeyを指定する事は可能になっています。
具体的にはフィールドにKey
と指定する事により、そのフィールドをValueの一部ではなくKeyとして扱います。先程のCREATE STREAM
を例に取ると:
CREATE STREAM ClickstreamWithKey (
IP VARCHAR Key,
USERID INT,
REMOTE_USER VARCHAR,
TIME VARCHAR,
_TIME INT,
REQUEST VARCHAR,
STATUS VARCHAR,
BYTES VARCHAR,
REFERRER VARCHAR,
AGENT VARCHAR
) WITH (
KAFKA_TOPIC='datagen-topic', VALUE_FORMAT='JSON');
同じTopicを参照していますが、ここで生成されるStreamにはレコードKeyにIP
が指定され、と言うよりValueからKeyに移動します。
この振る舞いを実際に確認するにはCSAS
で再定義したものに新たなTopicを割り当て、その結果を比較する必要があります。これら2つのStreamに以下のCSASを適用すると:
CREATE STREAM TransformedEvents
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON')
AS SELECT
IP,
USERID,
_TIME TIME_IN_INT,
STATUS,
BYTES
FROM Clickstream -- Keyがある方はClickstreamWithKeyと指定
EMIT CHANGES;
通常のCREATE STREAM
で生成した場合、Topicにはとなり、Key
を指定したStreamに対するCSAS
の結果はとなります。IPがTopicのKeyに移っている事が確認できます。
KeyをValueの中にも持つ
先述した通り、StreamにおいてKeyを扱う事は混乱を招く恐れがある為、JOIN等明確な利用がある場合のみに利用する事が推奨されます。それでもJoinもするがValueの中でもKeyを参照する、つまりksqlDB内でKeyを参照したいというユースケースも存在します。
この場合、KeyのコピーをValue内に持たせるする必要がありますが、ハックに近い対応が必要となります。
KeyをValueにコピーする関数は存在し、AS_VALUE関数を利用すればコピー出来ます。
しかしながら、AS_VALUE
はTableを前提とした関数であり、Tableには明示的に PRIMARY KEYを指定するのでKeyとそのフィールド名も参照出来ます。しかしながら、Streamの場合にはKeyは必須ではなく、先程のKey
を利用した結果にあるように、Keyには値のみ、値のみがKeyに移ります。(この例ではIP
のフィールド。)
PRIMARY KEYの様にフィールド名を指定してAS_VALUE
を使いたい、つまりROWKEY
に対してAS_VALUE
を実行したいのですが:
CREATE STREAM TransformedEventsWithKey
WITH (KAFKA_TOPIC='events-with-key', VALUE_FORMAT='JSON')
AS SELECT
AS_VALUE(ROWKEY) AS IP,
USERID,
_TIME TIME_IN_INT,
STATUS,
BYTES
FROM ClickstreamWithKey
EMIT CHANGES;
このクエリは構文エラーとなります。AS_VALUE
はTableのPRIMARY KEY
は引数として受け付けますが、値だけのROWKEYは受け付けません。つまりAS_VALUE
は通常の使用法ではStreamに対しては利用出来無いことになります。2
ハックとしての解答は以下になります:
CREATE STREAM TransformedEventsWithKey
WITH (KAFKA_TOPIC='events-with-key', VALUE_FORMAT='JSON')
AS SELECT
IP AS ROWKEY,
AS_VALUE(IP) AS IP,
USERID,
_TIME TIME_IN_INT,
STATUS,
BYTES
FROM ClickstreamWithKey
EMIT CHANGES;
ROWKEY
に対してIP
というフィールド名を割り当て、そのフィールドをAS_VALUE
で利用する方法になります。上記クエリを分解解釈すると以下となります:
- StreamにはないPRIMARY KEYのフィールドを
IP
として明示的に指定。 - そのフィールドを
AS_VALUE
で参照。この際元々あるフィールドと同名で定義。
妙な構文になりますが、結果は:と期待通りの結果となります。
こちらの一連の動きが確認出来るデモスクリプトを用意しました。是非実際に動かして挙動を確かめてください。
おわりに
このハック的なアプローチを見ると「ksqlDBは面倒くさい。直感的ではない。」と思うかも知れません。確かにハックだけを見るとその通りで、無意味な制約のようにも思えます。しかしながらこれには背景があり、より直感的なデータモデル定義へと変更した事による副作用である事を理解して頂ければと思います。また、ksqlDB、というよりデータフロー処理内でKeyを参照するというのは特殊な要件です。このハックを怪しい要件/ロジックのスメルと捉える事もできます。
何より、やや面倒くさいksqlDBにおけるKeyの扱いを理解すると、ksqlDBの仕組みやデータモデルへの理解が深まります。ksqlDBの裏側を少し垣間見る機会と思って頂ければ幸いです。
実際のステート管理はksqlDBが内部で利用するKafka Streamsの仕組みを利用している。 ↩︎
これはStreamの構文だからエラーではなく、そもそも
AS_VALUE
にROWKEY
を指定出来ないという仕様によるもの。 ↩︎