Chamagne Bastien
Développeur indépendant à Pau

Un système de notifications distribué

Modèle Acteur Système Distribué Elixir Erlang

Objectif

Nous allons créer un système de notification distribué en utilisant la distribution native et le module :pg.

Le module :pg

Ce module permet d’avoir un registre d’acteur distribué. La différence par rapport à :global c’est qu’on peut avoir plusieurs acteurs derrière un même nom. Ce module fonctionne avec la distribution native.

Concrètement, on l’utilise pour faire du pub/sub (ce que l’on utilisera ici), du broadcasting et des pools d’acteurs.

Le système de notifications

On va rester simple et faire une dernière fois tout dans iex. J’ai choisi d’avoir deux modules : Un GenServer pour chaque souscription qui affichera dans la console la notification et un module Aggregate qui permet de l’abstraire.

Bonne pratique : Le module d’aggrégat permet de refactorer par la suite sans avoir d’impact sur les autres modules qui utilisent les notifications. De plus, c’est ce module que l’on va tester, pas les modules sous-jacents qui sont des détails d’implémentation.

defmodule Notification.Subscriber do
  use GenServer

  defstruct [:subscriber_name, :group_name]

  # API
  def start_link({subscriber_name, group_name}) do
    GenServer.start_link(__MODULE__, {subscriber_name, group_name})
  end

  def stop(pid) do
    GenServer.stop(pid)
  end

  def publish(pid, message) do
    GenServer.cast(pid, {:notification, message})
  end

  # CALLBACKS
  @impl true
  def init({subscriber_name, group_name}) do
    :pg.join(group_name, self())
    {:ok, %__MODULE__{subscriber_name: subscriber_name, group_name: group_name}}
  end

  @impl true
  def handle_cast({:notification, message}, state) do
    IO.puts("[#{state.group_name}] #{state.subscriber_name} received: #{message}")
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, state) do
    :pg.leave(state.group_name, self())
    :ok
  end
end

Dans la fonction init/1, on voit l’utilisation de :pg.join/2. Grâce à cet appel, on enregistre le registre distribué. Tous les abonnés à un même groupe seront dans le même registre. Le module expose une fonction publish/2 qui est un cast (un message dont on n’attend pas de réponse) vers le serveur. Celui-ci affichera dans la console une ligne de notification.

Voyons maintenant le module d’aggregat qui va abstraire l’autre :

defmodule Notification do
  def subscribe(group_name, subscriber_name)
      when is_atom(group_name) and
             is_binary(subscriber_name) do
    Notification.Subscriber.start_link({subscriber_name, group_name})
  end

  def unsubscribe(pid) when is_pid(pid) do
    Notification.Subscriber.stop(pid)
  end

  def publish(group_name, message) when is_atom(group_name) and is_binary(message) do
    members = :pg.get_members(group_name)

    Enum.each(members, fn pid ->
      Notification.Subscriber.publish(pid, message)
    end)
  end

  def list_subscribers(group_name) when is_atom(group_name) do
    :pg.get_members(group_name)
  end
end

Bonne pratique : L’utilisation des guards partout dans ce module est un garde-fou. Si les checks passent ici, on peut avoir la certitude dans les modules enfants du “type” des variables.

Pour tester, ouvrez plusieurs noeuds nommés :

iex --name node1@127.0.0.1
iex --name node2@127.0.0.1

Le module :pg doit être démarré manuellement sur chaque nœud :

iex(node1@127.0.0.1)1> :pg.start_link
iex(node2@127.0.0.1)1> :pg.start_link

Sur node1, abonnons Alice aux notifications de sports :

iex(node1@127.0.0.1)2> Notification.subscribe(:sports, "Alice")
{:ok, #PID<0.129.0>}

Sur node2, abonnons Bob aux notifications de sports et listons les abonnés :

iex(node2@127.0.0.1)2> Notification.subscribe(:sports, "Bob")
{:ok, #PID<0.123.0>}
iex(node2@127.0.0.1)3> Notification.list_subscribers(:sports)
[#PID<0.123.0>]

Ah ! Un seul abonné… ce n’est pas ce que l’on attendait. En fait, c’est parce que les nœuds ne sont pas encore connectés ! Pour ce faire, il suffit d’utiliser Node.connect/1 sur un des nœuds :

iex(node1@127.0.0.1)3> Node.connect :"node2@127.0.0.1"
true
iex(node1@127.0.0.1)4> Notification.list_subscribers(:sports)
[#PID<0.129.0>, #PID<20340.126.0>]

Voilà, le module :pg a fait son travail : à peine les nœuds se connectent qu’ils partagent déjà leurs registres. Enfin, il reste à tester l’envoi de notifications :

iex(node1@127.0.0.1)5> Notification.publish(:sports, "Match X vs Y starting in 5min")
:ok

Alice reçoit la notification sur node1 :

[sports] Alice received: "Match X vs Y starting in 5min"

Bob reçoit la notification sur node2 :

[sports] Bob received: "Match X vs Y starting in 5min"

Conclusion

Comme on a pu le voir, le module :pg s’occupe vraiment de tout pour nous. On n’a vraiment pas l’impression de travailler avec plusieurs nœuds, le langage s’occupe de tout pour nous. Le modèle d’acteur est véritablement un modèle de programmation concurrent. D’ailleurs, ici on utilise deux nœuds, mais on pourrait en avoir des centaines que ce serait pareil. Le code est quasiment le même que si j’avais utilisé Registry, sans l’option {:via, _, _}.

Dans le prochain article, on parlera de supervision.