December 11, 2020

Using Event Sourcing and CQRS with Incident - Part 2

Event Sourcing and CQRS are design patterns that are great for some domains. The Incident library will help implement them without compromising other parts of your application.

Using Event Sourcing and CQRS with Incident - Part 2

This is the second part of a series of posts that I will present on how your application can use Event Sourcing and CQRS for specific domains with an open-source library that I am developing called Incident. If you haven't read the first part I highly recommend it as it will describe the library goals, some important Event Sourcing and CQRS concepts, library configuration, and basic usage. Check it out in Using Event Sourcing and CQRS with Incident - Part 1.

To recap, we are using Incident for the Account domain in a Bank system. In the first part we:

  • set up the library in the application;
  • defined the projection schema and migration;
  • defined a command for opening an account;
  • defined an event when the account is opened;
  • defined the aggregate state data structure;
  • defined the aggregate logic to execute the open account command and to apply the account opened event;
  • defined the logic in the event handler for the account opened event;
  • defined the command handler;

In this second part, we will be adding one more scenario, deposit money into the bank account.

Deposit Money Command

Let's define a Deposit Money command that will contain all the needed data. You can use Elixir structs or Ecto Schemas with Changesets, as long as it implements valid?/1:

defmodule Bank.Commands.DepositMoney do
  @behaviour Incident.Command

  use Ecto.Schema
  import Ecto.Changeset

  @primary_key false
  embedded_schema do
    field(:aggregate_id, :string)
    field(:amount, :integer)
  end

  @required_fields ~w(aggregate_id amount)a

  @impl true
  def valid?(command) do
    data = Map.from_struct(command)

    %__MODULE__{}
    |> cast(data, @required_fields)
    |> validate_required(@required_fields)
    |> validate_number(:amount, greater_than: 0)
    |> Map.get(:valid?)
  end
end

Money Deposited Event

We also need a data structure that will define the data for our Money Deposited event. Similar to commands, event data can be implemented using Ecto Schema or Elixir structs.

defmodule Bank.Events.MoneyDeposited do
  use Ecto.Schema

  @primary_key false
  embedded_schema do
    field(:aggregate_id, :string)
    field(:amount, :integer)
    field(:version, :integer)
  end
end

Aggregate Logic

An Incident aggregate will implement two functions for each scenario:

  • execute/1 will receive a command and based on any logic return an event or an error tuple, and;
  • apply/2 that will receive an event and a state, and return a new state;

For our depositing money logic, the only condition we check is the existence of the account when executing the command otherwise we return an error tuple. When applying the event, we update the aggregate state with the new balance.

defmodule Bank.BankAccount do
  @behaviour Incident.Aggregate

  alias Bank.BankAccountState
  alias Bank.Commands.{DepositMoney, OpenAccount}
  alias Bank.Events.{AccountOpened, MoneyDeposited}
  
  # Hidden code for execute the Open Account command
  
  @impl true
  def execute(%DepositMoney{aggregate_id: aggregate_id, amount: amount}) do
    case BankAccountState.get(aggregate_id) do
      %{aggregate_id: aggregate_id} = state when not is_nil(aggregate_id) ->
        new_event = %MoneyDeposited{
          aggregate_id: aggregate_id,
          amount: amount,
          version: state.version + 1
        }

        {:ok, new_event, state}

      %{aggregate_id: nil} ->
        {:error, :account_not_found}
    end
  end

  # Hidden code for apply the Account Opened event
  
  @impl true
  def apply(%{event_type: "MoneyDeposited"} = event, state) do
    %{
      state
      | balance: state.balance + event.event_data["amount"],
        version: event.version,
        updated_at: event.event_date
    }
  end
end

Event Handler Logic

As the Event Handler is the connection between the command side and the query side in the domain, we need to specify what we want to happen when the Money Deposit event is successful.

An Incident event handler implements listen/2 that pattern matches the event type. In the MoneyDeposited example, we ask the aggregate to return the new state based on its logic, and we build the data to be projected in our bank account projection, including the required fields version, event_id, and event_date.

defmodule Bank.BankAccountEventHandler do
  @behaviour Incident.EventHandler

  alias Bank.Projections.BankAccount
  alias Bank.BankAccount, as: Aggregate
  alias Incident.ProjectionStore

  # Hidden code for handle the Account Opened event
  
  @impl true
  def listen(%{event_type: "MoneyDeposited"} = event, state) do
    new_state = Aggregate.apply(event, state)

    data = %{
      aggregate_id: new_state.aggregate_id,
      balance: new_state.balance,
      version: event.version,
      event_id: event.event_id,
      event_date: event.event_date
    }

    ProjectionStore.project(BankAccount, data)
  end
end

Play Time

As the Command Handler was already defined in the first part of this series and does not require any change as all logic is handled by Incident, we can go straight to the terminal.

