どういう問題か?
CloudWatchLogsのとあるロググループに蓄積されるログをサブスクリプションフィルターで全て取得して、データをS3に出力するkinesis streamを作成した。
その際のログは、出力前にlamba関数による変換を有効にしていた。
大量のログ(合計数十万レコード程度)の処理を試みたところ、以下のエラーが発生した。
エラーログ
Some records failed while calling PutRecordBatch to Firehose stream, retrying. Individual error codes: ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,ServiceUnavailableException,...
原因
調べてみると、どうもPutRecordBatch()
apiに関するエラーのようで、boto3のAPIリファレンスを確認してみると次のような記述を発見した。
If PutRecordBatch throws ServiceUnavailableException , back off and retry. If the exception persists, it is possible that the throughput limits have been exceeded for the delivery stream.
実は変換用lambda関数では、最終的に変換したログをputRocordBatch()で元のStreamに突っ込んでいるのだが、どうやらこのAPIの実行に失敗していることがわかった。
ちなみにこのPutRecordBatchについてはfirehoseのドキュメントに制限値が以下のように記載されていた。
PutRecordBatch オペレーションは、呼び出しごとに 500 レコードまたは 4 MiB のどちらか小さい方を受け取ることができます。この制限は変更できません。
おそらくだが、CloudWatchLogsに蓄積されるログのスループットが高すぎて、変換用Lambda関数に渡されるログのステップ数がAPIの制限を超えてしまったのだと考えられる。
kinesis data firehose では、変換用Lambda関数にわたすログを予め決めた制限値に達するまでバッファし、達した段階でまとめて変換を開始する。今回はこのバッファされた容量が大きすぎると予想した。
解決策
上記の原因が本当なら以下の2種類の方法で解決できるはずである。
- 変換用Lambda関数にわたすログの容量をPutRecordBatchで処理できるサイズまで小さくする。
- 変換用Lambda関数側でPutRecordBatchの制限値内で処理できる単位で分割してAPIをたたくようにする。
- サブスクリプションフィルターで不要なログはフィルタリングする。
今回はもともとLambdaで提供されている設計図そのままでいきたかったこともあり、1の方法で対処をした。
バッファするサイズをどうするか、これはkinesis firehose の設定の以下の部分で編集できる。
(注意:timeoutが15minとなっているが、kinesis の変換として使用する場合には5minがタイムアウト限界値となることに注意したい。)
今回は設定できる最も小さい値、
- バッファサイズ:1MiB
- バッファインターバル:60sec
にそれぞれ変更して試したところ、上記のエラーは確認されなくなった。 もしこの設定でもエラーが引き続き確認される場合は、方法2,3による対処を考えたい。
また変換に関しては他にも制約があるため、その他の条件も併せて確認されたい。