验证码识别

import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import ActionChains
from PIL import Image
#https://www.om.cn/register
Email='test@test.com'
PASSWORD='123456'
url='https://auth.geetest.com/login/'
driver=webdriver.Chrome()
wait=WebDriverWait(driver,20)
driver.get(url)
email=Email
password=PASSWORD

def get_geetest_button():
    '''
    获取初始验证按钮
    :return: 按钮对象
    '''
    button=wait.until(EC.element_to_be_clickable((By.CLASS_NAME,'geetest_btn_click')))
    return button


def get_position():
    '''
    获取验证码位置
    :return: 验证码位置元组
    '''
    img=wait.until(EC.presence_of_element_located((By.CLASS_NAME,'geetest_bg')))
    time.sleep(2)
    location=img.location
    size=img.size
    top,bottom,left,right=location['y'],location['y']+size['height'],location['x'],location['x']+size['width']
    return (top,bottom,left,right)

def get_geetest_image():
    '''
    获取验证码图片
    :return: 图片对象
    '''
    top, bottom, left, right=get_position()
    print('验证码位置:',top,bottom,left,right)
    screenshot=driver.get_screenshot_as_png()
    captcha=screenshot.crop((top,bottom,left,right))
    return captcha

def slider():
    '''
    获取滑块
    :return:滑块对象
    '''
    slider=wait.until(EC.element_to_be_clickable(By.CLASS_NAME,'geetest_arrow'))
    return slider

def is_pixel_equal(image1,image2,x,y):
    '''
    判断像素是否相同
    :param image1: 图片1
    :param image2: 图片2
    :param x: 位置x
    :param y: 位置y
    :return: 像素是否相同
    '''
    #取两个像素点
    pixel1=image1.load()[x,y]
    pixel2=image2.load()[x,y]
    threshold=60
    if abs(pixel1[0]-pixel2[0])<threshold and abs(pixel1[1]-pixel2[1])<threshold and abs(pixel1[2]-pixel2[2])<threshold:
        return True
    else:
        return False

def get_gap(image1,image2):
    '''
    获取缺口偏移量
    :param image1: 不带缺口图片
    :param image2: 带缺口图片
    :return: 偏移量
    '''
    left=60
    for i in range(left,image1.size[0]):
        for j in range(image1.size[1]):
            if not is_pixel_equal(image1,image2,i,j):
                left=i
                return left
    return left

def get_track(distance):
    '''
    根据偏移量获取移动轨迹
    :param distance: 偏移量
    :return: 移动轨迹
    '''
    #移动轨迹
    track=[]
    #当前位移
    current=0
    #减速阈值
    mid=distance*4/5
    #计算间隔
    t=0.2
    v=0   #初速度
    while current < distance:
        if current<mid:
            #加速度为2
            a=2
        else:
            a=-3
        #初速度v0
        v0=v
        #当前速度v=v0+at
        v=v0+a*t
        #移动距离x=v0*t+1/2*a*t*t
        move = v0 * t + 1 / 2 * a * t * t
        #当前位移
        current+=move
        #加入轨迹
        track.append(round(move))
    return track

def move_to_gap(slider,tracks):
    '''
    拖动到滑块缺口处
    :param slider: 滑块
    :param tracks: 轨迹
    :return:
    '''
    ActionChains(driver).click_and_hold(slider).perform()
    for x in tracks:
        ActionChains(driver).move_by_offset(xoffset=x,yoffset=0).perform()
    time.sleep(0.5)
    ActionChains(driver).release().perform()


button=get_geetest_button()
button.click()

动态渲染

import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver import ActionChains

'''基本使用'''
#声明浏览器对象
browser=webdriver.Chrome()
try:
    '''访问页面'''
    browser.get('https://www.baidu.com')
    input=browser.find_element(By.ID,'kw')
    input.send_keys('python')
    input.send_keys(Keys.ENTER)
    wait=WebDriverWait(browser,10)
    wait.until(EC.presence_of_element_located((By.ID,'content_left')))
    print('='*50)
    print(browser.current_url)
    print('=' * 50)
    print(browser.get_cookies())
    print('=' * 50)
    # print(browser.page_source)
    browser.quit()
except:
    browser.close()

