From e3ed43148a8113c53ca1f7646e6073ca3521810e Mon Sep 17 00:00:00 2001 From: Aleksandr Date: Thu, 1 Jan 2026 09:37:00 +0300 Subject: [PATCH] nigga --- core/src/errors.rs | 2 +- core/src/requests/uploads.rs | 2 +- core/src/service/api_looks.rs | 2 + core/src/service/mod.rs | 1 + core/src/uploads.rs | 74 ++++++++++++-- http/Cargo.toml | 4 +- http/src/client/mod.rs | 78 +++++++++------ http/src/client/uploads.rs | 100 +++++++++++++++++++ http/src/format.rs | 1 + http/src/requests/mod.rs | 4 +- http/src/requests/uploads.rs | 10 +- http/src/server/context.rs | 13 ++- http/src/server/handler.rs | 17 ++-- http/src/server/routes/uploads.rs | 159 ++++++++++++++++++++++++++++++ 14 files changed, 408 insertions(+), 59 deletions(-) create mode 100644 http/src/client/uploads.rs diff --git a/core/src/errors.rs b/core/src/errors.rs index 7ca9557..334fe4c 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -107,7 +107,7 @@ pub enum Aux { InvalidRole(#[from] auth::InvalidRole), /// Internal failure. #[display("{_0}")] - InternalError(CompactString), + InternalError(String), /// object store failure. #[display("{_0}")] ObjectStore(CompactString), diff --git a/core/src/requests/uploads.rs b/core/src/requests/uploads.rs index f455364..e31c517 100644 --- a/core/src/requests/uploads.rs +++ b/core/src/requests/uploads.rs @@ -85,7 +85,7 @@ pub mod abort { #[data] pub struct Args { - pub upload: file::Id, + pub upload: upload::Id, } #[data(copy)] diff --git a/core/src/service/api_looks.rs b/core/src/service/api_looks.rs index 68f9ba8..5028228 100644 --- a/core/src/service/api_looks.rs +++ b/core/src/service/api_looks.rs @@ -35,4 +35,6 @@ impl Session {project!{ fn badges() -> marks::Badges; fn tabs() -> tabs::Tabs; + + fn uploads() -> uploads::Uploads; }} diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index 40930ab..a00dce0 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -40,6 +40,7 @@ trait_set! { + marks::Genres + marks::Badges + tabs::Tabs + + uploads::Uploads ; } diff --git a/core/src/uploads.rs b/core/src/uploads.rs index f3d70ad..66e4ab6 100644 --- a/core/src/uploads.rs +++ b/core/src/uploads.rs @@ -1,11 +1,73 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + use eva::{bytes::Bytes, data}; -use futures::stream::BoxStream; +use futures::stream::Stream; -#[data] -pub enum Action { - Push(Bytes), - Abort, +#[data(error)] +pub enum AbortReason { + #[display("the user uploaded more than expected")] + Overuploading, + #[display("{_0}")] + Other(String), } -pub type UploadStream<'a> = BoxStream<'a, Action>; +#[data] +pub enum Chunk { + Data(Bytes), + Aborted(AbortReason), +} + +type BoxedStream<'a> = Pin + Send + Sync + 'a>>; + +pub struct UploadStream<'a> { + stream: BoxedStream<'a>, + left: Option, +} + +impl Unpin for UploadStream<'_> {} + +impl<'a> Stream for UploadStream<'a> { + type Item = Chunk; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.left == Some(0) { + return Poll::Ready(None); + } + + let this = self.get_mut(); + let underlying = Pin::new(&mut this.stream); + let res = Stream::poll_next(underlying, cx); + match &res { + Poll::Ready(Some(Chunk::Data(data))) => { + this.left = this.left.map(|x| x.saturating_sub(data.len())); + } + Poll::Ready(Some(Chunk::Aborted(..))) => { + this.left = Some(0); + } + _ => {} + } + + res + } +} + +impl<'a> UploadStream<'a> { + pub const fn left(&self) -> Option { + self.left + } + + pub const fn unknown_size(stream: BoxedStream<'a>) -> Self { + Self { stream, left: None } + } + + pub const fn known_size(stream: BoxedStream<'a>, total_size: usize) -> Self { + Self { + stream, + left: Some(total_size), + } + } +} diff --git a/http/Cargo.toml b/http/Cargo.toml index 685d832..3fbca04 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -28,12 +28,12 @@ futures.workspace = true eva.workspace = true eyre.workspace = true -axum = { optional = true, version = "0.8.4", default-features = false, features = ["http2", "ws", "macros"] } +axum = { optional = true, version = "0.8.4", default-features = false, features = ["http2", "ws", "macros", "multipart"] } http-body-util = { version = "0.1.3", optional = true } tower-http = { optional = true, version = "0.6.6", default-features = false, features = ["cors"] } serde_urlencoded = { version = "0.7.1", optional = true } tokio = { workspace = true, optional = true } -reqwest = { optional = true, version = "0.12.23", default-features = false } +reqwest = { optional = true, version = "0.12.23", default-features = false, features = ["multipart", "stream"] } fastrace-axum = { optional = true, version = "0.1.3" } diff --git a/http/src/client/mod.rs b/http/src/client/mod.rs index 5ff99fc..486019a 100644 --- a/http/src/client/mod.rs +++ b/http/src/client/mod.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use serde::{Deserialize, Serialize}; + use eva::{ error::ShitHappens, str::{CompactString, format_compact as c}, @@ -19,6 +21,7 @@ use viendesu_core::{ messages::Messages, tabs::Tabs, threads::Threads, + uploads::Uploads, users::Users, }, types::session, @@ -39,6 +42,8 @@ mod users; mod marks; mod tabs; +mod uploads; + struct DoRequest<'c, P> { client: &'c mut HttpClient, method: Method, @@ -49,7 +54,7 @@ impl<'c, P, C, H> CallStep for DoRequest<'c, P> where C: Send + Sync, P: Send + Sync + FnMut(C) -> (CompactString, H), - H: Request, + H: Request + Serialize, { type Ok = H::Response; type Err = H::Error; @@ -81,15 +86,7 @@ impl HttpClient { } } - async fn do_request( - &self, - method: Method, - path: &str, - request: R, - ) -> Response - where - R: Request, - { + fn endpoint(&self, path: &str) -> String { let mut endpoint = self.options.endpoint.clone(); if endpoint.ends_with('/') { endpoint.push_str(path.strip_prefix('/').unwrap_or(path)); @@ -97,7 +94,42 @@ impl HttpClient { endpoint.push_str(path); } - let mut req = self.client.request(method.clone(), endpoint); + endpoint + } + + fn load_response(&self, data: &[u8]) -> Response + where + O: for<'de> Deserialize<'de>, + E: for<'de> Deserialize<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum GenericResponse { + Success { ok: O }, + Error { error: errors::Generic }, + } + + let resp: GenericResponse = + self.options.format.load(data).map_err(|e| { + Aux::Deserialization(format!("failed to deserialize response: {e:#}")) + })?; + + match resp { + GenericResponse::Error { error } => Err(error), + GenericResponse::Success { ok } => Ok(ok), + } + } + + async fn do_request( + &self, + method: Method, + path: &str, + request: R, + ) -> Response + where + R: Request + Serialize, + { + let mut req = self.client.request(method.clone(), self.endpoint(path)); if method == Method::GET { req = req.query(&request); } else { @@ -111,35 +143,19 @@ impl HttpClient { } if let Some(sess) = self.session { - req = req.header("authorization", format!("Bearer {}", sess.to_str())); + req = req.bearer_auth(sess); } let response = self .client .execute(req.build().expect("shit happens")) .await - .map_err(|e| Aux::InternalError(c!("failed to make request: {e:#}")))?; + .map_err(|e| Aux::InternalError(format!("failed to make request: {e:#}")))?; let bytes = response .bytes() .await - .map_err(|e| Aux::InternalError(c!("failed to read bytes: {e:#}")))?; - - #[derive(serde::Deserialize)] - #[serde(untagged)] - enum GenericResponse { - Success { ok: O }, - Error { error: errors::Generic }, - } - - let resp: GenericResponse = - self.options.format.load(&bytes).map_err(|e| { - Aux::Deserialization(format!("failed to deserialize response: {e:#}")) - })?; - - match resp { - GenericResponse::Error { error } => Err(error), - GenericResponse::Success { ok } => Ok(ok), - } + .map_err(|e| Aux::InternalError(format!("failed to read bytes: {e:#}")))?; + self.load_response(&bytes) } } diff --git a/http/src/client/uploads.rs b/http/src/client/uploads.rs new file mode 100644 index 0000000..f483fbd --- /dev/null +++ b/http/src/client/uploads.rs @@ -0,0 +1,100 @@ +use super::*; + +use futures::stream::StreamExt as _; + +use viendesu_core::{ + requests::uploads::{abort, finish, list_pending, start}, + uploads::Chunk, +}; + +use crate::requests::uploads as requests; + +struct FinishUpload<'a> { + client: &'a mut HttpClient, +} + +impl CallStep for FinishUpload<'_> { + type Ok = finish::Ok; + type Err = finish::Err; + + async fn call(&mut self, args: finish::Args) -> Response { + use reqwest::multipart; + + let finish::Args { id, stream } = args; + + // I don't know why anyone would re-use same stream, but let it be. + let content_length = stream.left(); + let endpoint = self.client.endpoint(&c!("/uploads/{id}")); + let mut request = self.client.client.post(endpoint); + if let Some(session) = self.client.session { + request = request.bearer_auth(session); + } + let body = reqwest::Body::wrap_stream(stream.map(|c| match c { + Chunk::Aborted(error) => Err(error), + Chunk::Data(data) => Ok(data), + })); + request = request.multipart(multipart::Form::new().part( + "file", + if let Some(size) = content_length { + multipart::Part::stream_with_length(body, size as u64) + } else { + multipart::Part::stream(body) + }, + )); + + let response = self + .client + .client + .execute(request.build().expect("shit happens")) + .await + .map_err(|e| Aux::InternalError(format!("failed to make request: {e}")))?; + let bytes = response + .bytes() + .await + .map_err(|e| Aux::InternalError(format!("failed to read response: {e}")))?; + + self.client.load_response(&bytes) + } +} + +impl Uploads for HttpClient { + fn list_pending( + &mut self, + ) -> impl CallStep { + self.do_call(Method::GET, |list_pending::Args {}| { + (c!("/uploads"), requests::ListPending {}) + }) + } + + fn start(&mut self) -> impl CallStep { + self.do_call( + Method::POST, + |start::Args { + name, + hash, + class, + size, + }| { + ( + c!("/uploads"), + requests::Start { + file_name: name, + hash, + class, + size, + }, + ) + }, + ) + } + + fn abort(&mut self) -> impl CallStep { + self.do_call(Method::DELETE, |abort::Args { upload }| { + (c!("/uploads/{upload}"), requests::Abort {}) + }) + } + + fn finish(&mut self) -> impl CallStep { + FinishUpload { client: self } + } +} diff --git a/http/src/format.rs b/http/src/format.rs index d1bf7e0..2c54fb5 100644 --- a/http/src/format.rs +++ b/http/src/format.rs @@ -18,6 +18,7 @@ impl Format { match mime { "application/json" => Ok(Json), + "*/*" => Ok(Json), _ => Err(E), } } diff --git a/http/src/requests/mod.rs b/http/src/requests/mod.rs index 8bd9d12..d3db682 100644 --- a/http/src/requests/mod.rs +++ b/http/src/requests/mod.rs @@ -1,8 +1,6 @@ pub mod status_code; -pub trait Request: - Send + Sync + 'static + for<'de> serde::Deserialize<'de> + serde::Serialize -{ +pub trait Request: Send + Sync + 'static { type Response: IsResponse; type Error: IsResponse + std::fmt::Display; } diff --git a/http/src/requests/uploads.rs b/http/src/requests/uploads.rs index 309898d..6e0eaf0 100644 --- a/http/src/requests/uploads.rs +++ b/http/src/requests/uploads.rs @@ -4,7 +4,7 @@ use eva::data; use crate::requests::status_code; -use viendesu_core::{errors, requests::uploads as reqs, types::file}; +use viendesu_core::{errors, requests::uploads as reqs, types::{file, upload}, uploads::UploadStream}; #[data] pub struct ListPending {} @@ -16,7 +16,7 @@ status_code::map!(reqs::list_pending::Err => []); #[data] pub struct Start { pub file_name: Option, - pub hash: file::Hash, + pub hash: Option, pub class: file::ClassKind, pub size: NonZeroU64, } @@ -25,8 +25,10 @@ impl_req!(Start => [reqs::start::Ok; reqs::start::Err]); status_code::direct!(reqs::start::Ok => CREATED); status_code::map!(reqs::start::Err => [QuotaExceeded, SimUpQuotaExceeded]); -#[data] -pub struct Finish {} +pub struct Finish { + pub id: upload::Id, + pub stream: UploadStream<'static>, +} impl_req!(Finish => [reqs::finish::Ok; reqs::finish::Err]); status_code::direct!(reqs::finish::Ok => OK); diff --git a/http/src/server/context.rs b/http/src/server/context.rs index 8cae6d8..1dfb390 100644 --- a/http/src/server/context.rs +++ b/http/src/server/context.rs @@ -21,7 +21,7 @@ use viendesu_core::errors::{Aux, AuxResult}; #[non_exhaustive] pub struct Context { pub request: R, - pub parts: Parts, + pub parts: Option, pub response_format: Format, } @@ -30,15 +30,18 @@ impl Context { where P: for<'de> Deserialize<'de> + Send + 'static, { - extract::path(&mut self.parts).await + extract::path(self.parts.as_mut().unwrap()).await } pub fn query<'this, T: serde::Deserialize<'this>>(&'this self) -> AuxResult { - extract::query(&self.parts) + extract::query(self.parts.as_ref().unwrap()) } } -pub async fn load_args(req: AxumRequest) -> Result, AxumResponse> { +pub async fn load_args(req: AxumRequest) -> Result, AxumResponse> +where + R: for<'de> serde::Deserialize<'de>, +{ let (parts, body) = req.with_limited_body().into_parts(); let response_format = @@ -79,7 +82,7 @@ pub async fn load_args(req: AxumRequest) -> Result, Ok(Context { request, - parts, + parts: Some(parts), response_format, }) } diff --git a/http/src/server/handler.rs b/http/src/server/handler.rs index 78c3493..a80a1df 100644 --- a/http/src/server/handler.rs +++ b/http/src/server/handler.rs @@ -1,5 +1,7 @@ use eva::fut::Fut; +use serde::Deserialize; + use std::{marker::PhantomData, sync::Arc}; use axum::{ @@ -123,9 +125,12 @@ impl> FinishedHandler> FinishedHandler(make_request: M) -> FinishedHandler> where - R: ServerRequest, + R: ServerRequest + for<'de> Deserialize<'de>, T: Types, M: MakeRequest, { @@ -153,7 +158,7 @@ where pub fn post(make_request: M) -> FinishedHandler> where - R: ServerRequest, + R: ServerRequest + for<'de> Deserialize<'de>, T: Types, M: MakeRequest, { @@ -162,7 +167,7 @@ where pub fn patch(make_request: M) -> FinishedHandler> where - R: ServerRequest, + R: ServerRequest + for<'de> Deserialize<'de>, T: Types, M: MakeRequest, { @@ -171,7 +176,7 @@ where pub fn delete(make_request: M) -> FinishedHandler> where - R: ServerRequest, + R: ServerRequest + for<'de> Deserialize<'de>, T: Types, M: MakeRequest, { diff --git a/http/src/server/routes/uploads.rs b/http/src/server/routes/uploads.rs index d9828e4..db38d7d 100644 --- a/http/src/server/routes/uploads.rs +++ b/http/src/server/routes/uploads.rs @@ -1,5 +1,164 @@ use super::*; +use crate::{ + requests::uploads::{Abort, Finish, ListPending, Start}, + server::{ + handler::{Handler, delete, get, post}, + request::extract, + response, + }, +}; + +use futures::stream; +use tokio::{sync::mpsc, task::JoinHandle}; + +use viendesu_core::{ + errors::Aux, + requests::uploads::{abort, finish, list_pending, start}, + service::uploads::Uploads as _, + types::upload, + uploads::{AbortReason, Chunk, UploadStream}, +}; + +use axum::{ + extract::{FromRequest, Multipart, Request as AxumRequest}, + response::Response as AxumResponse, +}; + +async fn populate_stream(req: AxumRequest, tx: &mpsc::Sender) -> Result<(), Aux> { + let mut multipart = Multipart::from_request(req, &()) + .await + .map_err(|e| Aux::Deserialization(format!("failed to load multipart request: {e}")))?; + + let Some(mut field) = multipart + .next_field() + .await + .map_err(|e| Aux::Deserialization(format!("failed to read multipart field: {e}")))? + else { + return Err(Aux::Deserialization("expected one multipart field".into())); + }; + + while let Some(chunk) = field + .chunk() + .await + .map_err(|e| Aux::Deserialization(format!("failed to read multipart chunk: {e}")))? + { + if tx.send(Chunk::Data(chunk)).await.is_err() { + break; + } + } + + Ok(()) +} + +async fn load_upload_context(request: AxumRequest) -> Result, AxumResponse> { + struct State { + handle: JoinHandle<()>, + rx: mpsc::Receiver, + } + + impl Drop for State { + fn drop(&mut self) { + // FUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUCK. + // Most likely it's actually unneeded, BUT + // let's not leave dangling tasks for whatever reason. + self.handle.abort(); + } + } + + let (mut parts, body) = request.into_parts(); + let id: upload::Id = extract::path(&mut parts).await?; + let response_format = + extract::response_format(&parts).map_err(|e| response::err(Default::default(), e))?; + + // This fucking sucks to the extent I can't express with words, the way its done + // is horrible, the reason behind this is even more, I wish authors of axum very pleasant + // requirement of self-referrential types. + // + // + // Ногами. + let (tx, rx) = mpsc::channel(1); + let handle = tokio::spawn(async move { + let tx = tx; + if let Err(e) = populate_stream(AxumRequest::from_parts(parts, body), &tx).await { + _ = tx + .send(Chunk::Aborted(AbortReason::Other(format!("{e}")))) + .await; + } + }); + let stream = stream::unfold(State { rx, handle }, async move |mut state| { + if let Some(c) = state.rx.recv().await { + Some((c, state)) + } else { + None + } + }); + + Ok(Ctx { + request: Finish { + id, + stream: UploadStream::unknown_size(Box::pin(stream)), + }, + parts: None, + response_format, + }) +} + pub fn make(router: RouterScope) -> RouterScope { router + .route( + "/", + get(async |mut session: SessionOf, ctx: Ctx| { + let ListPending {} = ctx.request; + session + .uploads() + .list_pending() + .call(list_pending::Args {}) + .await + }), + ) + .route( + "/{id}", + Handler::post(load_upload_context).exec( + async |mut session: SessionOf, ctx: Ctx| { + let Finish { id, stream } = ctx.request; + + session + .uploads() + .finish() + .call(finish::Args { id, stream }) + .await + }, + ), + ) + .route( + "/{id}", + delete(async |mut session: SessionOf, mut ctx: Ctx| { + let upload: upload::Id = ctx.path().await?; + let Abort {} = ctx.request; + + session.uploads().abort().call(abort::Args { upload }).await + }), + ) + .route( + "/", + post(async |mut session: SessionOf, ctx: Ctx| { + let Start { + file_name, + hash, + class, + size, + } = ctx.request; + session + .uploads() + .start() + .call(start::Args { + name: file_name, + hash, + class, + size, + }) + .await + }), + ) }