profile
Published on

Architecture with Nest: Event Sourcing - Parte II

Authors
  • avatar
    Name
    Leandro Simões
    Twitter

Event Sourcing é um padrão arquitetural onde o estado da aplicação é determinado pela sequência de eventos que ocorreram, em vez de armazenar apenas o estado atual. Cada mudança de estado é registrada como um evento imutável no Event Store.

Este artigo faz parte da Parte II de uma série onde estou implementando conceitos de arquitetura e padrões arquiteturais usando Nest. A implementação completa pode ser acessada em: https://github.com/lesimoes/architecture-with-nest/tree/part2

Benefícios

Como destacado por Greg Young em seu trabalho sobre CQRS e Event Sourcing, os principais benefícios incluem:

  1. Rastreamento Completo: Histórico completo de todas as mudanças de estado — o fluxo de eventos pode ser reexecutado para reconstruir estado ou para auditoria
  2. Replay de Eventos: Capacidade de reconstruir o estado em qualquer ponto no tempo
  3. Desacoplamento: Eventos podem ser consumidos por múltiplos handlers sem acoplamento
  4. Time Travel: Capacidade de visualizar o estado do sistema em qualquer momento histórico
  5. Consistência Otimista: Controle de versão previne conflitos de concorrência
  6. Versionamento de Modelo: Pode ajudar no versionamento e alterações de modelo, já que o estado atual não está "preso" a um esquema rígido
  7. Performance e Escalabilidade: Permite tratar o armazenamento como "append-only" (somente acréscimo), o que pode melhorar desempenho e escalabilidade em certos casos
  8. Particionamento: A abordagem de eventos facilita sharding e particionamento horizontal, pois o modelo de leitura pode ser distribuído e replicado

Trade-offs e Considerações

É importante considerar que Event Sourcing não é adequado para todos os domínios. Como discutido por Greg Young, existem alguns trade-offs:

Desafios

  1. Reconstrução de Estado: A reconstrução de estado pode implicar em "snapshots" para performance — gravar periodicamente o estado atual para não ter que aplicar todos os eventos desde o início. Neste projeto, contornamos esse problema fazendo um update com o saldo atual da conta sempre que um evento é persistido. No próximo artigo, mostraremos como implementar snapshots de forma mais eficiente.
  2. Complexidade: Lidar com consistência, versões de eventos, evolução de esquema de evento etc, exige disciplina e conhecimento especializado
  3. ROI: Nem todo domínio precisa de Event Sourcing — o retorno sobre investimento pode não valer o custo em domínios simples

Outras Considerações

  • Migração Incremental: Não é necessário adotar CQRS + Event Sourcing de uma só vez em todo o sistema. Pode-se começar a partir de pontos onde há necessidade clara (alto volume de leitura ou escrita, domínio crítico) e evoluir gradualmente
  • Custo vs Benefício: A adoção desses padrões implica mais complexidade — mais modelos, mais infraestrutura, mais especialização — então é importante avaliar se o ganho (em modelagem, desempenho, escalabilidade, manutenção) compensa o custo
  • Consistência Eventual: Em muitos casos, separar leitura e escrita e/ou usar replicação/eventos implica que a consistência imediata entre os modelos (comando vs consulta) pode não existir — ou seja, pode haver uma "consistência eventual"

Conceitos Fundamentais

Event Store

O Event Store é o repositório persistente onde todos os eventos são armazenados sequencialmente e de forma imutável. Cada evento contém:

  • streamId: Identificador do agregado que gerou o evento
  • type: Tipo do evento (ex: DepositMadeEvent, WithdrawMadeEvent)
  • position: Posição sequencial do evento no stream
  • data: Dados serializados do evento

Neste projeto, o Event Store é implementado usando MongoDB:

@Schema()
export class Event {
  @Prop()
  streamId: string

  @Prop()
  type: string

  @Prop()
  position: number

  @Prop({ type: SchemaTypes.Mixed })
  data: Record<string, any>
}

Aggregate Root Versionado

O aggregate root estende VersionedAggregateRoot, que adiciona controle de versão para garantir consistência otimista:

export class VersionedAggregateRoot extends AggregateRoot {
  public versionedId: string
  private [VERSION] = new Version(0)

  get version(): Version {
    return this[VERSION]
  }

  setVersion(version: Version): void {
    this[VERSION] = version
  }
}

A versão é usada para detectar conflitos de concorrência ao persistir eventos, garantindo que o agregado não esteja desatualizado.

Eventos de Domínio

Eventos de domínio representam algo que aconteceu no sistema e são imutáveis. Neste projeto, os eventos são definidos na camada de domínio:

