ストリーム処理でKafka TopicのKeyを扱う in KSQL with デモスクリプト

はじめに

Kafkaをベースとしたストリーム処理では、Kafka Topicに流れるイベントを取り込み、処理後に別のTopicに書き込む事を繋げる事によりパイプラインを構築します。ksqlDBの様なストリーム処理基盤はこの処理を開発者から隠蔽化し、SQLを用いてそのロジックのみに注力できるよう補助してくれます。

一方、この隠蔽化によってksqlDBが行なっている内部処理の多くは開発者にはタッチする事が出来ず、出来た場合でも直感的に扱えない事も多くあります。Kafka TopicのKeyもその一つです。

今回はksqlDBにおけるStreamの概念と、Topic Keyを扱う方法について説明します。

StreamとTable

ksqlDBのストリーム処理では、Topicと紐付けるデータモデルを定義しそのモデルに対してクエリを実行する形でデータにアクセスします。具体的にはStreamTableで:

  • STREAM - Topicを流れるイベントを、時系列を維持したデータの流れとして体現。ステートレス。
  • TABLE - Topicを流れるイベントからKey単位でデータの最新状態をマテリアライズ。ステートフル。

同じKafka上で扱うデータですが、モデルによってksqlDBにおける扱いも、そしてそれを支える内部の仕組みも異なります。1 ただksqlDBのクエリ上はどちらもCREATE構文を利用してSQL同様の手順で作成します。例としてStreamの定義のサンプルは以下の様になります。

CREATE STREAM Clickstream (
    IP VARCHAR,
    USERID INT,
    REMOTE_USER VARCHAR,
    TIME VARCHAR,
    _TIME INT,
    REQUEST VARCHAR,
    STATUS VARCHAR,
    BYTES VARCHAR,
    REFERRER VARCHAR,
    AGENT VARCHAR
) WITH (
KAFKA_TOPIC='datagen-topic', VALUE_FORMAT='JSON');

ここではdatagen-topicというJSONでシリアライズされたTopicに対してClickstreamという名前のStreamデータモデルを定義しています。

CSASとCTAS

Topicをモデル化したStreamやTableを定義したとして、今度はそのモデルに対して処理をする必要性が出てきます。

一般的なデータフロープログラミングのモデルではプログラム内で処理をチェイニングした結果を別Topicに出力しますが、SQLにはそのような構文モデルはありません。一方リレーショナルDBではストアドプロシージャ等を利用しますが、DB毎にその仕様は異なります。ksqlDBではSQLに近い構文でありながら、かつより汎用的な方法でデータの加工処理とストアを結び付ける必要があります。

ksqlDBではもっと汎用的なSELECTCREATE STREAMを組み合わせる事により処理とストアを結びつけます。具体的にはCREATE STREAM AS SELECT構文を利用します。

CREATE STREAM EventsWithoutKey
WITH (KAFKA_TOPIC='404events', VALUE_FORMAT='JSON')
AS SELECT
    IP,
    USERID,
    _TIME TIME_IN_INT,
    STATUS,
    BYTES
FROM Clickstream
WHERE STATUS = '404'
EMIT CHANGES;

ここでは前述したClickstreamというStreamから必要なカラムを指定してSELECTした結果を新たなStreamとして定義する処理です。さらにWHERE句を利用してフィルタリングした結果のみ抽出し、かつ新たに404eventsというTopicに対して出力しています。物理的にはdatagen-topic にあるデータが変換/加工され404eventsという新たなTopicに登録されています。

当然この構文はksqlDBでのストリーム処理には頻出構文であり、CSAS (CREATE STREAM AS SELECT)、CTAS (CREATE TABLE AS SELECT)と呼ばれます。

Keyとデータモデル

CREATE STREAMCREATE TABLEではTopicのKeyの扱い方が異なります。

以前のバージョンではStreamでもTableでもROWKEYという名称でTopicのKeyに紐づくフィールドをモデルに追加します。このROWKEYは物理的にはTopicのKeyでありながら、Valueを扱うksqlDBから参照出来るという、他のフィールドとは扱いも振る舞いも異なります。また、コミュニティではこの勝手に追加されるROWKEYというフィールドの扱い方に関して多くの混乱を招きました。ストリーム処理をSQLで処理をする上では直感的では無いという判断でした。

このKeyの扱い方はksqlDB 0.10で大きく変わりました。これは一律ではなくStreamとTableで定義を変える事で:

  • CREATE TABLE - 明示的にKeyを指定する必要があり、それにはPRIMARY KEYと指定するフィールドが必要。
  • CREATE STREAM - キーをそもそも指定しない。

このアプローチはTABLEの構文としても自然であり、かつSTREAMを扱う際にはKeyが存在すらしないという潔いものです。結果としてこの変更と思想が以降のksqlDBのコミュニティに広がりました。

StreamとKey

それでもStreamでKeyを扱いたいというのが本エントリの主旨です。

実際にStreamでKeyを指定する必要性がある場合は存在し、具体的にはJOINの際にはKey指定したものしかJOIN出来ません。この為StreamでもKeyを指定する事は可能になっています。

具体的にはフィールドにKeyと指定する事により、そのフィールドをValueの一部ではなくKeyとして扱います。先程のCREATE STREAMを例に取ると:

