最近牙疼,吃什么都不香,为了省点拔牙钱,排队办市民卡,郁闷,于是写文章解闷。
嗯……那就从一个简单的爬虫开始吧。
运行环境:Ubuntu 16.04
语言:Python2.7
数据库:Redis,mongoDB
中间件:RabbitMQ
进程管理:Supervisor
如无必要,勿增实体。为什么要加这么多花里胡哨的东西来占用资源?咳咳,就是为了玩玩。
需求文档:
- 1.爬取某平台达人数据,包含主页数据和视频粉丝等详情数据;
- 2.每周更新一次数据;
- 3.可随时插入或更新某达人详情数据。
画个巨Low的架构图来看看:
技术评估:
1.爬取频率:测测测!不要盲目自信设个延迟就溜了!需求一实际上是最基础的数据爬取和存储,还没涉及到数据处理层。但由于需要登录的cookie我只有一个,爬取频率要控制好(请求后延迟或消费者回调延迟都可以),线程嘛也不敢多开(python的multiprocessing模块中pool() + pool.map()开启多线程很方便,谨慎食用咯)。而多条MQ队列同时发布和消费本身就是多线程啊,How to DO?切记,如果没有十足的把握(让爬虫足够稳定),别把不同的消费者写在同一个脚本里。除非……把爬虫脚本写成API,通过发送请求来回调不同的队列消息,效果也不错,这样甚至能够通过设置请求频率来控制爬取频率。
2.数据去重:每周更新一次数据意味着每周都要爬一次该平台的所有达人,一般每周新增的达人不多,所以要在爬取时做去重处理。首先说一下为什么选择了mongoDB而不是MySQL?由于该平台爬取到的是json数据(包含list),故用mongoDB存储再好不过(懒人必备,这样就不用考虑存哪些字段,全拿下就行了,后期再按需求处理)。而对于json数据的处理,尽可能和爬取分开,在爬取过程中批量处理结构化数据,会非常影响爬虫效率,又或者,将爬取和数据处理分开在不同线程处理(切勿让数据处理延长单个线程的时间),可服务器资源有限的前提下这并不是一个高效的方法。既然不用MySQL,那去重我选择Redis。用极限思维来考虑,当爬虫级别上千万以后(这里还没考虑高并发读写),每一次查询都会消耗不少时间,爬虫效率也会逐渐降低,所以要在读写上尽可能提高效率。把键值对存到Redis的哈希表(上锁)来实现去重,效果可以媲美MySQL。
3.稳定性:有这么多花里胡哨的东西,稳定性要考虑的可多咯。首先是爬虫的稳定性,要控制频率,关于重试和超时,不要设置多次重试,就这一个cookie,失败了赶紧把URL重新塞回队列,等下一次爬取就行(非常有必要延迟一段时间后再启动下一次)。如果更加谨慎,可以新增一条与原队列延迟不同的「回笼」队列,来专门处理异常请求。RabbitMQ的话要考虑宕机的问题(谁知道什么时候挂机),这是要做好队列持久化;不小心断开连接也要做好信息不丢失的处理,没处理完的消息,保持下次连上的时候发布者能够重新发送;还有非常非常重要的——自动连接,如果想省心地挂上爬虫就和朋友去嗨皮,最好让RabbitMQ不检测心跳,这样就不会造成服务端主动断开连接(特别是延迟较大的时候)。以上几点也是我不采用Python自带的Queue模块的原因,显然使用中间件对队列的可控性更高,可灵活应对更多特殊情况。
4.灵活性:这是针对可随时插入或更新某条数据而言的。比方说,距离上次数据更新已经过去一星期了,公司运营小姐姐想要最新的某个达人的报价,怎么办?就这个问题我可以新增一条队列,叫「应急」队列什么的都行。这时她可以把这个达人的ID扔到「应急」队列中优先爬取(RabbitMQ的界面化操作非常友好),但这会有一个问题,万一不小心输错ID了怎么办?RabbitMQ可是照样发送的呀。不行不行,为了让运营部门省心,我可以开放一个API接口,当ID插入队列之前,我会判断该达人ID是否存在于达人库中,如果不存在该达人则不插入该消息。同理,公司不同岗位的同事的数据需求都可能不同,可按需开放接口。涉及到共用一个数据库,还要考虑如何去避免数据混淆(可构造列表存放不同时间维度的数据,同时也是为了处理时数据对齐)。
5.资源分配:如果可以的话,RabbitMQ一台,Redis一台,爬虫一台,数据库一台,代理的话,免费的可以用西刺(很不稳定)……当然,我只是玩玩,我全部用一台,嘻嘻。
程序设计
来了,要开始了,兄弟
创建虚拟环境
用python做项目要养成各项目间环境独立的习惯(当然如果使用的是docker那就方便多了) 现在我创建一个名为/spiders的文件夹并在该文件下创建名为env_py2.7虚拟环境:
root@ubuntu:~# cd /spiders
root@ubuntu:/spiders# virtualenv env_py2.7
激活环境与退出环境:
root@ubuntu:/spiders# source env_py2.7/bin/activate
(env_py2.7) root@ubuntu:/spiders# deactivate
创建项目并安装所有依赖
在/spiders文件夹下创建/xingtu文件夹,代码和配置等就放在这下面了。
root@ubuntu:/spiders/xingtu# tree
.
├── agent_proxy.py # 此文件存放user-agents和代理
├── cookie.txt # 此文件存放cookie
├── get_kol_detail.py # 爬取达人详细数据的脚本
├── get_kol_list.py # 爬取达人列表
├── requirements.txt # 打包依赖
├── send_id_api.py # 更新达人数据的接口
├── send_pages.py # 发布url消息
└── xingtu_supervisord.conf # 进程管理配置
0 directories, 8 files
这个小爬虫要用到的库及作用如下,我把所有依赖写进requirements.txt:
pika # 连接操作rabbitmq
requests # 爬虫
redis # 连接redis
pymongo # 连接mongoDB
flask # 编写API
flask_cors
pandas # 数据处理
gevent # 并发处理
在虚拟环境中直接安装所有依赖:
(env_py2.7) root@ubuntu:~# pip install -r /spiders/xingtu/requirements.txt
连接RabbitMQ
记得把heartbeat设为0,省去手动重连的后顾之忧:
credentials = pika.PlainCredentials(账号, 密码)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='服务器IP',
credentials=credentials,
heartbeat=0 )) # 设置heartbeat为0,意味着不检测心跳,server端不会主动断开连接
channel = connection.channel()
创建发布队列,发送URL(send_pages.py)
创建一个名为'xingtu_page'的队列来开始发布我们的爬取链接吧:
def get_page(page):
# url = '爬取 {} 的URL'.format(page) # 更新url的page参数
channel.queue_declare(queue='xingtu_page', durable=True)
channel.basic_publish(exchange='',
routing_key='xingtu_page',
body=url,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
if __name__ == '__main__':
pool = Pool(1) #只开一个线程
pool.map(get_page, range(1,2000)) #多线程工作
pool.close()
pool.join()
connection.close()
爬取达人列表(get_kol_list.py)
关于爬取中的超时和重试,可以自己定义,这里直接用requests封装好的:
req = requests.Session()
req.mount('http://', HTTPAdapter(max_retries=1))
req.mount('https://', HTTPAdapter(max_retries=1))
消费者回调函数:
def callback(ch, method, properties, body):
try:
'''调用get_kol函数爬取'''
get_kol_id(body)
time.sleep(20)
ch.basic_ack(delivery_tag=method.delivery_tag) # 消费者每次收到消息,要通知一声:已经收到,如果消费者连接断了,rabbitmq会重新把消息放到队列里,下次消费者可以连接的时候,就能重新收到丢失消息
except:
'''请求失败则把url重新放回队列'''
channel.basic_publish(exchange='',
routing_key='xingtu_page',
body=body,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
time.sleep(120) # 爬取失败就等两分钟再继续
爬取并且发布消息到新队列::
def get_kol_id(url):
headers = {'User-Agent': agent_proxy.get_agent()}
cookies = open(u'/spiders/xingtu/cookie.txt','r').read() # cookie写在cookie.txt里面
cookies_dict = {}
'''对原始cookies构造字典格式'''
for line in cookies.split(';'):
key, value = line.split('=', 1) # 1代表只分一次,得到两个数据
key = key.replace(' ', '')
cookies_dict[key] = value
list_kol = req.get(url, headers=headers, cookies=cookies_dict, timeout=20, verify=False).json() # 这里使用req是因为上面对resquests设置了重试机制
for author in list_kol['data']['authors']:
id_xingtu = author['id']
id_douyin = author['core_user_id']
if my_redis.hget('hash_kolid', id_douyin) == None: # redis 通过查询hash表去重
my_redis.hsetnx('hash_kolid', id_douyin, id_xingtu) # 将id写入redis,key为抖音id,value为达人在该平台id
channel.queue_declare(queue='xingtu_kol_id', durable=True) # 创建队列,发布达人的该平台id
channel.basic_publish(exchange='',
routing_key='xingtu_kol_id',
body=id_xingtu,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
else:
pass
if list(db_kl.find({'core_user_id': id_douyin})) == []: # 插入mongodb时去重.防止redis数据丢失,mongodb依然能够正常去重保存
db_kl.insert(author)
else:
pass
开始消费:
if __name__ == '__main__':
channel.basic_consume('xingtu_page',
callback,
auto_ack=False)
channel.start_consuming()
爬取达人详细数据(get_kol_detail.py)
消费者回调函数
def callback(ch, method, properties, body):
try:
'''调用get_kol_detail函数爬取'''
get_kol_detail(body)
time.sleep(30)
ch.basic_ack(delivery_tag=method.delivery_tag) # 消费者每次收到消息,要通知一声:已经收到,如果消费者连接断了,rabbitmq会重新把消息放到队列里,下次消费者可以连接的时候,就能重新收到丢失消息
except:
'''请求失败则把id重新放回队列'''
channel.basic_publish(exchange='',
routing_key='xingtu_kol_id',
body=body,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
time.sleep(120)
根据达人在该平台ID爬取达人详细数据:
def get_kol_detail(id_xingtu):
print id_xingtu
headers = {'User-Agent': agent_proxy.get_agent()}
cookies = open(u'/spiders/xingtu/cookie.txt','r').read() # cookie写在cookie.txt里面
cookies_dict = {}
'''对原始cookies构造字典格式'''
for line in cookies.split(';'):
key, value = line.split('=', 1) # 1代表只分一次,得到两个数据
key = key.replace(' ', '')
cookies_dict[key] = value
'''列出所有待爬取url'''
url_kol_index = '爬取的{}URL1'.format(id_xingtu) # 达人主页信息
url_kol_order = '爬取的{}URL2'.format(id_xingtu) # 达人任务信息
url_kol_item = '爬取的{}URL3'.format(id_xingtu) # 达人个人作品数据,作品总数据,代表作品数据
url_kol_video = '爬取的{}URL4'.format(id_xingtu) # 达人近15条视频数据
url_kol_daily_fans = '爬取的{}URL5'.format(id_xingtu) # 达人粉丝趋势变化
url_kol_fans_detail = '爬取的{}URL6'.format(id_xingtu) # 达人粉丝详情
url_dict = [{'kol_index': url_kol_index},
{'kol_order': url_kol_order},
{'kol_item': url_kol_item},
{'kol_video': url_kol_video},
{'kol_daily_fans': url_kol_daily_fans},
{'kol_fans_detail': url_kol_fans_detail}]
for url in url_dict:
time.sleep(5)
kol_detail = req.get( url.values()[0], headers=headers, cookies=cookies_dict, timeout=20, verify=False ).json()
if kol_detail['msg'] == u'成功':
db_kd.update( {'id': id_xingtu }, {'$set': { url.keys()[0] : kol_detail }}, upsert=True ) # upsert=True,则存在时修改,不存在时插入
else:
db_kd.update({'id': kol_detail['data']['nothing']}, {'$set': {url.keys()[0]: kol_detail}}, upsert=True) # 这行一定会报错,从而执行回调函数中的异常处理,因为kol_detail['data']['nothing']根本不存在
实时更新达人详细数据(send_id_api.py)
请求的json数据是该达人在抖音平台的ID(这里并未对请求的数据进行加密,如果是企业级开发的话对API的安全性和并发性能应做更加谨慎的设计),如果达人在该平台则提示「已添加」,否则提示「未收录」。
server = flask.Flask(__name__)
CORS(server, resources=r'/*')
@server.route('/douyinid', methods=['get', 'post'])
def registerPost():
# post请求获取请求的参数,返回结果类型是json
content = request.get_json()
if list(db_kl.find({'core_user_id': int(content['id_douyin'].encode('utf-8'))})) == []: # content['id_douyin']为unicode编码,需要转成utf-8再int
reply = jsonify({'Error': '该平台暂未收录该达人数据'})
else:
data_kol = pd.DataFrame(list(db_kl.find({'core_user_id': int(content['id_douyin'].encode('utf-8'))})))
channel.basic_publish(exchange='',
routing_key='xingtu_kol_id',
body=data_kol.loc[0]['id'], # 消息为该平台id
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
))
reply = jsonify({'Success': '该达人已添加进爬取队列'})
reply.headers['Access-Control-Allow-Origin'] = '*'
reply.headers['Access-Control-Allow-Methods'] = 'POST'
reply.headers['Access-Control-Allow-Headers'] = 'x-requested-with,content-type'
return reply
启动flask API服务:
if __name__ == '__main__':
WSGIServer(('0.0.0.0', 端口), server).serve_forever()
启动项目
所有脚本的启动我就交给supervisor这个进程管理工具啦,配置也是相当地简单,每一个脚本的运行单独作为一个进程(当然可以通过定义类来把所有脚本写成一个框架,但进程相互独立也有好处,至少在没留log文件的时候调试起来要方便的多),在/spiders/xingtu/xingtu_supervisord.conf中写入以下配置:
[program:send_pages]
command=/spiders/env_py2.7/bin/python /spiders/xingtu/send_pages.py
#priority是权重,数字越大,优先级越高
priority=999
autostart=true
autorestart=unexpected
#startsecs表示进程启动多长时间后视为正常运行
startsecs=0
startretries=3
redirect_stderr=true
stdout_logfile=/tmp/stdout.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10
[program:get_kol_list]
command=/spiders/env_py2.7/bin/python /spiders/xingtu/get_kol_list.py
priority=998
autostart=true
autorestart=unexpected
startsecs=0
startretries=3
redirect_stderr=true
stdout_logfile=/tmp/stdout.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10
[program:get_kol_detail]
command=/spiders/env_py2.7/bin/python /spiders/xingtu/get_kol_detail.py
priority=997
autostart=true
autorestart=unexpected
startsecs=0
startretries=3
redirect_stderr=true
stdout_logfile=/tmp/stdout.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10
[program:send_id_api]
command=/spiders/env_py2.7/bin/python /spiders/xingtu/send_id_api.py
priority=996
autostart=true
autorestart=unexpected
startsecs=0
startretries=3
redirect_stderr=true
stdout_logfile=/tmp/stdout.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10
我的supervisor是在全局环境中的python2.7中安装的,激活环境:
root@ubuntu:~# source activate python2.7
通过supervisor运行所有脚本:
(python2.7) root@ubuntu:~# supervisord -c /spiders/xingtu/xingtu_supervisord.conf
查看supervisor的运行状态:
(python2.7) root@ubuntu:~# supervisorctl -c /spiders/xingtu/xingtu_supervisord.conf status
get_kol_detail RUNNING pid 100986, uptime 0:00:03
get_kol_list RUNNING pid 100987, uptime 0:00:03
send_id_api RUNNING pid 100985, uptime 0:00:03
send_pages EXITED Jun 03 04:54 PM
我还可以通过浏览器访问supervisor的服务端,对所有进程进行重启,暂停等操作:
看一下RabbitMQ的控制面板:
查看一下redis:
root@ubuntu:~# redis-cli -a 密码
127.0.0.1:6379> hgetall hash_kolid
1) "3891903335244664"
2) "6680368576479625224"
3) "59359411419"
4) "6638835397155618820"
......
找一个已经在数据库的达人ID,测试一下更新达人详细数据的API:
这个入门级的小爬虫,大功告成啦!
写在后面
今天是高考的日子,前几天我无意中翻出了我去年毕业答辩的论文,想起了我的恩师们,同学们,我的大学舍友,历历在目。每一年毕业季会让人走向一座更高的山,跨向更广的海,但也让我们学会承受离别带来的伤感。
有时候我会后悔,为什么上学时候不好好学习,这个世界太多太多的魅力源自于science and technology,而自己只能羡慕别人的飞跃。但更多时候是欣喜,我的舍友既没有和我讨论薛定谔的猫也没有辩证忒休斯之船,但我记得,去年世界杯我们因为C罗的帽子戏法狂欢一宿,每年S系列赛的时候我们都会拿好小凳子围着给中国队加油,每次检查作业的前一晚都会手机闪光灯都会「派上用场」,每一次考试都会花光前面16周的记忆和运气......当流年成为历史,当老去成为定势,自己就会小心翼翼擦拭掉这些尘封回忆的灰,静默怀念。
OK,其实这部分后记本是前记,没有牢骚,就不会有这篇文章。我想说什么呢?正好可以用我毕业论文的一句话来表达:
大学的路虽走到了终点,而求知求学的征途长路漫漫。
记于2019/6/7,初夏