728x90
소개
파이썬에서 엘라스틱서치과 연동하여 기본 CRUD 사용법에 대해 알아보겠습니다.
우선 엘라스틱서치에서 사용하는 용어를 알아야 할 것입니다.
RDBMS | Elasticsearch |
Database | Index |
Table | Type |
Row | Document |
.. | .. |
코드
전체 코드입니다.
from elasticsearch import Elasticsearch
def create_index():
if not es.indices.exists(index=index):
return es.indices.create(index=index)
def delete_index():
if es.indices.exists(index=index):
return es.indices.delete(index=index)
def insert(body):
return es.index(index=index, doc_type=doc_type, body=body)
def delete(data):
if data is None:
data = {"match_all": {}}
else:
data = {"match": data}
body = {"query": data }
return es.delete_by_query(index, body=body)
def delete_by_id(id):
return es.delete(index, id=id)
def search(data=None):
if data is None:
data = {"match_all": {}}
else:
data = {"match": data}
body = {"query": data }
res = es.search(index=index, body=body)
return res
def update(id, doc):
body = {
'doc': doc
}
res = es.update(index=index, id=id, body=body, doc_type=doc_type)
return res
if __name__ == '__main__':
url = '192.168.111.176'
port = '9200'
index = 'news'
doc_type = 'daum'
es = Elasticsearch(f'{url}:{port}')
인덱스 생성/삭제
기존에 index 가 존재하면 create 시 에러가 발생합니다.
elasticsearch.exceptions.RequestError: RequestError(400, 'resource_already_exists_exception', 'index [news/XV4Yx-nFSWK93-T9rB0uQA]
already exists')
다음과 같이 처리해주시면 깔끔하겠습니다.
def create_index(index):
if not es.indices.exists(index=index):
return es.indices.create(index=index)
삭제시에도 성공적이면 아래와 같이 데이터가 반환되며 비정상은 None 을 반환합니다.
es = Elasticsearch(f'{url}:{port}')
# r = create_index(index)
dr = delete_index(index)
if dr is not None:
print('delete ok')
print(res_delete)
else:
print('delete no')
delete ok
{'acknowledged': True}
삽입
데이터 삽입 부분입니다.
def insert(body):
return es.index(index=index, doc_type=doc_type, body=body)
...
index = 'news'
doc_type = 'daum'
data = {
'date': '202008112229',
'category': 'society',
'newspaper': 'KBS',
'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 구조됐습니다.',
'url': 'https://news.v.daum.net/v/20200811222929678'
}
ir = insert(data)
print(ir)
{'_index': 'news', '_type': 'daum', '_id': 'HQjQ3XMBFvHGaPMSz9aD', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
조회
데이터 조회 부분입니다.
body에 {"query":{"match_all": {}}} 를 넣으면 모든 데이터를 조회합니다.
def search(index, data=None):
if data is None:
data = {"match_all": {}}
else:
data = {"match": data}
body = {"query": data }
res = es.search(index=index, body=body)
return res
...
sr = search()
pprint.pprint(sr)
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
'hits': {'hits': [{'_id': 'HgjS3XMBFvHGaPMSn9YM',
'_index': 'news',
'_score': 1.0,
'_source': {'category': 'society',
'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 '
'올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 '
'구조됐습니다.',
'date': '202008112229',
'newspaper': 'KBS',
'site': 'daum',
'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
'url': 'https://news.v.daum.net/v/20200811222929678'},
'_type': 'daum'}],
'max_score': 1.0,
'total': {'relation': 'eq', 'value': 2}},
'timed_out': False,
'took': 17}
단일 필드로 조회할 수 있구요.
def search(index, data=None):
if data is None:
data = {"match_all": {}}
else:
data = {"match": data}
body = {"query": data }
res = es.search(index=index, body=body)
return res
...
sr = search(index, {'category': 'society'})
pprint.pprint(sr)
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
'hits': {'hits': [{'_id': 'HgjS3XMBFvHGaPMSn9YM',
'_index': 'news',
'_score': 1.0,
'_source': {'category': 'society',
'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 '
'올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 '
'구조됐습니다.',
'date': '202008112229',
'newspaper': 'KBS',
'site': 'daum',
'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
'url': 'https://news.v.daum.net/v/20200811222929678'},
'_type': 'daum'}],
'max_score': 1.0,
'total': {'relation': 'eq', 'value': 2}},
'timed_out': False,
'took': 17}
필드를 여러개 넣으면 다음과 같은 에러가 발생합니다.
r = search(index, {'category': 'society', 'newspaper': 'KBS'})
elasticsearch.exceptions.RequestError: RequestError(400, 'parsing_exception', "[match] query doesn't support multiple fields, found [category] and [newspaper]")
삭제
조회와 마찬가지로 필드로하여 삭제할 시에는 delete_by_query 함수를 사용합니다.
def delete(index, data):
if data is None:
data = {"match_all": {}}
else:
data = {"match": data}
body = {"query": data }
return es.delete_by_query(index, body=body)
...
dr = delete(index, {'category': 'society'})
print(dr)
sr = search(index)
pprint.pprint(sr)
{'took': 819, 'timed_out': False, 'total': 1, 'deleted': 1, 'batches': 1, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
'hits': {'hits': [],
'max_score': None,
'total': {'relation': 'eq', 'value': 0}},
'timed_out': False,
'took': 2}
id 를 이용하여 삭제할 경우에는 delete 함수를 사용합니다.
def delete_by_id(id):
return es.delete(index, id=id)
...
data = {
'date': '202008112229',
'category': 'society',
'newspaper': 'KBS',
'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 구조됐습니다.',
'url': 'https://news.v.daum.net/v/20200811222929678'
}
ir = insert(data)
dr = delete_by_id(ir['_id'])
print(dr)
sr = search()
pprint.pprint(sr)
{'_index': 'news', '_type': '_doc', '_id': 'Lwj63XMBFvHGaPMSFdYH', '_version': 2, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 6, '_primary_term': 1}
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
'hits': {'hits': [],
'max_score': None,
'total': {'relation': 'eq', 'value': 0}},
'timed_out': False,
'took': 2}
업데이트
id 를 이용해서 업데이트합니다.
delete 와 마찬가지로 update_by_query 함수가 있지만, 사용법이 까다롭습니다.
def update(id, doc):
body = {
'doc': doc
}
res = es.update(index=index, id=id, body=body, doc_type=doc_type)
return res
...
data = {
'date': '202008112229',
'category': 'society',
'newspaper': 'KBS',
'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 구조됐습니다.',
'url': 'https://news.v.daum.net/v/20200811222929678'
}
ir = insert(data)
ur = update(ir['_id'], {'newspaper': 'SBS'})
pprint.pprint(ur)
sr = search()
pprint.pprint(sr)
{'_id': 'eAgz3nMBFvHGaPMSPNaO',
'_index': 'news',
'_primary_term': 1,
'_seq_no': 1,
'_shards': {'failed': 0, 'successful': 1, 'total': 2},
'_type': 'daum',
'_version': 2,
'result': 'updated'}
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
'hits': {'hits': [{'_id': 'eAgz3nMBFvHGaPMSPNaO',
'_index': 'news',
'_score': 1.0,
'_source': {'category': 'society',
'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 '
'올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 '
'구조됐습니다.',
'date': '202008112229',
'newspaper': 'SBS',
'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
'url': 'https://news.v.daum.net/v/20200811222929678'},
'_type': 'daum'}],
'max_score': 1.0,
'total': {'relation': 'eq', 'value': 1}},
'timed_out': False,
'took': 2}
'Language > Python' 카테고리의 다른 글
파이썬 쓰레드(Thread) 알아보기 - 2 : 뮤텍스(Lock) (2) | 2020.08.12 |
---|---|
파이썬 쓰레드(Thread) 알아보기 - 1 (0) | 2020.08.12 |
파이썬 엘라스틱서치(Elasticsearch) 연동 -1 : 원격 설정 (0) | 2020.08.11 |
파이썬 명령행 옵션, 인자 사용하기 : argparse (0) | 2020.08.10 |
파이썬 추상 클래스(abc) 사용하기 (0) | 2020.08.10 |