CREATE STREAM ClickstreamWithKey (
    IP VARCHAR Key,
    USERID INT,
    REMOTE_USER VARCHAR,
    TIME VARCHAR,
    _TIME INT,
    REQUEST VARCHAR,
    STATUS VARCHAR,
    BYTES VARCHAR,
    REFERRER VARCHAR,
    AGENT VARCHAR
) WITH (
KAFKA_TOPIC='datagen-topic', VALUE_FORMAT='JSON');

同じTopicを参照していますが、ここで生成されるStreamにはレコードKeyにIPが指定され、と言うよりValueからKeyに移動します。

この振る舞いを実際に確認するにはCSASで再定義したものに新たなTopicを割り当て、その結果を比較する必要があります。これら2つのStreamに以下のCSASを適用すると:

CREATE STREAM TransformedEvents
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='JSON')
AS SELECT
    IP,
    USERID,
    _TIME TIME_IN_INT,
    STATUS,
    BYTES
FROM Clickstream -- Keyがある方はClickstreamWithKeyと指定
EMIT CHANGES;

通常のCREATE STREAMで生成した場合、Topicには

CREATE STREAM without Key - Key
CREATE STREAM without Key - Value
となり、Keyを指定したStreamに対するCSASの結果は
CREATE STREAM with Key - Key
CREATE STREAM with Key - Value
となります。IPがTopicのKeyに移っている事が確認できます。

KeyをValueの中にも持つ

先述した通り、StreamにおいてKeyを扱う事は混乱を招く恐れがある為、JOIN等明確な利用がある場合のみに利用する事が推奨されます。それでもJoinもするがValueの中でもKeyを参照する、つまりksqlDB内でKeyを参照したいというユースケースも存在します。

この場合、KeyのコピーをValue内に持たせるする必要がありますが、ハックに近い対応が必要となります。

KeyをValueにコピーする関数は存在し、AS_VALUE関数を利用すればコピー出来ます。

しかしながら、AS_VALUEはTableを前提とした関数であり、Tableには明示的に PRIMARY KEYを指定するのでKeyとそのフィールド名も参照出来ます。しかしながら、Streamの場合にはKeyは必須ではなく、先程のKeyを利用した結果にあるように、Keyには値のみ、値のみがKeyに移ります。(この例ではIPのフィールド。)

PRIMARY KEYの様にフィールド名を指定してAS_VALUEを使いたい、つまりROWKEYに対してAS_VALUEを実行したいのですが:

CREATE STREAM TransformedEventsWithKey
WITH (KAFKA_TOPIC='events-with-key', VALUE_FORMAT='JSON')
AS SELECT
    AS_VALUE(ROWKEY) AS IP,
    USERID,
    _TIME TIME_IN_INT,
    STATUS,
    BYTES
FROM ClickstreamWithKey
EMIT CHANGES;

このクエリは構文エラーとなります。AS_VALUEはTableのPRIMARY KEYは引数として受け付けますが、値だけのROWKEYは受け付けません。つまりAS_VALUEは通常の使用法ではStreamに対しては利用出来無いことになります。2

ハックとしての解答は以下になります:

CREATE STREAM TransformedEventsWithKey
WITH (KAFKA_TOPIC='events-with-key', VALUE_FORMAT='JSON')
AS SELECT
    IP AS ROWKEY,
    AS_VALUE(IP) AS IP,
    USERID,
    _TIME TIME_IN_INT,
    STATUS,
    BYTES
FROM ClickstreamWithKey
EMIT CHANGES;

ROWKEYに対してIPというフィールド名を割り当て、そのフィールドをAS_VALUEで利用する方法になります。上記クエリを分解解釈すると以下となります:

  • StreamにはないPRIMARY KEYのフィールドをIPとして明示的に指定。
  • そのフィールドをAS_VALUEで参照。この際元々あるフィールドと同名で定義。

妙な構文になりますが、結果は:

CREATE STREAM with Key and Value - Key
CREATE STREAM with Key and Value - Value
と期待通りの結果となります。

こちらの一連の動きが確認出来るデモスクリプトを用意しました。是非実際に動かして挙動を確かめてください。

おわりに

このハック的なアプローチを見ると「ksqlDBは面倒くさい。直感的ではない。」と思うかも知れません。確かにハックだけを見るとその通りで、無意味な制約のようにも思えます。しかしながらこれには背景があり、より直感的なデータモデル定義へと変更した事による副作用である事を理解して頂ければと思います。また、ksqlDB、というよりデータフロー処理内でKeyを参照するというのは特殊な要件です。このハックを怪しい要件/ロジックのスメルと捉える事もできます。

何より、やや面倒くさいksqlDBにおけるKeyの扱いを理解すると、ksqlDBの仕組みやデータモデルへの理解が深まります。ksqlDBの裏側を少し垣間見る機会と思って頂ければ幸いです。


  1. 実際のステート管理はksqlDBが内部で利用するKafka Streamsの仕組みを利用している。 ↩︎

  2. これはStreamの構文だからエラーではなく、そもそもAS_VALUEROWKEYを指定出来ないという仕様によるもの。 ↩︎

hashi
hashi
Admin - Manager, Solutions Engineering, Korea/Japan

畑Noob