PythonからElasticsearch 5へのBulk処理を行わせる

PythonからElasticsearchへや大量にクエリ(indexだけではなくcreateやupdate、deleteも)を投げる処理を行う必要があったので、Bulkでまとめて処理をさせることになった。
Bulk処理をさせる場合、helpersがよく使用されているので今回はそれを利用する。以下のように記述することで、createやupdate、deleteについてbulk処理を行わせる事ができる。以下の例では、dataに入っている各データをcreateでbulk処理させている(Elasticsearchへの接続処理等は省略)。

Sponsored Links

es_index = 'INDEX'
es_doc = 'DOC'

query = []
for row in data:
    # idを固定にする場合は、'_id'で指定できる。
    datas.append({'_op_type':'create','_index':es_index,'_type':es_doc,'_source':row})
    
    if len(data) >= 1000:
        helpers.bulk(client=es_conn,actions=data,refresh=True,chunk_size=1000,request_timeout=150)

if len(data) > 0:
    helpers.bulk(client=es_conn,actions=data,refresh=True,chunk_size=1000,request_timeout=150)

 

createやdelete、updateといった指定は’_op_type’で指定できる。
また、Bulk処理はhelpers.bulkのChunk(デフォルトでは500件)単位で処理を行うようなのだが、クエリが1個でもエラーになるとそれ以降のChunkは実行されないようだ(エラーになったクエリを持つChunkが実行した他のクエリは実行される)。なので、500件以上の件数をbulk処理する場合は、chunkの件数も変更した方がいいだろう。Chunkの数はstreaming_bulkでデフォルト500となっている(helpers.bulkはhelpers.streaming_bulkのラッパー)のだけど、bulkで呼び出す際にも指定できるのでそれで対応している。

ちなみに、エラーメッセージはこの状態だと出力されないので、もし必要であればtryでbulk処理をさせて、例外発生時には「print(sys.exc_info())」で処理をさせることで出力ができる。

 


Written by blacknon

インフラエンジニア(…のつもり)。 仕事で使うならクライアントはWindowsよりはUNIXの方が好き。 大体いつも眠い。

Leave a Comment

メールアドレスが公開されることはありません。

*