feat(csv-to-json): use multipart/form-data encoding

This commit is contained in:
M. George Hansen 2022-05-08 23:37:43 -07:00
parent 8b9d382573
commit 991b82d750
Signed by: mgeorgehansen
SSH key fingerprint: SHA256:JlIGiQLPyQ2RHTH3a2oVlb20Xkh9Glr8DUF4YTXHJxM
4 changed files with 201 additions and 39 deletions

40
csv-to-json/Cargo.lock generated
View file

@ -183,6 +183,7 @@ dependencies = [
"csv-async",
"futures",
"hyper",
"multer",
"predicates",
"pretty_assertions",
"serde",
@ -226,6 +227,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encoding_rs"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if",
]
[[package]]
name = "float-cmp"
version = "0.9.0"
@ -517,6 +527,12 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mio"
version = "0.8.2"
@ -540,6 +556,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "multer"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http",
"httparse",
"log",
"memchr",
"mime",
"spin",
"version_check",
]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
@ -838,6 +872,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d"
[[package]]
name = "strsim"
version = "0.10.0"

View file

@ -17,6 +17,7 @@ anyhow = { version = "1" }
clap = { version = "3.1", features = ["derive"] }
url = { version = "2.2" }
serde_urlencoded = { version = "0.7" }
multer = { version = "2.0" }
[dev-dependencies]
assert_cmd = { version = "2.0" }

View file

