mirror of
https://github.com/actix/examples
synced 2024-11-23 22:41:07 +01:00
return uploaded files information in s3 example
This commit is contained in:
parent
4840cfdb68
commit
c8a4544ea1
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -3981,6 +3981,7 @@ dependencies = [
|
||||
"sanitize-filename",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio 1.20.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -1,4 +1,6 @@
|
||||
AWS_REGION=
|
||||
RUST_LOG=multipart=info,aws=warn,info
|
||||
|
||||
AWS_REGION=us-east-1
|
||||
AWS_ACCESS_KEY_ID=
|
||||
AWS_SECRET_ACCESS_KEY=
|
||||
AWS_S3_BUCKET_NAME=
|
||||
|
@ -18,3 +18,4 @@ log = "0.4"
|
||||
sanitize-filename = "0.4"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
tokio = { version = "1.13.1", features = ["io-util", "fs"] }
|
||||
|
@ -1,7 +1,6 @@
|
||||
# Multipart + AWS S3
|
||||
|
||||
Upload a file in multipart form to AWS S3 using [AWS S3 SDK](https://crates.io/crates/aws-sdk-s3).
|
||||
Receive multiple data in multipart form in JSON format and receive it as a struct.
|
||||
|
||||
# Usage
|
||||
|
||||
@ -9,13 +8,31 @@ Receive multiple data in multipart form in JSON format and receive it as a struc
|
||||
cd forms/multipart-s3
|
||||
```
|
||||
|
||||
1. copy .env.example .env
|
||||
1. edit .env AWS_ACCESS_KEY_ID=your_key
|
||||
1. edit .env AWS_SECRET_ACCESS_KEY=your_key
|
||||
1. edit .env AWS_S3_BUCKET_NAME=your_chosen_region
|
||||
1. copy `.env.example` to `.env`
|
||||
1. edit `.env` key `AWS_REGION` = your_bucket_region
|
||||
1. edit `.env` key `AWS_ACCESS_KEY_ID` = your_key_id
|
||||
1. edit `.env` key `AWS_SECRET_ACCESS_KEY` = your_key_secret
|
||||
1. edit `.env` key `AWS_S3_BUCKET_NAME` = your_bucket_name
|
||||
|
||||
```sh
|
||||
cargo run
|
||||
```
|
||||
|
||||
<http://localhost:8080>
|
||||
Go to <http://localhost:8080> in you browser.
|
||||
|
||||
Or, start the upload using [HTTPie]:
|
||||
|
||||
```sh
|
||||
http --form POST :8080/ file@Cargo.toml
|
||||
http --form POST :8080/ file@Cargo.toml file@README.md meta='{"namespace":"foo"}'
|
||||
```
|
||||
|
||||
Or, using cURL:
|
||||
|
||||
```sh
|
||||
curl -X POST http://localhost:8080/ -F 'file=@Cargo.toml'
|
||||
curl -X POST http://localhost:8080/ -F 'file=@Cargo.toml' -F 'file=@README.md' -F 'meta={"namespace":"foo"}'
|
||||
```
|
||||
|
||||
[httpie]: https://httpie.org
|
||||
[curl]: https://curl.haxx.se
|
||||
|
105
forms/multipart-s3/src/client.rs
Normal file
105
forms/multipart-s3/src/client.rs
Normal file
@ -0,0 +1,105 @@
|
||||
use std::env;
|
||||
|
||||
use actix_web::Error;
|
||||
use aws_config::SdkConfig as AwsConfig;
|
||||
use aws_sdk_s3::{types::ByteStream, Client as S3Client};
|
||||
use futures_util::{stream, StreamExt as _};
|
||||
use tokio::{fs, io::AsyncReadExt as _};
|
||||
|
||||
use crate::{TempFile, UploadedFile};
|
||||
|
||||
/// S3 client wrapper to expose semantic upload operations.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
s3: S3Client,
|
||||
bucket_name: String,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Construct S3 client wrapper.
|
||||
pub fn new(config: &AwsConfig) -> Client {
|
||||
Client {
|
||||
s3: S3Client::new(config),
|
||||
bucket_name: env::var("AWS_S3_BUCKET_NAME").unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn url(&self, key: &str) -> String {
|
||||
format!(
|
||||
"https://{}.s3.{}.amazonaws.com/{key}",
|
||||
env::var("AWS_S3_BUCKET_NAME").unwrap(),
|
||||
env::var("AWS_REGION").unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn upload_files(
|
||||
&self,
|
||||
temp_files: Vec<TempFile>,
|
||||
key_prefix: &str,
|
||||
) -> Result<Vec<UploadedFile>, Error> {
|
||||
let uploaded_files = stream::iter(temp_files)
|
||||
.map(|file| self.upload_and_remove(file, key_prefix))
|
||||
// upload files concurrently, up to 2 at a time
|
||||
.buffer_unordered(2)
|
||||
.collect()
|
||||
.await;
|
||||
|
||||
Ok(uploaded_files)
|
||||
}
|
||||
|
||||
async fn upload_and_remove(&self, file: TempFile, key_prefix: &str) -> UploadedFile {
|
||||
let uploaded_file = self.upload(&file, key_prefix).await;
|
||||
file.delete_from_disk().await;
|
||||
uploaded_file
|
||||
}
|
||||
|
||||
async fn upload(&self, file: &TempFile, key_prefix: &str) -> UploadedFile {
|
||||
let filename = file.name();
|
||||
let key = format!("{key_prefix}{}", file.name());
|
||||
let s3_url = self.put_object_from_file(file.path(), &key).await;
|
||||
UploadedFile::new(filename, key, s3_url)
|
||||
}
|
||||
|
||||
async fn put_object_from_file(&self, local_path: &str, key: &str) -> String {
|
||||
let mut file = fs::File::open(local_path).await.unwrap();
|
||||
|
||||
let size_estimate = file
|
||||
.metadata()
|
||||
.await
|
||||
.map(|md| md.len())
|
||||
.unwrap_or(1024)
|
||||
.try_into()
|
||||
.expect("file too big");
|
||||
|
||||
let mut contents = Vec::with_capacity(size_estimate);
|
||||
file.read_to_end(&mut contents).await.unwrap();
|
||||
|
||||
let _res = self
|
||||
.s3
|
||||
.put_object()
|
||||
.bucket(&self.bucket_name)
|
||||
.key(key)
|
||||
.body(ByteStream::from(contents))
|
||||
.send()
|
||||
.await
|
||||
.expect("Failed to put test object");
|
||||
|
||||
self.url(key)
|
||||
}
|
||||
|
||||
pub async fn delete_files(&self, keys: Vec<&str>) {
|
||||
for key in keys {
|
||||
self.delete_object(key).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete_object(&self, key: &str) {
|
||||
self.s3
|
||||
.delete_object()
|
||||
.bucket(&self.bucket_name)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
.expect("Couldn't delete object");
|
||||
}
|
||||
}
|
@ -1,48 +1,64 @@
|
||||
use std::fs;
|
||||
|
||||
use actix_multipart::Multipart;
|
||||
use actix_web::{middleware::Logger, web, App, Error, HttpResponse, HttpServer, Responder};
|
||||
use actix_web::{
|
||||
get, middleware::Logger, post, web, App, Error, HttpResponse, HttpServer, Responder,
|
||||
};
|
||||
use actix_web_lab::respond::Html;
|
||||
use aws_config::meta::region::RegionProviderChain;
|
||||
use dotenv::dotenv;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
|
||||
mod client;
|
||||
mod temp_file;
|
||||
mod upload_file;
|
||||
mod utils;
|
||||
|
||||
use self::utils::{
|
||||
s3::Client,
|
||||
upload::{save_file as upload_save_file, split_payload, UploadFile},
|
||||
};
|
||||
use self::client::Client;
|
||||
use self::temp_file::TempFile;
|
||||
use self::upload_file::UploadedFile;
|
||||
use self::utils::split_payload;
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
pub struct InpAdd {
|
||||
pub text: String,
|
||||
pub number: i32,
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct UploadMeta {
|
||||
namespace: String,
|
||||
}
|
||||
|
||||
async fn save_file(
|
||||
impl Default for UploadMeta {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
namespace: "default".to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[post("/")]
|
||||
async fn upload_to_s3(
|
||||
s3_client: web::Data<Client>,
|
||||
mut payload: Multipart,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let pl = split_payload(&mut payload).await;
|
||||
println!("bytes={:#?}", pl.0);
|
||||
) -> Result<impl Responder, Error> {
|
||||
let (data, files) = split_payload(&mut payload).await;
|
||||
log::info!("bytes = {data:?}");
|
||||
|
||||
let inp_info: InpAdd = serde_json::from_slice(&pl.0).unwrap();
|
||||
println!("converter_struct={:#?}", inp_info);
|
||||
println!("tmpfiles={:#?}", pl.1);
|
||||
let upload_meta = serde_json::from_slice::<UploadMeta>(&data).unwrap_or_default();
|
||||
log::info!("converter_struct = {upload_meta:?}");
|
||||
log::info!("tmp_files = {files:?}");
|
||||
|
||||
// make key
|
||||
let s3_upload_key = format!("projects/{}/", "posts_id");
|
||||
// make key prefix (make sure it ends with a forward slash)
|
||||
let s3_key_prefix = format!("uploads/{}/", upload_meta.namespace);
|
||||
|
||||
// create tmp file and upload s3 and remove tmp file
|
||||
let upload_files: Vec<UploadFile> = upload_save_file(&s3_client, pl.1, &s3_upload_key)
|
||||
.await
|
||||
.unwrap();
|
||||
println!("upload_files={:#?}", upload_files);
|
||||
let uploaded_files = s3_client.upload_files(files, &s3_key_prefix).await?;
|
||||
|
||||
Ok(HttpResponse::Ok().into())
|
||||
Ok(HttpResponse::Ok().json(json!({
|
||||
"uploadedFiles": uploaded_files,
|
||||
"meta": upload_meta,
|
||||
})))
|
||||
}
|
||||
|
||||
#[get("/")]
|
||||
async fn index() -> impl Responder {
|
||||
Html(include_str!("./index.html").to_owned())
|
||||
}
|
||||
@ -59,6 +75,8 @@ async fn main() -> std::io::Result<()> {
|
||||
log::info!("configuring S3 client");
|
||||
let aws_region = RegionProviderChain::default_provider().or_else("us-east-1");
|
||||
let aws_config = aws_config::from_env().region(aws_region).load().await;
|
||||
|
||||
// create singleton S3 client
|
||||
let s3_client = Client::new(&aws_config);
|
||||
|
||||
log::info!("using AWS region: {}", aws_config.region().unwrap());
|
||||
@ -67,11 +85,8 @@ async fn main() -> std::io::Result<()> {
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.service(
|
||||
web::resource("/")
|
||||
.route(web::get().to(index))
|
||||
.route(web::post().to(save_file)),
|
||||
)
|
||||
.service(index)
|
||||
.service(upload_to_s3)
|
||||
.wrap(Logger::default())
|
||||
.app_data(web::Data::new(s3_client.clone()))
|
||||
})
|
||||
|
35
forms/multipart-s3/src/temp_file.rs
Normal file
35
forms/multipart-s3/src/temp_file.rs
Normal file
@ -0,0 +1,35 @@
|
||||
use tokio::fs;
|
||||
|
||||
/// Info for a temporary file to be uploaded to S3.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TempFile {
|
||||
path: String,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl TempFile {
|
||||
/// Constructs info container with sanitized file name.
|
||||
pub fn new(filename: &str) -> TempFile {
|
||||
let filename = sanitize_filename::sanitize(filename);
|
||||
|
||||
TempFile {
|
||||
path: format!("./tmp/{filename}"),
|
||||
name: filename,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns name of temp file.
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
/// Returns path to temp file.
|
||||
pub fn path(&self) -> &str {
|
||||
&self.path
|
||||
}
|
||||
|
||||
/// Deletes temp file from disk.
|
||||
pub async fn delete_from_disk(self) {
|
||||
fs::remove_file(&self.path).await.unwrap();
|
||||
}
|
||||
}
|
25
forms/multipart-s3/src/upload_file.rs
Normal file
25
forms/multipart-s3/src/upload_file.rs
Normal file
@ -0,0 +1,25 @@
|
||||
use serde::Serialize;
|
||||
|
||||
/// Information about an uploaded file.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct UploadedFile {
|
||||
filename: String,
|
||||
s3_key: String,
|
||||
s3_url: String,
|
||||
}
|
||||
|
||||
impl UploadedFile {
|
||||
/// Construct new uploaded file info container.
|
||||
pub fn new(
|
||||
filename: impl Into<String>,
|
||||
s3_key: impl Into<String>,
|
||||
s3_url: impl Into<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
filename: filename.into(),
|
||||
s3_key: s3_key.into(),
|
||||
s3_url: s3_url.into(),
|
||||
}
|
||||
}
|
||||
}
|
61
forms/multipart-s3/src/utils.rs
Normal file
61
forms/multipart-s3/src/utils.rs
Normal file
@ -0,0 +1,61 @@
|
||||
use actix_multipart::{Field, Multipart};
|
||||
use actix_web::web::{Bytes, BytesMut};
|
||||
use futures_util::StreamExt as _;
|
||||
use tokio::{fs, io::AsyncWriteExt as _};
|
||||
|
||||
use crate::TempFile;
|
||||
|
||||
/// Returns tuple of `meta` field contents and a list of temp file info to upload.
|
||||
pub async fn split_payload(payload: &mut Multipart) -> (Bytes, Vec<TempFile>) {
|
||||
let mut meta = Bytes::new();
|
||||
let mut temp_files = vec![];
|
||||
|
||||
while let Some(item) = payload.next().await {
|
||||
let mut field = item.expect("split_payload err");
|
||||
let cd = field.content_disposition();
|
||||
|
||||
if matches!(cd.get_name(), Some(name) if name == "meta") {
|
||||
// if field name is "meta", just collect those bytes in-memory and return them later
|
||||
meta = collect_meta(&mut field).await;
|
||||
} else {
|
||||
match cd.get_filename() {
|
||||
Some(filename) => {
|
||||
// if file has a file name, we stream the field contents into a temp file on
|
||||
// disk so that large uploads do not exhaust memory
|
||||
|
||||
// create file info
|
||||
let file_info = TempFile::new(filename);
|
||||
|
||||
// create file on disk from file info
|
||||
let mut file = fs::File::create(file_info.path()).await.unwrap();
|
||||
|
||||
// stream field contents to file
|
||||
while let Some(chunk) = field.next().await {
|
||||
let data = chunk.unwrap();
|
||||
file.write_all(&data).await.unwrap();
|
||||
}
|
||||
|
||||
// return file info
|
||||
temp_files.push(file_info);
|
||||
}
|
||||
|
||||
None => {
|
||||
log::warn!("field {:?} is not a file", cd.get_name());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
(meta, temp_files)
|
||||
}
|
||||
|
||||
async fn collect_meta(field: &mut Field) -> Bytes {
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
while let Some(chunk) = field.next().await {
|
||||
let chunk = chunk.expect("split_payload err chunk");
|
||||
buf.extend(chunk);
|
||||
}
|
||||
|
||||
buf.freeze()
|
||||
}
|
@ -1,2 +0,0 @@
|
||||
pub mod s3;
|
||||
pub mod upload;
|
@ -1,58 +0,0 @@
|
||||
use std::{env, fs, io::Read as _};
|
||||
|
||||
use aws_config::SdkConfig as AwsConfig;
|
||||
use aws_sdk_s3::{types::ByteStream, Client as S3Client};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Client {
|
||||
s3: S3Client,
|
||||
bucket_name: String,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
// construct S3 testing client
|
||||
pub fn new(config: &AwsConfig) -> Client {
|
||||
Client {
|
||||
s3: S3Client::new(config),
|
||||
bucket_name: env::var("AWS_S3_BUCKET_NAME").unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn url(&self, key: &str) -> String {
|
||||
format!(
|
||||
"https://{}.s3.{}.amazonaws.com/{key}",
|
||||
env::var("AWS_S3_BUCKET_NAME").unwrap(),
|
||||
env::var("AWS_REGION").unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn put_object(&self, local_path: &str, key: &str) -> String {
|
||||
let mut file = fs::File::open(local_path).unwrap();
|
||||
|
||||
let mut contents =
|
||||
Vec::with_capacity(file.metadata().map(|md| md.len()).unwrap_or(1024) as usize);
|
||||
file.read_to_end(&mut contents).unwrap();
|
||||
|
||||
let _res = self
|
||||
.s3
|
||||
.put_object()
|
||||
.bucket(&self.bucket_name)
|
||||
.key(key)
|
||||
.body(ByteStream::from(contents))
|
||||
.send()
|
||||
.await
|
||||
.expect("Failed to put test object");
|
||||
|
||||
self.url(key)
|
||||
}
|
||||
|
||||
pub async fn delete_object(&self, key: &str) {
|
||||
self.s3
|
||||
.delete_object()
|
||||
.bucket(&self.bucket_name)
|
||||
.key(key)
|
||||
.send()
|
||||
.await
|
||||
.expect("Couldn't delete object");
|
||||
}
|
||||
}
|
@ -1,130 +0,0 @@
|
||||
use std::{convert::From, fs, io::Write};
|
||||
|
||||
use actix_multipart::{Field, Multipart};
|
||||
use actix_web::{web, web::Bytes, Error};
|
||||
use futures_util::StreamExt as _;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::utils::s3::Client;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct UploadFile {
|
||||
pub filename: String,
|
||||
pub key: String,
|
||||
pub url: String,
|
||||
}
|
||||
|
||||
impl From<Tmpfile> for UploadFile {
|
||||
fn from(tmp_file: Tmpfile) -> Self {
|
||||
UploadFile {
|
||||
filename: tmp_file.name,
|
||||
key: tmp_file.s3_key,
|
||||
url: tmp_file.s3_url,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
1. savefile
|
||||
2. s3 upload -> upload_data
|
||||
3. deletefile
|
||||
*/
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Tmpfile {
|
||||
pub name: String,
|
||||
pub tmp_path: String,
|
||||
pub s3_key: String,
|
||||
pub s3_url: String,
|
||||
}
|
||||
|
||||
impl Tmpfile {
|
||||
fn new(filename: &str) -> Tmpfile {
|
||||
Tmpfile {
|
||||
name: filename.to_string(),
|
||||
tmp_path: format!("./tmp/{filename}"),
|
||||
s3_key: "".to_string(),
|
||||
s3_url: "".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn s3_upload_and_tmp_remove(&mut self, client: &Client, s3_upload_key: &str) {
|
||||
self.s3_upload(client, s3_upload_key).await;
|
||||
self.tmp_remove();
|
||||
}
|
||||
|
||||
async fn s3_upload(&mut self, client: &Client, s3_upload_key: &str) {
|
||||
let key = format!("{s3_upload_key}{}", &self.name);
|
||||
self.s3_key = key.clone();
|
||||
let url = client.put_object(&self.tmp_path, &key.clone()).await;
|
||||
self.s3_url = url;
|
||||
}
|
||||
|
||||
fn tmp_remove(&self) {
|
||||
fs::remove_file(&self.tmp_path).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn split_payload(payload: &mut Multipart) -> (Bytes, Vec<Tmpfile>) {
|
||||
let mut tmp_files = vec![];
|
||||
let mut data = Bytes::new();
|
||||
|
||||
while let Some(item) = payload.next().await {
|
||||
let mut field: Field = item.expect(" split_payload err");
|
||||
let content_type = field.content_disposition();
|
||||
let name = content_type.get_name().unwrap();
|
||||
if name == "data" {
|
||||
while let Some(chunk) = field.next().await {
|
||||
data = chunk.expect(" split_payload err chunk");
|
||||
}
|
||||
} else {
|
||||
match content_type.get_filename() {
|
||||
Some(filename) => {
|
||||
let tmp_file = Tmpfile::new(&sanitize_filename::sanitize(&filename));
|
||||
let tmp_path = tmp_file.tmp_path.clone();
|
||||
let mut f = web::block(move || fs::File::create(&tmp_path))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
while let Some(chunk) = field.next().await {
|
||||
let data = chunk.unwrap();
|
||||
f = web::block(move || f.write_all(&data).map(|_| f))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
}
|
||||
tmp_files.push(tmp_file.clone());
|
||||
}
|
||||
None => {
|
||||
println!("file none");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
(data, tmp_files)
|
||||
}
|
||||
|
||||
pub async fn save_file(
|
||||
client: &Client,
|
||||
tmp_files: Vec<Tmpfile>,
|
||||
s3_upload_key: &str,
|
||||
) -> Result<Vec<UploadFile>, Error> {
|
||||
let mut arr: Vec<UploadFile> = Vec::with_capacity(tmp_files.len());
|
||||
|
||||
for item in tmp_files {
|
||||
let mut tmp_file: Tmpfile = item.clone();
|
||||
|
||||
tmp_file
|
||||
.s3_upload_and_tmp_remove(client, s3_upload_key)
|
||||
.await;
|
||||
|
||||
arr.push(UploadFile::from(tmp_file));
|
||||
}
|
||||
Ok(arr)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub async fn delete_object(client: &Client, keys: Vec<&str>) {
|
||||
for key in keys {
|
||||
client.delete_object(key).await;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user