Kinesis FirehoseでAmazon Elasticsearch Serviceにデータを送信してKibanaで可視化する

ここのところ、それなりの時系列ログデータが欲しい時というポストで紹介したNASA-HTTPアクセスログを使って↓のようにAmazonESにAWS LambdaやElasticsearchのingest nodeを使ってデータを投入してみましたが、

AWS Step FunctionsとAWS Lambdaを使ってサーバーレスにAmazon Elasticsearch Serviceにログデータを投入する Amazon Elasticsearch Service で ingest と bulk API を使ってインデクシングして Kibana で可視化する

今回は、AWS的には本命と言いますか、Kinesisにデータを放り込んでおけば、あとはよしなにElasticsearch Serviceまでヨロシクやってくれる、Kinesis Firehoseでデータを投入したいと思います。

ということで、 @hamburger_kidAWSブログ上で翻訳してくれた、 Kinesis Firehoseを使用してApache WebログをAmazon Elasticsearch Serviceに送信する に沿ってやっていきたいと思います。

昨年12月上旬の #CTONight で発表させていただいた段階では、まだ↓のKinesis FirehoseのデータをAWS Lambdaでトランスフォームする機能はアナウンスだけにとどまっていましたが、こちら、昨年の年末に Amazon Kinesis Firehose では、データストアにロードする前にストリーミングデータを準備して変換できるようになりました。 のようにローンチされています。 [slideshare id=i2uQw0x1RGfxkH?startSlide=26&w=595&h=485&fb=0&mw=0&mh=0&style=border:1px solid #CCC; border-width:1px; margin-bottom:5px; max-width: 100%;&sc=no]

ということで、今回は↓のようなイメージでKinesis FirehoseとAWS Lambdaを使ってAmazon Elasticsearch Searviceにデータを取り込んでいきたいと思います。 Kinesis FH, Lambda, AmazonES

Amazon Elasticsearch Serviceのドメインを作成する

AmazonESのドメインを作成します。 Screen Shot 2017-05-01 at 10.47.21//embedr.flickr.com/assets/client-code.js

現在はm4.largeがデフォルトになっています。こちらは用途にあわせて。 Screen Shot 2017-05-01 at 10.47.51//embedr.flickr.com/assets/client-code.js

今回はAmazon ESのエンドポイントへのは会社からのIPアドレスのみにしました。テンプレートを選んでIPアドレスを入力するだけ。Elasticsearchはランサム攻撃が問題になったりしていますので、IAMやIPアドレスで制御するようにしましょう。より詳細に関しては、AWS Security Blog の How to Control Access to Your Amazon Elasticsearch Service Domain をご覧ください。 Screen Shot 2017-05-02 at 10.44.14//embedr.flickr.com/assets/client-code.js

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "*"
      },
      "Action": "es:*",
      "Condition": {
        "IpAddress": {
          "aws:SourceIp": [
            "xxx.xxx.xxx.xxx/32",
            "xxx.xxx.xxx.xxx/32"
          ]
        }
      },
      "Resource": "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/*"
    }  
  ]
}

Kinesis Firehoseのストリームを作成する

Amazon ESをDestinationにしたKinesis Firehoseストリームを作成します。 Amazon_Kinesis_Firehose//embedr.flickr.com/assets/client-code.js インデックスのローテーションに関してはAmazon Kinesis Firehose Data DeliveryIndex Rotation for the Amazon ES Destinationにも詳細が記載されていますが、 『Depending on the rotation option you choose, Firehose appends a portion of the UTC arrival timestamp to your specified index name, and rotates the appended timestamp accordingly.』 ということで、到着のUTCタイムスタンプということで、例えばファイルの中のタイムスタンプの項目を見た上でのローテーションのようなことは現状では出来無さそうな予感です。

Apacheの生ログをElasticsearchが取り込めるようにJSONに変換するブループリントなLambda Functionを選択(今まではLogstashやIngest Nodeで行っていたような処理) Amazon_Kinesis_Firehose_ApacheLog_To_Json//embedr.flickr.com/assets/client-code.js