'''节点交互'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.taobao.com')
    input=driver.find_element(By.ID,'q')
    input.send_keys('手机')
    # input.clear()   #清除
    button=driver.find_element(By.CLASS_NAME,'btn-search')
    button.click()
    driver.close()
except:
    driver.close()


'''动作链(拖拉)'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')
    driver.switch_to.frame('iframeResult')     #切换frame
    source=driver.find_element(By.ID,'draggable')
    target=driver.find_element(By.ID,'droppable')
    actions=ActionChains(driver)
    actions.drag_and_drop(source,target)
    actions.perform()

except:
    driver.close()

'''执行JavaScript,比如下拉进度条'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    driver.execute_script('window.scrollTo(0,document.body.scrollHeight)')
    driver.execute_script('alert("To Bottom")')
except:
    print('error')
    driver.close()

'''获取属性'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    logo=driver.find_element(By.ID,'special')
    print(logo)
    print(logo.get_attribute('class'))
    driver.close()
except:
    print('error')
    driver.close()

'''获取文本'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
    print(res.text)
    driver.close()
except:
    print('error')
    driver.close()

'''获取id、位置、标签吗和大小'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
    print('id',res.id)
    print('位置',res.location)
    print('标签',res.tag_name)
    print('大小',res.size)
    driver.close()
except:
    print('error')
    driver.close()

'''延时显式等待'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.taobao.com')
#引入WebDriverWait对象,设置最长等待时间
    wait=WebDriverWait(driver,10)
#调用until方法,传入等待条件;EC.presence_of_element_located代表节点出现
    input=wait.until(EC.presence_of_element_located((By.ID,'q')))    #直到 id='q'的结点加载出来
    button=wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR,'.btn-search')))    #直到节点可点击
    print(input,button)
    driver.close()
except:
    print('error')
    driver.close()

'''前进和后退'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.taobao.com/')
    driver.get('https://www.baidu.com/')
    driver.get('https://www.python.org/')
    driver.back()   #后退
    time.sleep(1)
    driver.forward()  #前进
    driver.close()
except:
    print('error')
    driver.close()

pyquery库使用

import csv
from pyquery import PyQuery as pq
import requests
import json

def Get_HTML(url):
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:
        r = requests.get(url, headers=headers)
        r.raise_for_status()
        r.encoding = r.apparent_encoding
        return r.text
    except:
        print("Error!")

def parse_page(html):
    doc=pq(html)
    title=doc('.li03 a').text().split(' ')  #名称
    actor=doc('.li04 span').text().split(' ')  #演员表
    index=doc('.li01 b').text().split(' ')     #序号
    href=[]  #链接
    for i in doc('.li03 a').items():  #遍历
        href.append(i.attr('href'))
    score=doc('.li05 span').text().split(' ')  #评分
#保存成CSV文件
    # with open('1905电影排行榜.csv','a+') as csvfile:
    #     writer = csv.writer(csvfile,)
    #     writer.writerow(['序号', '名称', '链接', '演员', '评分'])
    #     for i in range(len(title)):
    #         writer.writerow([index[i],title[i],href[i],actor[i],score[i]])
#保存为字典
    Result={}
    for i in range(100):
        result={
            '序号':index[i],
            '名称':title[i],
            '链接':href[i],
            '演员':actor[i],
            '评分':score[i]
        }
        Result[index[i]]=result
    return Result
def write_to_txtfile(item):
    with open('1905电影排行榜.txt', 'a+', encoding='UTF-8') as f:
        f.write(json.dumps(item, ensure_ascii=False) + '\n')
def main():
    url = 'https://www.1905.com/vod/top/lst/'
    html = Get_HTML(url)
    item=parse_page(html)
#字典写成TXT文件
    # for i in range(1,len(item)+1):
        # write_to_txtfile(item[str(i)])
#字典写成CSV文件
    # with open('1905电影排行榜_111.csv','a+',encoding='utf-8') as csvfile:
    #     fieldnames = ['序号', '名称', '链接', '演员', '评分']
    #     writer=csv.DictWriter(csvfile,fieldnames=fieldnames)
    #     writer.writeheader()
    #     for i in range(len(item)):
    #         writer.writerow(item[str(i+1)])

main()

爬取数据保存到MySQL

import pymysql
from pyquery import PyQuery as pq
import requests

def Get_HTML(url):
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:
        r = requests.get(url, headers=headers)
        r.raise_for_status()
        r.encoding = r.apparent_encoding
        return r.text
    except:
        print("Error!")
