def fetch(url): # 要求連結至指定url
resp = requests.get(url)
resp = requests.get(url, cookies={'over18': '1'}) # 是否滿18歲填寫「是」
return resp # 返回回應
def parse_article_entries(doc): # 處理PTT文章條目頁面
html = HTML(html=doc) # 將字串轉成html格式
post_entries = html.find('div.r-ent') # 找到該頁面所有條目(一頁20條)
return post_entries # 返回所取得條目
def parse_ptt_post(resp): # 處理PTT文章
try:
html = HTML(html=resp.text) # 將字串轉成html格式
data = { # 預計回傳的資料:
'poster': '', # 發文者
'forum': '', # 看板
'title': '', # 標題
'time': '', # 發文時間
'content': '', # 內文
'comments': [], # 留言
'ip': '' # ip
}
elements = html.find('div#main-content>*')
author_label, author = elements[0].find('div>*')
forum_label, forum = elements[1].find('div>*')
title_label, title = elements[2].find('div>*')
time_label, time = elements[3].find('div>*')
pre_text = f'{author_label.text}{author.text}\n{forum_label.text}{forum.text}\n{title_label.text}{title.text}\n{time_label.text}{time.text}\n'
end_text = [element for element in html.find('div#main-content>span.f2') if '※ 發信站:' in element.text or '※ 編輯:' in element.text][0].text
data['poster'] = author.text
data['forum'] = forum.text
data['title'] = title.text
_, month, day, time, year = time.text.split(' ')
month = {'Jan': '01',
'Feb': '02',
'Mar': '03',
'Apr': '04',
'May': '05',
'Jun': '06',
'Jul': '07',
'Aug': '08',
'Sep': '09',
'Oct': '10',
'Nov': '11',
'Dec': '12'}[month]
day = '0'*(2-len(day))+day
data['time'] = f'{year}-{month}-{day} {time}'
data['content'] = html.find('div#main-content')[0].text.replace(pre_text, '').split(end_text)[0]
if data['content'][-3:]=='-- ':
data['content'] = data['content'][:-3]
for element in html.find('div#main-content>span.f2'):
if '※ 發信站:' in element.text:
data['ip'] = element.text.split('來自: ')[1].split(' (')[0]
commenters = [] # 找留言
for push_tag, commenter, comment, comment_time in zip(html.find('div#main-content>div.push>span.push-tag'), html.find('div#main-content>div.push>span.push-userid'), html.find('div#main-content>div.push>span.push-content'), html.find('div#main-content>div.push>span.push-ipdatetime')): # 找留言者、留言、留言時間
if len(comment_time.text.split(' '))==3:
time = comment_time.text.split(' ')[1].replace('/', '-') + ' ' + comment_time.text.split(' ')[2]
ip = comment_time.text.split(' ')[0]
elif len(comment_time.text.split(' '))==2:
time = comment_time.text
ip = ''
else:
time = ip = ''
if commenter.text not in commenters:# 因為PTT是依照時間排列留言,同一留言可能會被切斷,所以要把同一留言者的留言串接在一起
data['comments']+=[
{
'push_tag': {'推': 'P', '噓': 'N', '→': '', '': ''}[push_tag.text],
'poster': commenter.text,
'content': comment.text[2:],
'time': time,
'ip': ip
}
]
commenters.append(commenter.text)
else:
data['comments'][commenters.index(commenter.text)]['content']+=comment.text[2:]
return data # 回傳資料
except Exception as e:
print('\n')
print('Error found:')
lost_resp.append(resp)
print(data['title'], e)
print('\n')
return
def parse_save_ptt_post(post_url, print_result=False, use_thread=True): # 處理並儲存PTT文章
import threading # 多執行緒處理
def task(post_url):
resp = fetch(post_url) # 索取文章url回應
if resp.status_code==404: raise Exception('Page not found.')
data = parse_ptt_post(resp) # 處理文章
if data:
data['url'] = post_url # 加上文章url
json_object = json.dumps(data, indent=4, ensure_ascii=False) # 轉成json
name = f"{data['forum']}_{data['url'].split(data['forum']+'/')[1].replace('.', '_').replace('_html', '')}" # 目標檔名
if not data['forum']:
name = data['url'].split('bbs/')[-1].replace('/', '_').replace('.', '_').replace('_html', '')
with open(f'posts/{name}.json', 'w', encoding='utf8') as f: #寫成json檔
f.writelines(json_object)
if print_result: print(f'post saved as `posts/{name}.json`.')
return
if use_thread:
thread = threading.Thread(target=task, args=(post_url,)) # 分給執行緒處理
thread.start()
else:
task(post_url)
def update(): # 更新已爬取的文章
import threading
def update_post(post_url): # 重新爬取並儲存文章
try:
parse_save_ptt_post(post_url)
print('Page updated', f': {post_url}')
except Exception as e:
print(e, f': {post_url}')
pass
files = glob.glob('posts/*.json') # 所有已儲存的文章json檔
saved_posts = [json.load(open(file))['url'] for file in files] # 所有已儲存的文章的url
for post_url in saved_posts: # 遍歷每條url,更新文章
thread = threading.Thread(target=update_post, args=(post_url,))
thread.start()