1.对于爬取静态网页:
思路就是直接拼接每个页面的url,得到一个超大的url 的list,然后一个一个的request.get(url),然后xpath解析内容,存入mongo或者用logger.info(json.dumps(data)) 存为csv文件
- 对于动态加载的网页,一个思路就是用抓包工具分析其数据接口:我是用的fiddler,利用fiddler查看打开网页时数据加载的接口,一般是 {JSON} 格式,找到接口的地址,分析其接口中参数的规律,然后又是老操作(拼接出所有页面的数据的请求接口,然后再一个一个的遍历请求接口,得到json格式的数据),这个其实对于数据清洗来说很方便,不用做什么格式解析.
3.Fiddler 突然不能抓取请求了怎么办
3.1 Fiddler 打开后,浏览器不能打开网页,我的办法是卸载Fiddler ,重装,简单粗暴(因为Fiddler 只有6m大小,方便快捷)
迅雷下载: https://dl.softmgr.qq.com/original/Development/FiddlerSetup_5.0.20202.18177.exe
4.源代码:
import json
import multiprocessing
import os
import random
import time
import requests
from pprint import pprint
from fake_useragent import UserAgent
from loguru import logger
from motor.motor_asyncio import AsyncIOMotorClient
# from redis import Redis
from redis import Redis
class ETh():
def redis_connect(self):
# myredis=Redis(db=6)
myredis=Redis(db=6)
return myredis
def get_all_url(self):
for i in range(1, 62):
url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150"
self.redis_connect().lpush("redis_connect_urls", url)
print(f'push 第{i}页 to redis')
self.redis_connect().close()
def read_page(self, url):
# 获取页面信息,发起请求
# headers 代理,避免ip被封
headers={
"Host": "explorer-web.api.btc.com",
"Connection": "keep-alive",
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.61 Safari/537.36",
"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
"Origin": "https://eth.btc.com",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://eth.btc.com/blockinfo/0",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9"}
ua=UserAgent()
headers["User-Agent"]=ua.random
time.sleep(random.randint(0, 4))
res=requests.get(url, headers=headers)
s=requests.session()
s.keep_alive=False
requests.adapters.DEFAULT_RETRIES=5
# 得到整个网页页面
page_text=json.loads(res.text) # type: str
data_list=page_text["data"]["list"] # type: list
# pprint(data_list)
return data_list
# 字符串转时间戳
def str_to_timestamp(self, str_time):
timeArray=time.strptime(str_time, "%Y-%m-%d %H:%M:%S")
timeStamp=int(time.mktime(timeArray))
return timeStamp
# 清洗数据
def clean_data(self, url):
eth_datas=[]
data_list=self.read_page(url)
time="2015-07-30 23:26:13"
blocktime=self.str_to_timestamp(time)
for data in data_list:
"""
{'amount': '800.000000000000000000',
'created_ts': 1435634773,
'fee': '0',
'gas_price': '0',
'gas_used': 0,
'id': 1,
'internal_tx': [],
'receiver_hash': '0x1cfcf7517f0c08459720942b647ad192aa9c8828',
'receiver_type': 0,
'sender_hash': '0xGENESIS000000000000000000000000000000000',
'sender_type': 0,
'status': '',
'tx_hash': 'GENESIS_1cfcf7517f0c08459720942b647ad192aa9c8828',
'tx_type': ''},
"""
# todo
"""
{
"_id" : "0xfffffffead5f0ed224bf9ef7019599e9bf00c1a0fa726f316125570c5787f2a3",
"blockHash" : "0xc88ff379a6b6e85aa15fb5f0dbeb4606db191d5870920c804732a91fd9f0881f",
"blockNumber" : 6694829,
"contractAddress" : null,
"cumulativeGasUsed" : 4225411,
"from" : "0x52bc44d5378309ee2abf1539bf71de1b7d7be3b5",
"gasUsed" : 21000,
"logs" : [ ],
"status" : "0x1",
"to" : "0x7c20f2d7f49e5f4ade906d439d0e32240d32fc71",
"transactionIndex" : 68,
"gas" : 50000,
"gasPrice" : 3000000000,
"input" : "0x",
"nonce" : "0x9b602d",
"value" : 1000767082999010048,
"blockTime" : 1542083404
},
"""
data_dic={
"_id": data["tx_hash"],
"blockHash": "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
"blockNumber": 0,
"contractAddress": "null",
"cumulativeGasUsed": 0,
"gasUsed": data['gas_used'],
"gasPrice": data['gas_price'],
"logs": [],
"status": data["status"],
"value": data["amount"],
"nonce": "0x0000000000000042",
"from": data["sender_hash"],
"to": data["receiver_hash"],
"blockTime": blocktime
}
# pprint(data_dic)
eth_datas.append(data_dic)
# pprint(eth_datas)
# pprint(len(eth_datas)) # 150
return eth_datas
# 存入mongo库
def save_to_mongo(self, eth_datas):
# 保存到mongo数据库中
db_url='localhost'
db_port=27017
db_name="eth"
db_collection="eth_0"
# 建立连接
client=AsyncIOMotorClient(db_url, db_port)
# 连接某个库名字
db=client[db_name][db_collection]
db.insert_many([i for i in eth_datas])
print('inserted %d docs' % (len(eth_datas)))
# 存入csv文件
def save_to_json(self, eth_datas):
process_id=os.getpid()
logger.add(
# f"/backup/etc/receipt/receipts.json",
# 加入进程号,避免数据存储时候错乱
f"D:/etc_check_data/receipt34555/receipt_{process_id}.json",
level="INFO",
format="{message}",
rotation="1024 MB",
enqueue=True
)
for data in eth_datas:
logger.info(json.dumps(data))
# 获取所有eth_block_0 tx数据
def get_all_eth_data(n):
eth=ETh()
while True:
try:
net=eth.redis_connect().rpop('redis_connect_urls')
if net is None:
break
url=bytes.decode(net)
eth_datas=eth.clean_data(url)
logger.debug(f" process {n + 1} 开始爬取第 {url.split('page=')[1].split('&size')[0]} 页 {url}")
# 存mongo
eth.save_to_mongo(eth_datas)
# 存为json
# eth.save_to_json(eth_datas)
except Exception as e:
logger.debug(e)
if __name__ == '__main__':
# ETh().get_all_eth_data()
# 多进程
# 生成url连接存入redis
ETh().get_all_url()
# 爬取并存入
process_count=16
pool=multiprocessing.Pool(process_count)
for i in range(process_count):
pool.apply_async(get_all_eth_data, (i,))
pool.close()
pool.join()