Apache Kafkaに入門した

by Taichi Nakashima,

Apache kafka

最近仕事でApache Kafkaの導入を進めている.Kafkaとは何か? どこで使われているのか? どのような理由で作られたのか? どのように動作するのか(特にメッセージの読み出しについて)? を簡単にまとめておく(メッセージングはまだまだ勉強中なのでおかしなところがあればツッコミをいただければ幸いです).

バージョンは 0.8.2 を対象に書いている.

Apache Kafkaとは?

2011年にLinkedInから公開されたオープンソースの分散メッセージングシステムである.Kafkaはウェブサービスなどから発せられる大容量のデータ(e.g., ログやイベント)を高スループット/低レイテンシに収集/配信することを目的に開発されている.公式のトップページに掲載されているセールスポイントは以下の4つ.

  • Fast とにかく大量のメッセージを扱うことができる
  • Scalable Kafkaはシングルクラスタで大規模なメッセージを扱うことができダウンタイムなしでElasticかつ透過的にスケールすることができる
  • Durable メッセージはディスクにファイルとして保存され,かつクラスタ内でレプリカが作成されるためデータの損失を防げる(パフォーマンスに影響なくTBのメッセージを扱うことができる)
  • Distributed by Design クラスタは耐障害性のある設計になっている

どこで使われているのか?

Use Casesをあげると,メッセージキューやウェブサイトのアクティビティのトラッキング(LinkedInのもともとのUse Case),メトリクスやログの収集,StormSamzaを使ったストリーム処理などがあげられる.

利用している企業は例えばTwitterやNetflix,Square,Spotify,Uberなどがある(cf. Powered By).

Kafkaの初期衝動

Kafkaのデザインを理解するにはLinkedInでなぜKafkaが必要になったのかを理解するのが早い.それについては2012年のIEEEの論文“Building LinkedIn’s Real-time Activity Data Pipeline”を読むのが良い.簡単にまとめると以下のようになる.

LinkedInでは大きく2つのデータを扱っている.1つはウェブサイトから集められる大量のユーザのアクティビティデータ.これらをHadoop(バッチ処理)を通して機械学習しレコメンド/ニュースフィードなどサービスの改善に用いている.それだけではなくこれらのデータはサービスの監視(セキュリティなど)にも用いている.2つ目はシステムのログ.これらをリアルタイムで処理してサービスのモニタリングを行っている.これらは近年のウェブサービスではよく見かける風景.

問題はそれぞれのデータの流れが1本道になっていたこと.アクティビティデータはバッチ処理に特化していたためリアルタイム処理ができない,つまりサービス監視には遅れが生じていた.同様にシステムのログは,リアルタイム処理のみに特化していたため長期間にわたるキャパシティプランニングやシステムのデバッグには使えなかった.サービスを改善するにはそれぞれタイプの異なるデータフィードを最小コストで統合できるようにする必要があった.またLinkedInのようにデータがビジネスのコアになる企業ではそのデータを様々なチームが簡単に利用できる必要があった.

これら問題を解決するために大ボリュームのあらゆるデータを収集し様々なタイプのシステム(バッチ/リアルタイム)からそれを読めるようにする統一的ななメッセージプラットフォームの構築が始まった.

最初は既存のメッセージシステム(論文にはActiveMQを試したとある)の上に構築しようとした.しかしプロダクションレベルのデータを流すと以下のような問題が生じた.

  • 並列でキューのメッセージを読むにはメッセージごとに誰に読まれたかを記録する必要がある(mutex).そのため大量のデータを扱うとメモリが足りなくなった.メモリが足りなくなると大量のRamdom IOが発生しパフォーマンスに深刻な影響がでた
  • バッチ処理/リアルタイム処理の両方でキューを読むには少なくとも2つのデータのコピーが必要になり非効率になった

このような問題から新しいメッセージシステム,Kafkaの開発が必要になった.Kafkaが目指したのは以下.

  • あらゆる種類のデータ/大容量のデータを統一的に扱う
  • 様々なタイプのシステム(バッチ/リアルタイム)が同じデータを読める
  • 高スループットでデータを処理する(並列でデータを読める)

どのように動作するのか?(概要)

KafkaはBroker(クラスタ)とProducer,Consumerという3つのコンポーネントで構成される.Producerはメッセージの配信を行いConsumerはメッセージの購読を行う.そしてKafkaのコアであるBrokerはクラスタを構成しProducerとConsumerの間でメッセージの受け渡しを行うキューとして動作する.

http://kafka.apache.org/images/producer_consumer.png

メッセージのやりとり

KafkaはTopicを介してメッセージのやりとりを行う.Topicとはメッセージのフィードのようなものである.例えば,検索に関わるデータを"Search"というTopic名でBrokerに配信しておき,検索に関わるデータが欲しいConsumerは"Search"というTopic名を使ってそれをBrokerから購読する.

Pull vs Push