export class DepositMadeEvent {
  constructor(
    public readonly accountId: string,
    public readonly amount: Money,
    public readonly balance: number
  ) {}
}

export class WithdrawMadeEvent {
  constructor(
    public readonly accountId: string,
    public readonly amount: Money,
    public readonly balance: number
  ) {}
}

Aplicação de Eventos

Os eventos são aplicados ao agregado através do método apply() do NestJS CQRS, mas com a flag skipHandler: true para evitar processamento imediato:

deposit(money: Money): void {
  this.validateAmount(money.amount);
  this.balance = this.balance.add(money);
  this.versionedId = this.id.id;
  this.apply(
    new DepositMadeEvent(this.id.id, money, this.balance.money.amount),
    { skipHandler: true },
  );
}

Persistência de Eventos

Event Store Publisher

O EventStorePublisher implementa IEventPublisher do NestJS CQRS e intercepta todos os eventos publicados, persistindo-os no Event Store:

@Injectable()
export class EventStorePublisher implements IEventPublisher {
  publish<T extends IEvent = IEvent>(event: T, dispatcher: VersionedAggregateRoot) {
    const serializableEvent = this.eventSerializer.serialize(event, dispatcher)
    return this.eventStore.persist(serializableEvent)
  }

  publishAll<T extends IEvent = IEvent>(events: T[], dispatcher: VersionedAggregateRoot) {
    const serializableEvents = events
      .map((event) => this.eventSerializer.serialize(event, dispatcher))
      .map((serializableEvent, index) => ({
        ...serializableEvent,
        position: dispatcher.version.value + index + 1,
      }))

    return this.eventStore.persist(serializableEvents)
  }
}

Serialização de Eventos

O EventSerializer converte eventos de domínio em um formato serializável para persistência:

serialize<T>(
  event: T,
  dispatcher: VersionedAggregateRoot,
): SerializableEvent<T> {
  const eventType = event?.constructor?.name as string;
  const aggregateId = dispatcher.versionedId;
  return {
    streamId: aggregateId,
    position: dispatcher.version.value + 1,
    type: eventType,
    data: this.toJSON(event),
  };
}

Mongo Event Store

O MongoEventStore é responsável pela persistência física dos eventos no MongoDB, usando transações para garantir atomicidade:

async persist(
  eventOrEvents: SerializableEvent | SerializableEvent[],
): Promise<void> {
  const events = Array.isArray(eventOrEvents)
    ? eventOrEvents
    : [eventOrEvents];

  const session = await this.eventStore.startSession();
  try {
    session.startTransaction();
    await this.eventStore.insertMany(events, { session, ordered: true });
    await session.commitTransaction();
  } catch (error: any) {
    await session.abortTransaction();
    const UNIQUE_CONSTRAINT_ERROR_CODE = 11000;
    if (error?.code === UNIQUE_CONSTRAINT_ERROR_CODE) {
      throw new Error('Events could not be persisted. Aggregate is stale.');
    }
    throw error;
  } finally {
    await session.endSession();
  }
}

O índice único em { streamId: 1, position: 1 } garante que não haja duplicação de eventos e detecta conflitos de versão.

Recuperação de Estado

Para recuperar o estado atual de um agregado, o repositório consulta a última versão no Event Store:

async findByNumber(accountNumber: AccountNumber): Promise<BankAccount | null> {
  const entity = await this.bankAccountRepository.findOne({
    where: { number: accountNumber.number },
  });

  if (!entity) {
    return null;
  }

  const bankAccount = BankAccountMapper.toDomain(entity);
  const lastVersion = await this.eventStore.getLastVersion(
    bankAccount.versionedId,
  );
  bankAccount.setVersion(new Version(lastVersion));

  return bankAccount;
}

Fluxo Completo

  1. Command Handler recebe um comando (ex: MakeDepositCommand)
  2. Repository recupera o agregado do banco de dados e consulta a versão no Event Store
  3. Event Publisher é mesclado ao contexto do agregado usando mergeObjectContext()
  4. Método de Domínio (ex: deposit()) aplica o evento com skipHandler: true
  5. Commit do agregado dispara a publicação do evento
  6. Event Store Publisher intercepta e serializa os eventos
  7. Mongo Event Store persiste os eventos em uma transação
  8. Repository atualiza o estado atual do agregado no banco de dados

Conclusão

Ainda vamos evoluir esse projeto com conceitos como snapshot, saga pattern e implementar bancos diferentes de leitura e escrita, o que irá nos obrigar a considerar a consistência eventual.

Artigo do Greg Young: CQRS Documents