앞서서 데이터를 정리하는것 까지는 마무리했다. 이제 이 현재 실시간 날씨에 대해서 사람들이 긍정적인지 부정적인지를 알아보기 위해서 파이프라인을 구축하여 확인해 볼 것이다. https://www.youtube.com/watch?v=sG6eg6i0DC0&t=2103s 한국 Elastic 사용자 그룹 유튜브 링크 영상을 참고하여 구축했다.
1. python에서 meachine learning 모델 Elastic Search로 저장
#deploy_model.py
import elasticsearch
from pathlib import Path
from eland.ml.pytorch import PyTorchModel
from eland.ml.pytorch.transformers import TransformerModel
tm= TransformerModel("matthewburke/korean_sentiment", "text_classification")
tmp_path="models"
Path(tmp_path).mkdir(parents=True, exist_ok=True)
model_path, config, vocab_path = tm.save(tmp_path)
es= elasticsearch.Elasticsearch("http://172.18.0.2:9200") #elasticsearch로 전송
ptm= PyTorchModel(es, tm.elasticsearch_model_id())
ptm.import_model(model_path= model_path, config_path=None, vocab_path=vocab_path, config=config)
위의 코드는 Hugging face페이지에서 korean NLP 모델을 다운하여 elasticsearch로 보내는 코드이다. 여기서 나는 docker compose를 작성했을 때 elasticsearch의 주소를 172.18.0.2로 고정시켰기 때문에 모델을 보내는 주소또한 동일하게 작성해줬다.
이 코드를 돌리기 위해서 필요한 라이브러리는 아래와 같다.
torch==1.11.0
tqdm
transformers
sentence-transformers
eland==8.2
elasticsearch==8.4.2
이 또한 앞선 docker로 환경 구축하기에서 다 정리했던 부분이지만 그래도 다시 정리해봤다. 이제 python 컨테이너에서 deploy_model.py 파일을 실행시켜 정상적으로 elastic search에 전송되었는지 확인해봤다.
위의 이미지와 같이 정상적으로 학습된 모델이 저장되어있는것을 확인할 수 있었다. 이제 ingest pipeline을 구축해서 실시간 데이터에 적용시켜서 확인해 볼 것이다.
2. Elastic search ingest pipeline 구축하기
우선 나는 kibana 콘솔로 ingest pipeline을 구축했으나 이미지로 보면 가독성이 떨어질거 같아서 프로세스를 정리하려고 한다.
#kr-sentiment
[
{
"inference": { #머신러닝 모델 data필드에 적용
"model_id": "matthewburke__korean_sentiment",
"target_field": "predict",
"field_map": {
"data": "text_field"
},
"inference_config": { #필드 결과 이름 변경
"text_classification": {
"classification_labels": [
"부정적",
"긍정적"
]
}
}
}
},
{
"set": { #상위필드로 예측 꺼내기
"field": "sentiment-predict",
"value": "{{{predict.predicted_value}}}"
}
},
{
"set": { #상위필드로 긍부정확률 꺼내기
"field": "predict-probability",
"value": "{{{predict.prediction_probability}}}"
}
},
{
"remove": { #예측 필드 제거
"field": "predict"
}
}
]
위에 코드에 간단하게 파트별로 간단하게 주석을 달아주었다. 이제 각각의 부분을 세세하게 정리해보면 아래와 같다
- inference
- model_id : 저장되어 있는 모델인
- target_field : 머신러닝을 완료한 결과를 저장할 필드
- field_map : 머신러닝에 들어가게 될 텍스트 필드 (입력받은 실시간 데이터 텍스트)
- inference_config : 기본적인 결과가 LABEL_0, LABEL_1 이였기 때문에 알아보기 편하게 부정적, 긍정적으로 결과 변환
- set : 기본적으로 predict 필드 즉 결과 필드에 필요없는 부분이 있기 때문에 필요한 부분인 예측결과와 확률을 꺼내서 따로 필드로 생성
- remove: 필요없는 나머지 데이터가 있는 predict필드 삭제
3.Logstash pipeline output 코드변경
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "test"
document_id => "%{fingerprint}"
pipeline => "kr_sentiment"
}
}
이전에 테스트했던 코드와 달라진 부분은 pipeline을 추가해줘서 elastic search의 ingest pipeline을 적용시켜 데이터를 저장시키는 형태로 수정해주었다.
4.테스트 결과
아주 정상적으로 결과가 출력되는 모습을 볼 수 있다. 사실 여기까지 오는데도 아직 많이 부족해서 시간이 오래걸리게 된 것 같다. 그래도 이제 kibana dashboard만 만들면 이번 프로젝트를 성공적으로 끝낼 수 있을 것 같아서 뿌듯한 감정이 든다. 이번 프로젝트를 하면서 사소하거나 크게 문제가 되었던 부분들은 따로 정리해서 마지막 포스팅에 올릴 예정이다.
'개인프로젝트 > twitter 실시간 데이터 프로젝트(ELK stack)' 카테고리의 다른 글
실시간 데이터 프로젝트 회고 (1) | 2022.11.10 |
---|---|
kibana dashboard 제작 (0) | 2022.11.09 |
Logstash 실시간 데이터 처리 (0) | 2022.10.29 |
twitter 실시간 데이터 python으로 저장하기 (0) | 2022.10.21 |
[docker] python+filebeat+elk 환경 만들기 (0) | 2022.10.17 |