PTT Crawler¶

In [ ]:
import requests
from requests_html import HTML
import json
import os
import glob
import warnings
import pandas as pd

warnings.filterwarnings('ignore')
warnings.simplefilter(action='ignore', category=pd.errors.SettingWithCopyWarning)
In [ ]:
lost_resp = []
In [ ]:
def fetch(url): # 要求連結至指定url
    resp = requests.get(url)
    resp = requests.get(url, cookies={'over18': '1'}) # 是否滿18歲填寫「是」
    return resp # 返回回應

def parse_article_entries(doc): # 處理PTT文章條目頁面
    html = HTML(html=doc) # 將字串轉成html格式
    post_entries = html.find('div.r-ent') # 找到該頁面所有條目(一頁20條)
    return post_entries # 返回所取得條目

def parse_ptt_post(resp): # 處理PTT文章
    try:
        html = HTML(html=resp.text) # 將字串轉成html格式
        data = {            # 預計回傳的資料:
            'poster': '',   #   發文者
            'forum': '',    #   看板
            'title': '',    #   標題
            'time': '',     #   發文時間
            'content': '',  #   內文
            'comments': [], #   留言
            'ip': ''        #   ip
        }
        elements = html.find('div#main-content>*')
        author_label, author = elements[0].find('div>*')
        forum_label, forum = elements[1].find('div>*')
        title_label, title = elements[2].find('div>*')
        time_label, time = elements[3].find('div>*')
        pre_text = f'{author_label.text}{author.text}\n{forum_label.text}{forum.text}\n{title_label.text}{title.text}\n{time_label.text}{time.text}\n'
        end_text = [element for element in html.find('div#main-content>span.f2') if '※ 發信站:' in element.text or '※ 編輯:' in element.text][0].text
        
        data['poster'] = author.text
        data['forum'] = forum.text
        data['title'] = title.text
        _, month, day, time, year = time.text.split(' ')
        month = {'Jan': '01',
                 'Feb': '02',
                 'Mar': '03',
                 'Apr': '04',
                 'May': '05',
                 'Jun': '06',
                 'Jul': '07',
                 'Aug': '08',
                 'Sep': '09',
                 'Oct': '10',
                 'Nov': '11',
                 'Dec': '12'}[month]
        day = '0'*(2-len(day))+day
        data['time'] = f'{year}-{month}-{day} {time}'
        
        data['content'] = html.find('div#main-content')[0].text.replace(pre_text, '').split(end_text)[0]
        if data['content'][-3:]=='-- ':
            data['content'] = data['content'][:-3]
        
        for element in html.find('div#main-content>span.f2'):
            if '※ 發信站:' in element.text:
                data['ip'] = element.text.split('來自: ')[1].split(' (')[0]
        
        commenters = [] # 找留言
        for push_tag, commenter, comment, comment_time in zip(html.find('div#main-content>div.push>span.push-tag'), html.find('div#main-content>div.push>span.push-userid'), html.find('div#main-content>div.push>span.push-content'), html.find('div#main-content>div.push>span.push-ipdatetime')): # 找留言者、留言、留言時間
            if len(comment_time.text.split(' '))==3:
                time = comment_time.text.split(' ')[1].replace('/', '-') + ' ' + comment_time.text.split(' ')[2]
                ip = comment_time.text.split(' ')[0]
            elif len(comment_time.text.split(' '))==2:
                time = comment_time.text
                ip = ''
            else:
                time = ip = ''

            if commenter.text not in commenters:# 因為PTT是依照時間排列留言,同一留言可能會被切斷,所以要把同一留言者的留言串接在一起
                data['comments']+=[
                    {
                        'push_tag': {'推': 'P', '噓': 'N', '→': '', '': ''}[push_tag.text],
                        'poster': commenter.text,
                        'content': comment.text[2:],
                        'time': time,
                        'ip': ip
                    }
                ]
                commenters.append(commenter.text)

            else:
                data['comments'][commenters.index(commenter.text)]['content']+=comment.text[2:]
                
        return data # 回傳資料
    
    except Exception as e:
        print('\n')
        print('Error found:')
        lost_resp.append(resp)
        print(data['title'], e)
        print('\n')
        return

def parse_save_ptt_post(post_url, print_result=False, use_thread=True): # 處理並儲存PTT文章
    import threading # 多執行緒處理
    def task(post_url):
        resp = fetch(post_url) # 索取文章url回應
        if resp.status_code==404: raise Exception('Page not found.')

        data = parse_ptt_post(resp) # 處理文章
        if data:
            data['url'] = post_url # 加上文章url

            json_object = json.dumps(data, indent=4, ensure_ascii=False) # 轉成json
                
            name = f"{data['forum']}_{data['url'].split(data['forum']+'/')[1].replace('.', '_').replace('_html', '')}" # 目標檔名
            if not data['forum']:
                name = data['url'].split('bbs/')[-1].replace('/', '_').replace('.', '_').replace('_html', '')
                
            with open(f'posts/{name}.json', 'w', encoding='utf8') as f: #寫成json檔
                f.writelines(json_object)
                if print_result: print(f'post saved as `posts/{name}.json`.')
                
        return

    if use_thread:
        thread = threading.Thread(target=task, args=(post_url,)) # 分給執行緒處理
        thread.start()
    else:
        task(post_url)

def update(): # 更新已爬取的文章
    import threading

    def update_post(post_url): # 重新爬取並儲存文章
        try:
            parse_save_ptt_post(post_url)
            print('Page updated', f': {post_url}')
        except Exception as e:
            print(e, f': {post_url}')
            pass
        
    files = glob.glob('posts/*.json') # 所有已儲存的文章json檔
    saved_posts = [json.load(open(file))['url'] for file in files] # 所有已儲存的文章的url
    for post_url in saved_posts: # 遍歷每條url,更新文章
        thread = threading.Thread(target=update_post, args=(post_url,))

        thread.start()
In [ ]:
if not os.path.isdir('posts'): # 文章儲存資料夾
    os.mkdir('posts')
In [ ]:
n_target_post = 10000 # 預計爬取文章數
update_posts = False # 如遇到已儲存的文章,是否更新

find_latest = True # 從最新文章開始往前爬
current_page = 10 # 開始爬取的條目頁面,如果find_latest==True,會被忽略

post_urls = []
post_page_ids = []

forum = 'HatePolitics' # 所要爬取的看板名稱

saved_posts = [json.load(open(file))['url'] for file in glob.glob('posts/*.json')] # 已儲存的文章

resp = fetch(f'https://www.ptt.cc/bbs/{forum}/index.html') # 從最新的條目頁面開始
html = HTML(html=resp.text) # 轉成html

if find_latest:
    current_page = int(html.find('div.btn-group-paging>a')[1].attrs['href'].split('/')[-1].split('.')[0].replace('index', '')) # 目前條目頁面的頁數

continue_crawling = True
while continue_crawling: # 開始爬取
    url = f'https://www.ptt.cc/bbs/{forum}/index{current_page}.html'
    resp = fetch(url)
    post_entries = parse_article_entries(resp.text)
    
    for post_entry in post_entries:
        try:
            post_url = 'https://www.ptt.cc' + post_entry.find('div.title>a', first=True).attrs['href']
            if update_posts or post_url not in saved_posts:
                parse_save_ptt_post(post_url)
                n_target_post-=1
        except Exception as e:
            pass
        
        print('Current page:', current_page, 'url: ', post_url, f'\tTargets left: {n_target_post}', ' '*10, end='\r')

        if n_target_post==0:
            continue_crawling = False
            break
    
    current_page-=1