# Let's generate an account number for the new bank account
iex 1 > account_number = Ecto.UUID.generate()
"f004d517-8b86-45b4-bdfa-29ac41dd3f51"
iex 2 > command_open = %Bank.Commands.OpenAccount{aggregate_id: account_number}
%Bank.Commands.OpenAccount{aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51"}

# Opening a new bank account
iex 3 > Bank.BankAccountCommandHandler.receive(command_open)
{:ok,
 %Incident.EventStore.Postgres.Event{
   __meta__: #Ecto.Schema.Metadata<:loaded, "events">,
   aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
   event_data: %{
     "account_number" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
     "aggregate_id" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
     "version" => 1
   },
   event_date: #DateTime<2020-12-11 16:23:08.162372Z>,
   event_id: "369cc1d0-e973-42b8-99bf-a15025936fb2",
   event_type: "AccountOpened",
   id: 90,
   inserted_at: #DateTime<2020-12-11 16:23:08.162457Z>,
   version: 1
 }}
 
 # If we read from the projection we get the current state of the account
 iex 4 > Incident.ProjectionStore.get(Bank.Projections.BankAccount, account_number)
 %Bank.Projections.BankAccount{
  __meta__: #Ecto.Schema.Metadata<:loaded, "bank_accounts">,
  account_number: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
  aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
  balance: 0,
  event_date: #DateTime<2020-12-11 16:23:08.162372Z>,
  event_id: "369cc1d0-e973-42b8-99bf-a15025936fb2",
  id: 2,
  inserted_at: #DateTime<2020-12-11 16:23:08.189004Z>,
  updated_at: #DateTime<2020-12-11 16:23:08.189004Z>,
  version: 1
}

# Let's create a command for depositing money
iex 5 > command_deposit = %Bank.Commands.DepositMoney{aggregate_id: account_number, amount: 100}
%Bank.Commands.DepositMoney{
  aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
  amount: 100
}

# Depositing money
iex 6 > Bank.BankAccountCommandHandler.receive(command_deposit)
{:ok,
 %Incident.EventStore.Postgres.Event{
   __meta__: #Ecto.Schema.Metadata<:loaded, "events">,
   aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
   event_data: %{
     "aggregate_id" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
     "amount" => 100,
     "version" => 2
   },
   event_date: #DateTime<2020-12-11 16:31:36.460303Z>,
   event_id: "ce4f72b9-d3ec-47c9-8533-9216c59803e6",
   event_type: "MoneyDeposited",
   id: 91,
   inserted_at: #DateTime<2020-12-11 16:31:36.460401Z>,
   version: 2
 }}

# Let's make another deposit (same command for brevity)
iex 7 > Bank.BankAccountCommandHandler.receive(command_deposit)
{:ok,
 %Incident.EventStore.Postgres.Event{
   __meta__: #Ecto.Schema.Metadata<:loaded, "events">,
   aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
   event_data: %{
     "aggregate_id" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
     "amount" => 100,
     "version" => 3
   },
   event_date: #DateTime<2020-12-11 16:33:29.432375Z>,
   event_id: "618903b5-8304-4151-a94d-43ed0b763c39",
   event_type: "MoneyDeposited",
   id: 92,
   inserted_at: #DateTime<2020-12-11 16:33:29.432486Z>,
   version: 3
 }}

# We can list all events for the aggregate id
iex 8> Incident.EventStore.get(account_number)
[
  %Incident.EventStore.Postgres.Event{
    __meta__: #Ecto.Schema.Metadata<:loaded, "events">,
    aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
    event_data: %{
      "account_number" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
      "aggregate_id" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
      "version" => 1
    },
    event_date: #DateTime<2020-12-11 16:23:08.162372Z>,
    event_id: "369cc1d0-e973-42b8-99bf-a15025936fb2",
    event_type: "AccountOpened",
    id: 90,
    inserted_at: #DateTime<2020-12-11 16:23:08.162457Z>,
    version: 1
  },
  %Incident.EventStore.Postgres.Event{
    __meta__: #Ecto.Schema.Metadata<:loaded, "events">,
    aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
    event_data: %{
      "aggregate_id" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
      "amount" => 100,
      "version" => 2
    },
    event_date: #DateTime<2020-12-11 16:31:36.460303Z>,
    event_id: "ce4f72b9-d3ec-47c9-8533-9216c59803e6",
    event_type: "MoneyDeposited",
    id: 91,
    inserted_at: #DateTime<2020-12-11 16:31:36.460401Z>,
    version: 2
  },
  %Incident.EventStore.Postgres.Event{
    __meta__: #Ecto.Schema.Metadata<:loaded, "events">,
    aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
    event_data: %{
      "aggregate_id" => "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
      "amount" => 100,
      "version" => 3
    },
    event_date: #DateTime<2020-12-11 16:33:29.432375Z>,
    event_id: "618903b5-8304-4151-a94d-43ed0b763c39",
    event_type: "MoneyDeposited",
    id: 92,
    inserted_at: #DateTime<2020-12-11 16:33:29.432486Z>,
    version: 3
  }
]

# Now, after the 2 deposits we made, the projection is properly updated.
# Note the version, event_date and event_id fields that are related to the
# last event that updated the projection.
iex 9> Incident.ProjectionStore.get(Bank.Projections.BankAccount, account_number)
%Bank.Projections.BankAccount{
  __meta__: #Ecto.Schema.Metadata<:loaded, "bank_accounts">,
  account_number: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
  aggregate_id: "f004d517-8b86-45b4-bdfa-29ac41dd3f51",
  balance: 200,
  event_date: #DateTime<2020-12-11 16:33:29.432375Z>,
  event_id: "618903b5-8304-4151-a94d-43ed0b763c39",
  id: 2,
  inserted_at: #DateTime<2020-12-11 16:23:08.189004Z>,
  updated_at: #DateTime<2020-12-11 16:33:29.438631Z>,
  version: 3
}

Recap

In this second part, we added a brand-new but simple scenario in our application. In situations like that, we basically need a command, an event, some logic in the aggregate, and some logic in the event handler.

What Comes Next?

The next posts will cover another simple, such as withdraw money from the account, and complex scenarios, where one event would trigger new commands in sequence.

Check the Incident Github repo for the complete bank application example, including lots of integration tests. Feel free to share it and contribute, the library is somehow new, and any feedback is welcomed. See you soon!