initial commit

This commit is contained in:
Andre Henriques 2024-12-07 22:41:01 +00:00
commit c127b2b907
7 changed files with 4179 additions and 0 deletions

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
target/

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
.env

3858
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

12
Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "unwritten-application"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.7.9"
tokio = { version = "1", features = ["full"] }
polars = { version = "0.44.2", features = ["lazy"] }
postgres = { version = "0.19.9", features = ["with-uuid-1"] }
tokio-postgres = "0.7.12"
uuid = "1"

17
Dockerfile Normal file
View File

@ -0,0 +1,17 @@
FROM docker.io/rust:1-slim-bullseye AS build
WORKDIR /build
ADD Cargo.toml .
ADD Cargo.lock .
ADD src/ src
RUN cargo build --release
FROM docker.io/debian:bullseye-slim
WORKDIR /app
COPY --from=build /build/target/release/unwritten-application unwritten
CMD ["./unwritten"]

15
schema.sql Normal file
View File

@ -0,0 +1,15 @@
drop table if exists account;
create table if not exists accounts (
uuid uuid primary key default gen_random_uuid (),
name text default ''
);
drop table if exists transactions;
create table if not exists transactions (
uuid uuid primary key default gen_random_uuid (),
source uuid references accounts (uuid),
target uuid references accounts (uuid),
amount double precision not null
);

274
src/main.rs Normal file
View File

