PythonからElasticsearchへや大量にクエリ(indexだけではなくcreateやupdate、deleteも)を投げる処理を行う必要があったので、Bulkでまとめて処理をさせることになった。 Bulk処理をさせる場合、helpersがよく使用されているので今回はそれを利用する。以下のように記述することで、createやupdate、deleteについてbulk処理を行わせる事ができる。以下の例では、dataに入っている各データをcreateでbulk処理させている(Elasticsearchへの接続処理等は省略)。
es_index = 'INDEX'
es_doc = 'DOC'
query = []
for row in data:
# idを固定にする場合は、'_id'で指定できる。
datas.append({'_op_type':'create','_index':es_index,'_type':es_doc,'_source':row})
if len(data) >= 1000:
helpers.bulk(client=es_conn,actions=data,refresh=True,chunk_size=1000,request_timeout=150)
if len(data) > 0:
helpers.bulk(client=es_conn,actions=data,refresh=True,chunk_size=1000,request_timeout=150)
createやdelete、updateといった指定は _op_type
で指定できる。
また、Bulk処理はhelpers.bulkのChunk(デフォルトでは500件)単位で処理を行うようなのだが、クエリが1個でもエラーになるとそれ以降のChunkは実行されないようだ(エラーになったクエリを持つChunkが実行した他のクエリは実行される)。
なので、500件以上の件数をbulk処理する場合は、chunkの件数も変更した方がいいだろう。Chunkの数はstreaming_bulkでデフォルト500となっている(helpers.bulkはhelpers.streaming_bulkのラッパー)のだけど、bulkで呼び出す際にも指定できるのでそれで対応している。
ちなみに、エラーメッセージはこの状態だと出力されないので、もし必要であればtryでbulk処理をさせて、例外発生時には「print(sys.exc_info())」で処理をさせることで出力ができる。