Skip to content

Event Sourcing in Rust on top of PostgreSQL

License

Notifications You must be signed in to change notification settings

hseeberger/evented

Repository files navigation

evented

Crates.io license

evented is a library for Event Sourcing where state changes are persisted as events. It originated from eventsourced, but offers additional strong consistency features by tightly coupling to PostgreSQL as well as other features like event metadata.

The core abstraction of evented is an EventSourcedEntity which can be identified via an ID: an Entity implementation defines its state and event handling and associated Command implementations define its command handling.

When an event-sourced entity receives a command, the respective command handler is called, which either returns a sequence of to be persisted events plus metadata or a rejection. If events are returned, these are transactionally persisted, thereby also invoking an optional EventListener. Concurrency is handled by optimistic locking via per entity versions.

When creating an event-sourced entity, its events are loaded and its state is conctructed by applying them to its event handler. This state is then used by the command handlers to decide whether a command should be accepted – resulting in events to be persisted and applied – or rejected.

It is also possible to create asynchronous Projections. These transactionally process events by populating their view(s) and storing the sequence number of the processed event. Currently only events by type projections are supported.

The following shows a simple example from the tests which uses an event listener to build a synchronous and consistent view. Actually this view probably need not be consistent, but real-world use cases for consistent views include uniqueness checks, e.g. email addresses for user entities.

#[derive(Debug, Default, PartialEq, Eq)]
pub struct Counter(u64);

impl Entity for Counter {
    type Id = Uuid;
    type Event = Event;
    type Metadata = Metadata;

    const TYPE_NAME: &'static str = "counter";

    fn handle_event(&mut self, event: Self::Event) {
        match event {
            Event::Increased { inc, .. } => self.0 += inc,
            Event::Decreased { dec, .. } => self.0 -= dec,
        }
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum Event {
    Increased { id: Uuid, inc: u64 },
    Decreased { id: Uuid, dec: u64 },
}

#[derive(Debug)]
pub struct Increase(u64);

impl Command for Increase {
    type Entity = Counter;
    type Rejection = Overflow;

    async fn handle(
        self,
        id: &<Self::Entity as Entity>::Id,
        entity: &Self::Entity,
    ) -> Result<
        Vec<
            impl Into<
                EventWithMetadata<
                    <Self::Entity as Entity>::Event,
                    <Self::Entity as Entity>::Metadata,
                >,
            >,
        >,
        Self::Rejection,
    > {
        let Increase(inc) = self;
        if entity.0 > u64::MAX - inc {
            Err(Overflow)
        } else {
            let increased = Event::Increased { id: *id, inc };
            let metadata = Metadata {
                timestamp: OffsetDateTime::now_utc(),
            };
            Ok(vec![increased.with_metadata(metadata)])
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct Overflow;

#[derive(Debug)]
pub struct Decrease(u64);

impl Command for Decrease {
    type Entity = Counter;
    type Rejection = Underflow;

    async fn handle(
        self,
        id: &<Self::Entity as Entity>::Id,
        entity: &Self::Entity,
    ) -> Result<
        Vec<
            impl Into<
                EventWithMetadata<
                    <Self::Entity as Entity>::Event,
                    <Self::Entity as Entity>::Metadata,
                >,
            >,
        >,
        Self::Rejection,
    > {
        let Decrease(dec) = self;
        if entity.0 < dec {
            Err::<Vec<_>, Underflow>(Underflow)
        } else {
            let decreased = Event::Decreased { id: *id, dec };
            let metadata = Metadata {
                timestamp: OffsetDateTime::now_utc(),
            };
            Ok(vec![decreased.with_metadata(metadata)])
        }
    }
}

#[derive(Debug, PartialEq, Eq)]
pub struct Underflow;

#[derive(Debug, Serialize, Deserialize)]
pub struct Metadata {
    timestamp: OffsetDateTime,
}

struct Listener;

impl EventListener<Event> for Listener {
    async fn listen(
        &mut self,
        event: &Event,
        tx: &mut Transaction<'_, sqlx::Postgres>,
    ) -> Result<(), BoxError> {
        match event {
            Event::Increased { id, inc } => {
                let value = sqlx::query("SELECT value FROM counters WHERE id = $1")
                    .bind(id)
                    .fetch_optional(&mut **tx)
                    .await
                    .map_err(Box::new)?
                    .map(|row| row.try_get::<i64, _>(0))
                    .transpose()?;
                match value {
                    Some(value) => {
                        sqlx::query("UPDATE counters SET value = $1 WHERE id = $2")
                            .bind(value + *inc as i64)
                            .bind(id)
                            .execute(&mut **tx)
                            .await
                            .map_err(Box::new)?;
                    }

                    None => {
                        sqlx::query("INSERT INTO counters VALUES ($1, $2)")
                            .bind(id)
                            .bind(*inc as i64)
                            .execute(&mut **tx)
                            .await
                            .map_err(Box::new)?;
                    }
                }
                Ok(())
            }

            _ => Ok(()),
        }
    }
}

More examples can be found in the examples directory.

License

This code is open source software licensed under the Apache 2.0 License.