'''
网站网址:http://www.meinuzi.com/
'''
import random
from pyquery import PyQuery as pq
import requests
proxies=[
{'HTTP':'http://122.140.5.115:9999'},
{'HTTP':'http://113.101.96.66:8080'},
{'HTTP':'http://113.124.86.24:9999'},
{'HTTP':'http://121.13.252.58:41564'},
{'HTTP':'http://61.216.185.88:60808'},
{'HTTP':'http://58.20.184.187:9091'},
{'HTTP':'http://183.236.123.242:8060'},
{'HTTP':'http://116.9.163.205:58080'},
{'HTTP':'http://222.74.73.202:42055'},
{'HTTP':'http://183.247.202.208:30001'},
{'HTTP':'http://39.108.101.55:1080'},
{'HTTP':'http://47.105.91.226:8118'},
]
proxie=random.choice(proxies)
user_agent_list = ['Chrome/86.0.4240.198',
'Chrome/101.0.4951.64',
'Chrome/96.0.4664.45',
'Chrome/94.0.4606.41'
]
headers = {'User-Agent': random.choice(user_agent_list),
'Referer':'http://www.meinuzi.com'}
'''
请求url,返回HTML
'''
def Req_url(url):
try:
r=requests.get(url,headers=headers,proxies=proxie)
r.raise_for_status()
r.encoding=r.apparent_encoding
return r.text
except:
# print('ERROR!')
return 'over'
'''
解析初始页面,找到图片页面的网址
'''
def Parse_page(html):
doc=pq(html)
items=doc('.m-list-main li').items()
i=1
for item in items:
page_url=item('.u-img a').attr('href')
name=item('.u-img img').attr('alt')
print("保存第",i,'张图')
Get_img(page_url,name)
i+=1
'''
解析图片页面的网址,找到每张图片的url
'''
def Get_img(page_url,name):
html=Req_url(page_url)
doc=pq(html)
img_baseurl=doc('.g-mn .m-list-content img').attr('src')
Save_img(img_baseurl,name)
'''
根据得到的图片url,保存到文件夹
'''
def Save_img(img_baseurl,name):
img_url=headers['Referer']+img_baseurl
data=requests.get(img_url,headers=headers,proxies=proxie,timeout=15).content
houzui=img_url.split('.')[-1]
with open('./images_1/{}.{}'.format(name,houzui),'wb') as f:
f.write(data)
def main():
num = 7 # 从第num页开始
'''
当返回值为‘over’的时候,说明页面不存在,即已经爬取全部页面
'''
while True:
page_url='http://www.meinuzi.com/index_'+str(num)+'.html'
html = Req_url(page_url)
if html != 'over':
'''
页面存在但无图片数据,标签中只有换行符时
'''
doc=pq(html)
item = doc('.m-list-main ul').text()
if item!='' \
'':
print('**********第',num,'页**********')
Parse_page(html)
num+=1
else:
print('程序结束')
return '程序结束'
else:
print('程序结束')
return '程序结束'
main()
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import ActionChains
from PIL import Image
#https://www.om.cn/register
Email='test@test.com'
PASSWORD='123456'
url='https://auth.geetest.com/login/'
driver=webdriver.Chrome()
wait=WebDriverWait(driver,20)
driver.get(url)
email=Email
password=PASSWORD
def get_geetest_button():
'''
获取初始验证按钮
:return: 按钮对象
'''
button=wait.until(EC.element_to_be_clickable((By.CLASS_NAME,'geetest_btn_click')))
return button
def get_position():
'''
获取验证码位置
:return: 验证码位置元组
'''
img=wait.until(EC.presence_of_element_located((By.CLASS_NAME,'geetest_bg')))
time.sleep(2)
location=img.location
size=img.size
top,bottom,left,right=location['y'],location['y']+size['height'],location['x'],location['x']+size['width']
return (top,bottom,left,right)
def get_geetest_image():
'''
获取验证码图片
:return: 图片对象
'''
top, bottom, left, right=get_position()
print('验证码位置:',top,bottom,left,right)
screenshot=driver.get_screenshot_as_png()
captcha=screenshot.crop((top,bottom,left,right))
return captcha
def slider():
'''
获取滑块
:return:滑块对象
'''
slider=wait.until(EC.element_to_be_clickable(By.CLASS_NAME,'geetest_arrow'))
return slider
def is_pixel_equal(image1,image2,x,y):
'''
判断像素是否相同
:param image1: 图片1
:param image2: 图片2
:param x: 位置x
:param y: 位置y
:return: 像素是否相同
'''
#取两个像素点
pixel1=image1.load()[x,y]
pixel2=image2.load()[x,y]
threshold=60
if abs(pixel1[0]-pixel2[0])<threshold and abs(pixel1[1]-pixel2[1])<threshold and abs(pixel1[2]-pixel2[2])<threshold:
return True
else:
return False
def get_gap(image1,image2):
'''
获取缺口偏移量
:param image1: 不带缺口图片
:param image2: 带缺口图片
:return: 偏移量
'''
left=60
for i in range(left,image1.size[0]):
for j in range(image1.size[1]):
if not is_pixel_equal(image1,image2,i,j):
left=i
return left
return left
def get_track(distance):
'''
根据偏移量获取移动轨迹
:param distance: 偏移量
:return: 移动轨迹
'''
#移动轨迹
track=[]
#当前位移
current=0
#减速阈值
mid=distance*4/5
#计算间隔
t=0.2
v=0 #初速度
while current < distance:
if current<mid:
#加速度为2
a=2
else:
a=-3
#初速度v0
v0=v
#当前速度v=v0+at
v=v0+a*t
#移动距离x=v0*t+1/2*a*t*t
move = v0 * t + 1 / 2 * a * t * t
#当前位移
current+=move
#加入轨迹
track.append(round(move))
return track
def move_to_gap(slider,tracks):
'''
拖动到滑块缺口处
:param slider: 滑块
:param tracks: 轨迹
:return:
'''
ActionChains(driver).click_and_hold(slider).perform()
for x in tracks:
ActionChains(driver).move_by_offset(xoffset=x,yoffset=0).perform()
time.sleep(0.5)
ActionChains(driver).release().perform()
button=get_geetest_button()
button.click()
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver import ActionChains
'''基本使用'''
#声明浏览器对象
browser=webdriver.Chrome()
try:
'''访问页面'''
browser.get('https://www.baidu.com')
input=browser.find_element(By.ID,'kw')
input.send_keys('python')
input.send_keys(Keys.ENTER)
wait=WebDriverWait(browser,10)
wait.until(EC.presence_of_element_located((By.ID,'content_left')))
print('='*50)
print(browser.current_url)
print('=' * 50)
print(browser.get_cookies())
print('=' * 50)
# print(browser.page_source)
browser.quit()
except:
browser.close()
'''节点交互'''
driver=webdriver.Chrome()
try:
driver.get('https://www.taobao.com')
input=driver.find_element(By.ID,'q')
input.send_keys('手机')
# input.clear() #清除
button=driver.find_element(By.CLASS_NAME,'btn-search')
button.click()
driver.close()
except:
driver.close()
'''动作链(拖拉)'''
driver=webdriver.Chrome()
try:
driver.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')
driver.switch_to.frame('iframeResult') #切换frame
source=driver.find_element(By.ID,'draggable')
target=driver.find_element(By.ID,'droppable')
actions=ActionChains(driver)
actions.drag_and_drop(source,target)
actions.perform()
except:
driver.close()
'''执行JavaScript,比如下拉进度条'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
driver.execute_script('window.scrollTo(0,document.body.scrollHeight)')
driver.execute_script('alert("To Bottom")')
except:
print('error')
driver.close()
'''获取属性'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
logo=driver.find_element(By.ID,'special')
print(logo)
print(logo.get_attribute('class'))
driver.close()
except:
print('error')
driver.close()
'''获取文本'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
print(res.text)
driver.close()
except:
print('error')
driver.close()
'''获取id、位置、标签吗和大小'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
print('id',res.id)
print('位置',res.location)
print('标签',res.tag_name)
print('大小',res.size)
driver.close()
except:
print('error')
driver.close()
'''延时显式等待'''
driver=webdriver.Chrome()
try:
driver.get('https://www.taobao.com')
#引入WebDriverWait对象,设置最长等待时间
wait=WebDriverWait(driver,10)
#调用until方法,传入等待条件;EC.presence_of_element_located代表节点出现
input=wait.until(EC.presence_of_element_located((By.ID,'q'))) #直到 id='q'的结点加载出来
button=wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR,'.btn-search'))) #直到节点可点击
print(input,button)
driver.close()
except:
print('error')
driver.close()
'''前进和后退'''
driver=webdriver.Chrome()
try:
driver.get('https://www.taobao.com/')
driver.get('https://www.baidu.com/')
driver.get('https://www.python.org/')
driver.back() #后退
time.sleep(1)
driver.forward() #前进
driver.close()
except:
print('error')
driver.close()
import csv
from pyquery import PyQuery as pq
import requests
import json
def Get_HTML(url):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
r = requests.get(url, headers=headers)
r.raise_for_status()
r.encoding = r.apparent_encoding
return r.text
except:
print("Error!")
def parse_page(html):
doc=pq(html)
title=doc('.li03 a').text().split(' ') #名称
actor=doc('.li04 span').text().split(' ') #演员表
index=doc('.li01 b').text().split(' ') #序号
href=[] #链接
for i in doc('.li03 a').items(): #遍历
href.append(i.attr('href'))
score=doc('.li05 span').text().split(' ') #评分
#保存成CSV文件
# with open('1905电影排行榜.csv','a+') as csvfile:
# writer = csv.writer(csvfile,)
# writer.writerow(['序号', '名称', '链接', '演员', '评分'])
# for i in range(len(title)):
# writer.writerow([index[i],title[i],href[i],actor[i],score[i]])
#保存为字典
Result={}
for i in range(100):
result={
'序号':index[i],
'名称':title[i],
'链接':href[i],
'演员':actor[i],
'评分':score[i]
}
Result[index[i]]=result
return Result
def write_to_txtfile(item):
with open('1905电影排行榜.txt', 'a+', encoding='UTF-8') as f:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
def main():
url = 'https://www.1905.com/vod/top/lst/'
html = Get_HTML(url)
item=parse_page(html)
#字典写成TXT文件
# for i in range(1,len(item)+1):
# write_to_txtfile(item[str(i)])
#字典写成CSV文件
# with open('1905电影排行榜_111.csv','a+',encoding='utf-8') as csvfile:
# fieldnames = ['序号', '名称', '链接', '演员', '评分']
# writer=csv.DictWriter(csvfile,fieldnames=fieldnames)
# writer.writeheader()
# for i in range(len(item)):
# writer.writerow(item[str(i+1)])
main()
import pymysql
from pyquery import PyQuery as pq
import requests
def Get_HTML(url):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
r = requests.get(url, headers=headers)
r.raise_for_status()
r.encoding = r.apparent_encoding
return r.text
except:
print("Error!")
def parse_page(html):
doc=pq(html)
title=doc('.li03 a').text().split(' ') #名称
actor=doc('.li04 span').text().split(' ') #演员表
index=doc('.li01 b').text().split(' ') #序号
href=[] #链接
for i in doc('.li03 a').items(): #遍历
href.append(i.attr('href'))
score=doc('.li05 span').text().split(' ') #评分
#保存为字典
Result={}
for i in range(100):
result={
'序号':index[i],
'名称':title[i],
'链接':href[i],
'演员':actor[i],
'评分':score[i]
}
# print(result)
Result[index[i]]=result
return Result
def write_to_mysql(dict):
'''
字典数据写到mysql
:param dict:
:return: Null
'''
db=pymysql.connect(host='******',user='root',password='******',port=3306,db='xiaomishop')
cursor=db.cursor()
table='movie'
keys=','.join(dict.keys())
values=','.join(['%s']*len(dict))
sql='insert into {table}({keys}) values ({values})'.format(table=table,keys=keys,values=values)
try:
if cursor.execute(sql,tuple(dict.values())):
print("successful")
db.commit()
except:
print("failed")
db.rollback()
db.close()
def update_data(dict):
'''
若主键存在,进行更新操作;否则进行插入数据
:param dict:
:return:
'''
db = pymysql.connect(host='******', user='root', password='******', passwd='3306', db='xiaomishop')
cursor=db.cursor()
table='movie' #表名
keys=','.join(dict.keys())
values=','.join(['%s']*len(dict))
sql='insert into {table}({keys}) values ({values}) ON DUPLICATE KEY UPDATE'.format(table=table,keys=keys,values=values)
update=','.join([' {key}=%s'.format(key=key) for key in dict])
sql+=update
try:
if cursor.execute(sql,tuple(dict.values())*2):
print("successful")
db.commit()
except:
print("failed")
db.rollback()
db.close()
#查询
def search():
db = pymysql.connect(host='127.0.0.1', user='root', password='581581', passwd='3306', db='xiaomishop')
cursor = db.cursor()
sql = 'select * from movie'
try:
cursor.execute(sql)
print(cursor.rowcount)
row = cursor.fetchone()
while row:
print('Row:', row)
row = cursor.fetchone()
except:
print("Error")
db.close()
def main():
url = 'https://www.1905.com/vod/top/lst/'
html = Get_HTML(url)
item=parse_page(html)
# for i in range(len(item)):
# # write_to_mysql(item[str(i+1)]) #写入
# update_data(item[str(i+1)]) #更新
search() #查询
main()
又是美好的一天!