Amazon Elasticsearch Service で _ingest と _bulk API を使ってインデクシングして Kibana で可視化する

先日、Amazon Elasticsearch Service で、Elasticsearch 5.1 がサポートされました。

<li>↓Youtubeビデオ(日本語字幕付けたので是非ご覧ください)</li>

[youtube https://www.youtube.com/watch?v=QBwmVGlMOZU&w=560&h=315]

<li>↓AWSブログ(翻訳、、)</li>

Elasticsearch 5 now available on Amazon Elasticsearch Service

<li>↓SlideShare(日本語化するかな、、)</li>

[slideshare id=HJRjQ0uGUaQJQI&w=595&h=485&fb=0&mw=0&mh=0&style=border:1px solid #CCC; border-width:1px; margin-bottom:5px; max-width: 100%;&sc=no]

Elasticsearch 5で追加された目玉機能の一つとして、Ingest Nodeが挙げられると思います。 今までインデクシングする前段で構造化するための変換処理的なところをLogStashで行うケースが多かったものが、Elasticsearchの中で出来るようになった、と。

以前 それなりの時系列ログデータが欲しい時 | shinodogg.com というエントリにも書いた、NASA-HTTPのデータを使ったKibanaのデモというかハンズオン資料があるのですが、そのELKスタックも今回のリリースで少し古い感じになってきたので、アップデートを兼ねてIngest Nodeを使ってみることにしました。

_ingest APIを使ったシミュレーション

コレ系の操作はKibanaのDev Toolsから。 Screen Shot 2017-03-02 at 14.06.14//embedr.flickr.com/assets/client-code.js

_ingest APIにはシミュレートする機能があり、今回はNASA-HTTPで使われているApacheのログ形式に合わせて、messageというフィールドに %{COMMONAPACHELOG} 形式のデータが入ってきたら〜という形にします。

COMMONAPACHELOGに関しては↓のGrok Patternsにある、 https://github.com/elastic/elasticsearch/blob/master/modules/ingest-common/src/main/resources/patterns/grok-patterns ↓のような定義になっています。(こういうのをイチイチ自分でやるのメンドイので嬉しいですね)

%{IPORHOST:clientip} %{HTTPDUSER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
HTTPD20_ERRORLOG [%{HTTPDERROR_DATE:timestamp}] [%{LOGLEVEL:loglevel}] (?:[client %{IPORHOST:clientip}] ){0,1}%{GREEDYDATA:errormsg}

更に、dateプロセッサーで、現在日付ではなく、ログの中にある日付を @timestamp にするようにして、

"date": {
  "field": "timestamp",
  "formats" : ["dd/MMM/yyyy:HH:mm:ss Z"]
}

date_index_name プロセッサーで、日付毎に nasa-logs-YYYY-MM-DD という形式でインデックスが作れるようにします。

"date_index_name": {
  "field": "@timestamp",
  "index_name_prefix": "nasa-logs-",
  "date_rounding": "d"
}

以下のようなデータを食わせて、結果を見てみると、

"docs" : [
  {"_source":
    {"message": "199.72.81.55 - - [01/Nov/2015:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245"}
  }
]

↓のようにイイ感じになりました :)

{
  "docs": [
    {
      "doc": {
        "_index": "",
        "_type": "_type",
        "_id": "_id",
        "_source": {
          "request": "/history/apollo/",
          "auth": "-",
          "ident": "-",
          "verb": "GET",
          "message": "199.72.81.55 - - [01/Nov/2015:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245",
          "@timestamp": "2015-11-01T04:00:01.000Z",
          "response": "200",
          "bytes": "6245",
          "clientip": "199.72.81.55",
          "httpversion": "1.0",
          "timestamp": "01/Nov/2015:00:00:01 -0400"
        },
        "_ingest": {
          "timestamp": "2017-03-02T05:06:09.960+0000"
        }
      }
    }
  ]
}

_ingest APIを使ったパイプラインの設定

シミュレーションした内容をそのまま設定してやる感じ。 これもKibana上で↓のように。_ingest/pipeline/パイプライン名

PUT _ingest/pipeline/nasa-http
{
  "description": "nasa-http",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{COMMONAPACHELOG}"]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats" : ["dd/MMM/yyyy:HH:mm:ss Z"]
      }
    },
    {
      "date_index_name": {
       "field": "@timestamp",
        "index_name_prefix": "nasa-logs-",
        "date_rounding": "d"
      }
    }
  ]
}

