Objectif
Nous allons créer un système de notification distribué en utilisant la distribution native et le module :pg.
Le module :pg
Ce module permet d’avoir un registre d’acteur distribué. La différence par rapport à :global c’est qu’on peut avoir plusieurs acteurs derrière un même nom. Ce module fonctionne avec la distribution native.
Concrètement, on l’utilise pour faire du pub/sub (ce que l’on utilisera ici), du broadcasting et des pools d’acteurs.
Le système de notifications
On va rester simple et faire une dernière fois tout dans iex. J’ai choisi d’avoir deux modules : Un GenServer pour chaque souscription qui affichera dans la console la notification et un module Aggregate qui permet de l’abstraire.
Bonne pratique : Le module d’aggrégat permet de refactorer par la suite sans avoir d’impact sur les autres modules qui utilisent les notifications. De plus, c’est ce module que l’on va tester, pas les modules sous-jacents qui sont des détails d’implémentation.
defmodule Notification.Subscriber do
use GenServer
defstruct [:subscriber_name, :group_name]
# API
def start_link({subscriber_name, group_name}) do
GenServer.start_link(__MODULE__, {subscriber_name, group_name})
end
def stop(pid) do
GenServer.stop(pid)
end
def publish(pid, message) do
GenServer.cast(pid, {:notification, message})
end
# CALLBACKS
@impl true
def init({subscriber_name, group_name}) do
:pg.join(group_name, self())
{:ok, %__MODULE__{subscriber_name: subscriber_name, group_name: group_name}}
end
@impl true
def handle_cast({:notification, message}, state) do
IO.puts("[#{state.group_name}] #{state.subscriber_name} received: #{message}")
{:noreply, state}
end
@impl true
def terminate(_reason, state) do
:pg.leave(state.group_name, self())
:ok
end
end
Dans la fonction init/1, on voit l’utilisation de :pg.join/2. Grâce à cet appel, on enregistre le registre distribué. Tous les abonnés à un même groupe seront dans le même registre. Le module expose une fonction publish/2 qui est un cast (un message dont on n’attend pas de réponse) vers le serveur. Celui-ci affichera dans la console une ligne de notification.
Voyons maintenant le module d’aggregat qui va abstraire l’autre :
defmodule Notification do
def subscribe(group_name, subscriber_name)
when is_atom(group_name) and
is_binary(subscriber_name) do
Notification.Subscriber.start_link({subscriber_name, group_name})
end
def unsubscribe(pid) when is_pid(pid) do
Notification.Subscriber.stop(pid)
end
def publish(group_name, message) when is_atom(group_name) and is_binary(message) do
members = :pg.get_members(group_name)
Enum.each(members, fn pid ->
Notification.Subscriber.publish(pid, message)
end)
end
def list_subscribers(group_name) when is_atom(group_name) do
:pg.get_members(group_name)
end
end
Bonne pratique : L’utilisation des
guardspartout dans ce module est un garde-fou. Si les checks passent ici, on peut avoir la certitude dans les modules enfants du “type” des variables.
Pour tester, ouvrez plusieurs noeuds nommés :
iex --name node1@127.0.0.1
iex --name node2@127.0.0.1
Le module :pg doit être démarré manuellement sur chaque nœud :
iex(node1@127.0.0.1)1> :pg.start_link
iex(node2@127.0.0.1)1> :pg.start_link
Sur node1, abonnons Alice aux notifications de sports :
iex(node1@127.0.0.1)2> Notification.subscribe(:sports, "Alice")
{:ok, #PID<0.129.0>}
Sur node2, abonnons Bob aux notifications de sports et listons les abonnés :
iex(node2@127.0.0.1)2> Notification.subscribe(:sports, "Bob")
{:ok, #PID<0.123.0>}
iex(node2@127.0.0.1)3> Notification.list_subscribers(:sports)
[#PID<0.123.0>]
Ah ! Un seul abonné… ce n’est pas ce que l’on attendait. En fait, c’est parce que les nœuds ne sont pas encore connectés ! Pour ce faire, il suffit d’utiliser Node.connect/1 sur un des nœuds :
iex(node1@127.0.0.1)3> Node.connect :"node2@127.0.0.1"
true
iex(node1@127.0.0.1)4> Notification.list_subscribers(:sports)
[#PID<0.129.0>, #PID<20340.126.0>]
Voilà, le module :pg a fait son travail : à peine les nœuds se connectent qu’ils partagent déjà leurs registres. Enfin, il reste à tester l’envoi de notifications :
iex(node1@127.0.0.1)5> Notification.publish(:sports, "Match X vs Y starting in 5min")
:ok
Alice reçoit la notification sur node1 :
[sports] Alice received: "Match X vs Y starting in 5min"
Bob reçoit la notification sur node2 :
[sports] Bob received: "Match X vs Y starting in 5min"
Conclusion
Comme on a pu le voir, le module :pg s’occupe vraiment de tout pour nous. On n’a vraiment pas l’impression de travailler avec plusieurs nœuds, le langage s’occupe de tout pour nous. Le modèle d’acteur est véritablement un modèle de programmation concurrent. D’ailleurs, ici on utilise deux nœuds, mais on pourrait en avoir des centaines que ce serait pareil. Le code est quasiment le même que si j’avais utilisé Registry, sans l’option {:via, _, _}.
Dans le prochain article, on parlera de supervision.