import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import ActionChains
from PIL import Image
#https://www.om.cn/register
Email='test@test.com'
PASSWORD='123456'
url='https://auth.geetest.com/login/'
driver=webdriver.Chrome()
wait=WebDriverWait(driver,20)
driver.get(url)
email=Email
password=PASSWORD
def get_geetest_button():
'''
获取初始验证按钮
:return: 按钮对象
'''
button=wait.until(EC.element_to_be_clickable((By.CLASS_NAME,'geetest_btn_click')))
return button
def get_position():
'''
获取验证码位置
:return: 验证码位置元组
'''
img=wait.until(EC.presence_of_element_located((By.CLASS_NAME,'geetest_bg')))
time.sleep(2)
location=img.location
size=img.size
top,bottom,left,right=location['y'],location['y']+size['height'],location['x'],location['x']+size['width']
return (top,bottom,left,right)
def get_geetest_image():
'''
获取验证码图片
:return: 图片对象
'''
top, bottom, left, right=get_position()
print('验证码位置:',top,bottom,left,right)
screenshot=driver.get_screenshot_as_png()
captcha=screenshot.crop((top,bottom,left,right))
return captcha
def slider():
'''
获取滑块
:return:滑块对象
'''
slider=wait.until(EC.element_to_be_clickable(By.CLASS_NAME,'geetest_arrow'))
return slider
def is_pixel_equal(image1,image2,x,y):
'''
判断像素是否相同
:param image1: 图片1
:param image2: 图片2
:param x: 位置x
:param y: 位置y
:return: 像素是否相同
'''
#取两个像素点
pixel1=image1.load()[x,y]
pixel2=image2.load()[x,y]
threshold=60
if abs(pixel1[0]-pixel2[0])<threshold and abs(pixel1[1]-pixel2[1])<threshold and abs(pixel1[2]-pixel2[2])<threshold:
return True
else:
return False
def get_gap(image1,image2):
'''
获取缺口偏移量
:param image1: 不带缺口图片
:param image2: 带缺口图片
:return: 偏移量
'''
left=60
for i in range(left,image1.size[0]):
for j in range(image1.size[1]):
if not is_pixel_equal(image1,image2,i,j):
left=i
return left
return left
def get_track(distance):
'''
根据偏移量获取移动轨迹
:param distance: 偏移量
:return: 移动轨迹
'''
#移动轨迹
track=[]
#当前位移
current=0
#减速阈值
mid=distance*4/5
#计算间隔
t=0.2
v=0 #初速度
while current < distance:
if current<mid:
#加速度为2
a=2
else:
a=-3
#初速度v0
v0=v
#当前速度v=v0+at
v=v0+a*t
#移动距离x=v0*t+1/2*a*t*t
move = v0 * t + 1 / 2 * a * t * t
#当前位移
current+=move
#加入轨迹
track.append(round(move))
return track
def move_to_gap(slider,tracks):
'''
拖动到滑块缺口处
:param slider: 滑块
:param tracks: 轨迹
:return:
'''
ActionChains(driver).click_and_hold(slider).perform()
for x in tracks:
ActionChains(driver).move_by_offset(xoffset=x,yoffset=0).perform()
time.sleep(0.5)
ActionChains(driver).release().perform()
button=get_geetest_button()
button.click()
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver import ActionChains
'''基本使用'''
#声明浏览器对象
browser=webdriver.Chrome()
try:
'''访问页面'''
browser.get('https://www.baidu.com')
input=browser.find_element(By.ID,'kw')
input.send_keys('python')
input.send_keys(Keys.ENTER)
wait=WebDriverWait(browser,10)
wait.until(EC.presence_of_element_located((By.ID,'content_left')))
print('='*50)
print(browser.current_url)
print('=' * 50)
print(browser.get_cookies())
print('=' * 50)
# print(browser.page_source)
browser.quit()
except:
browser.close()
'''节点交互'''
driver=webdriver.Chrome()
try:
driver.get('https://www.taobao.com')
input=driver.find_element(By.ID,'q')
input.send_keys('手机')
# input.clear() #清除
button=driver.find_element(By.CLASS_NAME,'btn-search')
button.click()
driver.close()
except:
driver.close()
'''动作链(拖拉)'''
driver=webdriver.Chrome()
try:
driver.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')
driver.switch_to.frame('iframeResult') #切换frame
source=driver.find_element(By.ID,'draggable')
target=driver.find_element(By.ID,'droppable')
actions=ActionChains(driver)
actions.drag_and_drop(source,target)
actions.perform()
except:
driver.close()
'''执行JavaScript,比如下拉进度条'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
driver.execute_script('window.scrollTo(0,document.body.scrollHeight)')
driver.execute_script('alert("To Bottom")')
except:
print('error')
driver.close()
'''获取属性'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
logo=driver.find_element(By.ID,'special')
print(logo)
print(logo.get_attribute('class'))
driver.close()
except:
print('error')
driver.close()
'''获取文本'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
print(res.text)
driver.close()
except:
print('error')
driver.close()
'''获取id、位置、标签吗和大小'''
driver=webdriver.Chrome()
try:
driver.get('https://www.zhihu.com/explore')
res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
print('id',res.id)
print('位置',res.location)
print('标签',res.tag_name)
print('大小',res.size)
driver.close()
except:
print('error')
driver.close()
'''延时显式等待'''
driver=webdriver.Chrome()
try:
driver.get('https://www.taobao.com')
#引入WebDriverWait对象,设置最长等待时间
wait=WebDriverWait(driver,10)
#调用until方法,传入等待条件;EC.presence_of_element_located代表节点出现
input=wait.until(EC.presence_of_element_located((By.ID,'q'))) #直到 id='q'的结点加载出来
button=wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR,'.btn-search'))) #直到节点可点击
print(input,button)
driver.close()
except:
print('error')
driver.close()
'''前进和后退'''
driver=webdriver.Chrome()
try:
driver.get('https://www.taobao.com/')
driver.get('https://www.baidu.com/')
driver.get('https://www.python.org/')
driver.back() #后退
time.sleep(1)
driver.forward() #前进
driver.close()
except:
print('error')
driver.close()
import csv
from pyquery import PyQuery as pq
import requests
import json
def Get_HTML(url):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
r = requests.get(url, headers=headers)
r.raise_for_status()
r.encoding = r.apparent_encoding
return r.text
except:
print("Error!")
def parse_page(html):
doc=pq(html)
title=doc('.li03 a').text().split(' ') #名称
actor=doc('.li04 span').text().split(' ') #演员表
index=doc('.li01 b').text().split(' ') #序号
href=[] #链接
for i in doc('.li03 a').items(): #遍历
href.append(i.attr('href'))
score=doc('.li05 span').text().split(' ') #评分
#保存成CSV文件
# with open('1905电影排行榜.csv','a+') as csvfile:
# writer = csv.writer(csvfile,)
# writer.writerow(['序号', '名称', '链接', '演员', '评分'])
# for i in range(len(title)):
# writer.writerow([index[i],title[i],href[i],actor[i],score[i]])
#保存为字典
Result={}
for i in range(100):
result={
'序号':index[i],
'名称':title[i],
'链接':href[i],
'演员':actor[i],
'评分':score[i]
}
Result[index[i]]=result
return Result
def write_to_txtfile(item):
with open('1905电影排行榜.txt', 'a+', encoding='UTF-8') as f:
f.write(json.dumps(item, ensure_ascii=False) + '\n')
def main():
url = 'https://www.1905.com/vod/top/lst/'
html = Get_HTML(url)
item=parse_page(html)
#字典写成TXT文件
# for i in range(1,len(item)+1):
# write_to_txtfile(item[str(i)])
#字典写成CSV文件
# with open('1905电影排行榜_111.csv','a+',encoding='utf-8') as csvfile:
# fieldnames = ['序号', '名称', '链接', '演员', '评分']
# writer=csv.DictWriter(csvfile,fieldnames=fieldnames)
# writer.writeheader()
# for i in range(len(item)):
# writer.writerow(item[str(i+1)])
main()
import pymysql
from pyquery import PyQuery as pq
import requests
def Get_HTML(url):
headers = {'User-Agent': 'Mozilla/5.0'}
try:
r = requests.get(url, headers=headers)
r.raise_for_status()
r.encoding = r.apparent_encoding
return r.text
except:
print("Error!")
def parse_page(html):
doc=pq(html)
title=doc('.li03 a').text().split(' ') #名称
actor=doc('.li04 span').text().split(' ') #演员表
index=doc('.li01 b').text().split(' ') #序号
href=[] #链接
for i in doc('.li03 a').items(): #遍历
href.append(i.attr('href'))
score=doc('.li05 span').text().split(' ') #评分
#保存为字典
Result={}
for i in range(100):
result={
'序号':index[i],
'名称':title[i],
'链接':href[i],
'演员':actor[i],
'评分':score[i]
}
# print(result)
Result[index[i]]=result
return Result
def write_to_mysql(dict):
'''
字典数据写到mysql
:param dict:
:return: Null
'''
db=pymysql.connect(host='******',user='root',password='******',port=3306,db='xiaomishop')
cursor=db.cursor()
table='movie'
keys=','.join(dict.keys())
values=','.join(['%s']*len(dict))
sql='insert into {table}({keys}) values ({values})'.format(table=table,keys=keys,values=values)
try:
if cursor.execute(sql,tuple(dict.values())):
print("successful")
db.commit()
except:
print("failed")
db.rollback()
db.close()
def update_data(dict):
'''
若主键存在,进行更新操作;否则进行插入数据
:param dict:
:return:
'''
db = pymysql.connect(host='******', user='root', password='******', passwd='3306', db='xiaomishop')
cursor=db.cursor()
table='movie' #表名
keys=','.join(dict.keys())
values=','.join(['%s']*len(dict))
sql='insert into {table}({keys}) values ({values}) ON DUPLICATE KEY UPDATE'.format(table=table,keys=keys,values=values)
update=','.join([' {key}=%s'.format(key=key) for key in dict])
sql+=update
try:
if cursor.execute(sql,tuple(dict.values())*2):
print("successful")
db.commit()
except:
print("failed")
db.rollback()
db.close()
#查询
def search():
db = pymysql.connect(host='127.0.0.1', user='root', password='581581', passwd='3306', db='xiaomishop')
cursor = db.cursor()
sql = 'select * from movie'
try:
cursor.execute(sql)
print(cursor.rowcount)
row = cursor.fetchone()
while row:
print('Row:', row)
row = cursor.fetchone()
except:
print("Error")
db.close()
def main():
url = 'https://www.1905.com/vod/top/lst/'
html = Get_HTML(url)
item=parse_page(html)
# for i in range(len(item)):
# # write_to_mysql(item[str(i+1)]) #写入
# update_data(item[str(i+1)]) #更新
search() #查询
main()
from urllib.parse import urlencode
import requests
import re
def Get_page(page_num):
headers={
'Host':'so.toutiao.com',
'user-agent':'Mozilla/5.0',
'cookie':'tt=acd600b5ac865033f0ee83a63ed44675; '
}
base_url='https://so.toutiao.com/search?'
params = {
'keyword': '街拍',
'pd': 'atlas',
'dvpf': 'pc',
'aid': '4916',
'page_num': page_num,
'search_json': '%7B%22from_search_id%22%3A%2220220104115420010212192151532E8188%22%2C%22origin_keyword%22%3A%22%E8%A1%97%E6%8B%8D%22%2C%22image_keyword%22%3A%22%E8%A1%97%E6%8B%8D%22%7D',
'rawJSON': 1,
'search_id': '2022062517173701021219402539E36546'
}
url=base_url+urlencode(params)
try:
r=requests.get(url,headers=headers)
if r.status_code==200:
return r.json()
except:
print('Failed!')
def get_images(json):
images = json['rawData']['data']
for image in images:
yield {
'name': image['text'],
'url':image['img_url']
}
def save_image(dict):
data=requests.get(dict['url']).content
#把“和 :都替换成‘,’,否则python路径会识别错误
name=dict['name'].replace(':',',')
name=name.replace('"',',')
name=name.replace('?','')
name=name.split('|')[-1] #碰到‘|’就取最后一个
houzui=dict['url'].split('.')[-1] #截取后缀,比如JPEG,png等图片的格式名称
with open("./images/{}.{}".format(name,houzui),'wb') as f:
f.write(data)
def main():
for page_num in range(1,20):
json=Get_page(page_num)
dict=get_images(json)
for d in dict:
save_image(d)
main()
tesseract的安装使用及配置问题解决: https://blog.csdn.net/Alexa_/article/details/121192132
Tesserocr报错:“RuntimeError: Failed to init API, possibly an invalid tessdata path:”解决办法:https://blog.csdn.net/Aiy5249/article/details/118055261