본문으로 바로가기
728x90

소개

 

파이썬에서 엘라스틱서치과 연동하여 기본 CRUD 사용법에 대해 알아보겠습니다.

 

우선 엘라스틱서치에서 사용하는 용어를 알아야 할 것입니다.

 

RDBMS Elasticsearch
Database Index
Table Type
Row Document
.. ..

 

 

Python Elasticsearch Client — Elasticsearch 8.0.0 documentation

This client was designed as very thin wrapper around Elasticsearch’s REST API to allow for maximum flexibility. This means that there are no opinions in this client; it also means that some of the APIs are a little cumbersome to use from Python. We have

elasticsearch-py.readthedocs.io

 

코드

 

전체 코드입니다.

from elasticsearch import Elasticsearch

def create_index():
    if not es.indices.exists(index=index):
        return es.indices.create(index=index)

def delete_index():
    if es.indices.exists(index=index):
        return es.indices.delete(index=index)

def insert(body):
    return es.index(index=index, doc_type=doc_type, body=body)

def delete(data):
    if data is None:
        data = {"match_all": {}}
    else:
        data = {"match": data}
    body = {"query": data }
    return es.delete_by_query(index, body=body)

def delete_by_id(id):
    return es.delete(index, id=id)

def search(data=None):
    if data is None:
        data = {"match_all": {}}
    else:
        data = {"match": data}
    body = {"query": data }
    res = es.search(index=index, body=body)
    return res
    
def update(id, doc):
    body = {
        'doc': doc
    }
    res = es.update(index=index, id=id, body=body, doc_type=doc_type)
    return res

if __name__ == '__main__':
    url = '192.168.111.176'
    port = '9200'
    index = 'news'
    doc_type = 'daum'
    
    es = Elasticsearch(f'{url}:{port}')

 

 

인덱스 생성/삭제

 

기존에 index 가 존재하면 create 시 에러가 발생합니다.

elasticsearch.exceptions.RequestError: RequestError(400, 'resource_already_exists_exception', 'index [news/XV4Yx-nFSWK93-T9rB0uQA]
already exists')

 

다음과 같이 처리해주시면 깔끔하겠습니다.

def create_index(index):
    if not es.indices.exists(index=index):
        return es.indices.create(index=index)

 

삭제시에도 성공적이면 아래와 같이 데이터가 반환되며 비정상은 None 을 반환합니다.

es = Elasticsearch(f'{url}:{port}')
# r = create_index(index)
dr = delete_index(index)
if dr is not None:
    print('delete ok')
    print(res_delete)
else:
    print('delete no')
delete ok
{'acknowledged': True}

 

 

삽입

 

데이터 삽입 부분입니다.

def insert(body):
    return es.index(index=index, doc_type=doc_type, body=body)
    
...

index = 'news'
doc_type = 'daum'
data = {
        'date': '202008112229',
        'category': 'society',
        'newspaper': 'KBS',
        'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
        'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 구조됐습니다.',
        'url': 'https://news.v.daum.net/v/20200811222929678'

    }
ir = insert(data)
print(ir)

 

{'_index': 'news', '_type': 'daum', '_id': 'HQjQ3XMBFvHGaPMSz9aD', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}

 

 

조회

 

데이터 조회 부분입니다.

 

body{"query":{"match_all": {}}} 를 넣으면 모든 데이터를 조회합니다.

def search(index, data=None):
    if data is None:
        data = {"match_all": {}}
    else:
        data = {"match": data}
    body = {"query": data }
    res = es.search(index=index, body=body)
    return res
    
...

sr = search()
pprint.pprint(sr)
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [{'_id': 'HgjS3XMBFvHGaPMSn9YM',
                    '_index': 'news',
                    '_score': 1.0,
                    '_source': {'category': 'society',
                                'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 '
                                           '올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 '
                                           '구조됐습니다.',
                                'date': '202008112229',
                                'newspaper': 'KBS',
                                'site': 'daum',
                                'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
                                'url': 'https://news.v.daum.net/v/20200811222929678'},
                    '_type': 'daum'}],
          'max_score': 1.0,
          'total': {'relation': 'eq', 'value': 2}},
 'timed_out': False,
 'took': 17}

 

단일 필드로 조회할 수 있구요.

def search(index, data=None):
    if data is None:
        data = {"match_all": {}}
    else:
        data = {"match": data}
    body = {"query": data }
    res = es.search(index=index, body=body)
    return res
    
