diff --git a/Cargo.lock b/Cargo.lock index e8ec6f0..a452fc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3981,6 +3981,7 @@ dependencies = [ "sanitize-filename", "serde", "serde_json", + "tokio 1.20.0", ] [[package]] diff --git a/forms/multipart-s3/.env.example b/forms/multipart-s3/.env.example index 8debcff..f5efa7f 100644 --- a/forms/multipart-s3/.env.example +++ b/forms/multipart-s3/.env.example @@ -1,4 +1,6 @@ -AWS_REGION= +RUST_LOG=multipart=info,aws=warn,info + +AWS_REGION=us-east-1 AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= AWS_S3_BUCKET_NAME= diff --git a/forms/multipart-s3/Cargo.toml b/forms/multipart-s3/Cargo.toml index 9dcbe7d..de5e9ac 100644 --- a/forms/multipart-s3/Cargo.toml +++ b/forms/multipart-s3/Cargo.toml @@ -18,3 +18,4 @@ log = "0.4" sanitize-filename = "0.4" serde = { version = "1", features = ["derive"] } serde_json = "1" +tokio = { version = "1.13.1", features = ["io-util", "fs"] } diff --git a/forms/multipart-s3/README.md b/forms/multipart-s3/README.md index 9a3c988..f0a169c 100644 --- a/forms/multipart-s3/README.md +++ b/forms/multipart-s3/README.md @@ -1,7 +1,6 @@ # Multipart + AWS S3 -Upload a file in multipart form to AWS S3 using [AWS S3 SDK](https://crates.io/crates/aws-sdk-s3). -Receive multiple data in multipart form in JSON format and receive it as a struct. +Upload a file in multipart form to AWS S3 using [AWS S3 SDK](https://crates.io/crates/aws-sdk-s3). # Usage @@ -9,13 +8,31 @@ Receive multiple data in multipart form in JSON format and receive it as a struc cd forms/multipart-s3 ``` -1. copy .env.example .env -1. edit .env AWS_ACCESS_KEY_ID=your_key -1. edit .env AWS_SECRET_ACCESS_KEY=your_key -1. edit .env AWS_S3_BUCKET_NAME=your_chosen_region +1. copy `.env.example` to `.env` +1. edit `.env` key `AWS_REGION` = your_bucket_region +1. edit `.env` key `AWS_ACCESS_KEY_ID` = your_key_id +1. edit `.env` key `AWS_SECRET_ACCESS_KEY` = your_key_secret +1. edit `.env` key `AWS_S3_BUCKET_NAME` = your_bucket_name ```sh cargo run ``` - +Go to in you browser. + +Or, start the upload using [HTTPie]: + +```sh +http --form POST :8080/ file@Cargo.toml +http --form POST :8080/ file@Cargo.toml file@README.md meta='{"namespace":"foo"}' +``` + +Or, using cURL: + +```sh +curl -X POST http://localhost:8080/ -F 'file=@Cargo.toml' +curl -X POST http://localhost:8080/ -F 'file=@Cargo.toml' -F 'file=@README.md' -F 'meta={"namespace":"foo"}' +``` + +[httpie]: https://httpie.org +[curl]: https://curl.haxx.se diff --git a/forms/multipart-s3/src/client.rs b/forms/multipart-s3/src/client.rs new file mode 100644 index 0000000..42de2cf --- /dev/null +++ b/forms/multipart-s3/src/client.rs @@ -0,0 +1,105 @@ +use std::env; + +use actix_web::Error; +use aws_config::SdkConfig as AwsConfig; +use aws_sdk_s3::{types::ByteStream, Client as S3Client}; +use futures_util::{stream, StreamExt as _}; +use tokio::{fs, io::AsyncReadExt as _}; + +use crate::{TempFile, UploadedFile}; + +/// S3 client wrapper to expose semantic upload operations. +#[derive(Debug, Clone)] +pub struct Client { + s3: S3Client, + bucket_name: String, +} + +impl Client { + /// Construct S3 client wrapper. + pub fn new(config: &AwsConfig) -> Client { + Client { + s3: S3Client::new(config), + bucket_name: env::var("AWS_S3_BUCKET_NAME").unwrap(), + } + } + + pub fn url(&self, key: &str) -> String { + format!( + "https://{}.s3.{}.amazonaws.com/{key}", + env::var("AWS_S3_BUCKET_NAME").unwrap(), + env::var("AWS_REGION").unwrap(), + ) + } + + pub async fn upload_files( + &self, + temp_files: Vec, + key_prefix: &str, + ) -> Result, Error> { + let uploaded_files = stream::iter(temp_files) + .map(|file| self.upload_and_remove(file, key_prefix)) + // upload files concurrently, up to 2 at a time + .buffer_unordered(2) + .collect() + .await; + + Ok(uploaded_files) + } + + async fn upload_and_remove(&self, file: TempFile, key_prefix: &str) -> UploadedFile { + let uploaded_file = self.upload(&file, key_prefix).await; + file.delete_from_disk().await; + uploaded_file + } + + async fn upload(&self, file: &TempFile, key_prefix: &str) -> UploadedFile { + let filename = file.name(); + let key = format!("{key_prefix}{}", file.name()); + let s3_url = self.put_object_from_file(file.path(), &key).await; + UploadedFile::new(filename, key, s3_url) + } + + async fn put_object_from_file(&self, local_path: &str, key: &str) -> String { + let mut file = fs::File::open(local_path).await.unwrap(); + + let size_estimate = file + .metadata() + .await + .map(|md| md.len()) + .unwrap_or(1024) + .try_into() + .expect("file too big"); + + let mut contents = Vec::with_capacity(size_estimate); + file.read_to_end(&mut contents).await.unwrap(); + + let _res = self + .s3 + .put_object() + .bucket(&self.bucket_name) + .key(key) + .body(ByteStream::from(contents)) + .send() + .await + .expect("Failed to put test object"); + + self.url(key) + } + + pub async fn delete_files(&self, keys: Vec<&str>) { + for key in keys { + self.delete_object(key).await; + } + } + + async fn delete_object(&self, key: &str) { + self.s3 + .delete_object() + .bucket(&self.bucket_name) + .key(key) + .send() + .await + .expect("Couldn't delete object"); + } +} diff --git a/forms/multipart-s3/src/main.rs b/forms/multipart-s3/src/main.rs index b50ef9c..cc6aa3d 100644 --- a/forms/multipart-s3/src/main.rs +++ b/forms/multipart-s3/src/main.rs @@ -1,48 +1,64 @@ use std::fs; use actix_multipart::Multipart; -use actix_web::{middleware::Logger, web, App, Error, HttpResponse, HttpServer, Responder}; +use actix_web::{ + get, middleware::Logger, post, web, App, Error, HttpResponse, HttpServer, Responder, +}; use actix_web_lab::respond::Html; use aws_config::meta::region::RegionProviderChain; use dotenv::dotenv; use serde::{Deserialize, Serialize}; +use serde_json::json; +mod client; +mod temp_file; +mod upload_file; mod utils; -use self::utils::{ - s3::Client, - upload::{save_file as upload_save_file, split_payload, UploadFile}, -}; +use self::client::Client; +use self::temp_file::TempFile; +use self::upload_file::UploadedFile; +use self::utils::split_payload; -#[derive(Deserialize, Serialize, Debug)] -pub struct InpAdd { - pub text: String, - pub number: i32, +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UploadMeta { + namespace: String, } -async fn save_file( +impl Default for UploadMeta { + fn default() -> Self { + Self { + namespace: "default".to_owned(), + } + } +} + +#[post("/")] +async fn upload_to_s3( s3_client: web::Data, mut payload: Multipart, -) -> Result { - let pl = split_payload(&mut payload).await; - println!("bytes={:#?}", pl.0); +) -> Result { + let (data, files) = split_payload(&mut payload).await; + log::info!("bytes = {data:?}"); - let inp_info: InpAdd = serde_json::from_slice(&pl.0).unwrap(); - println!("converter_struct={:#?}", inp_info); - println!("tmpfiles={:#?}", pl.1); + let upload_meta = serde_json::from_slice::(&data).unwrap_or_default(); + log::info!("converter_struct = {upload_meta:?}"); + log::info!("tmp_files = {files:?}"); - // make key - let s3_upload_key = format!("projects/{}/", "posts_id"); + // make key prefix (make sure it ends with a forward slash) + let s3_key_prefix = format!("uploads/{}/", upload_meta.namespace); // create tmp file and upload s3 and remove tmp file - let upload_files: Vec = upload_save_file(&s3_client, pl.1, &s3_upload_key) - .await - .unwrap(); - println!("upload_files={:#?}", upload_files); + let uploaded_files = s3_client.upload_files(files, &s3_key_prefix).await?; - Ok(HttpResponse::Ok().into()) + Ok(HttpResponse::Ok().json(json!({ + "uploadedFiles": uploaded_files, + "meta": upload_meta, + }))) } +#[get("/")] async fn index() -> impl Responder { Html(include_str!("./index.html").to_owned()) } @@ -59,6 +75,8 @@ async fn main() -> std::io::Result<()> { log::info!("configuring S3 client"); let aws_region = RegionProviderChain::default_provider().or_else("us-east-1"); let aws_config = aws_config::from_env().region(aws_region).load().await; + + // create singleton S3 client let s3_client = Client::new(&aws_config); log::info!("using AWS region: {}", aws_config.region().unwrap()); @@ -67,11 +85,8 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { App::new() - .service( - web::resource("/") - .route(web::get().to(index)) - .route(web::post().to(save_file)), - ) + .service(index) + .service(upload_to_s3) .wrap(Logger::default()) .app_data(web::Data::new(s3_client.clone())) }) diff --git a/forms/multipart-s3/src/temp_file.rs b/forms/multipart-s3/src/temp_file.rs new file mode 100644 index 0000000..78a1bf9 --- /dev/null +++ b/forms/multipart-s3/src/temp_file.rs @@ -0,0 +1,35 @@ +use tokio::fs; + +/// Info for a temporary file to be uploaded to S3. +#[derive(Debug, Clone)] +pub struct TempFile { + path: String, + name: String, +} + +impl TempFile { + /// Constructs info container with sanitized file name. + pub fn new(filename: &str) -> TempFile { + let filename = sanitize_filename::sanitize(filename); + + TempFile { + path: format!("./tmp/{filename}"), + name: filename, + } + } + + /// Returns name of temp file. + pub fn name(&self) -> &str { + &self.name + } + + /// Returns path to temp file. + pub fn path(&self) -> &str { + &self.path + } + + /// Deletes temp file from disk. + pub async fn delete_from_disk(self) { + fs::remove_file(&self.path).await.unwrap(); + } +} diff --git a/forms/multipart-s3/src/upload_file.rs b/forms/multipart-s3/src/upload_file.rs new file mode 100644 index 0000000..1f199d0 --- /dev/null +++ b/forms/multipart-s3/src/upload_file.rs @@ -0,0 +1,25 @@ +use serde::Serialize; + +/// Information about an uploaded file. +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct UploadedFile { + filename: String, + s3_key: String, + s3_url: String, +} + +impl UploadedFile { + /// Construct new uploaded file info container. + pub fn new( + filename: impl Into, + s3_key: impl Into, + s3_url: impl Into, + ) -> Self { + Self { + filename: filename.into(), + s3_key: s3_key.into(), + s3_url: s3_url.into(), + } + } +} diff --git a/forms/multipart-s3/src/utils.rs b/forms/multipart-s3/src/utils.rs new file mode 100644 index 0000000..7462d53 --- /dev/null +++ b/forms/multipart-s3/src/utils.rs @@ -0,0 +1,61 @@ +use actix_multipart::{Field, Multipart}; +use actix_web::web::{Bytes, BytesMut}; +use futures_util::StreamExt as _; +use tokio::{fs, io::AsyncWriteExt as _}; + +use crate::TempFile; + +/// Returns tuple of `meta` field contents and a list of temp file info to upload. +pub async fn split_payload(payload: &mut Multipart) -> (Bytes, Vec) { + let mut meta = Bytes::new(); + let mut temp_files = vec![]; + + while let Some(item) = payload.next().await { + let mut field = item.expect("split_payload err"); + let cd = field.content_disposition(); + + if matches!(cd.get_name(), Some(name) if name == "meta") { + // if field name is "meta", just collect those bytes in-memory and return them later + meta = collect_meta(&mut field).await; + } else { + match cd.get_filename() { + Some(filename) => { + // if file has a file name, we stream the field contents into a temp file on + // disk so that large uploads do not exhaust memory + + // create file info + let file_info = TempFile::new(filename); + + // create file on disk from file info + let mut file = fs::File::create(file_info.path()).await.unwrap(); + + // stream field contents to file + while let Some(chunk) = field.next().await { + let data = chunk.unwrap(); + file.write_all(&data).await.unwrap(); + } + + // return file info + temp_files.push(file_info); + } + + None => { + log::warn!("field {:?} is not a file", cd.get_name()); + } + } + } + } + + (meta, temp_files) +} + +async fn collect_meta(field: &mut Field) -> Bytes { + let mut buf = BytesMut::new(); + + while let Some(chunk) = field.next().await { + let chunk = chunk.expect("split_payload err chunk"); + buf.extend(chunk); + } + + buf.freeze() +} diff --git a/forms/multipart-s3/src/utils/mod.rs b/forms/multipart-s3/src/utils/mod.rs deleted file mode 100644 index e23a6f0..0000000 --- a/forms/multipart-s3/src/utils/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod s3; -pub mod upload; diff --git a/forms/multipart-s3/src/utils/s3.rs b/forms/multipart-s3/src/utils/s3.rs deleted file mode 100644 index e36bc17..0000000 --- a/forms/multipart-s3/src/utils/s3.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::{env, fs, io::Read as _}; - -use aws_config::SdkConfig as AwsConfig; -use aws_sdk_s3::{types::ByteStream, Client as S3Client}; - -#[derive(Debug, Clone)] -pub struct Client { - s3: S3Client, - bucket_name: String, -} - -impl Client { - // construct S3 testing client - pub fn new(config: &AwsConfig) -> Client { - Client { - s3: S3Client::new(config), - bucket_name: env::var("AWS_S3_BUCKET_NAME").unwrap(), - } - } - - pub fn url(&self, key: &str) -> String { - format!( - "https://{}.s3.{}.amazonaws.com/{key}", - env::var("AWS_S3_BUCKET_NAME").unwrap(), - env::var("AWS_REGION").unwrap(), - ) - } - - pub async fn put_object(&self, local_path: &str, key: &str) -> String { - let mut file = fs::File::open(local_path).unwrap(); - - let mut contents = - Vec::with_capacity(file.metadata().map(|md| md.len()).unwrap_or(1024) as usize); - file.read_to_end(&mut contents).unwrap(); - - let _res = self - .s3 - .put_object() - .bucket(&self.bucket_name) - .key(key) - .body(ByteStream::from(contents)) - .send() - .await - .expect("Failed to put test object"); - - self.url(key) - } - - pub async fn delete_object(&self, key: &str) { - self.s3 - .delete_object() - .bucket(&self.bucket_name) - .key(key) - .send() - .await - .expect("Couldn't delete object"); - } -} diff --git a/forms/multipart-s3/src/utils/upload.rs b/forms/multipart-s3/src/utils/upload.rs deleted file mode 100644 index df53a19..0000000 --- a/forms/multipart-s3/src/utils/upload.rs +++ /dev/null @@ -1,130 +0,0 @@ -use std::{convert::From, fs, io::Write}; - -use actix_multipart::{Field, Multipart}; -use actix_web::{web, web::Bytes, Error}; -use futures_util::StreamExt as _; -use serde::{Deserialize, Serialize}; - -use crate::utils::s3::Client; - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct UploadFile { - pub filename: String, - pub key: String, - pub url: String, -} - -impl From for UploadFile { - fn from(tmp_file: Tmpfile) -> Self { - UploadFile { - filename: tmp_file.name, - key: tmp_file.s3_key, - url: tmp_file.s3_url, - } - } -} - -/* -1. savefile -2. s3 upload -> upload_data -3. deletefile -*/ -#[derive(Debug, Clone)] -pub struct Tmpfile { - pub name: String, - pub tmp_path: String, - pub s3_key: String, - pub s3_url: String, -} - -impl Tmpfile { - fn new(filename: &str) -> Tmpfile { - Tmpfile { - name: filename.to_string(), - tmp_path: format!("./tmp/{filename}"), - s3_key: "".to_string(), - s3_url: "".to_string(), - } - } - - async fn s3_upload_and_tmp_remove(&mut self, client: &Client, s3_upload_key: &str) { - self.s3_upload(client, s3_upload_key).await; - self.tmp_remove(); - } - - async fn s3_upload(&mut self, client: &Client, s3_upload_key: &str) { - let key = format!("{s3_upload_key}{}", &self.name); - self.s3_key = key.clone(); - let url = client.put_object(&self.tmp_path, &key.clone()).await; - self.s3_url = url; - } - - fn tmp_remove(&self) { - fs::remove_file(&self.tmp_path).unwrap(); - } -} - -pub async fn split_payload(payload: &mut Multipart) -> (Bytes, Vec) { - let mut tmp_files = vec![]; - let mut data = Bytes::new(); - - while let Some(item) = payload.next().await { - let mut field: Field = item.expect(" split_payload err"); - let content_type = field.content_disposition(); - let name = content_type.get_name().unwrap(); - if name == "data" { - while let Some(chunk) = field.next().await { - data = chunk.expect(" split_payload err chunk"); - } - } else { - match content_type.get_filename() { - Some(filename) => { - let tmp_file = Tmpfile::new(&sanitize_filename::sanitize(&filename)); - let tmp_path = tmp_file.tmp_path.clone(); - let mut f = web::block(move || fs::File::create(&tmp_path)) - .await - .unwrap() - .unwrap(); - while let Some(chunk) = field.next().await { - let data = chunk.unwrap(); - f = web::block(move || f.write_all(&data).map(|_| f)) - .await - .unwrap() - .unwrap(); - } - tmp_files.push(tmp_file.clone()); - } - None => { - println!("file none"); - } - } - } - } - (data, tmp_files) -} - -pub async fn save_file( - client: &Client, - tmp_files: Vec, - s3_upload_key: &str, -) -> Result, Error> { - let mut arr: Vec = Vec::with_capacity(tmp_files.len()); - - for item in tmp_files { - let mut tmp_file: Tmpfile = item.clone(); - - tmp_file - .s3_upload_and_tmp_remove(client, s3_upload_key) - .await; - - arr.push(UploadFile::from(tmp_file)); - } - Ok(arr) -} - -#[allow(unused)] -pub async fn delete_object(client: &Client, keys: Vec<&str>) { - for key in keys { - client.delete_object(key).await; - } -}