add study flow endpoints
This commit is contained in:
@@ -0,0 +1,109 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::sessions::SessionSnapshot;
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct AnalyticsBatchRequest {
|
||||
pub events: Vec<AnalyticsEventInput>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct AnalyticsEventInput {
|
||||
pub event_name: String,
|
||||
pub occurred_at: DateTime<Utc>,
|
||||
pub attributes: Option<Vec<AttributeInput>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct AttributeInput {
|
||||
pub key: String,
|
||||
pub value: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AnalyticsStore {
|
||||
inner: Arc<RwLock<AnalyticsState>>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct AnalyticsState {
|
||||
event_types_by_name: HashMap<String, Uuid>,
|
||||
events: Vec<AnalyticsEventSnapshot>,
|
||||
attributes: Vec<AnalyticsEventAttributeSnapshot>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[allow(dead_code)]
|
||||
struct AnalyticsEventSnapshot {
|
||||
id: Uuid,
|
||||
analytics_event_type_id: Uuid,
|
||||
session_id: Uuid,
|
||||
user_id: Uuid,
|
||||
occurred_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[allow(dead_code)]
|
||||
struct AnalyticsEventAttributeSnapshot {
|
||||
id: Uuid,
|
||||
analytics_event_id: Uuid,
|
||||
attribute_key: String,
|
||||
attribute_value: String,
|
||||
}
|
||||
|
||||
impl AnalyticsStore {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(AnalyticsState::default())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn ingest(&self, session: &SessionSnapshot, request: AnalyticsBatchRequest) -> usize {
|
||||
let mut state = self.inner.write().await;
|
||||
let mut inserted = 0usize;
|
||||
|
||||
for event in request.events {
|
||||
let analytics_event_type_id = *state
|
||||
.event_types_by_name
|
||||
.entry(event.event_name.clone())
|
||||
.or_insert_with(Uuid::new_v4);
|
||||
|
||||
let analytics_event_id = Uuid::new_v4();
|
||||
state.events.push(AnalyticsEventSnapshot {
|
||||
id: analytics_event_id,
|
||||
analytics_event_type_id,
|
||||
session_id: session.session_id,
|
||||
user_id: session.user_id,
|
||||
occurred_at: event.occurred_at,
|
||||
});
|
||||
|
||||
let Some(attributes) = event.attributes else {
|
||||
inserted += 1;
|
||||
continue;
|
||||
};
|
||||
|
||||
for attribute in attributes {
|
||||
state.attributes.push(AnalyticsEventAttributeSnapshot {
|
||||
id: Uuid::new_v4(),
|
||||
analytics_event_id,
|
||||
attribute_key: attribute.key,
|
||||
attribute_value: attribute.value,
|
||||
});
|
||||
}
|
||||
|
||||
inserted += 1;
|
||||
}
|
||||
|
||||
inserted
|
||||
}
|
||||
|
||||
pub async fn counts(&self) -> (usize, usize) {
|
||||
let state = self.inner.read().await;
|
||||
(state.events.len(), state.attributes.len())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user