GET _ingest/pipeline/nasa-http してやると、登録されたパイプラインの情報が確認できます

{
  "nasa-http": {
    "description": "nasa-http",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "%{COMMONAPACHELOG}"
          ]
        }
      },
      {
        "date": {
          "field": "timestamp",
          "formats": [
            "dd/MMM/yyyy:HH:mm:ss Z"
          ]
        }
      },
      {
        "date_index_name": {
          "field": "@timestamp",
          "index_name_prefix": "nasa-logs-",
          "date_rounding": "d"
        }
      }
    ]
  }
}

Pythonスクリプトを使って _bulk APIでデータを登録

やることは以下。

  • 1. Nasaのサイトからgzファイルをダウンロード
  • 2. ダウンロードしたgzファイルを開いて行ごとにIngest Nodeに食わせる為にJSON
  • 3. BulkでElasticsearchにインデクシングするために10000行ごとに固めてPost
  • 本当はAWS Lambdaで/tmp使ってやろうと思ったけど、時間かかって5分で収まらなかったのでEC2で…

    StepFunctions使ってホゲホゲとかやっても良いかなぁ〜

    gzをダウンロードして、

    ftp = FTP()
    ftp.connect(host='ita.ee.lbl.gov')
    ftp.login(user='anonymous', passwd="")
    ftp.set_pasv(True)
    ftp.retrbinary("RETR /traces/NASA_access_log_Jul95.gz", open('【ダウンロード先】/NASA_access_log_Jul95.gz', 'wb').write)
    ftp.retrbinary("RETR /traces/NASA_access_log_Aug95.gz", open('【ダウンロード先】/NASA_access_log_Aug95.gz', 'wb').write)
    

    gzを開いて行ごとに読み込んで

    for file in glob.glob('【ダウンロード先】/*'):
        f = gzip.open(file)
        session = requests.Session()
        line = f.readline()
    

    メタデータとコンテンツのJSONを作ってPOSTで送信。

    1万件で1回送信なのでrequests.Sessionがどれだけ効くのかはアレですが、、、

    meta = "{"index": {"_index":"nasa-logs-", "_type":"nasa-http"}} n"
    jsonstring = ""
    while line:
        i += 1
        line = f.readline()
        dict = {"message": line}
    
        try:
            jsonstring = jsonstring + meta + json.dumps(dict) + "n"
    
            if i % 10000 == 1:
                r = session.post("https://AmazonESのエンドポイント/_bulk?pipeline=nasa-http", data=jsonstring)
                print r.status_code
                jsonstring = ""
    
        except UnicodeDecodeError:
            print line
    

    当初は_bulkを使わずに1件1件↓のようにインデクシングしてたのですが、なかなか終わらず、、、

    requests.post("https://AmazonESのエンドポイント/nasa-logs-/nasa-http/?pipeline=nasa-http", data=jsonstring)
    

    マニュアル(https://www.elastic.co/guide/en/elasticsearch/reference/5.1/docs-bulk.html)みたら直感的にメタデータ+実データの組み合わせを作って突っ込むだけだったので(とは言え、改行コード入れてなくて"no requests added"ってエラーが出てStackOverFlowみたりしたけど…)、サクっと出来ました。

    Kibanaで可視化

    Kibanaからデータが見れるようになったので、 Screen Shot 2017-03-02 at 17.31.52//embedr.flickr.com/assets/client-code.js

    例えば、HTTPのレスポンスコードでパイチャート作ったりできたりします :) Screen Shot 2017-03-02 at 17.36.08//embedr.flickr.com/assets/client-code.js