개인프로젝트/twitter 실시간 데이터 프로젝트(ELK stack)

twitter 실시간 데이터 python으로 저장하기

장경훈 2022. 10. 21. 14:16

우선 앞서서 docker로 개발환경을 성공적으로 만들었었다. 그렇다면 이제는 트위터 api의 데이터가 어떤 식으로 불러와지는지 먼저 확인하는 작업이 필요하다고 생각한다. 데이터가 어떤 식으로 들어와 지는지 부터 한번 확인해 봤다. 

 

1.python 트위터 데이터 logging으로 저장하기

#send_data.py
import requests
import os
import json
import logging



def bearer_oauth(r):
    r.headers["Authorization"] = f"각자의 twitter api bearer 코드입력"
    r.headers["User-Agent"] = "v2FilteredStreamPython"
    return r


def get_rules():
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    return response.json()


def delete_all_rules(rules):
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot delete rules (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    print(json.dumps(response.json()))


def set_rules(delete): 
    # You can adjust the rules if needed
    sample_rules = [ 
        {"value": "날씨 lang:ko"}, #내가 스트리밍할 키워드와 작성된 국가설정
    ]
    payload = {"add": sample_rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload,
    )
    if response.status_code != 201:
        raise Exception(
            "Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))


def get_stream(set):
    mylogger= logging.getLogger() 
    mylogger.setLevel(logging.DEBUG) #디버그 이상급 데이터를 저장
    myhandler = logging.FileHandler('/usr/share/mylog/data.log',encoding='utf-8') #파일위치 작성
    mylogger.addHandler(myhandler) 
    
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True,
    )
    print(response.status_code) 
    if response.status_code != 200:
        raise Exception(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    while True:
        try:
            for response_line in response.iter_lines():
                if response_line: #읽어드린 데이터가 개행문자를 포함하고 있으면 데이터가 깨짐 현상이 발생할 
                		#수 있기 때문에 제거후 데이터 저장
                    json_response = json.loads(response_line)
                    par_text=json_response['data']['text'].replace("\n", ' ')
                    mylogger.info(par_text)
                    
        except ChunkedEncodingError:
            continue

def main():

    rules = get_rules()
    delete = delete_all_rules(rules)
    set = set_rules(delete)
    get_stream(set)


if __name__ == "__main__":
    main()
#참고한 github
https://github.com/twitterdev/Twitter-API-v2-sample-code/blob/main/Filtered-Stream/filtered_stream.py

우선 기본적인 코드 예시는 위의 주소의 코드를 사용했다. 추가적으로 파일에 log 형식으로 데이터를 저장하고 싶었기 때문에 logging 모듈을 사용해서 데이터를 저장하려고 했다. 여기서 나에게 필요한 데이터는 결국 텍스트 데이터이지만 이 twitter api의 기본 요소는 id도 포함되어 있기 때문에 나는 text부분만을 읽어서 파일에 저장시켰다.  

테스트를 위해서 '날씨'에 대한 트위터 데이터를 수집한 모습

python 컨테이너 안에서 send_data.py 스크립트를 실행하니 위의 이미지와 같이 정상적으로 데이터가 저장되는 모습을 볼 수 있다. 하지만 여기서 저장된 양식을 보면,  어차피 나는 사람들의 의견 부분만 필요하기 때문에  RT, @id와 같이 필요 없는 데이터 값과 데이터 중복이 생기고 있다. 그렇기 때문에 추후에 Logstash를 사용해서 데이터를 전 처리할 예정이다.

 

2. filebeat -> logstash -> elasticsearch 파이프라인 정상적으로 동작하는지 확인

우선 앞서 파일에 트위터 실시간 데이터가 저장되는 모습을 볼 수 있었다. 이제 이 데이터를 filebeat에서 logstash를 거쳐 elasticsearch로 이동되는지에 대해서 확인해 보려고 한다.

#logstash pipeline
input {
  beats {
    port => 5044
  }
}


output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "test"
  }
}

우선적으로 데이터가 정상적으로 들어가는지에 대해서 확인하고 어떤 형태를 가지고 있는지 확인하기 위해서 단순하게 filebeat에서 input을 받아 elasticsearch로 output하는 코드를 작성해서 테스트해보려고 했다.

 

kibana console에서 테스트 한 이미지

실제 테스트를 하기 위해 수집한 '날씨' 키워드 데이터의 도큐먼트를 보니 filebeat에서 데이터를 정상적으로 받아서 저장된 모습을 볼 수 있다. 여기서 내가 필요한 필드는 message, @timestamp 부분만 필요할 것이므로 필요 없는 부분은 추후 logstash로 전처리를 할 예정이다.

 

 

우선적으로 python -> filebeat -> logstash-> elasticsearch로 실시간 데이터가 정상적으로 전송되는것을 볼 수 있었다.  이 정도까지 온 것만으로도 뭔가 프로젝트의 50%는 성공한 것 같은 기분이 든다. 이제 데이터를 전처리하고 kibana 대시보드로 시각화까지 해서 프로젝트를 성공적으로 마무리하고 싶다.