Elixir轻量级进程如何成就百万级并发爬虫?

Elixir凭借其Actor模型与轻量级进程,天然支持高并发。本爬虫利用此优势,结合HTTPoison与Floki,高效实现多任务网页抓取、链接提取与数据解析,是处理大规模数据采集的理想方案。

下面是一个使用 Elixir 实现的高并发爬虫示例。Elixir 的 Actor 模型和轻量级进程使其非常适合构建高并发的网络应用。

defmodule ConcurrentCrawlerdo

  @moduledoc """

  高并发网页爬虫实现

  使用 Elixir 的并发特性和 HTTPoison 库

  """

use Task

require Logger

  @user_agent "Mozilla/5.0 (compatible; ConcurrentCrawler/1.0; +http://example.com/bot.html)"

  @default_concurrency50

  @request_timeout10_000

  def start_link(urls, concurrency \\ @default_concurrency) when is_list(urls) do

   Task.start_link(fn -> crawl(urls, concurrency) end)

  end

  @doc """

  主爬取函数

  """

  def crawl(urls, concurrency) do

   # 创建任务监督器

{:ok, supervisor} = Task.Supervisor.start_link(max_children:concurrency)

   # 创建流处理管道

   urls

|> Stream.map(&String.trim/1)

|> Stream.filter(&valid_url?/1)

|> Stream.uniq()

|> Task.Supervisor.async_stream_nolink(

     supervisor,

     &process_url/1,

     max_concurrency:concurrency,

     timeout: @request_timeout* 2

   )

|> Stream.run()

  end

  defp process_url(url) do

   Logger.info("Processing: #{url}")

   case fetch_url(url) do

{:ok, %{status_code:200, body:body}} ->

       # 提取数据并解析链接

       data = extract_data(body, url)

       links = extract_links(body, url)


       # 存储数据(这里只是打印,实际应用中可存入数据库)

       Logger.info("Extracted #{length(links)} links from #{url}")

       Logger.debug("Data: #{inspect(data, limit: 3)}")


       # 可以选择继续爬取新发现的链接

       # crawl(links, concurrency) # 注意:这需要额外的循环控制逻辑


{:ok, url, data, links}

{:ok, response} ->

       Logger.warning("Failed to fetch #{url}: Status #{response.status_code}")

{:error, url, response.status_code}

{:error, reason} ->

       Logger.error("Error fetching #{url}: #{inspect(reason)}")

{:error, url, reason}

   end

  end

  defp fetch_url(url) do

   headers = [

{"User-Agent", @user_agent},

{"Accept", "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"}

   ]

   HTTPoison.get(url, headers,

     timeout: @request_timeout,

     recv_timeout: @request_timeout,

     follow_redirect:true

   )

  end

  defp extract_links(html, base_url) do

case Floki.parse_document(html) do

{:ok, document} ->

       document

|> Floki.find("a")

|> Floki.attribute("href")

|> Enum.map(&absolute_url(&1, base_url))

|> Enum.filter(&valid_url?/1)

|> Enum.uniq()

     _ ->

       []

   end

  end

  defp extract_data(html, url) do

   # 这里可以根据需要提取特定数据

   # 例如标题、元描述、正文内容等


case Floki.parse_document(html) do

{:ok, document} ->

       title =

         document

|> Floki.find("title")

|> Floki.text()

|> String.trim()

       description =

         document

|> Floki.find("meta[name=\"description\"]")

|> Floki.attribute("content")

|> List.first()

|> Kernel.||("")

       %{

         url: url,

         title: title,

         description: description,

         crawled_at: DateTime.utc_now()

       }

     _ ->

       %{url: url, title: "", description: "", crawled_at: DateTime.utc_now()}

   end

  end

  defp absolute_url(url, base) when is_binary(url) do

   case URI.parse(url) do

     %URI{scheme: scheme} when scheme in ["http", "https"] ->

       url

     _ ->

       base_uri = URI.parse(base)

       absolute_path = Path.expand(url, base_uri.path || "/")

       URI.merge(base, absolute_path) |> to_string()

   end

  end

  defp valid_url?(url) do

   case URI.parse(url) do

     %URI{scheme: scheme, host: host} when scheme in ["http", "https"] and not is_nil(host) ->

       true

     _ ->

       false

   end

  end

end

# 配置和启动应用

defmodule Crawler.Application do

  use Application

  def start(_type, _args) do

   # 配置HTTPoison

   :hackney_pool.start_pool(:crawler_pool, [

     timeout: 15_000,

     max_connections: 100

   ])

   children = [

     {Task.Supervisor, name: Crawler.TaskSupervisor},

     {ConcurrentCrawler, [["https://example.com/page1", "https://example.com/page2"]]}

   ]

   opts = [strategy: :one_for_one, name: Crawler.Supervisor]

   Supervisor.start_link(children, opts)

  end

end

具体说明

1、添加依赖到 mix.exs:

defp depsdo

  [

{:httpoison, "~> 1.8"},

{:floki, "~> 0.32.0"},

{:hackney, "~> 1.17"}

  ]

end

2、运行爬虫:

# 直接运行

urls = ["https://example.com", "https://example.org"]

ConcurrentCrawler.crawl(urls, 20) # 20是并发数

# 或者作为监督树的一部分启动

{:ok, pid} = ConcurrentCrawler.start_link(urls, 20)

这个实现利用了 Elixir 的并发特性,能够高效地处理大量网页抓取任务,同时保持良好的可维护性和扩展性。

总之,该爬虫充分展现了Elixir在并发处理上的强大能力。代码结构清晰,易于扩展,您可根据实际需求添加代理、去重或分布式存储等功能,以构建健壮的爬虫系统。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容