图片爬虫(小改动)

'''
网站网址:http://www.meinuzi.com/
'''

import random
from pyquery import PyQuery as pq
import requests

proxies=[
         {'HTTP':'http://122.140.5.115:9999'},
         {'HTTP':'http://113.101.96.66:8080'},
         {'HTTP':'http://113.124.86.24:9999'},
         {'HTTP':'http://121.13.252.58:41564'},
         {'HTTP':'http://61.216.185.88:60808'},
         {'HTTP':'http://58.20.184.187:9091'},
         {'HTTP':'http://183.236.123.242:8060'},
         {'HTTP':'http://116.9.163.205:58080'},
         {'HTTP':'http://222.74.73.202:42055'},
         {'HTTP':'http://183.247.202.208:30001'},
         {'HTTP':'http://39.108.101.55:1080'},
         {'HTTP':'http://47.105.91.226:8118'},
         ]
proxie=random.choice(proxies)
user_agent_list = ['Chrome/86.0.4240.198',
                   'Chrome/101.0.4951.64',
                   'Chrome/96.0.4664.45',
                   'Chrome/94.0.4606.41'
                   ]
headers = {'User-Agent': random.choice(user_agent_list),
           'Referer':'http://www.meinuzi.com'}

'''
请求url,返回HTML
'''
def Req_url(url):
    try:
        r=requests.get(url,headers=headers,proxies=proxie)
        r.raise_for_status()
        r.encoding=r.apparent_encoding
        return r.text
    except:
        # print('ERROR!')
        return 'over'

'''
解析初始页面,找到图片页面的网址
'''
def Parse_page(html):
    doc=pq(html)
    items=doc('.m-list-main li').items()
    i=1
    for item in items:
        page_url=item('.u-img a').attr('href')
        name=item('.u-img img').attr('alt')
        print("保存第",i,'张图')
        Get_img(page_url,name)
        i+=1

'''
解析图片页面的网址,找到每张图片的url
'''
def Get_img(page_url,name):
    html=Req_url(page_url)
    doc=pq(html)
    img_baseurl=doc('.g-mn .m-list-content img').attr('src')
    Save_img(img_baseurl,name)

'''
根据得到的图片url,保存到文件夹
'''
def Save_img(img_baseurl,name):
    img_url=headers['Referer']+img_baseurl
    data=requests.get(img_url,headers=headers,proxies=proxie,timeout=15).content
    houzui=img_url.split('.')[-1]
    with open('./images_1/{}.{}'.format(name,houzui),'wb') as f:
        f.write(data)


def main():

    num = 7  # 从第num页开始

    '''
    当返回值为‘over’的时候,说明页面不存在,即已经爬取全部页面
    '''
    while True:
        page_url='http://www.meinuzi.com/index_'+str(num)+'.html'

        html = Req_url(page_url)
        if html != 'over':
            '''
            页面存在但无图片数据,标签中只有换行符时
            '''
            doc=pq(html)
            item = doc('.m-list-main ul').text()
            if item!='' \
                     '':
                print('**********第',num,'页**********')
                Parse_page(html)
                num+=1
            else:
                print('程序结束')
                return '程序结束'
        else:
            print('程序结束')
            return '程序结束'
main()






验证码识别

import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import ActionChains
from PIL import Image
#https://www.om.cn/register
Email='test@test.com'
PASSWORD='123456'
url='https://auth.geetest.com/login/'
driver=webdriver.Chrome()
wait=WebDriverWait(driver,20)
driver.get(url)
email=Email
password=PASSWORD

def get_geetest_button():
    '''
    获取初始验证按钮
    :return: 按钮对象
    '''
    button=wait.until(EC.element_to_be_clickable((By.CLASS_NAME,'geetest_btn_click')))
    return button


def get_position():
    '''
    获取验证码位置
    :return: 验证码位置元组
    '''
    img=wait.until(EC.presence_of_element_located((By.CLASS_NAME,'geetest_bg')))
    time.sleep(2)
    location=img.location
    size=img.size
    top,bottom,left,right=location['y'],location['y']+size['height'],location['x'],location['x']+size['width']
    return (top,bottom,left,right)

def get_geetest_image():
    '''
    获取验证码图片
    :return: 图片对象
    '''
    top, bottom, left, right=get_position()
    print('验证码位置:',top,bottom,left,right)
    screenshot=driver.get_screenshot_as_png()
    captcha=screenshot.crop((top,bottom,left,right))
    return captcha

def slider():
    '''
    获取滑块
    :return:滑块对象
    '''
    slider=wait.until(EC.element_to_be_clickable(By.CLASS_NAME,'geetest_arrow'))
    return slider

def is_pixel_equal(image1,image2,x,y):
    '''
    判断像素是否相同
    :param image1: 图片1
    :param image2: 图片2
    :param x: 位置x
    :param y: 位置y
    :return: 像素是否相同
    '''
    #取两个像素点
    pixel1=image1.load()[x,y]
    pixel2=image2.load()[x,y]
    threshold=60
    if abs(pixel1[0]-pixel2[0])<threshold and abs(pixel1[1]-pixel2[1])<threshold and abs(pixel1[2]-pixel2[2])<threshold:
        return True
    else:
        return False

