
Olá, Habr! Decidi me afastar de Scala, Idris e outros FP por um tempo e falar um pouco sobre o Event Store - um banco de dados no qual os eventos podem ser salvos em fluxos de eventos. Como no bom e velho livro, também temos 4 mosqueteiros, e o quarto é DDD. Primeiro, eu uso o Event Storming para selecionar comandos, eventos e entidades associados a eles. Então, com base neles, salvarei o estado do objeto e o restaurarei. Eu estarei fazendo um TodoList regular neste artigo. Para obter detalhes, bem-vindo em cat.
Conteúdo
- Os Três Mosqueteiros - Fonte de Eventos, Tempestade de Eventos e Armazenamento de Eventos - Entre na Batalha: Parte 1 - Experimentando o Armazenamento de Eventos DB
Links
Imagens de fontes docker image
Armazenamento de
eventos Soucing de
eventos Tempestade de eventos
Na verdade, o Armazenamento de Eventos é um banco de dados projetado para armazenar eventos. Ela também sabe como criar assinaturas de eventos para que possam ser processados de alguma forma. Existem também projeções que também reagem a eventos e, a partir delas, acumulam alguns dados. Por exemplo, durante o evento TodoCreated, você pode aumentar algum tipo de contador Contagem na projeção. Por enquanto, nesta parte, estarei usando o Armazenamento de eventos como Read e Write Db. Mais adiante, nos artigos a seguir, criarei um banco de dados separado para leitura, no qual os dados serão gravados com base nos eventos armazenados no banco de dados para gravação no Armazenamento de Eventos. Haverá também um exemplo de como fazer "Viagem no tempo", revertendo o sistema ao estado em que se encontrava no passado.
E então vamos começar o Event Stroming. Normalmente, para a sua implementação, reúnem-se todos os interessados e especialistas que contam quais os eventos da área temática que o software irá simular. Por exemplo, para software de fábrica - ProductManufactured. Para o jogo - Danos sofridos. Para software financeiro - dinheiro creditado na conta e assim por diante. Como nossa área temática é tão simples quanto TodoList, teremos poucos eventos. E assim, vamos escrever os eventos da nossa área temática (domínio) no quadro.

Agora vamos adicionar os comandos que acionam esses eventos.

A seguir, vamos agrupar esses eventos e comandos em torno da entidade com uma mudança no estado ao qual estão associados.

