scaffolding hermes flow and audit logging
This commit is contained in:
+243
-15
@@ -16,6 +16,7 @@ pub use crate::sessions::{SessionSnapshot, SessionStartRequest};
|
||||
|
||||
use crate::{
|
||||
analytics::AnalyticsStore,
|
||||
audit::AuditStore,
|
||||
bets::{
|
||||
BetsStore, ACCEPTANCE_ACCEPTED, ACCEPTANCE_REJECTED_INVALID_MARKET,
|
||||
ACCEPTANCE_REJECTED_INVALID_SESSION, ACCEPTANCE_REJECTED_TOO_LATE,
|
||||
@@ -43,6 +44,7 @@ pub struct AppState {
|
||||
experiments: ExperimentsStore,
|
||||
localization: LocalizationStore,
|
||||
analytics: AnalyticsStore,
|
||||
audit: AuditStore,
|
||||
markets: MarketsStore,
|
||||
users: UsersStore,
|
||||
sessions: SessionsStore,
|
||||
@@ -65,6 +67,7 @@ impl AppState {
|
||||
experiments: ExperimentsStore::new(),
|
||||
localization: LocalizationStore::new(),
|
||||
analytics: AnalyticsStore::new(),
|
||||
audit: AuditStore::new(),
|
||||
markets: MarketsStore::new(),
|
||||
users: UsersStore::new(),
|
||||
sessions: SessionsStore::new(),
|
||||
@@ -87,7 +90,8 @@ impl AppState {
|
||||
})
|
||||
.await;
|
||||
|
||||
self.sessions
|
||||
let session = self
|
||||
.sessions
|
||||
.start_session(
|
||||
user.id,
|
||||
request,
|
||||
@@ -98,7 +102,24 @@ impl AppState {
|
||||
default_experiment_variant: self.config.default_experiment_variant.clone(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.await;
|
||||
|
||||
self.audit
|
||||
.record(
|
||||
"session_started",
|
||||
Some(session.session_id),
|
||||
Some(session.user_id),
|
||||
Utc::now(),
|
||||
[
|
||||
("locale_code", session.locale_code.clone()),
|
||||
("device_platform", session.device_platform.clone()),
|
||||
("app_version", session.app_version.clone()),
|
||||
("experiment_variant", session.experiment_variant.clone()),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
session
|
||||
}
|
||||
|
||||
pub async fn current_session(&self) -> Option<SessionSnapshot> {
|
||||
@@ -129,6 +150,40 @@ impl AppState {
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_bet_audit(
|
||||
&self,
|
||||
action: &str,
|
||||
session: Option<&SessionSnapshot>,
|
||||
request: &BetIntentRequest,
|
||||
reason: &str,
|
||||
accepted_odds_version_id: Option<Uuid>,
|
||||
) {
|
||||
let mut attributes = vec![
|
||||
("reason".to_string(), reason.to_string()),
|
||||
("session_id".to_string(), request.session_id.to_string()),
|
||||
("event_id".to_string(), request.event_id.to_string()),
|
||||
("market_id".to_string(), request.market_id.to_string()),
|
||||
("outcome_id".to_string(), request.outcome_id.to_string()),
|
||||
];
|
||||
|
||||
if let Some(accepted_odds_version_id) = accepted_odds_version_id {
|
||||
attributes.push((
|
||||
"accepted_odds_version_id".to_string(),
|
||||
accepted_odds_version_id.to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
self.audit
|
||||
.record(
|
||||
action,
|
||||
session.map(|session| session.session_id),
|
||||
session.map(|session| session.user_id),
|
||||
Utc::now(),
|
||||
attributes,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn submit_bet_intent(&self, request: BetIntentRequest) -> BetIntentResponse {
|
||||
let Some(existing) = self
|
||||
.bets
|
||||
@@ -143,7 +198,7 @@ impl AppState {
|
||||
|
||||
async fn submit_bet_intent_fresh(&self, request: BetIntentRequest) -> BetIntentResponse {
|
||||
let Some(session) = self.current_session().await else {
|
||||
return self
|
||||
let response = self
|
||||
.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
@@ -153,10 +208,21 @@ impl AppState {
|
||||
None,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_rejected",
|
||||
None,
|
||||
&request,
|
||||
ACCEPTANCE_REJECTED_INVALID_SESSION,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
return response;
|
||||
};
|
||||
|
||||
if session.ended_at.is_some() || session.session_id != request.session_id {
|
||||
return self
|
||||
let response = self
|
||||
.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
@@ -166,10 +232,21 @@ impl AppState {
|
||||
None,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_rejected",
|
||||
Some(&session),
|
||||
&request,
|
||||
ACCEPTANCE_REJECTED_INVALID_SESSION,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
let Some(market) = self.markets.market(request.market_id).await else {
|
||||
return self
|
||||
let response = self
|
||||
.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
@@ -179,10 +256,21 @@ impl AppState {
|
||||
None,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_rejected",
|
||||
Some(&session),
|
||||
&request,
|
||||
ACCEPTANCE_REJECTED_INVALID_MARKET,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
return response;
|
||||
};
|
||||
|
||||
if market.event_id != request.event_id {
|
||||
return self
|
||||
let response = self
|
||||
.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
@@ -192,10 +280,21 @@ impl AppState {
|
||||
None,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_rejected",
|
||||
Some(&session),
|
||||
&request,
|
||||
ACCEPTANCE_REJECTED_INVALID_MARKET,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
let Some(outcome) = self.markets.outcome(request.outcome_id).await else {
|
||||
return self
|
||||
let response = self
|
||||
.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
@@ -205,10 +304,21 @@ impl AppState {
|
||||
None,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_rejected",
|
||||
Some(&session),
|
||||
&request,
|
||||
ACCEPTANCE_REJECTED_INVALID_MARKET,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
return response;
|
||||
};
|
||||
|
||||
if outcome.market_id != market.id || request.event_id != market.event_id {
|
||||
return self
|
||||
let response = self
|
||||
.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
@@ -218,10 +328,21 @@ impl AppState {
|
||||
None,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_rejected",
|
||||
Some(&session),
|
||||
&request,
|
||||
ACCEPTANCE_REJECTED_INVALID_MARKET,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
if Utc::now() >= market.lock_at {
|
||||
return self
|
||||
let response = self
|
||||
.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
@@ -231,10 +352,21 @@ impl AppState {
|
||||
None,
|
||||
))
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_rejected",
|
||||
Some(&session),
|
||||
&request,
|
||||
ACCEPTANCE_REJECTED_TOO_LATE,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
let accepted_odds_version_id = self.current_odds(market.id).await.map(|odds| odds.id);
|
||||
self.bets
|
||||
let response = self.bets
|
||||
.record(self.build_bet_intent_record(
|
||||
&request,
|
||||
Some(session.user_id),
|
||||
@@ -242,7 +374,18 @@ impl AppState {
|
||||
ACCEPTANCE_ACCEPTED,
|
||||
accepted_odds_version_id,
|
||||
))
|
||||
.await
|
||||
.await;
|
||||
|
||||
self.record_bet_audit(
|
||||
"bet_accepted",
|
||||
Some(&session),
|
||||
&request,
|
||||
ACCEPTANCE_ACCEPTED,
|
||||
accepted_odds_version_id,
|
||||
)
|
||||
.await;
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
pub async fn bet_intent(&self, bet_intent_id: Uuid) -> Option<BetIntentResponse> {
|
||||
@@ -250,10 +393,28 @@ impl AppState {
|
||||
}
|
||||
|
||||
pub async fn end_session(&self) -> Result<SessionSnapshot, AppError> {
|
||||
self.sessions
|
||||
let session = self
|
||||
.sessions
|
||||
.end_session()
|
||||
.await
|
||||
.map_err(|_| AppError::not_found("No active session"))
|
||||
.map_err(|_| AppError::not_found("No active session"))?;
|
||||
|
||||
self.audit
|
||||
.record(
|
||||
"session_ended",
|
||||
Some(session.session_id),
|
||||
Some(session.user_id),
|
||||
Utc::now(),
|
||||
[
|
||||
("locale_code", session.locale_code.clone()),
|
||||
("device_platform", session.device_platform.clone()),
|
||||
("app_version", session.app_version.clone()),
|
||||
("experiment_variant", session.experiment_variant.clone()),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
pub async fn admin_create_event_manifest(
|
||||
@@ -266,6 +427,20 @@ impl AppState {
|
||||
self.markets.insert_market(market.clone()).await;
|
||||
}
|
||||
|
||||
self.audit
|
||||
.record(
|
||||
"event_created",
|
||||
None,
|
||||
None,
|
||||
Utc::now(),
|
||||
[
|
||||
("event_id", created.event.id.to_string()),
|
||||
("sport_type", created.event.sport_type.clone()),
|
||||
("status", created.event.status.clone()),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(created)
|
||||
}
|
||||
|
||||
@@ -279,6 +454,22 @@ impl AppState {
|
||||
|
||||
self.markets.insert_market(market.clone()).await;
|
||||
let _ = self.events.insert_market(market.event_id, market.clone()).await;
|
||||
|
||||
self.audit
|
||||
.record(
|
||||
"market_created",
|
||||
None,
|
||||
None,
|
||||
Utc::now(),
|
||||
[
|
||||
("market_id", market.id.to_string()),
|
||||
("event_id", market.event_id.to_string()),
|
||||
("market_type", market.market_type.clone()),
|
||||
("status", market.status.clone()),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(market)
|
||||
}
|
||||
|
||||
@@ -290,7 +481,23 @@ impl AppState {
|
||||
return Err(AppError::not_found("Market not found"));
|
||||
};
|
||||
|
||||
Ok(self.markets.publish_odds(odds).await)
|
||||
let created = self.markets.publish_odds(odds).await;
|
||||
|
||||
self.audit
|
||||
.record(
|
||||
"odds_version_published",
|
||||
None,
|
||||
None,
|
||||
Utc::now(),
|
||||
[
|
||||
("odds_version_id", created.id.to_string()),
|
||||
("market_id", created.market_id.to_string()),
|
||||
("version_no", created.version_no.to_string()),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(created)
|
||||
}
|
||||
|
||||
pub async fn admin_publish_settlement(
|
||||
@@ -301,7 +508,24 @@ impl AppState {
|
||||
return Err(AppError::not_found("Market not found"));
|
||||
};
|
||||
|
||||
Ok(self.settlements.upsert(market.event_id, settlement).await)
|
||||
let created = self.settlements.upsert(market.event_id, settlement).await;
|
||||
|
||||
self.audit
|
||||
.record(
|
||||
"result_settled",
|
||||
None,
|
||||
None,
|
||||
Utc::now(),
|
||||
[
|
||||
("settlement_id", created.id.to_string()),
|
||||
("market_id", created.market_id.to_string()),
|
||||
("event_id", market.event_id.to_string()),
|
||||
("winning_outcome_id", created.winning_outcome_id.to_string()),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(created)
|
||||
}
|
||||
|
||||
pub async fn next_event(&self) -> Option<EventSnapshot> {
|
||||
@@ -362,6 +586,10 @@ impl AppState {
|
||||
self.analytics.counts().await
|
||||
}
|
||||
|
||||
pub async fn audit_counts(&self) -> (usize, usize) {
|
||||
self.audit.counts().await
|
||||
}
|
||||
|
||||
pub async fn markets_for_event(&self, event_id: Uuid) -> Option<Vec<MarketSnapshot>> {
|
||||
self.markets.markets_for_event(event_id).await
|
||||
}
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use tokio::sync::RwLock;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AuditStore {
|
||||
inner: Arc<RwLock<AuditState>>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct AuditState {
|
||||
event_types_by_name: HashMap<String, Uuid>,
|
||||
events: Vec<AuditEventSnapshot>,
|
||||
attributes: Vec<AuditEventAttributeSnapshot>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[allow(dead_code)]
|
||||
struct AuditEventSnapshot {
|
||||
id: Uuid,
|
||||
audit_event_type_id: Uuid,
|
||||
session_id: Option<Uuid>,
|
||||
user_id: Option<Uuid>,
|
||||
occurred_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[allow(dead_code)]
|
||||
struct AuditEventAttributeSnapshot {
|
||||
id: Uuid,
|
||||
audit_event_id: Uuid,
|
||||
attribute_key: String,
|
||||
attribute_value: String,
|
||||
}
|
||||
|
||||
impl AuditStore {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RwLock::new(AuditState::default())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn record<I, K, V>(
|
||||
&self,
|
||||
action: impl Into<String>,
|
||||
session_id: Option<Uuid>,
|
||||
user_id: Option<Uuid>,
|
||||
occurred_at: DateTime<Utc>,
|
||||
attributes: I,
|
||||
) where
|
||||
I: IntoIterator<Item = (K, V)>,
|
||||
K: Into<String>,
|
||||
V: Into<String>,
|
||||
{
|
||||
let mut state = self.inner.write().await;
|
||||
let action = action.into();
|
||||
let event_type_id = *state
|
||||
.event_types_by_name
|
||||
.entry(action)
|
||||
.or_insert_with(Uuid::new_v4);
|
||||
|
||||
let audit_event_id = Uuid::new_v4();
|
||||
state.events.push(AuditEventSnapshot {
|
||||
id: audit_event_id,
|
||||
audit_event_type_id: event_type_id,
|
||||
session_id,
|
||||
user_id,
|
||||
occurred_at,
|
||||
});
|
||||
|
||||
for (key, value) in attributes {
|
||||
state.attributes.push(AuditEventAttributeSnapshot {
|
||||
id: Uuid::new_v4(),
|
||||
audit_event_id,
|
||||
attribute_key: key.into(),
|
||||
attribute_value: value.into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn counts(&self) -> (usize, usize) {
|
||||
let state = self.inner.read().await;
|
||||
(state.events.len(), state.attributes.len())
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
#![forbid(unsafe_code)]
|
||||
|
||||
pub mod analytics;
|
||||
pub mod audit;
|
||||
pub mod admin;
|
||||
pub mod bets;
|
||||
pub mod app_state;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use axum::{extract::Extension, Json};
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::app_state::AppState;
|
||||
@@ -10,6 +11,7 @@ pub struct HealthResponse {
|
||||
pub environment: String,
|
||||
pub version: String,
|
||||
pub uptime_ms: u128,
|
||||
pub server_time: DateTime<Utc>,
|
||||
pub database_ready: bool,
|
||||
pub redis_ready: bool,
|
||||
}
|
||||
@@ -21,6 +23,7 @@ pub async fn handler(Extension(state): Extension<AppState>) -> Json<HealthRespon
|
||||
environment: state.config.environment.clone(),
|
||||
version: state.config.app_version.clone(),
|
||||
uptime_ms: state.uptime_ms(),
|
||||
server_time: Utc::now(),
|
||||
database_ready: state.database_ready(),
|
||||
redis_ready: state.redis_ready(),
|
||||
})
|
||||
|
||||
@@ -11,10 +11,91 @@ async fn health_returns_ok() {
|
||||
|
||||
let response = app
|
||||
.oneshot(Request::builder().uri("/health").body(Body::empty()).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
let body = to_bytes(response.into_body(), usize::MAX).await.unwrap();
|
||||
let json: json::Value = json::from_slice(&body).unwrap();
|
||||
assert!(json["server_time"].as_str().is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn audit_logging_records_session_and_bet() {
|
||||
let state = AppState::new(AppConfig::default(), None, None);
|
||||
let app = build_router(state.clone());
|
||||
|
||||
let session_response = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method("POST")
|
||||
.uri("/api/v1/session/start")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from("{}"))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
let session_body = to_bytes(session_response.into_body(), usize::MAX).await.unwrap();
|
||||
let session_json: json::Value = json::from_slice(&session_body).unwrap();
|
||||
let session_id = session_json["session_id"].as_str().unwrap().to_string();
|
||||
|
||||
let event_response = app
|
||||
.clone()
|
||||
.oneshot(Request::builder().uri("/api/v1/feed/next").body(Body::empty()).unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let event_body = to_bytes(event_response.into_body(), usize::MAX).await.unwrap();
|
||||
let event_json: json::Value = json::from_slice(&event_body).unwrap();
|
||||
let event_id = event_json["id"].as_str().unwrap().to_string();
|
||||
|
||||
let markets_response = app
|
||||
.clone()
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri(format!("/api/v1/events/{event_id}/markets"))
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let markets_body = to_bytes(markets_response.into_body(), usize::MAX).await.unwrap();
|
||||
let markets_json: json::Value = json::from_slice(&markets_body).unwrap();
|
||||
let market_id = markets_json[0]["id"].as_str().unwrap().to_string();
|
||||
let outcome_id = markets_json[0]["outcomes"][0]["id"].as_str().unwrap().to_string();
|
||||
|
||||
let request = json::json!({
|
||||
"session_id": session_id,
|
||||
"event_id": event_id,
|
||||
"market_id": market_id,
|
||||
"outcome_id": outcome_id,
|
||||
"idempotency_key": "audit-bet-001",
|
||||
"client_sent_at": Utc::now().to_rfc3339(),
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.method("POST")
|
||||
.uri("/api/v1/bets/intent")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(request))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(response.status(), StatusCode::CREATED);
|
||||
|
||||
let (audit_events, audit_attributes) = state.audit_counts().await;
|
||||
assert!(audit_events >= 2);
|
||||
assert!(audit_attributes >= 10);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user