def parse_page(html):
    doc=pq(html)
    title=doc('.li03 a').text().split(' ')  #名称
    actor=doc('.li04 span').text().split(' ')  #演员表
    index=doc('.li01 b').text().split(' ')     #序号
    href=[]  #链接
    for i in doc('.li03 a').items():  #遍历
        href.append(i.attr('href'))
    score=doc('.li05 span').text().split(' ')  #评分
#保存为字典
    Result={}
    for i in range(100):
        result={
            '序号':index[i],
            '名称':title[i],
            '链接':href[i],
            '演员':actor[i],
            '评分':score[i]
        }
        # print(result)
        Result[index[i]]=result
    return Result


def write_to_mysql(dict):
    '''
    字典数据写到mysql
    :param dict:
    :return: Null
    '''
    db=pymysql.connect(host='******',user='root',password='******',port=3306,db='xiaomishop')
    cursor=db.cursor()
    table='movie'
    keys=','.join(dict.keys())
    values=','.join(['%s']*len(dict))
    sql='insert into {table}({keys}) values ({values})'.format(table=table,keys=keys,values=values)
    try:
        if cursor.execute(sql,tuple(dict.values())):
            print("successful")
            db.commit()
    except:
        print("failed")
        db.rollback()
    db.close()


def update_data(dict):
    '''
    若主键存在,进行更新操作;否则进行插入数据
    :param dict:
    :return:
    '''
    db = pymysql.connect(host='******', user='root', password='******', passwd='3306', db='xiaomishop')
    cursor=db.cursor()
    table='movie'    #表名
    keys=','.join(dict.keys())
    values=','.join(['%s']*len(dict))
    sql='insert into {table}({keys}) values ({values}) ON DUPLICATE KEY UPDATE'.format(table=table,keys=keys,values=values)
    update=','.join([' {key}=%s'.format(key=key) for key in dict])
    sql+=update
    try:
        if cursor.execute(sql,tuple(dict.values())*2):
            print("successful")
            db.commit()
    except:
        print("failed")
        db.rollback()
    db.close()

#查询
def search():
    db = pymysql.connect(host='127.0.0.1', user='root', password='581581', passwd='3306', db='xiaomishop')
    cursor = db.cursor()
    sql = 'select * from movie'
    try:
        cursor.execute(sql)
        print(cursor.rowcount)
        row = cursor.fetchone()
        while row:
            print('Row:', row)
            row = cursor.fetchone()
    except:
        print("Error")
    db.close()

def main():
    url = 'https://www.1905.com/vod/top/lst/'
    html = Get_HTML(url)
    item=parse_page(html)
    # for i in range(len(item)):
    #     # write_to_mysql(item[str(i+1)])   #写入
    #     update_data(item[str(i+1)])       #更新
    search()                    #查询
main()

Ajax数据爬取

from urllib.parse import urlencode
import requests
import re

def Get_page(page_num):
    headers={
        'Host':'so.toutiao.com',
        'user-agent':'Mozilla/5.0',
        'cookie':'tt=acd600b5ac865033f0ee83a63ed44675; '
    }
    base_url='https://so.toutiao.com/search?'
    params = {
        'keyword': '街拍',
        'pd': 'atlas',
        'dvpf': 'pc',
        'aid': '4916',
        'page_num': page_num,
        'search_json': '%7B%22from_search_id%22%3A%2220220104115420010212192151532E8188%22%2C%22origin_keyword%22%3A%22%E8%A1%97%E6%8B%8D%22%2C%22image_keyword%22%3A%22%E8%A1%97%E6%8B%8D%22%7D',
        'rawJSON': 1,
        'search_id': '2022062517173701021219402539E36546'
    }
    url=base_url+urlencode(params)
    try:
        r=requests.get(url,headers=headers)
        if r.status_code==200:
            return r.json()
    except:
        print('Failed!')

def get_images(json):
    images = json['rawData']['data']
    for image in images:
        yield {
            'name': image['text'],
            'url':image['img_url']
        }

def save_image(dict):
    data=requests.get(dict['url']).content
#把“和 :都替换成‘,’,否则python路径会识别错误
    name=dict['name'].replace(':',',')
    name=name.replace('"',',')
    name=name.replace('?','')
    name=name.split('|')[-1]  #碰到‘|’就取最后一个
    houzui=dict['url'].split('.')[-1]   #截取后缀,比如JPEG,png等图片的格式名称
    with open("./images/{}.{}".format(name,houzui),'wb') as f:
        f.write(data)

def main():
    for page_num in range(1,20):
        json=Get_page(page_num)
        dict=get_images(json)
        for d in dict:
            save_image(d)


main()