This commit is contained in:
Aleksandr 2026-01-01 09:37:00 +03:00
parent b828917b4a
commit e3ed43148a
14 changed files with 408 additions and 59 deletions

View file

@ -107,7 +107,7 @@ pub enum Aux {
InvalidRole(#[from] auth::InvalidRole),
/// Internal failure.
#[display("{_0}")]
InternalError(CompactString),
InternalError(String),
/// object store failure.
#[display("{_0}")]
ObjectStore(CompactString),

View file

@ -85,7 +85,7 @@ pub mod abort {
#[data]
pub struct Args {
pub upload: file::Id,
pub upload: upload::Id,
}
#[data(copy)]

View file

@ -35,4 +35,6 @@ impl<S: IsSession> Session<S> {project!{
fn badges() -> marks::Badges;
fn tabs() -> tabs::Tabs;
fn uploads() -> uploads::Uploads;
}}

View file

@ -40,6 +40,7 @@ trait_set! {
+ marks::Genres
+ marks::Badges
+ tabs::Tabs
+ uploads::Uploads
;
}

View file

@ -1,11 +1,73 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use eva::{bytes::Bytes, data};
use futures::stream::BoxStream;
use futures::stream::Stream;
#[data]
pub enum Action {
Push(Bytes),
Abort,
#[data(error)]
pub enum AbortReason {
#[display("the user uploaded more than expected")]
Overuploading,
#[display("{_0}")]
Other(String),
}
pub type UploadStream<'a> = BoxStream<'a, Action>;
#[data]
pub enum Chunk {
Data(Bytes),
Aborted(AbortReason),
}
type BoxedStream<'a> = Pin<Box<dyn Stream<Item = Chunk> + Send + Sync + 'a>>;
pub struct UploadStream<'a> {
stream: BoxedStream<'a>,
left: Option<usize>,
}
impl Unpin for UploadStream<'_> {}
impl<'a> Stream for UploadStream<'a> {
type Item = Chunk;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.left == Some(0) {
return Poll::Ready(None);
}
let this = self.get_mut();
let underlying = Pin::new(&mut this.stream);
let res = Stream::poll_next(underlying, cx);
match &res {
Poll::Ready(Some(Chunk::Data(data))) => {
this.left = this.left.map(|x| x.saturating_sub(data.len()));
}
Poll::Ready(Some(Chunk::Aborted(..))) => {
this.left = Some(0);
}
_ => {}
}
res
}
}
impl<'a> UploadStream<'a> {
pub const fn left(&self) -> Option<usize> {
self.left
}
pub const fn unknown_size(stream: BoxedStream<'a>) -> Self {
Self { stream, left: None }
}
pub const fn known_size(stream: BoxedStream<'a>, total_size: usize) -> Self {
Self {
stream,
left: Some(total_size),
}
}
}

View file

@ -28,12 +28,12 @@ futures.workspace = true
eva.workspace = true
eyre.workspace = true
axum = { optional = true, version = "0.8.4", default-features = false, features = ["http2", "ws", "macros"] }
axum = { optional = true, version = "0.8.4", default-features = false, features = ["http2", "ws", "macros", "multipart"] }
http-body-util = { version = "0.1.3", optional = true }
tower-http = { optional = true, version = "0.6.6", default-features = false, features = ["cors"] }
serde_urlencoded = { version = "0.7.1", optional = true }
tokio = { workspace = true, optional = true }
reqwest = { optional = true, version = "0.12.23", default-features = false }
reqwest = { optional = true, version = "0.12.23", default-features = false, features = ["multipart", "stream"] }
fastrace-axum = { optional = true, version = "0.1.3" }

View file

@ -1,5 +1,7 @@
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use eva::{
error::ShitHappens,
str::{CompactString, format_compact as c},
@ -19,6 +21,7 @@ use viendesu_core::{
messages::Messages,
tabs::Tabs,
threads::Threads,
uploads::Uploads,
users::Users,
},
types::session,
@ -39,6 +42,8 @@ mod users;
mod marks;
mod tabs;
mod uploads;
struct DoRequest<'c, P> {
client: &'c mut HttpClient,
method: Method,
@ -49,7 +54,7 @@ impl<'c, P, C, H> CallStep<C> for DoRequest<'c, P>
where
C: Send + Sync,
P: Send + Sync + FnMut(C) -> (CompactString, H),
H: Request,
H: Request + Serialize,
{
type Ok = H::Response;
type Err = H::Error;
@ -81,15 +86,7 @@ impl HttpClient {
}
}
async fn do_request<R>(
&self,
method: Method,
path: &str,
request: R,
) -> Response<R::Response, R::Error>
where
R: Request,
{
fn endpoint(&self, path: &str) -> String {
let mut endpoint = self.options.endpoint.clone();
if endpoint.ends_with('/') {
endpoint.push_str(path.strip_prefix('/').unwrap_or(path));
@ -97,7 +94,42 @@ impl HttpClient {
endpoint.push_str(path);
}
let mut req = self.client.request(method.clone(), endpoint);
endpoint
}
fn load_response<O, E>(&self, data: &[u8]) -> Response<O, E>
where
O: for<'de> Deserialize<'de>,
E: for<'de> Deserialize<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum GenericResponse<O, E> {
Success { ok: O },
Error { error: errors::Generic<E> },
}
let resp: GenericResponse<O, E> =
self.options.format.load(data).map_err(|e| {
Aux::Deserialization(format!("failed to deserialize response: {e:#}"))
})?;
match resp {
GenericResponse::Error { error } => Err(error),
GenericResponse::Success { ok } => Ok(ok),
}
}
async fn do_request<R>(
&self,
method: Method,
path: &str,
request: R,
) -> Response<R::Response, R::Error>
where
R: Request + Serialize,
{
let mut req = self.client.request(method.clone(), self.endpoint(path));
if method == Method::GET {
req = req.query(&request);
} else {
@ -111,35 +143,19 @@ impl HttpClient {
}
if let Some(sess) = self.session {
req = req.header("authorization", format!("Bearer {}", sess.to_str()));
req = req.bearer_auth(sess);
}
let response = self
.client
.execute(req.build().expect("shit happens"))
.await
.map_err(|e| Aux::InternalError(c!("failed to make request: {e:#}")))?;
.map_err(|e| Aux::InternalError(format!("failed to make request: {e:#}")))?;
let bytes = response
.bytes()
.await
.map_err(|e| Aux::InternalError(c!("failed to read bytes: {e:#}")))?;
#[derive(serde::Deserialize)]
#[serde(untagged)]
enum GenericResponse<O, E> {
Success { ok: O },
Error { error: errors::Generic<E> },
}
let resp: GenericResponse<R::Response, R::Error> =
self.options.format.load(&bytes).map_err(|e| {
Aux::Deserialization(format!("failed to deserialize response: {e:#}"))
})?;
match resp {
GenericResponse::Error { error } => Err(error),
GenericResponse::Success { ok } => Ok(ok),
}
.map_err(|e| Aux::InternalError(format!("failed to read bytes: {e:#}")))?;
self.load_response(&bytes)
}
}

100
http/src/client/uploads.rs Normal file
View file

@ -0,0 +1,100 @@
use super::*;
use futures::stream::StreamExt as _;
use viendesu_core::{
requests::uploads::{abort, finish, list_pending, start},
uploads::Chunk,
};
use crate::requests::uploads as requests;
struct FinishUpload<'a> {
client: &'a mut HttpClient,
}
impl CallStep<finish::Args> for FinishUpload<'_> {
type Ok = finish::Ok;
type Err = finish::Err;
async fn call(&mut self, args: finish::Args) -> Response<Self::Ok, Self::Err> {
use reqwest::multipart;
let finish::Args { id, stream } = args;
// I don't know why anyone would re-use same stream, but let it be.
let content_length = stream.left();
let endpoint = self.client.endpoint(&c!("/uploads/{id}"));
let mut request = self.client.client.post(endpoint);
if let Some(session) = self.client.session {
request = request.bearer_auth(session);
}
let body = reqwest::Body::wrap_stream(stream.map(|c| match c {
Chunk::Aborted(error) => Err(error),
Chunk::Data(data) => Ok(data),
}));
request = request.multipart(multipart::Form::new().part(
"file",
if let Some(size) = content_length {
multipart::Part::stream_with_length(body, size as u64)
} else {
multipart::Part::stream(body)
},
));
let response = self
.client
.client
.execute(request.build().expect("shit happens"))
.await
.map_err(|e| Aux::InternalError(format!("failed to make request: {e}")))?;
let bytes = response
.bytes()
.await
.map_err(|e| Aux::InternalError(format!("failed to read response: {e}")))?;
self.client.load_response(&bytes)
}
}
impl Uploads for HttpClient {
fn list_pending(
&mut self,
) -> impl CallStep<list_pending::Args, Ok = list_pending::Ok, Err = list_pending::Err> {
self.do_call(Method::GET, |list_pending::Args {}| {
(c!("/uploads"), requests::ListPending {})
})
}
fn start(&mut self) -> impl CallStep<start::Args, Ok = start::Ok, Err = start::Err> {
self.do_call(
Method::POST,
|start::Args {
name,
hash,
class,
size,
}| {
(
c!("/uploads"),
requests::Start {
file_name: name,
hash,
class,
size,
},
)
},
)
}
fn abort(&mut self) -> impl CallStep<abort::Args, Ok = abort::Ok, Err = abort::Err> {
self.do_call(Method::DELETE, |abort::Args { upload }| {
(c!("/uploads/{upload}"), requests::Abort {})
})
}
fn finish(&mut self) -> impl CallStep<finish::Args, Ok = finish::Ok, Err = finish::Err> {
FinishUpload { client: self }
}
}

View file

@ -18,6 +18,7 @@ impl Format {
match mime {
"application/json" => Ok(Json),
"*/*" => Ok(Json),
_ => Err(E),
}
}

View file

@ -1,8 +1,6 @@
pub mod status_code;
pub trait Request:
Send + Sync + 'static + for<'de> serde::Deserialize<'de> + serde::Serialize
{
pub trait Request: Send + Sync + 'static {
type Response: IsResponse;
type Error: IsResponse + std::fmt::Display;
}

View file

@ -4,7 +4,7 @@ use eva::data;
use crate::requests::status_code;
use viendesu_core::{errors, requests::uploads as reqs, types::file};
use viendesu_core::{errors, requests::uploads as reqs, types::{file, upload}, uploads::UploadStream};
#[data]
pub struct ListPending {}
@ -16,7 +16,7 @@ status_code::map!(reqs::list_pending::Err => []);
#[data]
pub struct Start {
pub file_name: Option<file::BaseName>,
pub hash: file::Hash,
pub hash: Option<file::Hash>,
pub class: file::ClassKind,
pub size: NonZeroU64,
}
@ -25,8 +25,10 @@ impl_req!(Start => [reqs::start::Ok; reqs::start::Err]);
status_code::direct!(reqs::start::Ok => CREATED);
status_code::map!(reqs::start::Err => [QuotaExceeded, SimUpQuotaExceeded]);
#[data]
pub struct Finish {}
pub struct Finish {
pub id: upload::Id,
pub stream: UploadStream<'static>,
}
impl_req!(Finish => [reqs::finish::Ok; reqs::finish::Err]);
status_code::direct!(reqs::finish::Ok => OK);

View file

@ -21,7 +21,7 @@ use viendesu_core::errors::{Aux, AuxResult};
#[non_exhaustive]
pub struct Context<R: ServerRequest> {
pub request: R,
pub parts: Parts,
pub parts: Option<Parts>,
pub response_format: Format,
}
@ -30,15 +30,18 @@ impl<R: ServerRequest> Context<R> {
where
P: for<'de> Deserialize<'de> + Send + 'static,
{
extract::path(&mut self.parts).await
extract::path(self.parts.as_mut().unwrap()).await
}
pub fn query<'this, T: serde::Deserialize<'this>>(&'this self) -> AuxResult<T> {
extract::query(&self.parts)
extract::query(self.parts.as_ref().unwrap())
}
}
pub async fn load_args<R: ServerRequest>(req: AxumRequest) -> Result<Context<R>, AxumResponse> {
pub async fn load_args<R: ServerRequest>(req: AxumRequest) -> Result<Context<R>, AxumResponse>
where
R: for<'de> serde::Deserialize<'de>,
{
let (parts, body) = req.with_limited_body().into_parts();
let response_format =
@ -79,7 +82,7 @@ pub async fn load_args<R: ServerRequest>(req: AxumRequest) -> Result<Context<R>,
Ok(Context {
request,
parts,
parts: Some(parts),
response_format,
})
}

View file

@ -1,5 +1,7 @@
use eva::fut::Fut;
use serde::Deserialize;
use std::{marker::PhantomData, sync::Arc};
use axum::{
@ -123,9 +125,12 @@ impl<R: ServerRequest, M, T: Types, Cx: MakeContext<R>> FinishedHandler<R, M, T,
.await
.map_err(|e| response::err(resp_format, e))?;
if let Some(token) = extract::session_token(&context.parts)
.map_err(|e| response::err(resp_format, e))?
if let Some(token) = context
.parts
.as_ref()
.and_then(|p| extract::session_token(p).transpose())
{
let token = token.map_err(|e| response::err(resp_format, e))?;
session
.authz()
.authenticate(token)
@ -144,7 +149,7 @@ impl<R: ServerRequest, M, T: Types, Cx: MakeContext<R>> FinishedHandler<R, M, T,
pub fn get<R, T, M>(make_request: M) -> FinishedHandler<R, M, T, impl MakeContext<R>>
where
R: ServerRequest,
R: ServerRequest + for<'de> Deserialize<'de>,
T: Types,
M: MakeRequest<T, R>,
{
@ -153,7 +158,7 @@ where
pub fn post<R, T, M>(make_request: M) -> FinishedHandler<R, M, T, impl MakeContext<R>>
where
R: ServerRequest,
R: ServerRequest + for<'de> Deserialize<'de>,
T: Types,
M: MakeRequest<T, R>,
{
@ -162,7 +167,7 @@ where
pub fn patch<R, T, M>(make_request: M) -> FinishedHandler<R, M, T, impl MakeContext<R>>
where
R: ServerRequest,
R: ServerRequest + for<'de> Deserialize<'de>,
T: Types,
M: MakeRequest<T, R>,
{
@ -171,7 +176,7 @@ where
pub fn delete<R, T, M>(make_request: M) -> FinishedHandler<R, M, T, impl MakeContext<R>>
where
R: ServerRequest,
R: ServerRequest + for<'de> Deserialize<'de>,
T: Types,
M: MakeRequest<T, R>,
{

View file

@ -1,5 +1,164 @@
use super::*;
use crate::{
requests::uploads::{Abort, Finish, ListPending, Start},
server::{
handler::{Handler, delete, get, post},
request::extract,
response,
},
};
use futures::stream;
use tokio::{sync::mpsc, task::JoinHandle};
use viendesu_core::{
errors::Aux,
requests::uploads::{abort, finish, list_pending, start},
service::uploads::Uploads as _,
types::upload,
uploads::{AbortReason, Chunk, UploadStream},
};
use axum::{
extract::{FromRequest, Multipart, Request as AxumRequest},
response::Response as AxumResponse,
};
async fn populate_stream(req: AxumRequest, tx: &mpsc::Sender<Chunk>) -> Result<(), Aux> {
let mut multipart = Multipart::from_request(req, &())
.await
.map_err(|e| Aux::Deserialization(format!("failed to load multipart request: {e}")))?;
let Some(mut field) = multipart
.next_field()
.await
.map_err(|e| Aux::Deserialization(format!("failed to read multipart field: {e}")))?
else {
return Err(Aux::Deserialization("expected one multipart field".into()));
};
while let Some(chunk) = field
.chunk()
.await
.map_err(|e| Aux::Deserialization(format!("failed to read multipart chunk: {e}")))?
{
if tx.send(Chunk::Data(chunk)).await.is_err() {
break;
}
}
Ok(())
}
async fn load_upload_context(request: AxumRequest) -> Result<Ctx<Finish>, AxumResponse> {
struct State {
handle: JoinHandle<()>,
rx: mpsc::Receiver<Chunk>,
}
impl Drop for State {
fn drop(&mut self) {
// FUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUUCK.
// Most likely it's actually unneeded, BUT
// let's not leave dangling tasks for whatever reason.
self.handle.abort();
}
}
let (mut parts, body) = request.into_parts();
let id: upload::Id = extract::path(&mut parts).await?;
let response_format =
extract::response_format(&parts).map_err(|e| response::err(Default::default(), e))?;
// This fucking sucks to the extent I can't express with words, the way its done
// is horrible, the reason behind this is even more, I wish authors of axum very pleasant
// requirement of self-referrential types.
//
//
// Ногами.
let (tx, rx) = mpsc::channel(1);
let handle = tokio::spawn(async move {
let tx = tx;
if let Err(e) = populate_stream(AxumRequest::from_parts(parts, body), &tx).await {
_ = tx
.send(Chunk::Aborted(AbortReason::Other(format!("{e}"))))
.await;
}
});
let stream = stream::unfold(State { rx, handle }, async move |mut state| {
if let Some(c) = state.rx.recv().await {
Some((c, state))
} else {
None
}
});
Ok(Ctx {
request: Finish {
id,
stream: UploadStream::unknown_size(Box::pin(stream)),
},
parts: None,
response_format,
})
}
pub fn make<T: Types>(router: RouterScope<T>) -> RouterScope<T> {
router
.route(
"/",
get(async |mut session: SessionOf<T>, ctx: Ctx<ListPending>| {
let ListPending {} = ctx.request;
session
.uploads()
.list_pending()
.call(list_pending::Args {})
.await
}),
)
.route(
"/{id}",
Handler::post(load_upload_context).exec(
async |mut session: SessionOf<T>, ctx: Ctx<Finish>| {
let Finish { id, stream } = ctx.request;
session
.uploads()
.finish()
.call(finish::Args { id, stream })
.await
},
),
)
.route(
"/{id}",
delete(async |mut session: SessionOf<T>, mut ctx: Ctx<Abort>| {
let upload: upload::Id = ctx.path().await?;
let Abort {} = ctx.request;
session.uploads().abort().call(abort::Args { upload }).await
}),
)
.route(
"/",
post(async |mut session: SessionOf<T>, ctx: Ctx<Start>| {
let Start {
file_name,
hash,
class,
size,
} = ctx.request;
session
.uploads()
.start()
.call(start::Args {
name: file_name,
hash,
class,
size,
})
.await
}),
)
}