ソースコードは後からじっくり読むとして、Lambda Functionを作成します。 Lambda_Management_Console//embedr.flickr.com/assets/client-code.js

Lambda Functionにはベーシックなロギング周りな権限を持つIAM Roleを付けてあげて、タイムアウトの時間を長めにしておきます。(Lambdaの処理にかかる時間は、後からFirehose側の振る舞いについて出てきますが、そこでの設定値によるかなと思います) Lambda_Management_Console_role//embedr.flickr.com/assets/client-code.js

Kinesis Firehoseのコンソールに戻って↑で作成したLambda Functionを指定した後、バッファの設定をします。実際の業務で利用する場合にはバッファの設定が大事になります。Kineis Firehoseでは都度データをElasticsearchに送るのではなく一時的にバッファリングして、この設定を元にしたタイミングでまとめて複数ドキュメントをバッチ的にAmazon ESに送ります。小さいインスタンスに対して大容量のデータを頻繁に送り続けるとAmazon ESが耐えられなくなる場合があるかもしれません。 今回はバッファリングを2MBにしてみます。もし2MBに満たなかった場合、インターバルの方で設定した間隔で実行されます。 Amazon_Kinesis_Firehose_buff//embedr.flickr.com/assets/client-code.js

S3上にどの圧縮フォーマットで置くかーとか、エラーの際のログはーとかっていう設定に続いてIAM Role。 Amazon_Kinesis_Firehose_role//embedr.flickr.com/assets/client-code.js

ウィザードに従ってポチポチやっていくと、以下のようなKinesis FirehoseからS3の操作、Amazon ESの操作、Lambda Functionの起動、ロギングといったIAMの設定がジェネレートされて適用されます。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::nasa-http-kinesis-firehose",
        "arn:aws:s3:::nasa-http-kinesis-firehose/*"
      ]
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction",
        "lambda:GetFunctionConfiguration"
      ],
      "Resource": "arn:aws:lambda:us-west-2:xxxxxxxxxxxx:function:nasa-http:$LATEST"
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "es:DescribeElasticsearchDomain",
        "es:DescribeElasticsearchDomains",
        "es:DescribeElasticsearchDomainConfig",
        "es:ESHttpPost",
        "es:ESHttpPut"
      ],
      "Resource": [
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/*"
      ]
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "es:ESHttpGet"
      ],
      "Resource": [
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/_all/_settings",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/_cluster/stats",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/logs*/_mapping/log",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/_nodes",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/_nodes/stats",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/_nodes/*/stats",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/_stats",
        "arn:aws:es:us-west-2:xxxxxxxxxxxx:domain/nasa-http/logs*/_stats"
      ]
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:us-west-2:xxxxxxxxxxxx:log-group:/aws/kinesisfirehose/nasa-http:log-stream:*"
      ]
    }
  ]
}

EC2からKinesis Firehoseにデータを流し込み

データを流し込む前にKibanaのDev Toolsからマッピングの定義をしておきます。 @timestamp と @timestamp_utc 以外はkeyword(Analyzeしない)で良いかな、と。

PUT _template/template1
{
  "template" : "logs-*",
  "settings": {
    "number_of_shards": 1
  },
  "mappings" : {
    "type1": {
      "_source": {
        "enabled": false
      },
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@timestamp_utc": {
          "type": "date"
        },
        "ident": {
          "type": "keyword"
        },
        "bytes": {
          "type": "keyword"
        },
        "request": {
          "type": "keyword"
        },
        "host": {
          "type": "keyword"
        },
        "timezone": {
          "type": "keyword"
        },
        "response": {
          "type": "keyword"
        }
      }
    }  
  }
}