...

sr = search(index, {'category': 'society'})
pprint.pprint(sr)
{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [{'_id': 'HgjS3XMBFvHGaPMSn9YM',
                    '_index': 'news',
                    '_score': 1.0,
                    '_source': {'category': 'society',
                                'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 '
                                           '올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 '
                                           '구조됐습니다.',
                                'date': '202008112229',
                                'newspaper': 'KBS',
                                'site': 'daum',
                                'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
                                'url': 'https://news.v.daum.net/v/20200811222929678'},
                    '_type': 'daum'}],
          'max_score': 1.0,
          'total': {'relation': 'eq', 'value': 2}},
 'timed_out': False,
 'took': 17}

 

필드를 여러개 넣으면 다음과 같은 에러가 발생합니다.

r = search(index, {'category': 'society', 'newspaper': 'KBS'})
elasticsearch.exceptions.RequestError: RequestError(400, 'parsing_exception', "[match] query doesn't support multiple fields, found [category] and [newspaper]")

 

 

삭제

 

조회와 마찬가지로 필드로하여 삭제할 시에는 delete_by_query 함수를 사용합니다.

def delete(index, data):
    if data is None:
        data = {"match_all": {}}
    else:
        data = {"match": data}
    body = {"query": data }
    return es.delete_by_query(index, body=body)
    
...

dr = delete(index, {'category': 'society'})
print(dr)

sr = search(index)
pprint.pprint(sr)
{'took': 819, 'timed_out': False, 'total': 1, 'deleted': 1, 'batches': 1, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 0}},
 'timed_out': False,
 'took': 2}

 

id 를 이용하여 삭제할 경우에는 delete 함수를 사용합니다.

def delete_by_id(id):
    return es.delete(index, id=id)
    
...

data = {
        'date': '202008112229',
        'category': 'society',
        'newspaper': 'KBS',
        'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
        'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 구조됐습니다.',
        'url': 'https://news.v.daum.net/v/20200811222929678'

    }
    
ir = insert(data)

dr = delete_by_id(ir['_id'])
print(dr)

sr = search()
pprint.pprint(sr)
{'_index': 'news', '_type': '_doc', '_id': 'Lwj63XMBFvHGaPMSFdYH', '_version': 2, 'result': 'deleted', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 6, '_primary_term': 1}

{'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [],
          'max_score': None,
          'total': {'relation': 'eq', 'value': 0}},
 'timed_out': False,
 'took': 2}

 

 

업데이트

 

id 를 이용해서 업데이트합니다.

delete 와 마찬가지로 update_by_query 함수가 있지만, 사용법이 까다롭습니다.

def update(id, doc):
    body = {
        'doc': doc
    }
    res = es.update(index=index, id=id, body=body, doc_type=doc_type)
    return res

...

data = {
        'date': '202008112229',
        'category': 'society',
        'newspaper': 'KBS',
        'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
        'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 구조됐습니다.',
        'url': 'https://news.v.daum.net/v/20200811222929678'

    }
    
ir = insert(data)
ur = update(ir['_id'], {'newspaper': 'SBS'})
pprint.pprint(ur)

sr = search()
pprint.pprint(sr)
{'_id': 'eAgz3nMBFvHGaPMSPNaO',
 '_index': 'news',
 '_primary_term': 1,
 '_seq_no': 1,
 '_shards': {'failed': 0, 'successful': 1, 'total': 2},
 '_type': 'daum',
 '_version': 2,
 'result': 'updated'}
 
 {'_shards': {'failed': 0, 'skipped': 0, 'successful': 1, 'total': 1},
 'hits': {'hits': [{'_id': 'eAgz3nMBFvHGaPMSPNaO',
                    '_index': 'news',
                    '_score': 1.0,
                    '_source': {'category': 'society',
                                'content': '집중호우와 산사태 경보가 내려졌는데도, 입산이 통제된 산에 '
                                           '올라가거나 바다에서 수영을 즐기던 동호회원들이 적발되거나 '
                                           '구조됐습니다.',
                                'date': '202008112229',
                                'newspaper': 'SBS',
                                'title': '큰 비 온다는 경보에도 수영등산 처벌은?',
                                'url': 'https://news.v.daum.net/v/20200811222929678'},
                    '_type': 'daum'}],
          'max_score': 1.0,
          'total': {'relation': 'eq', 'value': 1}},
 'timed_out': False,
 'took': 2}