BrokerがConsumerにデータをPushするのか(fluentd,logstash,flume),もしくはConsumerがBrokerからデータをPullするのかはメッセージシステムのデザインに大きな影響を与える.もちろんそれぞれにPros/Consはある.KafkaはPull型のConsumerを採用している.それは以下の理由による.

  • Pushだと様々なConsumerを扱うのが難しく,Brokerがデータの転送量などを意識しないといけない.Kafkaの目標は最大限のスピードでデータを消費することだが,(予期せぬアクセスなどで)転送量を見誤るとConsumerを圧倒してまう.PullだとConsumerが消費量を自らが管理できる.
  • Pullだとバッチ処理にも対応できる.Pushだと自らそれを溜め込んだ上でConsumerがそれを扱えるか否かに関わらずそれを送らないといけない
  • (PullでしんどいのはBrokerにデータがまだ届いてない場合のコストだがlong pollingなどでそれに対応している)

メッセージのライフサイクル

BrokerはConsumerがメッセージを購読したかに関わらず設定された期間のみ保持してその後削除する.これはKafkaの大きな特徴の1つである.例えば保存期間を2日間に設定すれば配信後2日間のみデータは保持されその後削除される.

このためConsumerサイドがメッセージをどこまで読んだがを自らが管理する(Brokerが管理する必要がない).普通は順番にメッセージを読んでいくが,Consumerに問題があれば読む位置を巻き戻して復旧することもできる(最悪どれくらいでConsumerを復旧できるかによりデータの保存期間が決まり保持するデータのサイズが決まる).

この2つの特徴のためConsumerはBrokerにも他のBrokerにも大きな影響を与えない.

高速にメッセージを消費する

Kafkaで面白いのはConsumerがBrokerから高速にメッセージを読み込むための仕組みであると思う.これをどのように実現しているかを説明する.

並列でキューを読むのは大変

高速にメッセージを消費するにはBrokerのデータを並列に読む必要がある.そもそも"初期衝動"のところで説明したように複数のConsumerが並列でキューを読むのは大変である.

  • 重複なく送るためにはメッセージごとにどのConsumerに読まれたかを管理する必要がある
  • キューの書き込みまでは順序性が確保されるが並列で読むと複数のConsumerに消費された瞬間順序は失われる

Kafkaのデザインはこれらを解決するようになっている.

Brokerにおけるメッセージの保存

まずBrokerのメッセージの保存方法に特徴がある.KafkaはTopicごとに1つ以上のPartitionという単位でメッセージを保存する.メッセージはそれぞれのPartitionの末尾に追記される.これによりPartitionごとにメッセージの順序性が担保される.例えば以下の図はあるTopicの3つのPartitionにメッセージが追記されていることを示す.

http://kafka.apache.org/images/log_anatomy.png

Partitionの使われ方

Partitionには大きく以下の2つの目的がある.

  • 複数のサーバーでメッセージを分散させるため(1つのサーバーのキャパを超えてメッセージを保存できる)
  • 並列処理のため

どのように並列処理するか? Consumerはグループ単位でメッセージを購読する.そして"1つのPartitionのデータは1つのConsumerグループ内の1つのConsumerにのみ消費される"という制限でこれを実現する(つまりConsumerの並列数はPartition数を超えられない).以下の図は2つのConsumerグループAとBに属する複数のConsumerが並列にメッセージを購読している様子を示す.グループ内では並列処理だがグループ間で見ると伝統的なPub/Subモデル(1対1)のモデルに見える.

この仕組みには以下のような利点がある.

  • あるPartitionを読むConsumerは特定の1つなので,メッセージが誰にどこまで読まれたかまで記録する必要はなくて,単純にどこまで読まれたかを通知しておけばよい
  • 読んでるConsumerは1つなのでConsumerはlazyに読んだ場所を記録しておけばよくて処理に失敗したら再びよみにいけば良い(at-least-once)
  • どのメッセージが読まれたかをlazilyに記録できるためにパフォーマンスを保証できる(Partitionのオーナーを同じように決められ無い場合はスタティックにConsumerを割り当てるか/ランダムにConsumerをロードバランスするしか無い.ランダムにやると複数のプロセスが同時に同じPartitionを購読するのでmutexが必要になりBrokerの処理が重くなる)

順序性の補足

Partition内,つまりConsumer内では順序性が確保される.つまりBrokerに記録された順番で消費される.がPartition間では保証されない.

ProducerはBrokerにメッセージを配信するときにKeyを指定することができる.このKeyにより同じKeyが指定されたメッセージを同じPartitionに保存することができる.Partition内の順序性とKeyで大抵のアプリケーションには問題ない(とのこと).完全な順序性を確保したければPartitionを1つにすれば良い(Consumerも一つになってしまうが).

高スループットへの挑戦

Brokerへの書き込み/読み込みはとにかく速い.LinkedInのベンチマークでは200万 write/sec(Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines))とある.なぜこれだけ速いのか.以下の2つを組み合わせることにより実現している.

  • write バッファ書き込みを自分たちで実装するのではなくてカーネルのメモリキャシュ機構をがっつり使うようにした(Brokerが動いているサーバーのメモリ32GBのうち28-30GBがキャッシュに使われている)
  • read ページキャッシュからネットワークのsocketへ効率よくデータを受け渡すためにsendfile()を使ってる

まとめ

先に紹介した論文“Building LinkedIn’s Real-time Activity Data Pipeline”は他にも面白いことがたくさん書いてあるのでKafkaを使おうとしているひとはぜひ一度目を通してみるといいと思う.

次回はGo言語を使ってProducerとConsumerを実装する話を書く.

参考