當 Item 在 Spider 中被收集之后,它將會被傳遞到 Item Pipeline,一些組件會按照一定的順序執(zhí)行對 Item 的處理。
每個 item pipeline 組件(有時稱之為“Item Pipeline”)是實現(xiàn)了簡單方法的 Python 類。他們接收到 Item 并通過它執(zhí)行一些行為,同時也決定此 Item 是否繼續(xù)通過 pipeline,或是被丟棄而不再進行處理。
以下是 item pipeline 的一些典型應用:
編寫你自己的 item pipeline 很簡單,每個 item pipeline 組件是一個獨立的 Python 類,同時必須實現(xiàn)以下方法:
每個 item pipeline 組件都需要調(diào)用該方法,這個方法必須返回一個 Item (或任何繼承類)對象, 或是拋出 DropItem 異常,被丟棄的 item 將不會被之后的 pipeline 組件所處理。
參數(shù):
此外,他們也可以實現(xiàn)以下方法:
當 spider 被開啟時,這個方法被調(diào)用。
參數(shù):
當 spider 被關(guān)閉時,這個方法被調(diào)用
參數(shù):
spider (Spider 對象) – 被關(guān)閉的 spider
If present, this classmethod is called to create a pipeline instance from a Crawler. It must return a new instance of the pipeline. Crawler object provides access to all Scrapy core components like settings and signals; it is a way for pipeline to access them and hook its functionality into Scrapy.
參數(shù):
crawler (Crawler object) – crawler that uses this pipeline
讓我們來看一下以下這個假設的 pipeline,它為那些不含稅(price_excludes_vat 屬性)的 item 調(diào)整了 price 屬性,同時丟棄了那些沒有價格的 item:
from scrapy.exceptions import DropItem
class PricePipeline(object):
vat_factor = 1.15
def process_item(self, item, spider):
if item['price']:
if item['price_excludes_vat']:
item['price'] = item['price'] * self.vat_factor
return item
else:
raise DropItem("Missing price in %s" % item)
以下 pipeline 將所有(從所有 spider 中)爬取到的 item,存儲到一個獨立地 items.jl 文件,每行包含一個序列化為 JSON 格式的 item:
import json
class JsonWriterPipeline(object):
def __init__(self):
self.file = open('items.jl', 'wb')
def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item
注解
JsonWriterPipeline 的目的只是為了介紹怎樣編寫 item pipeline,如果你想要將所有爬取的 item 都保存到同一個 JSON 文件, 你需要使用 Feed exports 。
In this example we’ll write items to MongoDB using pymongo. MongoDB address and database name are specified in Scrapy settings; MongoDB collection is named after item class.
The main point of this example is to show how to use from_crawler() method and how to clean up the resources properly.
注解
Previous example (JsonWriterPipeline) doesn’t clean up resources properly. Fixing it is left as an exercise for the reader. import pymongo
class MongoPipeline(object):
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
collection_name = item.__class__.__name__
self.db[collection_name].insert(dict(item))
return item
一個用于去重的過濾器,丟棄那些已經(jīng)被處理過的 item。讓我們假設我們的 item 有一個唯一的 id,但是我們 spider 返回的多個 item 中包含有相同的 id:
from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item
為了啟用一個 Item Pipeline 組件,你必須將它的類添加到 ITEM_PIPELINES 配置,就像下面這個例子:
ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
}
分配給每個類的整型值,確定了他們運行的順序,item 按數(shù)字從低到高的順序,通過 pipeline,通常將這些數(shù)字定義在 0-1000 范圍內(nèi)。