110 lines
2.8 KiB
Rust
110 lines
2.8 KiB
Rust
use std::{collections::HashMap, sync::Arc};
|
|
|
|
use chrono::{DateTime, Utc};
|
|
use serde::Deserialize;
|
|
use tokio::sync::RwLock;
|
|
use uuid::Uuid;
|
|
|
|
use crate::sessions::SessionSnapshot;
|
|
|
|
#[derive(Clone, Debug, Deserialize)]
|
|
pub struct AnalyticsBatchRequest {
|
|
pub events: Vec<AnalyticsEventInput>,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Deserialize)]
|
|
pub struct AnalyticsEventInput {
|
|
pub event_name: String,
|
|
pub occurred_at: DateTime<Utc>,
|
|
pub attributes: Option<Vec<AttributeInput>>,
|
|
}
|
|
|
|
#[derive(Clone, Debug, Deserialize)]
|
|
pub struct AttributeInput {
|
|
pub key: String,
|
|
pub value: String,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct AnalyticsStore {
|
|
inner: Arc<RwLock<AnalyticsState>>,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct AnalyticsState {
|
|
event_types_by_name: HashMap<String, Uuid>,
|
|
events: Vec<AnalyticsEventSnapshot>,
|
|
attributes: Vec<AnalyticsEventAttributeSnapshot>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
#[allow(dead_code)]
|
|
struct AnalyticsEventSnapshot {
|
|
id: Uuid,
|
|
analytics_event_type_id: Uuid,
|
|
session_id: Uuid,
|
|
user_id: Uuid,
|
|
occurred_at: DateTime<Utc>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
#[allow(dead_code)]
|
|
struct AnalyticsEventAttributeSnapshot {
|
|
id: Uuid,
|
|
analytics_event_id: Uuid,
|
|
attribute_key: String,
|
|
attribute_value: String,
|
|
}
|
|
|
|
impl AnalyticsStore {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
inner: Arc::new(RwLock::new(AnalyticsState::default())),
|
|
}
|
|
}
|
|
|
|
pub async fn ingest(&self, session: &SessionSnapshot, request: AnalyticsBatchRequest) -> usize {
|
|
let mut state = self.inner.write().await;
|
|
let mut inserted = 0usize;
|
|
|
|
for event in request.events {
|
|
let analytics_event_type_id = *state
|
|
.event_types_by_name
|
|
.entry(event.event_name.clone())
|
|
.or_insert_with(Uuid::new_v4);
|
|
|
|
let analytics_event_id = Uuid::new_v4();
|
|
state.events.push(AnalyticsEventSnapshot {
|
|
id: analytics_event_id,
|
|
analytics_event_type_id,
|
|
session_id: session.session_id,
|
|
user_id: session.user_id,
|
|
occurred_at: event.occurred_at,
|
|
});
|
|
|
|
let Some(attributes) = event.attributes else {
|
|
inserted += 1;
|
|
continue;
|
|
};
|
|
|
|
for attribute in attributes {
|
|
state.attributes.push(AnalyticsEventAttributeSnapshot {
|
|
id: Uuid::new_v4(),
|
|
analytics_event_id,
|
|
attribute_key: attribute.key,
|
|
attribute_value: attribute.value,
|
|
});
|
|
}
|
|
|
|
inserted += 1;
|
|
}
|
|
|
|
inserted
|
|
}
|
|
|
|
pub async fn counts(&self) -> (usize, usize) {
|
|
let state = self.inner.read().await;
|
|
(state.events.len(), state.attributes.len())
|
|
}
|
|
}
|