def get_gap(image1,image2):
    '''
    获取缺口偏移量
    :param image1: 不带缺口图片
    :param image2: 带缺口图片
    :return: 偏移量
    '''
    left=60
    for i in range(left,image1.size[0]):
        for j in range(image1.size[1]):
            if not is_pixel_equal(image1,image2,i,j):
                left=i
                return left
    return left

def get_track(distance):
    '''
    根据偏移量获取移动轨迹
    :param distance: 偏移量
    :return: 移动轨迹
    '''
    #移动轨迹
    track=[]
    #当前位移
    current=0
    #减速阈值
    mid=distance*4/5
    #计算间隔
    t=0.2
    v=0   #初速度
    while current < distance:
        if current<mid:
            #加速度为2
            a=2
        else:
            a=-3
        #初速度v0
        v0=v
        #当前速度v=v0+at
        v=v0+a*t
        #移动距离x=v0*t+1/2*a*t*t
        move = v0 * t + 1 / 2 * a * t * t
        #当前位移
        current+=move
        #加入轨迹
        track.append(round(move))
    return track

def move_to_gap(slider,tracks):
    '''
    拖动到滑块缺口处
    :param slider: 滑块
    :param tracks: 轨迹
    :return:
    '''
    ActionChains(driver).click_and_hold(slider).perform()
    for x in tracks:
        ActionChains(driver).move_by_offset(xoffset=x,yoffset=0).perform()
    time.sleep(0.5)
    ActionChains(driver).release().perform()


button=get_geetest_button()
button.click()

动态渲染

import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver import ActionChains

'''基本使用'''
#声明浏览器对象
browser=webdriver.Chrome()
try:
    '''访问页面'''
    browser.get('https://www.baidu.com')
    input=browser.find_element(By.ID,'kw')
    input.send_keys('python')
    input.send_keys(Keys.ENTER)
    wait=WebDriverWait(browser,10)
    wait.until(EC.presence_of_element_located((By.ID,'content_left')))
    print('='*50)
    print(browser.current_url)
    print('=' * 50)
    print(browser.get_cookies())
    print('=' * 50)
    # print(browser.page_source)
    browser.quit()
except:
    browser.close()

'''节点交互'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.taobao.com')
    input=driver.find_element(By.ID,'q')
    input.send_keys('手机')
    # input.clear()   #清除
    button=driver.find_element(By.CLASS_NAME,'btn-search')
    button.click()
    driver.close()
except:
    driver.close()


'''动作链(拖拉)'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.runoob.com/try/try.php?filename=jqueryui-api-droppable')
    driver.switch_to.frame('iframeResult')     #切换frame
    source=driver.find_element(By.ID,'draggable')
    target=driver.find_element(By.ID,'droppable')
    actions=ActionChains(driver)
    actions.drag_and_drop(source,target)
    actions.perform()

except:
    driver.close()

'''执行JavaScript,比如下拉进度条'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    driver.execute_script('window.scrollTo(0,document.body.scrollHeight)')
    driver.execute_script('alert("To Bottom")')
except:
    print('error')
    driver.close()

'''获取属性'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    logo=driver.find_element(By.ID,'special')
    print(logo)
    print(logo.get_attribute('class'))
    driver.close()
except:
    print('error')
    driver.close()

'''获取文本'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
    print(res.text)
    driver.close()
except:
    print('error')
    driver.close()

'''获取id、位置、标签吗和大小'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.zhihu.com/explore')
    res=driver.find_element(By.CLASS_NAME,'ExploreSpecialCard-contentTitle')
    print('id',res.id)
    print('位置',res.location)
    print('标签',res.tag_name)
    print('大小',res.size)
    driver.close()
except:
    print('error')
    driver.close()

