Elixir Agent详解

本章我们将创建一个名为 KV.Bucket 的模块。这个模块负责存储可被不同进程读写的键值对。

如果你跳过了“入门”手册,或者是太久以前读的,那么建议你最好重新阅读一下关于 进程 的那一章。 它是本节所内容的起点。

状态的麻烦

Elixir是一种“(变量值)不可变”的语言。默认情况下,没有什么是被共享的。 如果想要提供某种状态,通过其创建可以从不同地方访问的“桶”,我们有两种选择:

我们之前介绍过进程,但 ETS 是个新东西,在后面的章节中再去探讨。 而当用到进程时,我们很少会去自己动手从底层做起,而是用Elixir 和 OTP 中抽象出来的东西代替:

  • Agent - 对状态简单的封装
  • GenServer - “通用的服务器”(进程)。它封装了状态,提供了同步或异步调用,支持代码热更新等等
  • GenEvent - “通用的事件”管理器,允许向多个接收者发布事件消息
  • Task - 计算处理的异步单元,可以派生出进程并稍后收集计算结果

我们在本“进阶”手册中会逐一讨论这些抽象物。 记住它们都是在进程基础上实现的,使用Erlang虚拟机提供的基本特性, 如sendreceivespawnlink

Agents

Agent 是对状态简单的封装。 如果你想要一个可以保存状态的地方(进程),那么Agent就是不二之选。 让我们在工程里启动一个iex对话:

$iex -S mix

然后“玩弄”一下Agent:

