import os
import time
import chardet
import requests
import requests.exceptions
from urllib.parse import urlparse
from bs4 import BeautifulSoup
import asyncio
import traceback
try:
from .config import get_header, func_timer, connect_mongo
from .config import TIMEOUT, DEPTH, MONGODB, AMOUNTS, CONTENT_LENGTH, DELAY, QUEUE_SIZE
from .logger import Logging
except ImportError:
from config import get_header, func_timer, connect_mongo
from config import TIMEOUT, DEPTH, MONGODB, AMOUNTS, CONTENT_LENGTH, DELAY, QUEUE_SIZE
from logger import Logging
_logger = Logging(os.path.splitext(os.path.basename(__file__))[0]).logObject
class Url(object):
url = ''
end_type = 'WEB'
parent_url = ''
depth = 0
def __init__(self, url, end_type, parent_url_obj=None):
self.url = url
self.end_type = end_type
if parent_url_obj:
self.parent_url = parent_url_obj.url
self.depth = parent_url_obj.depth + 1
class AllSiteSpider(object):
def __init__(self):
self.client = connect_mongo(MONGODB)
self.movie = self.client[MONGODB['collection']]
self.filter_set = set()
self.q = asyncio.Queue(maxsize=QUEUE_SIZE)
self.consume_num = 0
self.produce_num = 0
self.stop_extract_links = False
def start_crawl(self, start_url, end_type):
self.domain = getattr(urlparse(start_url), 'netloc', '')
self.end_type = end_type
url = start_url.strip()
if url.endswith('/'):
url = url[:-1]
self.filter_set.add(url)
start_url_obj = Url(url, self.end_type)
self.q.put_nowait(start_url_obj)
event_loop = asyncio.get_event_loop()
_logger.info('开始全站爬取:start_url=[{}], Depth={}, end_type={}, parent_url=None'.format(
start_url_obj.url, start_url_obj.depth, start_url_obj.end_type))
try:
event_loop.run_until_complete(self.run(event_loop))
finally:
event_loop.close()
async def run(self, loop):
consumers = [loop.create_task(self.consumer(i)) for i in range(5)]
await asyncio.wait(consumers)
async def consumer(self, num):
while True:
try:
url_obj = await self.q.get()
self.consume_num += 1
_logger.info('消费者{}:开始消耗第 {} 个, QSize={}, Depth={}, URL=[{}]'.format(
num, self.consume_num, self.q.qsize(), url_obj.depth, url_obj.url))
self.q.task_done()
await self.producer(url_obj)
finally:
if self.q.empty():
_logger.info('队列已空,任务结束,退出EventLoop.')
break
await self.q.join()
async def producer(self, url_obj):
self.produce_num += 1
_logger.info('生产者:开始生产第 {} 个, QSize={}, Depth={}, URL=[{}]'.format(
self.produce_num, self.q.qsize(), url_obj.depth, url_obj.url))
try:
response = self.download_html(url_obj.url)
self.save_html(response)
await asyncio.sleep(DELAY)
if url_obj.depth < DEPTH:
if not self.stop_extract_links:
links = self.extract_links(response, url_obj)
if (self.consume_num+self.q.qsize()) < AMOUNTS:
for url_obj in links:
await self.q.put(url_obj)
_logger.info('向当前队列Put {} 个url, 当前队列QSize={}'.format(len(links) if links else 0, self.q.qsize()))
else:
_logger.info('当前已消耗URL + 当前队列QSize为{},大于全站爬取最大限制数目{},停止向队列Put URL,丢弃提取的 {} 个url,并停止解析提取之后的所有页面链接'.format(
self.consume_num+self.q.qsize(), AMOUNTS, len(links) if links else 0))
self.stop_extract_links = True
else:
_logger.warning('当前URL深度为 {}, 达到最大深度限制,不再提取页面URL,当前URL=[{}]'.format(url_obj.depth, url_obj.url))
_logger.info('当前全局set共有 {} 个url'.format(len(self.filter_set)))
except Exception as e:
_logger.error('生产者异常:{}'.format(traceback.format_exc()))
def download_html(self, url, retry=2):
_logger.info('下载页面:开始下载HTML页面...')
try:
response = requests.get(url, headers=get_header(self.end_type), stream=True, timeout=TIMEOUT)
if int(response.headers.get('Content-Length', 0)) < CONTENT_LENGTH:
response.encoding = chardet.detect(response.content)['encoding']
return response
except requests.exceptions.RequestException as e:
_logger.error('访问链接失败: {}, ERROR: {}'.format(url, e))
if retry > 0:
_logger.info('链接: {} 准备重试倒数第 {} 次'.format(url, retry+1))
self.download_html(url, retry-1)
def save_html(self, response):
doc = {}
if response and response.status_code not in range(400, 600):
doc['request'] = {
'url': response.url,
'headers': response.request.headers,
'method': response.request.method
}
doc['response'] = {
'headers': response.headers,
'content': response.content,
'status_code': response.status_code
}
doc['end_type'] = self.end_type
doc['domain'] = getattr(urlparse(response.url), 'netloc', '')
doc['insert_time'] = format((time.time() * 1000), '0.0f')
if doc:
_logger.info('保存数据:解析并保存数据到MONGODB数据库...')
self.movie.insert(doc)
else:
if response:
_logger.warning('没有解析到数据, status_code={}, url={}'.format(
response.status_code, response.url))
else:
_logger.warning('没有解析到数据')
def record_urls(self, filename, url):
with open(filename, 'a', encoding='utf-8') as f:
f.write(url + '\n')
def filter_url(self, url, parent_url_obj, filtered_num, record=False):
url_obj = None
new_url = url.strip()
if new_url.endswith('/'):
new_url = new_url[:-1]
if not self.filter_set.__contains__(new_url) and self.domain in new_url:
self.filter_set.add(new_url)
url_obj = Url(new_url, self.end_type, parent_url_obj)
if record:
self.record_urls('unique.txt', new_url)
else:
filtered_num += 1
if record:
self.record_urls('filtered.txt', new_url)
return url_obj, filtered_num
def extract_links(self, response, url_obj):
if response:
link_list = []
filtered_num = 0
parent_url_obj = url_obj
parent_url = parent_url_obj.url
soup = BeautifulSoup(response.text, 'lxml')
tags = soup.find_all(True)
for tag in tags:
url = tag.get('href', None)
if url is not None:
if url.startswith('javascript') or url.startswith('index'):
continue
elif url.startswith('//'):
url = '{}:{}'.format(getattr(urlparse(parent_url), 'scheme', ''), url)
elif url.startswith('/'):
url = '{}://{}{}'.format(getattr(urlparse(parent_url), 'scheme', ''),
getattr(urlparse(parent_url), 'netloc', ''), url)
url_obj, filtered_num = self.filter_url(url, parent_url_obj, filtered_num)
if url_obj:
link_list.append(url_obj)
_logger.info('提取Urls:该页面共提取 {} 个, 过滤 {} 个'.format(len(link_list), filtered_num))
return link_list
else:
_logger.error("该页面为None")