PythonからElasticsearchへや大量にクエリ(indexだけではなくcreateやupdate、deleteも)を投げる処理を行う必要があったので、Bulkでまとめて処理をさせることになった。 Bulk処理をさせる場合、helpersがよく使用されているので今回はそれを利用する。以下のように記述することで、createやupdate、deleteについてbulk処理を行わせる事ができる。以下の例では、dataに入っている各データをcreateでbulk処理させている(Elasticsearchへの接続処理等は省略)。

es_index = 'INDEX'
es_doc = 'DOC'

query = []
for row in data:
    # idを固定にする場合は、'_id'で指定できる。
    datas.append({'_op_type':'create','_index':es_index,'_type':es_doc,'_source':row})

    if len(data) >= 1000:
        helpers.bulk(client=es_conn,actions=data,refresh=True,chunk_size=1000,request_timeout=150)

if len(data) > 0:
    helpers.bulk(client=es_conn,actions=data,refresh=True,chunk_size=1000,request_timeout=150)

createやdelete、updateといった指定は _op_type で指定できる。 また、Bulk処理はhelpers.bulkのChunk(デフォルトでは500件)単位で処理を行うようなのだが、クエリが1個でもエラーになるとそれ以降のChunkは実行されないようだ(エラーになったクエリを持つChunkが実行した他のクエリは実行される)。 なので、500件以上の件数をbulk処理する場合は、chunkの件数も変更した方がいいだろう。Chunkの数はstreaming_bulkでデフォルト500となっている(helpers.bulkはhelpers.streaming_bulkのラッパー)のだけど、bulkで呼び出す際にも指定できるのでそれで対応している。

ちなみに、エラーメッセージはこの状態だと出力されないので、もし必要であればtryでbulk処理をさせて、例外発生時には「print(sys.exc_info())」で処理をさせることで出力ができる。