'''延时显式等待'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.taobao.com')
#引入WebDriverWait对象,设置最长等待时间
    wait=WebDriverWait(driver,10)
#调用until方法,传入等待条件;EC.presence_of_element_located代表节点出现
    input=wait.until(EC.presence_of_element_located((By.ID,'q')))    #直到 id='q'的结点加载出来
    button=wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR,'.btn-search')))    #直到节点可点击
    print(input,button)
    driver.close()
except:
    print('error')
    driver.close()

'''前进和后退'''
driver=webdriver.Chrome()
try:
    driver.get('https://www.taobao.com/')
    driver.get('https://www.baidu.com/')
    driver.get('https://www.python.org/')
    driver.back()   #后退
    time.sleep(1)
    driver.forward()  #前进
    driver.close()
except:
    print('error')
    driver.close()

pyquery库使用

import csv
from pyquery import PyQuery as pq
import requests
import json

def Get_HTML(url):
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:
        r = requests.get(url, headers=headers)
        r.raise_for_status()
        r.encoding = r.apparent_encoding
        return r.text
    except:
        print("Error!")

def parse_page(html):
    doc=pq(html)
    title=doc('.li03 a').text().split(' ')  #名称
    actor=doc('.li04 span').text().split(' ')  #演员表
    index=doc('.li01 b').text().split(' ')     #序号
    href=[]  #链接
    for i in doc('.li03 a').items():  #遍历
        href.append(i.attr('href'))
    score=doc('.li05 span').text().split(' ')  #评分
#保存成CSV文件
    # with open('1905电影排行榜.csv','a+') as csvfile:
    #     writer = csv.writer(csvfile,)
    #     writer.writerow(['序号', '名称', '链接', '演员', '评分'])
    #     for i in range(len(title)):
    #         writer.writerow([index[i],title[i],href[i],actor[i],score[i]])
#保存为字典
    Result={}
    for i in range(100):
        result={
            '序号':index[i],
            '名称':title[i],
            '链接':href[i],
            '演员':actor[i],
            '评分':score[i]
        }
        Result[index[i]]=result
    return Result
def write_to_txtfile(item):
    with open('1905电影排行榜.txt', 'a+', encoding='UTF-8') as f:
        f.write(json.dumps(item, ensure_ascii=False) + '\n')
def main():
    url = 'https://www.1905.com/vod/top/lst/'
    html = Get_HTML(url)
    item=parse_page(html)
#字典写成TXT文件
    # for i in range(1,len(item)+1):
        # write_to_txtfile(item[str(i)])
#字典写成CSV文件
    # with open('1905电影排行榜_111.csv','a+',encoding='utf-8') as csvfile:
    #     fieldnames = ['序号', '名称', '链接', '演员', '评分']
    #     writer=csv.DictWriter(csvfile,fieldnames=fieldnames)
    #     writer.writeheader()
    #     for i in range(len(item)):
    #         writer.writerow(item[str(i+1)])

main()

爬取数据保存到MySQL

import pymysql
from pyquery import PyQuery as pq
import requests

def Get_HTML(url):
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:
        r = requests.get(url, headers=headers)
        r.raise_for_status()
        r.encoding = r.apparent_encoding
        return r.text
    except:
        print("Error!")
def parse_page(html):
    doc=pq(html)
    title=doc('.li03 a').text().split(' ')  #名称
    actor=doc('.li04 span').text().split(' ')  #演员表
    index=doc('.li01 b').text().split(' ')     #序号
    href=[]  #链接
    for i in doc('.li03 a').items():  #遍历
        href.append(i.attr('href'))
    score=doc('.li05 span').text().split(' ')  #评分
#保存为字典
    Result={}
    for i in range(100):
        result={
            '序号':index[i],
            '名称':title[i],
            '链接':href[i],
            '演员':actor[i],
            '评分':score[i]
        }
        # print(result)
        Result[index[i]]=result
    return Result


def write_to_mysql(dict):
    '''
    字典数据写到mysql
    :param dict:
    :return: Null
    '''
    db=pymysql.connect(host='******',user='root',password='******',port=3306,db='xiaomishop')
    cursor=db.cursor()
    table='movie'
    keys=','.join(dict.keys())
    values=','.join(['%s']*len(dict))
    sql='insert into {table}({keys}) values ({values})'.format(table=table,keys=keys,values=values)
    try:
        if cursor.execute(sql,tuple(dict.values())):
            print("successful")
            db.commit()
    except:
        print("failed")
        db.rollback()
    db.close()


def update_data(dict):
    '''
    若主键存在,进行更新操作;否则进行插入数据
    :param dict:
    :return:
    '''
    db = pymysql.connect(host='******', user='root', password='******', passwd='3306', db='xiaomishop')
    cursor=db.cursor()
    table='movie'    #表名
    keys=','.join(dict.keys())
    values=','.join(['%s']*len(dict))
    sql='insert into {table}({keys}) values ({values}) ON DUPLICATE KEY UPDATE'.format(table=table,keys=keys,values=values)
    update=','.join([' {key}=%s'.format(key=key) for key in dict])
    sql+=update
    try:
        if cursor.execute(sql,tuple(dict.values())*2):
            print("successful")
            db.commit()
    except:
        print("failed")
        db.rollback()
    db.close()

#查询
def search():
    db = pymysql.connect(host='127.0.0.1', user='root', password='581581', passwd='3306', db='xiaomishop')
    cursor = db.cursor()
    sql = 'select * from movie'
    try:
        cursor.execute(sql)
        print(cursor.rowcount)
        row = cursor.fetchone()
        while row:
            print('Row:', row)
            row = cursor.fetchone()
    except:
        print("Error")
    db.close()

def main():
    url = 'https://www.1905.com/vod/top/lst/'
    html = Get_HTML(url)
    item=parse_page(html)
    # for i in range(len(item)):
    #     # write_to_mysql(item[str(i+1)])   #写入
    #     update_data(item[str(i+1)])       #更新
    search()                    #查询
main()