iex> {:ok, agent} = Agent.start_link fn -> [] end
{:ok, #PID<0.57.0>}
iex> Agent.update(agent, fn list -> ["eggs"|list] end)
:ok
iex> Agent.get(agent, fn list -> list end)
["eggs"]
iex> Agent.stop(agent)
:ok

这里用某个初始状态(空列表)启动了一个agent,然后执行了一个命令来修改这个状态, 加了一个新的列表项到头部。Agent.update/3的第二个参数是一个匿名函数: 它使用agent当前状态为输入,返回想要的新状态。 最终我们获取整个列表。Agent.get/3函数的第二个参数是个匿名函数: 它使用当前状态为输入,返回的值就是Agent.get/3的返回值。 一旦我们用完agent,我们调用Agent.stop/1来终止agent进程。

现在我们用Agent来实现KV.Bucket。当时在开始之前,我们先写些测试。 新建文件test/kv/bucket_test.exs(回想一下.exs文件),内容是:

defmodule KV.BucketTest do
  use ExUnit.Case, async: true

  test "stores values by key" do
    {:ok, bucket} = KV.Bucket.start_link
    assert KV.Bucket.get(bucket, "milk") == nil

    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end

我们的第一条测试很直白:启动一个KV.Bucket,然后执行get/2put/2操作。 最后判断结果。我们不需要显式地停止agent进程。 因为该test里面用到的agent进程是链接到测试进程的,测试进程一结束它就会跟着结束。

同时还要注意我们向 ExUnit.Case 传递了一个 async:true 的选项。 这个选项使得该测试用例与其它同样包含 :async 选项的测试用例并行执行。 这种方式能够更好地利用计算机多核的能力。但要注意,这样的话,测试用例不能依赖或改变某些全局的值。 比如测试需要向文件系统里写入文字,或者注册进程,或者访问数据库等。 你在放置:async标记前必须考虑会不会在两个测试之间造成资源竞争。

不管是不是异步执行的,很明显我们的测试会失败,因为该实现的功能一个都没实现。

为了修复失败的用例,我们来创建文件 lib/kv/bucket.ex,输入以下内容。 你可以不看下方的代码,自己随便尝试着创建agent的行为:

defmodule KV.Bucket do
  @doc """
  Starts a new bucket.
  """
  def start_link do
    Agent.start_link(fn -> %{} end)
  end

  @doc """
  Gets a value from the `bucket` by `key`.
  """
  def get(bucket, key) do
    Agent.get(bucket, &Map.get(&1, key))
  end

  @doc """
  Puts the `value` for the given `key` in the `bucket`.
  """
  def put(bucket, key, value) do
    Agent.update(bucket, &Map.put(&1, key, value))
  end
end

我们使用图(Map)来存储我们的键和值。函数捕捉符号 & 在《入门》中介绍过。 现在KV.Bucket模块定义好了,测试都通过了!你可以执行mix test试试。

ExUnit回调函数

在继续为 KV.Bucket 加入更多功能之前,先讲一讲ExUnit的回调函数。 你可能已经想到,每一个 KV.Bucket 的测试用例都需要用到bucket。 它要在该测试用例启动时设置好,还要在该测试用例结束时停止。 幸运的是,ExUnit支持回调函数,使我们跳过这重复机械的任务。

让我们使用回调机制重写刚才的测试:

defmodule KV.BucketTest do
  use ExUnit.Case, async: true

  setup do
    {:ok, bucket} = KV.Bucket.start_link
    {:ok, bucket: bucket}
  end

  test "stores values by key", %{bucket: bucket} do
    assert KV.Bucket.get(bucket, "milk") == nil

    KV.Bucket.put(bucket, "milk", 3)
    assert KV.Bucket.get(bucket, "milk") == 3
  end
end

我们首先利用 setup/1 宏,创建了设置bucket的回调函数。 这个函数会在每条测试用例执行前被执行一次,并且是与测试在同一个进程里。

注意我们需要一个机制来传递创建好的 bucket 的pid给测试用例。 我们使用 测试上下文 来达到这个目的。 当在回调函数里返回 {:ok, bucket: bucket} 的时候, ExUnit会把该返回值元祖(字典)的第二个元素merge进测试上下文中。 测试上下文是一个图,我们可以在测试用例的定义中匹配它,从而获取这个上下文的值给用例中的代码使用:

test "stores values by key", %{bucket: bucket} do
  # `bucket` is now the bucket from the setup block
end

更多信息可以参考ExUnit.Case模块文档, 以及回调函数

其它Agent行为

除了“读取”或者“修改”agent的状态,agent还允许我们使用 函数Agent.get_and_update/2 “读取并修改”它维持的状态。 我们用这个函数来实现删除 KV.Bucket.delete/2 功能---从bucket中删除一个值,并返回该值:

@doc """
Deletes `key` from `bucket`.

Returns the current value of `key`, if `key` exists.
"""
def delete(bucket, key) do
  Agent.get_and_update(bucket, &Map.pop(&1, key))
end

现在轮到你来给上面的代码写个测试啦。 你可以阅读Agent模块的文档 获取更多信息。

Agent中的C/S模式

在进入下一章之前,让我们讨论一下agent中的C/S二元模式。 先来展开刚刚写好的delete/2 函数:

def delete(bucket, key) do
  Agent.get_and_update(bucket, fn dict->
    Map.pop(dict, key)
  end)
end

我们传递给agent的函数中的任何东西,都会出现在agent的进程里。 在这里,因为agent进程负责接收和回复我们的消息,因此可以说agent进程就是个服务器(服务端)。 而那个方法之外的任何东西,都被看成是在客户端的范围内。

这个区别很重要。如果有大量的工作要做,你必须考虑这个工作是放在客户端还是在服务器上执行。比如:

def delete(bucket, key) do
  :timer.sleep(1000) # puts client to sleep
  Agent.get_and_update(bucket, fn dict ->
    :timer.sleep(1000) # puts server to sleep
    Map.pop(dict, key)
  end)
end

当服务器上执行一个很耗时的工作时,所有其它对该服务器的请求都必须等待,直到那个工作完成。 这会造成客户端的超时。

下一章我们会探索通用服务器GenServer,它在概念上对服务器与客户端的隔离更明显。

教程来源于Github,感谢Jancd大佬的无私奉献,致敬!

技术教程推荐

快速上手Kotlin开发 -〔张涛〕

从0开发一款iOS App -〔朱德权〕

Node.js开发实战 -〔杨浩〕

说透中台 -〔王健〕

分布式系统案例课 -〔杨波〕

恋爱必修课 -〔李一帆〕

超级访谈:对话张雪峰 -〔张雪峰〕

Serverless进阶实战课 -〔静远〕

云计算的必修小课 -〔吕蕴偲〕