背景
- 目的:通过百度搜索指定关键词,然后抓取前三页网页的广告
技术栈小结
time
import time
now = lambda : time.perf_counter()
# 暂停5s
time.sleep(5)
>>> time.time() # 时间戳
1613220602.8661115
>>> time.localtime() # 本地时间
time.struct_time(tm_year=2021, tm_mon=2, tm_mday=13, tm_hour=20, tm_min=49, tm_sec=57, tm_wday=5, tm_yday=44, tm_isdst=0)
>>> time.asctime(time.localtime()) # 获取格式化时间
'Sat Feb 13 20:35:31 2021'
>>> time.asctime(time.localtime(time.time()))
'Sat Feb 13 20:58:19 2021'
>>> time.strftime('%Y%m%d', time.localtime()) # 格式化时间
'20210213'
>>>
os
# 文件/目录方法
>>> import os
>>> os.getcwd() # 当前工作文件夹
>>> os.chdir(path) # 修改当前工作文件目录为path
>>> os.listdir(path) # 返加指定路径下path中的文件夹及文件名称
>>> os.mkdir(path) # 新建文件夹
>>> os.rmdir(path) # 删除空文件夹 -- 文件夹非空 OSError
>>> os.remove(path) # 删除文件
>>> os.rename(oldName, newName) # 修改文件名
>>> os.stat() # 获取指定路径的信息
# os.stat()
# st_atime 上次访问时间;st_mtime 最近修改时间; st_ctime 创建时间
>>> os.stat(os.getcwd())
os.stat_result(st_mode=16749, st_ino=1688849860301781, st_dev=3370681046, st_nlink=1, st_uid=0, st_gid=0, st_size=61440, st_atime=1613227168, st_mtime=1613227168, st_ctime=1523514771)
>>> os.stat(os.getcwd()).st_ctime
1523514771.5762281
os.path模块
import os
# 获取文件的属性信息
os.path.abspath(path) # 返回绝对路径
os.path.dirname(path) # 返回文件路径
os.path.basename(path) # 返回文件名
os.path.exists(path) # 判定文件路径是否存在,是True,否False
os.path.expanduser('~') # 返回用户目录
>>> os.path.expanduser('~tt') # 将用户目录换为用户(tt)目录
'C:\\Users\\tt'
>>> os.path.getmtime(r't.py') # 返回最近修改时间
1612868196.803879
>>> os.path.getsize(r't.py') # 返回文件大小
1862
os.path.isabs(path) # 判定是否为绝对路径
os.path.isfile(path) # 判定是否文件
os.path.isdir(path) # 判定是否为目录
>>> os.path.join(os.getcwd(),'t', 'a','c') # 将目录、文件夹、文件合并为路径
'c:\\users\\chen.huaiyu\\desktop\\t\\a\\c'
>>> os.path.split(os.path.join(os.path.expanduser('~tt'),r't.py')) # 将路径分割为dirname & basename,返回元组
('C:\\Users\\tt', 't.py')
>>> os.path.splitext(os.path.join(os.path.expanduser('~tt'),r't.py')) # 分割为路径名 & 文件拓展名
('C:\\Users\\tt\\t', '.py')
>>> os.path.splitext(r't.py')
('t', '.py')
logging
logging模块的日志级别:1 -> 5依次升高。
1.DEBUG 问题诊断;
2.INFO 关键节点信息,检查程序是否按预期运行;
3 WARNING 不期望的事情发生;
4.ERROR 严重的问题发生,导致某些功能不能正常使用;
5.CRITICAL 严重错误,导致程序无法正常动行。logging四大组件:
1.Loggers,提供日志使用接口;
2.Handles,将日志发送到指定位置;
3.Filters,过滤日志,决定哪些日志将会被记录;
4.Formatters,控制日志输出格式logging模块的使用方式:
1.使用logging提供的模块级别的函数;
# logging.info(msg, *args)
logging.info('%s is %d years old', 'Tom', 10)
# exc_info:True - 将异常信息添加到日志中
# stack_info:默认False,True - 栈信息将被添加到日志中
# extra: dict参数,自定义消息格式中的字段
>>> import logging
>>> LOG_FORMAT = '%(asctime)s - %(levelname)s - %(user)s[%(ip)s] - %(message)s'
>>> DATE_FORMAT = '%m/%d/%Y %H:%M:%S %p'
>>> logging.basicConfig(format=LOG_FORMAT, datefmt=DATE_FORMAT)
>>> logging.warning('Some one delete the log file.', exc_info=True, stack_info=True, extra={'user':'Tom', 'ip':'10.10.10.10'})
02/14/2021 21:11:43 PM - WARNING - Tom[10.10.10.10] - Some one delete the log file.
NoneType: None
Stack (most recent call last):
File "<string>", line 1, in <module>
File "D:\...\lib\idlelib\run.py", line 144, in main
ret = method(*args, **kwargs)
File "D:\...\lib\idlelib\run.py", line 474, in runcode
exec(code, self.locals)
File "<pyshell#70>", line 1, in <module>
2.使用Logging日志系统的四大组件:日志器、处理器、过滤器、格式器。
1)日志器(logger)需要处理器(handler)将日志输出;
2)一个logger可以有多个handler;
3)不同处理器(handler)可以将日志输出到不同位置;
4)每个handler可以有多个过滤器(filter);
5)每个处理器可以设置自己的格式器(formatter)。
3.logging日志处理流程
1)日志器等级过滤;
2)日志器的过滤器过滤;
3)日志器的处理器的等级过滤;
4)日志器的处理器的过滤器过滤。
4.配置logging的几种方式
1)显式创建loggers;
2)使用日志配置文件,使用fileConfig()读取文件内容;
3)创建包含配置信息的dict,然后传递给dictConfig()。
# 读取日志配置文件
logging.config.fileConfig('logging.conf')
# 创建一个日志器
logger = logging.getLogger('simpleExample')
# 日志输出
logger.debug('debug message')
logger.info('info message')
logger.warn('warn message')
logger.error('error message')
logger.critical('critical message')
#
# 配置文件 logging.conf 内容如下:
[loggers] # 日志器
keys=root,simpleExample
[handlers] # 处理器
keys=fileHandler,consoleHandler
[formatters] # 格式器
keys=simpleFormatter
[logger_root]
level=DEBUG
handlers=fileHandler
[logger_simpleExample]
level=DEBUG
handlers=consoleHandler
qualname=simpleExample # 必须,表示在logger层级中的名字,应用代码中通个这个得到logger
propagate=0 # 指定propagate为0,防止日志记录向上层logger传递
[handler_consoleHandler]
class=StreamHandler # * 将日志消息发送到输出到Stream
args=(sys.stdout,) # *
level=DEBUG
formatter=simpleFormatter
[handler_fileHandler]
class=FileHandler # *,将日志消息发送到磁盘文件
args=('logger.log', 'a') # *
level=ERROR
formatter=simpleFormatter
[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelName)s - %(message)s
datefmt='%Y-%m-%d %H:%M:%S'
pandas
import pandas as pd
pd.DataFrame(data, columns=column) # 新建DataFrame
pd.concat(df1, df2, axis=0, ignore_index=True) # axis=0 按行合并
df.fillna('-', inplace=True) # 空值填充为'-'
pd.read_csv(path, engine='python', encoding='GBK') # 读取csv;文件名为中文必须用参数engine='python'
# pd.read_csv(path, engine='python', encoding='utf-8-sig') # utf-8-BOM
pd.read_excel(path) # 读取excel
df.merge(df_url, how='left', on='Notloc') # 按Notloc左侧合并
df.merge(result, how='left', left_on='广告主', right_on='搜索词') # 按left_on & right_on合并
df.append(df1) # 增加
df['URL'].apply(lambda x: urlparse(x).netloc) # 调用函数lambda,获取主域
selenium
# 示例1
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
driver = webdriver.Chrome() # 打开浏览器
driver.implicitly_wait(10) ## 隐式等待,如查找不所需元素,等待10s,期间每0.5s自动检查一次,如查找不到,报错
driver.maximize_window() # 浏览器最大化
driver.get('https://www.baidu.com')
elem = driver.find_element_by_name('wd') # 定位搜索框
elem.clear() # 清空搜索框
elem.send_keys('a') # 输入 a
elem.send_keys(Keys.RETURN) # Enter
driver.close() # 关闭浏览器
# 缓慢向下滚动到页面底部
def scroll():
ini_height, check_height = 0, 0
while 1:
driver.execute_script(
'window.scrollTo({top:536 + %s, behivor:"smooth"})'
% check_height)
time.sleep(0.5)
check_height = driver.execute_script('return document.documentElement.scrollTop || window.pageYOffset || document.body.scrollTop;')
if ini_height == check_height:
break
ini_height = check_height
# 异常类
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException
# 定位元素: find_element_by_*
driver.find_element_by_name()
driver.fing_element_by_xpath(xpath)
# 用Selenium写测试用例
# 摘
import unittest
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
class PythonOrgSearch(unittest.TestCase): # 声明这是一个测试用例:TestCase
def setUp(self): # 初始化的一部分,每个测试方法被执行前都执行一次
self.driver = webdriver.Chrome()
def test_search_in_python_org(self): # 测试方法始终以test开头
driver = self.driver
driver.get('https://www.baidu.com')
self.assertIn('百度一下', driver.title)
elem = driver.find_element_by_name('wd')
elem.send_keys('Fergus')
elem.send_keys(Keys.RETURN)
assert 'No result found' not in driver.page_source
def tearDown(self): # 每个测试方法执行后执行,用来做清扫工作
self.driver.close()
if __name__ == "__main__":
unittest.main()
# driver.get 方法将会根据方法中给出的URL地址打开该网站。 WebDriver 会等待整个页面加载完成(其实是等待”onload”事件执行完毕)之后把控制权交给测试程序。 如果你的页面使用大量的AJAX技术来加载页面,WebDriver可能不知道什么时候页面已经加载完成:
urlparse
# 获取域名
'''parse.urlparse(scheme='https', netloc='www.cnblogs.com', path='/angelyan/', params='', query='', fragment='')
scheme:表示协议
netloc:域名
path:路径
params:参数
query:查询条件,一般都是get请求的url
fragment:锚点,用于直接定位页面的下拉位置,跳转到网页的指定位置
'''
from urllib import parse
url = "http://xx.xx.xx:8000/get_account.json?page_size=20&page_index=1&user_id=456"
parse.urlparse(url)
Out[33]: ParseResult(scheme='http', netloc='xx.xx.xx:8000', path='/get_account.json', params='', query='page_size=20&page_index=1&user_id=456', fragment='')
parse.urlparse(url).netloc
Out[34]: 'xx.xx.xx:8000'
其它
# 计数
from collections import Counter
dic = Counter()
for i in lis:
dic[i] += 1
# 累加
from functools import reduce
all_str = reduce(lambda x,y: x+y, split_netloc)
# 筛选
filter(lambda x: x[1]<2 and len(x[0])>1, sorted(dic.items(), key=lambda kv:(kv[1], kv[0]), reverse=True)
代码
# -*- coding: utf-8 -*-
"""
Created on Sat Jan 9 16:08:57 2021
@author: Fergus
"""
import os
import time
import logging.config
import pandas as pd
from selenium import webdriver
from urllib.parse import urlparse
from selenium.webdriver.common.keys import Keys
#from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import NoSuchElementException, StaleElementReferenceException
now = lambda : time.perf_counter()
# 日志
PATH = os.path.join(os.path.expanduser('~'), r'CheckAD.conf')
logging.config.fileConfig(PATH)
logger = logging.getLogger('CheckAD')
def scroll():
# 滚
ini_height, check_height = 0, 0
while True:
# 每次平滑滚动
driver.execute_script(
'window.scrollTo({top:536 + %s, behavior:"smooth"})'
% check_height)
time.sleep(0.5)
check_height = driver.execute_script(
'return document.documentElement.scrollTop || \
window.pageYOffset || document.body.scrollTop;')
if ini_height == check_height:
break
ini_height = check_height
def search(keyword):
try:
# 定位搜索框
elem = driver.find_element_by_name('wd')
except NoSuchElementException:
input('手动滑块验证、或F5刷新页面后Enter')
elem = driver.find_element_by_name('wd')
finally:
elem.clear()
elem.send_keys(keyword)
elem.send_keys(Keys.RETURN)
time.sleep(0.3)
def parser(xpath):
try:
element = driver.find_element_by_xpath(xpath)
except NoSuchElementException:
return '1'
except StaleElementReferenceException:
logger.info('ERROR:\n', exc_info=True)
except Exception as e:
logger.info('parse: %s' % e)
else:
return element.text
def parsers(xpath):
try:
element = driver.find_elements_by_xpath(xpath)
except NoSuchElementException:
logger.info('ERROR:\n', exc_info=True)
except StaleElementReferenceException:
logger.info('ERROR:\n', exc_info=True)
except Exception as e:
logger.info('parsers: %s' % e, exc_info=True)
else:
return element
def getAd():
# 广告
## 标题 & 标题url
ad = parsers('//h3/a[@data-is-main-url="true"]')
ad_headline = [i.text for i in ad]
ad_headline_url = [i.get_attribute('data-landurl') for i in ad]
## 显式url
lis = ['京公网安备11000002000001号', '京ICP证030173号', '展开', '']
url = parsers('//div/div[2]/a/span[1] | //div/div[3]/a/span[1]')
explicit_url = [i.text for i in url if i.text not in lis]
return ad_headline, ad_headline_url, explicit_url
def getBrandAd():
# 品牌广告
## 标题 & 标题url
brandad = parsers('//h2/a')
brandad_headline = [i.text for i in brandad
if '想在此推广' not in i.text and i.text != '官方']
brandad_headline_url = [i.get_attribute('ourl') for i in brandad
if i.get_attribute('ourl') is not None]
## 显式url
url = parsers('//span[@class="ec-pc_brand_tip-official-site"]')
explicit_brandurl = [i.text for i in url]
return brandad_headline, brandad_headline_url, explicit_brandurl
def next_page():
try:
driver.find_element_by_partial_link_text('下一页').click()
time.sleep(0.5)
except NoSuchElementException:
logger.info('ERROR:\n', exc_info=True)
except Exception as e:
input('%s\n手动滑块验证、或F5刷新页面后Enter' % e)
driver.find_element_by_partial_link_text('下一页').click()
time.sleep(0.5)
finally:
scroll()
def output(keyword):
global result
# 当前页
cur_page = parser('//div[@id="page"]/div/strong/span[2]')
# 标题 & 标题url & 显式url
ad_headline, ad_headline_url, explicit_url = getAd()
brandad_headline, brandad_headline_url, explicit_brandurl = getBrandAd()
# 输出 - 广告:搜索词、页码、广告标题 & 落地页 & 显式url
try:
rows = len(ad_headline)
df1 = pd.DataFrame({'搜索词': [keyword] * rows,
'广告': ['广告'] * rows,
'页码': [cur_page] * rows,
'标题': ad_headline,
'落地页': ad_headline_url,
'显式url': explicit_url
})
# 打标识
if not df1.shape[0]:
df1 = df1.append(pd.DataFrame([[keyword, '广告', cur_page, '',
'','']], columns=df1.columns))
rows = len(brandad_headline)
df2 = pd.DataFrame({'搜索词': [keyword] * rows,
'广告': ['品牌广告'] * rows,
'页码': [cur_page] * rows,
'标题': brandad_headline,
'落地页': brandad_headline_url,
'显式url': explicit_brandurl
})
# 打标识
if not df2.shape[0]:
df2 = df2.append(pd.DataFrame([[keyword, '品牌广告', cur_page[0],
'', '', '']], columns=df1.columns))
df = df1.append(df2)
result = result.append(df)
#print('\n', result, '\nRuntime: {:.3f}Min'.format((now() - st
# )/60))
except Exception as e:
# 部分数据抓取异常
lis1 = []
lis1.extend([keyword, '广告', cur_page[0]])
lis1.append(ad_headline)
lis1.append(ad_headline_url)
lis1.append(explicit_url)
lis2 = []
lis2.extend([keyword, '品牌广告', cur_page[0]])
lis2.append(brandad_headline)
lis2.append(brandad_headline_url)
lis2.append(explicit_brandurl)
lis3 = []
lis3.append(lis1)
lis3.append(lis2)
lis3.append(['异常: %s | %s' % (keyword, e)])
err.append(lis3)
#print(err)
print('Runtime: {:.3f}Min'.format((now() - st)/60))
def connectDB():
# 访问DB
from sqlalchemy import create_engine
# sql server
ss = 'mssql+pymssql://%s:%s@%s:%s/%s'
try:
engine = create_engine(ss % ('sa', 'cs_holly123', '192.168.60.110'
, '1433', 'Account Management'))
except Exception:
raise
else:
logger.info('数据库连接成功,读取数据中...')
return engine
def getDB(deadline):
with connectDB().begin() as conn:
# 广告主近半年有消费 and 广告主近3天没有消费
sql = '''
SELECT *
FROM ( SELECT 用户名, AM, b.广告主, 网站名称, URL, ad_hy.[近半年消费(AD)]
, D.近3天消费
FROM basicInfo b
LEFT JOIN (SELECT 广告主, ISNULL(sum(HY.sum_),0) '近半年消费(AD)'
FROM basicInfo b
LEFT JOIN(SELECT 用户名, sum(金额) sum_
FROM 消费
WHERE 日期 BETWEEN DATEADD(DD, -200, '20210121') AND '20210121'
AND 类别 in ('搜索点击', '新产品', '自主投放', '超投')
GROUP BY 用户名) HY
ON HY.用户名 = b.用户名
GROUP BY 广告主) ad_hy
ON b.广告主 = ad_hy.广告主
LEFT JOIN ( SELECT 广告主, ISNULL(sum(HY.sum_),0) '近3天消费'
FROM basicInfo b
LEFT JOIN(SELECT 用户名, sum(金额) sum_
FROM 消费
WHERE 日期 BETWEEN DATEADD(DD, -3, '20210121') AND '20210121'
AND 类别 in ('搜索点击', '新产品', '自主投放', '超投')
GROUP BY 用户名) HY
ON HY.用户名 = b.用户名
GROUP BY 广告主) D
ON D.广告主 = b.广告主) T
WHERE T.[近半年消费(AD)] > 0
AND T.近3天消费 = 0
'''.replace('{}', deadline)
df = pd.DataFrame(conn.execute(sql).fetchall()
, columns=['用户名', 'AM', '广告主', '网站名称', 'URL'
, '近半年有消费(AD)', '近3日无消费(AD)'])
return df
def getNetlocKeywords():
# URL拆分: 核心url
from functools import reduce
from collections import Counter
global df
# 获取域名
df['Netloc'] = df['URL'].apply(lambda x: urlparse(x).netloc)
netloc = list(set(df['Netloc']))
# 截取域名中核心字段
## 先将url按'.'拆解,计数,保留计次 <= 2的词 & length > 1
split_netloc = list(map(lambda x: x.split('.'), netloc))
all_str = reduce(lambda x,y: x+y, split_netloc)
### 统计
dic = Counter()
for i in all_str:
dic[i] = dic[i] + 1
### 筛选
letters = filter(lambda x: x[1] <= 2 and len(x[0]) > 1,
sorted(dic.items(), key=lambda kv: (kv[1], kv[0])
, reverse=True))
keywords = [i[0] for i in letters]
# url, keywords
df_url = pd.DataFrame([[netloc[n], j] for n, i in enumerate(split_netloc
) for j in keywords if j in i]
, columns=['Netloc', 'Key'])
# return
df = df.merge(df_url, how='left', on='Netloc')
def exclude():
# 排除关键词
input('Tips: 检查桌面排除文件,确认无误Enter\n')
path = r'c:\users\chen.huaiyu\desktop\excludeKeywords.csv'
csv = pd.read_csv(path, engine='python', encoding='GBK')
return set(csv['搜索词'])
def fromExcel():
# 1.读取excel;2.分解关键词;3.分解前后一一对应;4.return
from re import split
from functools import reduce
path = r'c:\users\chen.huaiyu\desktop\账户关键词.xlsx'
inputKeywords = pd.read_excel(path)
inputKeywords.drop(columns=inputKeywords.columns[0], inplace=True)
# 关键词拆分
keywords = list(set(reduce(lambda x,y: x+y, inputKeywords['关键词'].apply(
lambda x: split(r',|,', x)))))
keywords.remove('物料已删除')
keywords.remove('')
# 匹配关键词 & 广告主
adAndKeyword = [(inputKeywords['广告主'][n], j) for n, i in enumerate(
inputKeywords['关键词'].apply(lambda x: split(r',|,',x)))
for j in keywords if j in i]
df = pd.DataFrame(adAndKeyword, columns=['广告主', '搜索关键词'])
df = inputKeywords.merge(df, on='广告主', how='left')
df.fillna('-', inplace=True)
df = df.loc[df.apply(lambda x: x['搜索关键词'] in x['关键词'], axis=1), :]
return keywords, df
def getKeywords():
#
if choice == 'from DB':
getNetlocKeywords()
search_words = list(set(df['网站名称']) | set(df['广告主']
) | set(df['Key']) - exclude())
elif choice == 'from Excel':
search_words, _ = fromExcel()
return search_words
def combine():
if choice == 'from DB':
# 合并查询结果,输出
df_ad = df.merge(result, how='left', left_on='广告主', right_on='搜索词')
df_website = df.merge(result, how='left', left_on='网站名称'
, right_on='搜索词')
df_url = df.merge(result, how='left', left_on='Key', right_on='搜索词')
merge = pd.concat((df_ad, df_website, df_url), axis=0, ignore_index=True)
merge.fillna('-', inplace=True)
#
# 筛选准备
#
## 获取落地页netloc
merge['landurl_netloc'] = merge['落地页'].apply(
lambda x: urlparse(x).netloc)
## 显式url == 开户url Netloc
merge['filter1'] = merge['Netloc'] == merge['显式url']
## 开户url核心词 in 显式url
merge['filter2'] = merge.apply(
lambda x: x['Key'] in x['显式url'], axis=1)
## Key in land_netloc
merge['filter3'] = merge.apply(
lambda x: x['Key'] in x['landurl_netloc'], axis=1)
merge.to_excel(r'c:/users/chen.huaiyu/desktop/CheckAD.xlsx')
elif choice == 'from Excel':
_, DF = fromExcel()
merge = DF.merge(result, how='left', left_on='搜索关键词'
, right_on='搜索词')
merge.to_excel(r'c:/users/chen.huaiyu/desktop/CheckADKey.xlsx')
# 异常记录
er = pd.DataFrame(err, columns=['搜索词', '广告', 'Note'])
er.to_excel(r'c:/users/chen.huaiyu/desktop/error.xlsx')
#
kw = pd.DataFrame(keywords)
kw.to_csv(r'c:/users/chen.huaiyu/desktop/kw.csv', encoding='GBK')
if __name__ == '__main__':
st = now()
# 设定截止日期
DL = input('设置截止日期,eg.20210121\n')
choice = input('关键词从哪里来?(from DB/from Excel)')
logger.info('\nStart,设置截止日期为:%s\n' % DL)
# 读取数据库
df = getDB(DL)
# output
result = pd.DataFrame()
err = []
# 搜索词
keywords = getKeywords()
# 运行浏览器开始搜索
#chrome_options = Options()
#chrome_options.add_argument('--headless') # 不显示浏览器
#chrome_options.add_argument('--disable-gpu') # 不加载图片
driver = webdriver.Chrome()
driver.implicitly_wait(10)
driver.maximize_window()
driver.get('https://www.baidu.com')
for n, keyword in enumerate(keywords):
try:
# 搜索
search(keyword)
scroll()
output(keyword)
# 只需检查前3页
for i in range(2):
next_page()
output(keyword)
# 输出提示
logger.info('{}\n共{}个,第{}个,完成{:.0%},预计耗时{:.1f}min...'.format(
time.ctime(), len(keywords), n, n/len(keywords)
, ((now() - st)/n*(len(keywords)-n))/60))
except KeyboardInterrupt as e:
print(e)
break
except Exception as e:
logger.info(e, exc_info=True)
continue
# 完成后将结果合并输出
combine()
driver.quit()