Meus comandos simplesmente se transformarão em nomes de métodos de serviço. Vamos começar a implementação.
Primeiro, vamos descrever os eventos no código.
public interface IDomainEvent
{
// . id Event Strore
Guid EventId { get; }
// . Event Store
long EventNumber { get; set; }
}
public sealed class TodoCreated : IDomainEvent
{
//Id Todo
public Guid Id { get; set; }
// Todo
public string Name { get; set; }
public Guid EventId => Id;
public long EventNumber { get; set; }
}
public sealed class TodoRemoved : IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
public sealed class TodoCompleted: IDomainEvent
{
public Guid EventId { get; set; }
public long EventNumber { get; set; }
}
Agora, nosso núcleo é uma entidade:
public sealed class Todo : IEntity<TodoId>
{
private readonly List<IDomainEvent> _events;
public static Todo CreateFrom(string name)
{
var id = Guid.NewGuid();
var e = new List<IDomainEvent>(){new TodoCreated()
{
Id = id,
Name = name
}};
return new Todo(new TodoId(id), e, name, false);
}
public static Todo CreateFrom(IEnumerable<IDomainEvent> events)
{
var id = Guid.Empty;
var name = String.Empty;
var completed = false;
var ordered = events.OrderBy(e => e.EventNumber).ToList();
if (ordered.Count == 0)
return null;
foreach (var @event in ordered)
{
switch (@event)
{
case TodoRemoved _:
return null;
case TodoCreated created:
name = created.Name;
id = created.Id;
break;
case TodoCompleted _:
completed = true;
break;
default: break;
}
}
if (id == default)
return null;
return new Todo(new TodoId(id), new List<IDomainEvent>(), name, completed);
}
private Todo(TodoId id, List<IDomainEvent> events, string name, bool isCompleted)
{
Id = id;
_events = events;
Name = name;
IsCompleted = isCompleted;
Validate();
}
public TodoId Id { get; }
public IReadOnlyList<IDomainEvent> Events => _events;
public string Name { get; }
public bool IsCompleted { get; private set; }
public void Complete()
{
if (!IsCompleted)
{
IsCompleted = true;
_events.Add(new TodoCompleted()
{
EventId = Guid.NewGuid()
});
}
}
public void Delete()
{
_events.Add(new TodoRemoved()
{
EventId = Guid.NewGuid()
});
}
private void Validate()
{
if (Events == null)
throw new ApplicationException(" ");
if (string.IsNullOrWhiteSpace(Name))
throw new ApplicationException(" ");
if (Id == default)
throw new ApplicationException(" ");
}
}
Conectamos à Loja de Eventos:
services.AddSingleton(sp =>
{
// TCP .
// . .
var con = EventStoreConnection.Create(new Uri("tcp://admin:changeit@127.0.0.1:1113"), "TodosConnection");
con.ConnectAsync().Wait();
return con;
});
E assim, a parte principal. Armazenamento e leitura de eventos da própria Loja de Eventos:
public sealed class EventsRepository : IEventsRepository
{
private readonly IEventStoreConnection _connection;
public EventsRepository(IEventStoreConnection connection)
{
_connection = connection;
}
public async Task<long> Add(Guid collectionId, IEnumerable<IDomainEvent> events)
{
var eventPayload = events.Select(e => new EventData(
//Id
e.EventId,
//
e.GetType().Name,
// Json (True|False)
true,
//
Encoding.UTF8.GetBytes(JsonSerializer.Serialize((object)e)),
//
Encoding.UTF8.GetBytes((string)e.GetType().FullName)
));
//
var res = await _connection.AppendToStreamAsync(collectionId.ToString(), ExpectedVersion.Any, eventPayload);
return res.NextExpectedVersion;
}
public async Task<List<IDomainEvent>> Get(Guid collectionId)
{
var results = new List<IDomainEvent>();
long start = 0L;
while (true)
{
var events = await _connection.ReadStreamEventsForwardAsync(collectionId.ToString(), start, 4096, false);
if (events.Status != SliceReadStatus.Success)
return results;
results.AddRange(Deserialize(events.Events));
if (events.IsEndOfStream)
return results;
start = events.NextEventNumber;
}
}
public async Task<List<T>> GetAll<T>() where T : IDomainEvent
{
var results = new List<IDomainEvent>();
Position start = Position.Start;
while (true)
{
var events = await _connection.ReadAllEventsForwardAsync(start, 4096, false);
results.AddRange(Deserialize(events.Events.Where(e => e.Event.EventType == typeof(T).Name)));
if (events.IsEndOfStream)
return results.OfType<T>().ToList();
start = events.NextPosition;
}
}
private List<IDomainEvent> Deserialize(IEnumerable<ResolvedEvent> events) =>
events
.Where(e => IsEvent(e.Event.EventType))
.Select(e =>
{
var result = (IDomainEvent)JsonSerializer.Deserialize(e.Event.Data, ToType(e.Event.EventType));
result.EventNumber = e.Event.EventNumber;
return result;
})
.ToList();
private static bool IsEvent(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => true,
nameof(TodoCompleted) => true,
nameof(TodoRemoved) => true,
_ => false
};
}
private static Type ToType(string eventName)
{
return eventName switch
{
nameof(TodoCreated) => typeof(TodoCreated),
nameof(TodoCompleted) => typeof(TodoCompleted),
nameof(TodoRemoved) => typeof(TodoRemoved),
_ => throw new NotImplementedException(eventName)
};
}
}
O armazenamento de entidades parece muito simples. Obtemos os eventos de entidade do EventStore e os restauramos a partir deles, ou simplesmente salvamos os eventos de entidade.
public sealed class TodoRepository : ITodoRepository
{
private readonly IEventsRepository _eventsRepository;
public TodoRepository(IEventsRepository eventsRepository)
{
_eventsRepository = eventsRepository;
}
public Task SaveAsync(Todo entity) => _eventsRepository.Add(entity.Id.Value, entity.Events);
public async Task<Todo> GetAsync(TodoId id)
{
var events = await _eventsRepository.Get(id.Value);
return Todo.CreateFrom(events);
}
public async Task<List<Todo>> GetAllAsync()
{
var events = await _eventsRepository.GetAll<TodoCreated>();
var res = await Task.WhenAll(events.Where(t => t != null).Where(e => e.Id != default).Select(e => GetAsync(new TodoId(e.Id))));
return res.Where(t => t != null).ToList();
}
}
O serviço em que ocorre o trabalho com o repositório e a entidade:
public sealed class TodoService : ITodoService
{
private readonly ITodoRepository _repository;
public TodoService(ITodoRepository repository)
{
_repository = repository;
}
public async Task<TodoId> Create(TodoCreateDto dto)
{
var todo = Todo.CreateFrom(dto.Name);
await _repository.SaveAsync(todo);
return todo.Id;
}
public async Task Complete(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Complete();
await _repository.SaveAsync(todo);
}
public async Task Remove(TodoId id)
{
var todo = await _repository.GetAsync(id);
todo.Delete();
await _repository.SaveAsync(todo);
}
public async Task<List<TodoReadDto>> GetAll()
{
var todos = await _repository.GetAllAsync();
return todos.Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
public async Task<List<TodoReadDto>> Get(IEnumerable<TodoId> ids)
{
var todos = await Task.WhenAll(ids.Select(i => _repository.GetAsync(i)));
return todos.Where(t => t != null).Select(t => new TodoReadDto()
{
Id = t.Id.Value,
Name = t.Name,
IsComplete = t.IsCompleted
}).ToList();
}
}
Bem, na verdade, até agora nada impressionante. No próximo artigo, quando eu adicionar um banco de dados separado para leitura, tudo brilhará com cores diferentes. Isso vai nos suspender imediatamente a consistência com o tempo. Armazenamento de eventos e banco de dados SQL em uma base mestre-escravo. Um ES branco e muitos MS SQL pretos dos quais lêem os dados.
Digressão lírica. À luz dos acontecimentos recentes, não pude deixar de brincar sobre o escravo mestre e os brancos negros. Ehe, a era está passando, vamos contar aos nossos netos que vivíamos numa época em que as bases durante a replicação eram chamadas de mestre e escravo.
Em sistemas onde há muita leitura e pouca escrita de dados (a maioria deles), isso aumentará a velocidade do trabalho. Na verdade, a replicação do próprio mestre escravo, visa o fato de que sua escrita fica mais lenta (assim como com os índices), mas em contrapartida, a leitura é acelerada distribuindo a carga por vários bancos de dados.