型についてはElasticsearchはバージョン5からStringがText(トークナイズとかAnalyzeする)とKeyword(Analyzeしない。正規化くらい)という2つに分かれてたりもするので、その辺も Text vs. keyword といったポストをご覧いただけるとよろしいかと思います。 [slideshare id=i2uQw0x1RGfxkH?startSlide=32&w=595&h=485&fb=0&mw=0&mh=0&style=border:1px solid #CCC; border-width:1px; margin-bottom:5px; max-width: 100%;&sc=no]

また、Shard数をどうするか?といったところに関しては、今回は少量のデータなので1にしましたが、データが大量になる場合は↓こちらのブログなどをご参考になさっていただければと。 Amazon Elasticsearch Service をはじめよう: シャード数の算出方法

ということで、まずはNASA-HTTPのApacheのログ・ファイルをEC2にダウンロードして解凍しておきます。

$ wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
--2017-05-01 07:54:53--  ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
           => ‘NASA_access_log_Jul95.gz’
Resolving ita.ee.lbl.gov (ita.ee.lbl.gov)... 131.243.2.164, 2620:83:8000:102::a4
Connecting to ita.ee.lbl.gov (ita.ee.lbl.gov)|131.243.2.164|:21... connected.
Logging in as anonymous ... Logged in!
==> SYST ... done.    ==> PWD ... done.
==> TYPE I ... done.  ==> CWD (1) /traces ... done.
==> SIZE NASA_access_log_Jul95.gz ... 20676672
==> PASV ... done.    ==> RETR NASA_access_log_Jul95.gz ... done.
Length: 20676672 (20M) (unauthoritative)

NASA_access_log_Jul95.gz              100%[======================================================================>]  19.72M  2.26MB/s    in 8.8s    

2017-05-01 07:55:02 (2.24 MB/s) - ‘NASA_access_log_Jul95.gz’ saved [20676672]

$ wget ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz
--2017-05-01 07:55:40--  ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz
           => ‘NASA_access_log_Aug95.gz’
Resolving ita.ee.lbl.gov (ita.ee.lbl.gov)... 131.243.2.164, 2620:83:8000:102::a4
Connecting to ita.ee.lbl.gov (ita.ee.lbl.gov)|131.243.2.164|:21... connected.
Logging in as anonymous ... Logged in!
==> SYST ... done.    ==> PWD ... done.
==> TYPE I ... done.  ==> CWD (1) /traces ... done.
==> SIZE NASA_access_log_Aug95.gz ... 16633316
==> PASV ... done.    ==> RETR NASA_access_log_Aug95.gz ... done.
Length: 16633316 (16M) (unauthoritative)

NASA_access_log_Aug95.gz              100%[======================================================================>]  15.86M  2.25MB/s    in 7.1s    

2017-05-01 07:55:48 (2.24 MB/s) - ‘NASA_access_log_Aug95.gz’ saved [16633316]

$ ls -l
total 36440
-rw-rw-r-- 1 ec2-user ec2-user 16633316 May  1 07:55 NASA_access_log_Aug95.gz
-rw-rw-r-- 1 ec2-user ec2-user 20676672 May  1 07:55 NASA_access_log_Jul95.gz

$ gzip -d NASA_access_log_Jul95.gz 
$ gzip -d NASA_access_log_Aug95.gz 
$ ls -l
-rw-rw-r-- 1 ec2-user ec2-user 167813770 May  1 07:55 NASA_access_log_Aug95
-rw-rw-r-- 1 ec2-user ec2-user 205242368 May  1 07:55 NASA_access_log_Jul95

次に、EC2にKinesisエージェントをインストールします。

$ sudo yum install –y aws-kinesis-agent
Loaded plugins: priorities, update-motd, upgrade-helper
amzn-main                                                                                                                     | 2.1 kB  00:00:00     
amzn-updates                                                                                                                  | 2.3 kB  00:00:00     
Resolving Dependencies
--> Running transaction check
---> Package aws-kinesis-agent.noarch 0:1.1.2-1.amzn1 will be installed
--> Processing Dependency: log4j for package: aws-kinesis-agent-1.1.2-1.amzn1.noarch
--> Running transaction check
---> Package log4j.noarch 0:1.2.16-6.4.9.amzn1 will be installed
--> Processing Dependency: jaxp_parser_impl for package: log4j-1.2.16-6.4.9.amzn1.noarch

〜略〜

Running transaction
  Installing : xml-commons-apis-1.3.04-3.6.9.amzn1.noarch                                                                                        1/5 
  Installing : xerces-j2-2.7.1-12.7.19.amzn1.noarch                                                                                              2/5 
  Installing : xml-commons-resolver-1.1-4.18.10.amzn1.noarch                                                                                     3/5 
  Installing : log4j-1.2.16-6.4.9.amzn1.noarch                                                                                                   4/5 
  Installing : aws-kinesis-agent-1.1.2-1.amzn1.noarch                                                                                            5/5 
  Verifying  : aws-kinesis-agent-1.1.2-1.amzn1.noarch                                                                                            1/5 
  Verifying  : xml-commons-apis-1.3.04-3.6.9.amzn1.noarch                                                                                        2/5 
  Verifying  : xerces-j2-2.7.1-12.7.19.amzn1.noarch                                                                                              3/5 
  Verifying  : log4j-1.2.16-6.4.9.amzn1.noarch                                                                                                   4/5 
  Verifying  : xml-commons-resolver-1.1-4.18.10.amzn1.noarch                                                                                     5/5 

Installed:
  aws-kinesis-agent.noarch 0:1.1.2-1.amzn1                                                                                                           

Dependency Installed:
  log4j.noarch 0:1.2.16-6.4.9.amzn1                    xerces-j2.noarch 0:2.7.1-12.7.19.amzn1      xml-commons-apis.noarch 0:1.3.04-3.6.9.amzn1     
  xml-commons-resolver.noarch 0:1.1-4.18.10.amzn1     

Complete!

次に、Kinesisエージェントの設定ファイル(/etc/aws-kinesis/agent.json)を編集します。エンドポイント、Firehoseに連携するファイルなどを設定します。/tmp/log.txtについては後から述べます。

{ 
  "cloudwatch.emitMetrics": true,
  "firehose.endpoint": "https://firehose.us-west-2.amazonaws.com",
  
  "flows": [
    {
      "filePattern": "/tmp/log.txt",
      "deliveryStream": "nasa-http"
      "initialPosition": "START_OF_FILE"
    }
  ]
}

Kinesisエージェントを起動します。この段階では対象ファイルである /tmp/log.txt は何も存在していません。実際のユースケースとしては断続的にファイルに追記されるのをtailしながら〜という形になります。ということで、今回はKinesisエージェントの起動がOKになってからリダイレクトを行います。

# sudo service aws-kinesis-agent start
aws-kinesis-agent startup                                  [  OK  ]

# cat NASA_access_log_Jul95 >> /tmp/log.txt

KibanaのDev Toolsから GET _aliases を叩くと↓のようにインデックスが出来ています。 Console_-_Kibana//embedr.flickr.com/assets/client-code.js

今回流し込んだ1995年7月分のデータはとりこまれているようです。 Screen Shot 2017-05-02 at 12.23.17//embedr.flickr.com/assets/client-code.js

とは言え、実ファイルと比べるといくつかパースできないようなレコードがあったとしても乖離が激しく、

# wc -l NASA_access_log_Jul95 
1891714 NASA_access_log_Jul95

CloudWatch LogsでAWS Lambdaのログを見てみると、おそらくパースのところでかなりFailedになっているので、この辺はBlueprintのままでなく、実際のデータでどのようなエラーが起こっているのかを判別して対処する必要がありそうです。 Screen Shot 2017-05-02 at 12.28.49//embedr.flickr.com/assets/client-code.js

とは言え、

# cat NASA_access_log_Aug95 >> /tmp/log.txt
としてやるだけで、ガシガシAmazon ESにインデックスされていくわけなのでとても便利ですね。 最終的には元ネタのブログと同じようなアンバイになりました。 Screen Shot 2017-05-02 at 12.41.53//embedr.flickr.com/assets/client-code.js

Relevant Search: With Applications for Solr and Elasticsearch
Doug Turnbull John Berryman
Manning Pubns Co
売り上げランキング: 67,587