@ -18,14 +18,14 @@ cargo run --release
## Usage
By default, running csv-to-json will run a server listening on port 3000. This can be overridden with the `-p {port number}` option:
By default, running csv-to-json will run a server listening on port 8000. This can be overridden with the `-p {port number}` option:
```sh
$> csv-to-json
listening on 127.0.0.1:3000
listening on 127.0.0.1:8000
```
To parse a csv into JSON, simply make a POST request to the root path where the server is listening (all other request types and paths will return a 404 NOT FOUND response).
To parse a csv into JSON, simply make a multipart/form-data POST request to the root path where the server is listening (all other request types and paths will return a 404 NOT FOUND response).
For example, given a CSV file `fakebirds.csv` containing the following records:
@ -35,10 +35,10 @@ date,lat,lng,number of "birds"
2022-04-07,33.756503,-118.141727,8
```
Assuming the csv-to-json server is running at localhost:3000, POSTing this CSV data would result in the following response:
Assuming the csv-to-json server is running at localhost:8000, POSTing this CSV data would result in the following response:
```sh
$> curl --data-binary @fakebirds.csv localhost:3000
$> curl -F file=@fakebirds.csv localhost:8000
[{"date":"2022-04-06","lat":"33.759108","lng":"-118.143132","number of \"birds\"":"12"},{"date":"2022-04-07","lat":"33.756503","lng":"-118.141727","number of \"birds\"":"8"}]
```
@ -51,7 +51,7 @@ By default, csv-to-json assumes that your CSV file is comma-delimited `,`, uses
Provide a `delimiter=` query parameter with a URL-encoded, single character to change which character is treated as a field delimiter. For example, to parse tab-delimited CSV you can specify `delimiter=%09` (`%09` is the URL-encoded escape for the tab character):
```sh
$> curl --data-binary $'tab\tfields\n1\t2' 'localhost:3000?delimiter=%09'
$> curl -F file=$'tab\tfields\n1\t2' 'localhost:8000?delimiter=%09'
[{"fields":"2","tab":"1"}]
```
@ -60,29 +60,30 @@ $> curl --data-binary $'tab\tfields\n1\t2' 'localhost:3000?delimiter=%09'
Provide a `quote=` query parameter with a URL-encoded, single character to change which character is treated as a field quote. For example, to parse CSVs that use the single quote `'` to quote fields you can specify `quote=%27` (`%27` is the URL-encoded excape for the single quote `'` character):
```sh
$> curl --data-binary $'field1,field2,field3\n\'1\',2,\'3\'' 'localhost:3000?quote=%27'
$> curl -F file=$'field1,field2,field3\n\'1\',2,\'3\'' 'localhost:8000?quote=%27'
[{"field1":"1","field2":"2","field3":"3"}]
```
## Core Design Decisions
- I chose `hyper` over other higher-abstraction web frameworks because:
a) it has a strong concurrency model with `tokio`;
b) it has native streaming response support;
c) it has strong community support and is used in many high-priofile libraries; and
d) I don't need complex routing logic, middleware or context management and wanted to keep things simple.
- I decided upon a streaming processing pipline because I know from experience loading large files into memory wouldn't be a scalable solution with multiple concurrent API consumers - if I had to read the entire CSV into memory a single consumer could easily exhaust the memory resources on the server without some kind of limiting. I had to relearn rust's Stream protocol and struggled with writing clean abstractions but eventually ended up with a solution that only loads in the CSV headers and the current record that it's processing at any one moment.
- I didn't want to roll my own CSV parser because I knew that CSVs come in all sorts of different formats and quoting strategies that make parsing tricky (Excel STILL doesn't support newlines in quoted fields, grumble grumble...). I evaluated the excellent `csv` library but quickly determined it wasn't going to work in an async/streaming context, then tried to do field-level deserialization with `csv_core` but quickly ran into an edge case where the parser would ignore a field at the end of a record if it wasn't newline terminated. I settled on `csv_async` which seems to work great and serializes records instead of fields. A field-level CSV deserializer would be interesting to experiment with though, I wonder if it would be a performance improvement or not.
- Similarlly, for serializing JSON I decided that I didn't want to write quoting and escaping logic for serialized object fields. So I decided to leverage `serde_json` to do the heavy lifting for me and do the object serialization. The only tricky bit was figuring out how to share a buffer to serialize into so that I didn't have to yield individual array separators from the stream, which would have been inefficient.
- I chose `hyper` over other higher-abstraction web frameworks because:
a) it has a strong concurrency model with `tokio`;
b) it has native streaming response support;
c) it has strong community support and is used in many high-priofile libraries; and
d) I don't need complex routing logic, middleware or context management and wanted to keep things simple.
- I decided upon a streaming processing pipline because I know from experience loading large files into memory wouldn't be a scalable solution with multiple concurrent API consumers - if I had to read the entire CSV into memory a single consumer could easily exhaust the memory resources on the server without some kind of limiting. I had to relearn rust's Stream protocol and struggled with writing clean abstractions but eventually ended up with a solution that only loads in the CSV headers and the current record that it's processing at any one moment.
- I didn't want to roll my own CSV parser because I knew that CSVs come in all sorts of different formats and quoting strategies that make parsing tricky (Excel STILL doesn't support newlines in quoted fields, grumble grumble...). I evaluated the excellent `csv` library but quickly determined it wasn't going to work in an async/streaming context, then tried to do field-level deserialization with `csv_core` but quickly ran into an edge case where the parser would ignore a field at the end of a record if it wasn't newline terminated. I settled on `csv_async` which seems to work great and serializes records instead of fields. A field-level CSV deserializer would be interesting to experiment with though, I wonder if it would be a performance improvement or not.
- Similarlly, for serializing JSON I decided that I didn't want to write quoting and escaping logic for serialized object fields. So I decided to leverage `serde_json` to do the heavy lifting for me and do the object serialization. The only tricky bit was figuring out how to share a buffer to serialize into so that I didn't have to yield individual array separators from the stream, which would have been inefficient.
- I found pretty quickly that it was going to be difficult to support streaming file submission and download in the browser in an SPA like React using regular POST requests, so I decided to switch to `multipart/form-data` instead so that I could leverage the browser's native form file fields.
## Current Limitations
- CSV are always parsed assuming the first record contains the column headers. If a CSV doesn't have headers, this will result in _strange_ results. Don't do it.
- There is currently no type inference or conversion of CSV fields to JSON types. All CSV fields are interpreted and output as strings.
- The current CSV parser, `csv_async`, does not place any limits upon the size of records that it tries to read. This means that there is a potential denial of service attack vector where malicious users could POST a CSV with a very large line of valid UTF-8 string data that could cause the server to exhaust it's memory resources. We'd have to either use a different CSV parser or patch csv-async to resolve this issue (perhaps by providing a `max_record_size` option to AsyncReaderBuilder).
- Errors from malformed CSVs (e.g. missing fields in a particular record) currently result in the response stream being terminated, with no in-band way of giving the user information about the cause of the error. There are a few potential solutions, such as utilizing custom tailers in the streaming response to encode error messages, but these all require the client code to know to look for them or have some other out-of-band error mechanism.
- All CSV input is assumed to be UTF-8 encoded. We could potentially support other encodings by transcoding them before processing with a query parameter or request header, but this is a dubious proposition since UTF-8 is widely adopted as the default encoding of the web and users are unlikely to know what obscure charset their 20-year-old CSV files are in anyway.
- The `Content-Type` and `Accept` headers are currently ignored, since the endpoint only accepts UTF-8 encoded CSV data and only outputs JSON data. It might be useful to reject requests that specify a `Content-Type` other than `text/csv` or an `Accept` header other than `application/json`.
- CSV are always parsed assuming the first record contains the column headers. If a CSV doesn't have headers, this will result in _strange_ results. Don't do it.
- There is currently no type inference or conversion of CSV fields to JSON types. All CSV fields are interpreted and output as strings.
- The current CSV parser, `csv_async`, does not place any limits upon the size of records that it tries to read. This means that there is a potential denial of service attack vector where malicious users could POST a CSV with a very large line of valid UTF-8 string data that could cause the server to exhaust it's memory resources. We'd have to either use a different CSV parser or patch csv-async to resolve this issue (perhaps by providing a `max_record_size` option to AsyncReaderBuilder).
- Errors from malformed CSVs (e.g. missing fields in a particular record) currently result in the response stream being terminated, with no in-band way of giving the user information about the cause of the error. There are a few potential solutions, such as utilizing custom tailers in the streaming response to encode error messages, but these all require the client code to know to look for them or have some other out-of-band error mechanism.
- All CSV input is assumed to be UTF-8 encoded. We could potentially support other encodings by transcoding them before processing with a query parameter or request header, but this is a dubious proposition since UTF-8 is widely adopted as the default encoding of the web and users are unlikely to know what obscure charset their 20-year-old CSV files are in anyway.
- The `Content-Type` and `Accept` headers are currently ignored. It might be useful to reject requests that specify a `Content-Type` other than `multipart/form` or an `Accept` header other than `application/json`.
## Development and Testing

View file

@ -1,14 +1,53 @@
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use async_stream::try_stream;
use bytes::Bytes;
use clap::Parser;
use futures::{pin_mut, Stream, TryStreamExt};
use hyper::header::{CONTENT_DISPOSITION, CONTENT_TYPE};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use multer::Multipart;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
fn replace_file_extension(path: &str, extension: &str) -> Result<String> {
let mut path = PathBuf::from_str(path)?;
path.set_extension(extension);
let path = path
.to_str()
.ok_or_else(|| anyhow!("unable to convert path to String"))?;
Ok(path.to_string())
}
/// Stream producer that takes a request body and attempts to read the first multipart/form-data
/// field that it encounters.
async fn read_multipart(
body: Body,
boundary: String,
) -> Option<(String, impl Stream<Item = multer::Result<Bytes>>)> {
// FIXME: possible DOS attack vector by attempting to read the whole multipart/form-data field. multer provides
// a constraints API to help mitigate this risk: https://github.com/rousan/multer-rs.
let mut multipart = Multipart::new(body, boundary);
// KLUDGE: a result type with an error we can match on might be better here, that way we can differentiate
// between "don't have a multiple field when we were expecting one" and "there was an error reading
// the multipart field".
let mut field = multipart.next_field().await.ok()??;
// FIXME: possible attack vectors here by passing through the file name from the multipart POST request. may
// want to do some sanitizing.
let file_name = field.file_name().unwrap_or("download.csv");
Some((
file_name.to_string(),
try_stream! {
while let Some(chunk) = field.chunk().await? {
yield chunk;
}
},
))
}
const fn default_delimiter() -> char {
','
@ -45,11 +84,12 @@ fn parse_csv_records<S, B>(
input: S,
) -> impl Stream<Item = csv_async::Result<CsvRecord>>
where
S: Stream<Item = std::io::Result<B>> + Unpin + Send,
S: Stream<Item = std::io::Result<B>> + Send,
B: AsRef<[u8]> + Send,
{
let CsvParseOptions { delimiter, quote } = options;
try_stream! {
pin_mut!(input);
let deserializer = csv_async::AsyncReaderBuilder::new()
.delimiter(delimiter as u8)
.quote(quote as u8)
@ -111,9 +151,36 @@ async fn convert_csv(req: Request<Body>) -> Result<Response<Body>, hyper::http::
}
};
let boundary = req
.headers()
.get(hyper::header::CONTENT_TYPE)
.and_then(|ct| ct.to_str().ok())
.and_then(|ct| multer::parse_boundary(ct).ok());
let boundary = match boundary {
Some(boundary) => boundary,
None => {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(
r#"{"error":"missing boundary in multipart content type"}"#,
))
.unwrap())
}
};
let (file_name, csv_file) = match read_multipart(req.into_body(), boundary).await {
Some(res) => res,
None => {
return Ok(Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from(
r#"{"error":"missing required multipart file field"}"#,
))
.unwrap())
}
};
let csv_records = parse_csv_records(
csv_parse_options,
req.into_body()
csv_file
// KLUDGE: csv_async currently requires errors to be std::io::Error since it assumes it's reading from
// an io device directly. We're just mapping all errors as std::io::ErrorKind::Other for now, but
// we could be more finely detailed if it turns out csv_async handles some std::io::Error variants
@ -125,12 +192,26 @@ async fn convert_csv(req: Request<Body>) -> Result<Response<Body>, hyper::http::
// with requests.
eprintln!("error during CSV conversion: {:?}", error);
});
let download_file_name = replace_file_extension(&file_name, "json")
.ok()
.unwrap_or("download.csv".to_string());
Response::builder()
.header("content-type", "application/json")
// NOTE: according to https://github.com/eligrey/FileSaver.js/wiki/Saving-a-remote-file it is better to
// use octent-stream over the actual mime type when trying to stream data so that browsers don't
// try to render the result, but instead force a file-save dialog.
.header(CONTENT_TYPE, "application/octet-stream; charset=utf-8")
.header(
CONTENT_DISPOSITION,
format!(
r#"attachment; filename="{0}"; filename*="{0}""#,
download_file_name
),
)
.body(Body::wrap_stream(response))
}
async fn route_request(req: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
println!("got request: {:?}", &req);
match (req.method(), req.uri().path()) {
(&Method::POST, "/") => convert_csv(req).await,
_ => Response::builder()
@ -142,7 +223,7 @@ async fn route_request(req: Request<Body>) -> Result<Response<Body>, hyper::http
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[clap(short, long, default_value_t = 3000)]
#[clap(short, long, default_value_t = 8000)]
port: u16,
}
@ -167,8 +248,27 @@ async fn main() {
mod tests {
use super::*;
use futures::TryStreamExt;
use hyper::header::HeaderValue;
use pretty_assertions::assert_eq;
const BOUNDARY: &str = "X-BOUNDARY";
fn build_multipart_request(
request: hyper::http::request::Builder,
data: &str,
) -> Request<Body> {
request
.header(
CONTENT_TYPE,
format!("multipart/form-data; boundary={}", BOUNDARY),
)
.body(Body::from(format!(
"--{0}\r\nContent-Disposition: form-data; name=\"field\"; filename=\"example.csv\"\r\n\r\n{1}\r\n--{0}\r\n",
BOUNDARY, data
)))
.unwrap()
}
async fn read_to_string(body: Body) -> String {
body.try_fold(String::new(), |output, bytes| async move {
let parsed = std::str::from_utf8(&bytes).unwrap();
@ -180,7 +280,7 @@ mod tests {
#[tokio::test]
async fn empty_csv() -> Result<()> {
let req = Request::builder().body(Body::empty())?;
let req = build_multipart_request(Request::builder(), "");
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
@ -190,7 +290,7 @@ mod tests {
#[tokio::test]
async fn returns_nothing_when_only_headers() -> Result<()> {
let req = Request::builder().body(Body::from("field1,field2,field3"))?;
let req = build_multipart_request(Request::builder(), "field1,field2,field3");
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
@ -200,7 +300,7 @@ mod tests {
#[tokio::test]
async fn returns_single_record_for_single_line() -> Result<()> {
let req = Request::builder().body(Body::from("field1,field2,field3\n1,2,3"))?;
let req = build_multipart_request(Request::builder(), "field1,field2,field3\n1,2,3");
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
@ -210,7 +310,7 @@ mod tests {
#[tokio::test]
async fn returns_multiple_records_for_multiple_lines() -> Result<()> {
let req = Request::builder().body(Body::from("field1,field2,field3\n1,2,3\n4,5,6"))?;
let req = build_multipart_request(Request::builder(), "field1,field2,field3\n1,2,3\n4,5,6");
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
@ -223,7 +323,8 @@ mod tests {
#[tokio::test]
async fn can_parse_quoted_fields() -> Result<()> {
let req = Request::builder().body(Body::from("\"field1\",field2,field3\n1,\"2\",3"))?;
let req =
build_multipart_request(Request::builder(), "\"field1\",field2,field3\n1,\"2\",3");
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
@ -233,8 +334,10 @@ mod tests {
#[tokio::test]
async fn can_parse_newslines_in_quoted_fields() -> Result<()> {
let req =
Request::builder().body(Body::from("\"field1\",field2,field3\n1,\"2 &\n 3\",4"))?;
let req = build_multipart_request(
Request::builder(),
"\"field1\",field2,field3\n1,\"2 &\n 3\",4",
);
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
@ -248,9 +351,10 @@ mod tests {
#[tokio::test]
async fn can_change_delimiter_with_query_param() -> Result<()> {
let req = Request::builder()
.uri("/?delimiter=%09")
.body(Body::from("field1\tfield2\tfield3\n1\t2\t3"))?;
let req = build_multipart_request(
Request::builder().uri("/?delimiter=%09"),
"field1\tfield2\tfield3\n1\t2\t3",
);
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
@ -261,13 +365,29 @@ mod tests {
#[tokio::test]
async fn can_change_quote_char_with_query_param() -> Result<()> {
let req = Request::builder()
.uri("/?quote=%27")
.body(Body::from("field1,'field2','field3'\n1,'2',3"))?;
let req = build_multipart_request(
Request::builder().uri("/?quote=%27"),
"field1,'field2','field3'\n1,'2',3",
);
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(&res_body, r#"[{"field1":"1","field2":"2","field3":"3"}]"#);
Ok(())
}
#[tokio::test]
async fn responds_with_content_disposition_header() -> Result<()> {
let req = build_multipart_request(Request::builder(), "field1,field2,field3\n1,2,3");
let res = convert_csv(req).await?;
assert_eq!(
res.headers().get("content-disposition"),
Some(&HeaderValue::from_static(
r#"attachment; filename="example.json"; filename*="example.json""#
))
);
Ok(())
}
}