@ -0,0 +1,274 @@
use std::sync::Mutex;
use axum::{
response::IntoResponse,
routing::{delete, get, put},
Extension, Router,
};
use polars::prelude::*;
use tokio_postgres::NoTls;
// Generetes a bunch of random data in the db
async fn generate_random_data(
Extension(client): Extension<Arc<tokio_postgres::Client>>,
) -> impl IntoResponse {
// For demo reason limiting to 1 set of random data
let count = client.query("SELECT count(uuid) from accounts;", &[]).await;
if 'cond: {
if count.is_err() {
break 'cond true;
}
let count: i64 = count.unwrap()[0].get(0);
break 'cond count > 0;
} {
return "PLEASE DELETE DATA BEFORE GENERATING NEW DATA".into();
}
let create_accounts = client
.query(
"
INSERT INTO accounts ( name )
SELECT ( md5(random()::text) )
FROM generate_series(1, 10000) s(i);",
&[],
)
.await;
if create_accounts.is_err() {
println!(
"Failed to create accounts: {}",
create_accounts.unwrap_err()
);
return "Failed to create accounts";
}
// NOTE my psql version seams to limit inserts at 40000
// so we are just going to loop a few times :)
for _i in 0..50 {
let create_transactions = client
.query(
"
INSERT INTO transactions(source, target, amount)
SELECT a1.uuid, a2.uuid, (random() * 100000)
FROM accounts as a1
CROSS JOIN LATERAL (SELECT uuid FROM accounts as a2 ORDER BY random() limit 5) as a2
WHERE a2.uuid <> a1.uuid order by random() limit 20000;
",
&[],
)
.await;
if create_transactions.is_err() {
println!(
"Failed to create transactions: {}",
create_transactions.unwrap_err()
);
return "Failed to create transactions";
}
}
"DATA CREATED!"
}
// Clears the data from db
async fn clear_data(
Extension(client): Extension<Arc<tokio_postgres::Client>>,
) -> impl IntoResponse {
let delete_transactions = client.execute("delete from transactions;", &[]).await;
if delete_transactions.is_err() {
println!(
"Failed to delete transactions: {}",
delete_transactions.unwrap_err()
);
return "Failed to delete transactions";
}
println!("Deleted transactions");
// Deletion of accounts seams to take a lot of time
// goint to assume it's because the forign keys that the db is checking
let delete_accounts = client.execute("delete from accounts;", &[]).await;
if delete_accounts.is_err() {
println!(
"Failed to delete accounts: {}",
delete_accounts.unwrap_err()
);
return "Failed to delete accounts";
}
println!("Deleted accounts");
"DATA DELETED!"
}
async fn get_data(
Extension(client): Extension<Arc<tokio_postgres::Client>>,
Extension(state): Extension<Arc<State>>,
) -> impl IntoResponse {
let accounts = client.query("SELECT uuid, name from accounts;", &[]).await;
if accounts.is_err() {
println!(
"Could not get accounts from database: {}",
accounts.unwrap_err()
);
return "Could not get accounts from database";
}
let accounts = accounts.unwrap();
let account_df = 'acount_df: {
let mut uuids: Vec<String> = Vec::with_capacity(accounts.len());
let mut names: Vec<String> = Vec::with_capacity(accounts.len());
for account in &accounts {
let uuid: uuid::Uuid = account.get(0);
uuids.push(uuid.into());
names.push(account.get(1));
}
let c1 = Column::new("uuid".into(), uuids);
let c2 = Column::new("name".into(), names);
let df = DataFrame::new(vec![c1, c2]);
if df.is_err() {
println!("Failed to create account dataframe: {}", df.unwrap_err());
return "Failed to create account dataframe".into();
}
break 'acount_df df.unwrap();
};
println!("loaded accounts");
let transactions = client
.query(
"SELECT uuid, amount, source, target from transactions;",
&[],
)
.await;
if transactions.is_err() {
println!(
"Could not get transactions from database: {}",
transactions.unwrap_err()
);
return "Could not get transactions from database";
}
let transactions = transactions.unwrap();
let transactions_df = 'transctions_df: {
let mut uuids: Vec<String> = Vec::with_capacity(accounts.len());
let mut ammounts: Vec<f64> = Vec::with_capacity(accounts.len());
let mut sources: Vec<String> = Vec::with_capacity(accounts.len());
let mut targets: Vec<String> = Vec::with_capacity(accounts.len());
for transaction in &transactions {
let uuid: uuid::Uuid = transaction.get(0);
uuids.push(uuid.into());
ammounts.push(transaction.get(1));
let source: uuid::Uuid = transaction.get(2);
sources.push(source.into());
let target: uuid::Uuid = transaction.get(3);
targets.push(target.into());
}
let c1 = Column::new("uuid".into(), uuids);
let c2 = Column::new("amount".into(), ammounts);
let c3 = Column::new("source".into(), sources);
let c4 = Column::new("target".into(), targets);
let df = DataFrame::new(vec![c1, c2, c3, c4]);
if df.is_err() {
println!(
"Failed to create transacions dataframe: {}",
df.unwrap_err()
);
return "Failed to create transactions dataframe".into();
}
break 'transctions_df df.unwrap();
};
println!("loaded transactions");
let mut account_dataframe = state.accounts_dataframe.lock().unwrap();
*account_dataframe = Some(account_df.lazy());
let mut transactions_dataframe = state.transactions_dataframe.lock().unwrap();
*transactions_dataframe = Some(transactions_df.lazy());
println!("df generated");
return "DATA INJESTED!";
}
async fn accounts(Extension(state): Extension<Arc<State>>) -> impl IntoResponse {
let accounts = state.accounts_dataframe.lock().unwrap();
if accounts.is_some() {
let df = accounts.as_ref().unwrap().clone();
let to_send = df.select(vec![col("uuid"), col("name")]).collect().unwrap();
return format!("{}", to_send);
}
return "Injest Data First".into();
}
async fn transactions(Extension(state): Extension<Arc<State>>) -> impl IntoResponse {
let transactions = state.transactions_dataframe.lock().unwrap();
if transactions.is_some() {
let df = transactions.as_ref().unwrap().clone();
let to_send = df
.select(vec![
col("uuid"),
col("source"),
col("target"),
col("amount"),
])
.collect()
.unwrap();
return format!("{}", to_send);
}
return "Injest Data First".into();
}
struct State {
accounts_dataframe: Arc<Mutex<Option<LazyFrame>>>,
transactions_dataframe: Arc<Mutex<Option<LazyFrame>>>,
}
impl State {
fn new() -> State {
return State {
accounts_dataframe: Arc::new(Mutex::new(None)),
transactions_dataframe: Arc::new(Mutex::new(None)),
};
}
}
#[tokio::main]
async fn main() {
let psql_conn = std::env::var("PSQL_CONN");
if psql_conn.is_err() {
panic!("Please provide PSQL_CONN");
}
let psql_conn = psql_conn.unwrap();
let (client, connection) = tokio_postgres::connect(psql_conn.as_str(), NoTls)
.await
.unwrap();
let client = Arc::new(client);
let state = Arc::new(State::new());
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
let app = Router::new()
.route("/injest", get(get_data))
.route("/generate_random", put(generate_random_data))
.route("/delete_data", delete(clear_data))
.route("/accounts", get(accounts))
.route("/transactions", get(transactions))
.layer(Extension(client))
.layer(Extension(state));
let listner = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listner, app).await.unwrap();
}