异步爬虫之多进程和多线程

基本概念

目的:在爬虫中使用异步实现高性能的数据爬取操作

线程和进程的区别

  • 线程具有许多传统进程所具有的特征,故又称为轻型进程(Light—Weight Process)或进程元;而把传统的进程称为重型进程(Heavy—Weight Process),它相当于只有一个线程的任务。在引入了线程的操作系统中,通常一个进程都有若干个线程,至少包含一个线程。
  • 根本区别:进程是操作系统资源分配的基本单位,而线程是处理器任务调度和执行的基本单位
  • 资源开销:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程可以看做轻量级的进程,同一类线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小

版权声明:本文为CSDN博主「ThinkWon」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ThinkWon/article/details/102021274

异步爬虫的方式

多进程、多线程

  • 好处:可以为相关阻塞的操作单独开启进程或者线程,阻塞操作可以异步执行。
  • 弊端:无法无限制的开启多线程或者多进程。

进程池、线程池

  • 好处:可以降低系统对进程或者线程的创建和销毁的一个频率,从而很好的降低系统的开销。
  • 弊端:池中进程或线程的数量是有上限的。

基本使用

python 多线程

多线程 方法(1)

直接调用Thread

from threading import Thread  # 线程类


def fun1():
    for i in range(5):
        print(i, end=' ')


def fun2():
    for i in range(5, 10):
        print(i, end=' ')


if __name__ == '__main__':
    t1 = Thread(target=fun1)
    t1.start()  # 子线程执行
    fun2()  # 主线程执行

执行结果为:

05 6 7 8 9 1 2 3 4

多线程 方法(2)

通过类继承的方法来实现,直接继承Thread类并且重写run函数

from threading import Thread


def fun1():
    for i in range(5):
        print(i, end=' ')


def fun2():
    for i in range(5, 10):
        print(i, end=' ')


# 继承Thread重新run函数
class MyThread(Thread):
    def run(self) -> None:
        fun2()


if __name__ == '__main__':
    t = MyThread()
    t.start()  # 开启线程开始运行
    fun1()

执行结果:

50 6 71 2 8 3 4 9

线程池(1)ThreadPoolExecutor

一次性创建一些线程,我们用户直接给线程池提交任务,线程任务的调度交给线程池来完成

from concurrent.futures import ThreadPoolExecutor
import time


def fn(name):
    time.sleep(2)
    print(name)


if __name__ == '__main__':
    # 创建线程池
    startTime = time.time()
    with ThreadPoolExecutor(1000) as t:
        for i in range(1000):
            t.submit(fn, name=f'线程{i}')
    endTime = time.time()
    print(f'运行时间{endTime - startTime}')
    # 等待线程池中的任务全部执行完毕,才能继续执行下面的代码

线程池(2)multiprocessing

import time
from multiprocessing.dummy import Pool
start_time = time.time()


def fake_process(str):
    print("正在执行:", str)
    time.sleep(2)
    print('执行完成:', str)
    return str


process_list = ['1', '2', '3', '4']
poop = Pool()

str_list = poop.map(fake_process, process_list)

end_time = time.time()

print('耗时:', end_time-start_time)
print('进程返回结果', str_list)

python 多进程

多进程 方法(1)

from multiprocessing import Process  # 进程类


def fun1(arg):
    for i in range(10000):
        print(arg, i)


def fun2(arg):
    for i in range(10000):
        print(arg, i)


if __name__ == '__main__':
    p1 = Process(target=fun1, args=('进程1',))
    p2 = Process(target=fun2, args=('进程2',))
    p1.start()
    p2.start()

多进程和多线程如果需要传递参数,参数一定是一个元组,例如 args=('进程1',)

多进程 方法(2)

通过类继承的方法来实现,直接继承Process类并且重写run函数

from multiprocessing import Process  # 进程类


class MyProcess(Process):

    def __init__(self, arg):
        super(MyProcess, self).__init__()
        self.arg = arg

    def run(self) -> None:
        for i in range(10000):
            print(self.arg, i)


if __name__ == '__main__':
    p1 = MyProcess('进程1')
    p2 = MyProcess('进程2')
    p1.start()
    p2.start()

案例:爬取新发地菜价信息

爬取 新发地-价格行情 (xinfadi.com.cn) 中所有信息

新发地菜价价格行情目录

代码:

import requests
from fake_useragent import UserAgent
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time


# 爬取一页数据
def download_one_page(data: dict):
    url = 'http://www.xinfadi.com.cn/getPriceData.html'
    headers = {
        'user-agent': UserAgent().random,
        'referer': 'http://www.xinfadi.com.cn/priceDetail.html'
    }

    resp = requests.post(url=url, headers=headers, data=data)
    data_list = resp.json().get('list')
    # 保存数据
    with open('北京新发地.csv', 'a', encoding='utf-8') as fp:
        for elem in tqdm(data_list, desc=f'下载第 {data["current"]} 页数据 当前状告码:{resp.status_code}', ascii=True):
            info = (elem['prodCat'], elem['prodPcat'], elem['prodName'], elem['lowPrice'], elem['avgPrice'],
                    elem['highPrice'], elem['specInfo']
                    , elem['place'], elem['unitInfo'], elem['pubDate'])
            fp.write(','.join(info) + '\n')


def download_pages(page_start: int, page_end: int, page_limit: int = 20):
    fp = open('北京新发地.csv', 'w', encoding='utf-8')
    title = ['一级分类', '二级分类', '品名', '最低价', '平均价', '最高价', '规格', '产地', '单位', '发布日期']
    fp.write(','.join(title) + '\n')
    fp.close()
    with ThreadPoolExecutor(2048) as t:
        for i in range(page_start, page_end + 1):
            data = {
                'limit': f'{page_limit}',
                'current': f'{i}',
                'pubDateStartTime': '',
                'pubDateEndTime': '',
                'prodPcatid': '',
                'prodCatid': '',
                'prodName': ''
            }
            t.submit(download_one_page, data)


if __name__ == '__main__':
    start_time = time.time()
    download_pages(page_start=1, page_end=100, page_limit=20)
    end_time = time.time()
    print(f'总耗时{end_time - start_time}s')
最后修改:2022 年 02 月 18 日
如果觉得我的文章对你有用,请随意赞赏