feat: initial commit of csv-to-json service

This commit is contained in:
M. George Hansen 2022-05-08 21:03:52 -07:00
commit 8b9d382573
Signed by: mgeorgehansen
SSH key fingerprint: SHA256:JlIGiQLPyQ2RHTH3a2oVlb20Xkh9Glr8DUF4YTXHJxM
6 changed files with 1521 additions and 0 deletions

1
csv-to-json/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

1119
csv-to-json/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

24
csv-to-json/Cargo.toml Normal file
View file

@ -0,0 +1,24 @@
[package]
name = "csv-to-json"
version = "0.1.0"
edition = "2021"
license = "UNLICENSED"
[dependencies]
hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
csv-async = { version = "1.2" }
futures = { version = "0.3" }
async-stream = { version = "0.3" }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
bytes = { version = "1.1" }
anyhow = { version = "1" }
clap = { version = "3.1", features = ["derive"] }
url = { version = "2.2" }
serde_urlencoded = { version = "0.7" }
[dev-dependencies]
assert_cmd = { version = "2.0" }
predicates = { version = "2.1" }
pretty_assertions = { version = "1.2" }

93
csv-to-json/README.md Normal file
View file

@ -0,0 +1,93 @@
# csv-to-json
A web service to convert CSV files into a JSON array of objects, where each object is a record of header names to field values.
## Installing
csv-to-json does not have a published binary, but you can easily build it from source and install it using cargo and a recent stable, 2021-edition Rust compiler:
```sh
cargo install --path .
```
Alternatively, you can run it from source using cargo:
```sh
cargo run --release
```
## Usage
By default, running csv-to-json will run a server listening on port 3000. This can be overridden with the `-p {port number}` option:
```sh
$> csv-to-json
listening on 127.0.0.1:3000
```
To parse a csv into JSON, simply make a POST request to the root path where the server is listening (all other request types and paths will return a 404 NOT FOUND response).
For example, given a CSV file `fakebirds.csv` containing the following records:
```csv
date,lat,lng,number of "birds"
2022-04-06,33.759108,-118.143132,12
2022-04-07,33.756503,-118.141727,8
```
Assuming the csv-to-json server is running at localhost:3000, POSTing this CSV data would result in the following response:
```sh
$> curl --data-binary @fakebirds.csv localhost:3000
[{"date":"2022-04-06","lat":"33.759108","lng":"-118.143132","number of \"birds\"":"12"},{"date":"2022-04-07","lat":"33.756503","lng":"-118.141727","number of \"birds\"":"8"}]
```
## Supporting Different CSV Formats
By default, csv-to-json assumes that your CSV file is comma-delimited `,`, uses quotation marks `"` to quote fields, and uses any style of newline (`\r`, `\n`, or `\r\n`) to terminate records. csv-to-json provides some flexibility in parsing via the following query parameters:
### Delimiter
Provide a `delimiter=` query parameter with a URL-encoded, single character to change which character is treated as a field delimiter. For example, to parse tab-delimited CSV you can specify `delimiter=%09` (`%09` is the URL-encoded escape for the tab character):
```sh
$> curl --data-binary $'tab\tfields\n1\t2' 'localhost:3000?delimiter=%09'
[{"fields":"2","tab":"1"}]
```
### Quote
Provide a `quote=` query parameter with a URL-encoded, single character to change which character is treated as a field quote. For example, to parse CSVs that use the single quote `'` to quote fields you can specify `quote=%27` (`%27` is the URL-encoded excape for the single quote `'` character):
```sh
$> curl --data-binary $'field1,field2,field3\n\'1\',2,\'3\'' 'localhost:3000?quote=%27'
[{"field1":"1","field2":"2","field3":"3"}]
```
## Core Design Decisions
- I chose `hyper` over other higher-abstraction web frameworks because:
a) it has a strong concurrency model with `tokio`;
b) it has native streaming response support;
c) it has strong community support and is used in many high-priofile libraries; and
d) I don't need complex routing logic, middleware or context management and wanted to keep things simple.
- I decided upon a streaming processing pipline because I know from experience loading large files into memory wouldn't be a scalable solution with multiple concurrent API consumers - if I had to read the entire CSV into memory a single consumer could easily exhaust the memory resources on the server without some kind of limiting. I had to relearn rust's Stream protocol and struggled with writing clean abstractions but eventually ended up with a solution that only loads in the CSV headers and the current record that it's processing at any one moment.
- I didn't want to roll my own CSV parser because I knew that CSVs come in all sorts of different formats and quoting strategies that make parsing tricky (Excel STILL doesn't support newlines in quoted fields, grumble grumble...). I evaluated the excellent `csv` library but quickly determined it wasn't going to work in an async/streaming context, then tried to do field-level deserialization with `csv_core` but quickly ran into an edge case where the parser would ignore a field at the end of a record if it wasn't newline terminated. I settled on `csv_async` which seems to work great and serializes records instead of fields. A field-level CSV deserializer would be interesting to experiment with though, I wonder if it would be a performance improvement or not.
- Similarlly, for serializing JSON I decided that I didn't want to write quoting and escaping logic for serialized object fields. So I decided to leverage `serde_json` to do the heavy lifting for me and do the object serialization. The only tricky bit was figuring out how to share a buffer to serialize into so that I didn't have to yield individual array separators from the stream, which would have been inefficient.
## Current Limitations
- CSV are always parsed assuming the first record contains the column headers. If a CSV doesn't have headers, this will result in _strange_ results. Don't do it.
- There is currently no type inference or conversion of CSV fields to JSON types. All CSV fields are interpreted and output as strings.
- The current CSV parser, `csv_async`, does not place any limits upon the size of records that it tries to read. This means that there is a potential denial of service attack vector where malicious users could POST a CSV with a very large line of valid UTF-8 string data that could cause the server to exhaust it's memory resources. We'd have to either use a different CSV parser or patch csv-async to resolve this issue (perhaps by providing a `max_record_size` option to AsyncReaderBuilder).
- Errors from malformed CSVs (e.g. missing fields in a particular record) currently result in the response stream being terminated, with no in-band way of giving the user information about the cause of the error. There are a few potential solutions, such as utilizing custom tailers in the streaming response to encode error messages, but these all require the client code to know to look for them or have some other out-of-band error mechanism.
- All CSV input is assumed to be UTF-8 encoded. We could potentially support other encodings by transcoding them before processing with a query parameter or request header, but this is a dubious proposition since UTF-8 is widely adopted as the default encoding of the web and users are unlikely to know what obscure charset their 20-year-old CSV files are in anyway.
- The `Content-Type` and `Accept` headers are currently ignored, since the endpoint only accepts UTF-8 encoded CSV data and only outputs JSON data. It might be useful to reject requests that specify a `Content-Type` other than `text/csv` or an `Accept` header other than `application/json`.
## Development and Testing
To run unit tests of the CSV conversion request handler, run:
```sh
$> cargo test
```

273
csv-to-json/src/main.rs Normal file
View file

@ -0,0 +1,273 @@
use anyhow::{Context, Result};
use async_stream::try_stream;
use bytes::Bytes;
use clap::Parser;
use futures::{pin_mut, Stream, TryStreamExt};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::net::SocketAddr;
const fn default_delimiter() -> char {
','
}
const fn default_quote() -> char {
'"'
}
/// Options taken from the URL query string to customize CSV parsing behavior.
#[derive(Deserialize)]
#[serde(rename_all = "kebab-case")]
struct CsvParseOptions {
#[serde(default = "default_delimiter")]
delimiter: char,
#[serde(default = "default_quote")]
quote: char,
}
/// Representation of a single record or line in a CSV. Fields are named according to the headers
/// in the original CSV.
#[derive(Debug, Deserialize, Serialize)]
struct CsvRecord(
// NOTE: Using a BTreeMap to keep the ordering of fields the same order as in the CSV. This
// makes testing a lot easier since the output is predictable.
BTreeMap<String, String>,
);
// Stream producer that takes a stream of input bytes and attempts to deserialize them as CsvRecords.
// This assumes that the input stream represents UTF-8 encoded string data, and will produce errors
// if input data is not properly UTF-8 encoded.
fn parse_csv_records<S, B>(
options: CsvParseOptions,
input: S,
) -> impl Stream<Item = csv_async::Result<CsvRecord>>
where
S: Stream<Item = std::io::Result<B>> + Unpin + Send,
B: AsRef<[u8]> + Send,
{
let CsvParseOptions { delimiter, quote } = options;
try_stream! {
let deserializer = csv_async::AsyncReaderBuilder::new()
.delimiter(delimiter as u8)
.quote(quote as u8)
.flexible(true)
.create_deserializer(input.into_async_read());
let records = deserializer.into_deserialize::<CsvRecord>();
for await record in records {
yield record?;
}
}
}
/// Stream producer that takes a stream of serde::Serialize values and serializes them to
/// JSON array in a UTF-8-encoed, binary chunked format.
fn serialize_json_seq<S, T, E>(values: S) -> impl Stream<Item = Result<Bytes>>
where
S: Stream<Item = Result<T, E>>,
T: Serialize,
E: std::error::Error + Send + Sync + 'static,
{
try_stream! {
// To give downstream consumers the most opportunity for optimization we'll have a single bytes buffer
// and periodically flush that buffer and yield it's contents to the stream. This is *probably* much
// better than yielding individual , and [ characters.
let mut buffer = Vec::with_capacity(1024);
buffer.push(b'[');
// The first value won't need a leading array element separator "," so we treat it specially.
pin_mut!(values);
if let Some(first_value) = values.try_next().await.context("failed to read from input stream")? {
serde_json::to_writer(&mut buffer, &first_value).context("failed to serialize value")?;
}
yield Bytes::copy_from_slice(&buffer);
buffer.clear();
// For all subsequent values, we have to emit a leading "," to separate each value in the JSON array.
for await value in values {
let value = value.context("failed to read from input stream")?;
buffer.push(b',');
serde_json::to_writer(&mut buffer, &value).context("failed to serialize value")?;
yield Bytes::copy_from_slice(&buffer);
buffer.clear();
}
// Emit a final closing tag to finish the stream.
yield Bytes::from_static(b"]");
}
}
async fn convert_csv(req: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
let csv_parse_options = match serde_urlencoded::from_str::<CsvParseOptions>(
req.uri().query().unwrap_or_default(),
) {
Ok(options) => options,
Err(error) => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(format!(r#"{{"error": "invalid query parameters: {}"}}"#, error).into())
}
};
let csv_records = parse_csv_records(
csv_parse_options,
req.into_body()
// KLUDGE: csv_async currently requires errors to be std::io::Error since it assumes it's reading from
// an io device directly. We're just mapping all errors as std::io::ErrorKind::Other for now, but
// we could be more finely detailed if it turns out csv_async handles some std::io::Error variants
// specially.
.map_err(|error| std::io::Error::new(std::io::ErrorKind::Other, error)),
);
let response = serialize_json_seq(csv_records).inspect_err(|error| {
// TODO: look for some trace header and log that with errors for more easily tracing errors and associate them
// with requests.
eprintln!("error during CSV conversion: {:?}", error);
});
Response::builder()
.header("content-type", "application/json")
.body(Body::wrap_stream(response))
}
async fn route_request(req: Request<Body>) -> Result<Response<Body>, hyper::http::Error> {
match (req.method(), req.uri().path()) {
(&Method::POST, "/") => convert_csv(req).await,
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty()),
}
}
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[clap(short, long, default_value_t = 3000)]
port: u16,
}
#[tokio::main]
async fn main() {
let args = Args::parse();
let addr = SocketAddr::from(([127, 0, 0, 1], args.port));
let csv_service =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(route_request)) });
let server = Server::bind(&addr).serve(csv_service);
println!("listening on {}", server.local_addr());
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::TryStreamExt;
use pretty_assertions::assert_eq;
async fn read_to_string(body: Body) -> String {
body.try_fold(String::new(), |output, bytes| async move {
let parsed = std::str::from_utf8(&bytes).unwrap();
Ok(output + parsed)
})
.await
.unwrap()
}
#[tokio::test]
async fn empty_csv() -> Result<()> {
let req = Request::builder().body(Body::empty())?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(&res_body, "[]");
Ok(())
}
#[tokio::test]
async fn returns_nothing_when_only_headers() -> Result<()> {
let req = Request::builder().body(Body::from("field1,field2,field3"))?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(&res_body, "[]");
Ok(())
}
#[tokio::test]
async fn returns_single_record_for_single_line() -> Result<()> {
let req = Request::builder().body(Body::from("field1,field2,field3\n1,2,3"))?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(&res_body, r#"[{"field1":"1","field2":"2","field3":"3"}]"#);
Ok(())
}
#[tokio::test]
async fn returns_multiple_records_for_multiple_lines() -> Result<()> {
let req = Request::builder().body(Body::from("field1,field2,field3\n1,2,3\n4,5,6"))?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(
&res_body,
r#"[{"field1":"1","field2":"2","field3":"3"},{"field1":"4","field2":"5","field3":"6"}]"#
);
Ok(())
}
#[tokio::test]
async fn can_parse_quoted_fields() -> Result<()> {
let req = Request::builder().body(Body::from("\"field1\",field2,field3\n1,\"2\",3"))?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(&res_body, r#"[{"field1":"1","field2":"2","field3":"3"}]"#);
Ok(())
}
#[tokio::test]
async fn can_parse_newslines_in_quoted_fields() -> Result<()> {
let req =
Request::builder().body(Body::from("\"field1\",field2,field3\n1,\"2 &\n 3\",4"))?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(
&res_body,
r#"[{"field1":"1","field2":"2 &\n 3","field3":"4"}]"#
);
Ok(())
}
#[tokio::test]
async fn can_change_delimiter_with_query_param() -> Result<()> {
let req = Request::builder()
.uri("/?delimiter=%09")
.body(Body::from("field1\tfield2\tfield3\n1\t2\t3"))?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(&res_body, r#"[{"field1":"1","field2":"2","field3":"3"}]"#);
Ok(())
}
#[tokio::test]
async fn can_change_quote_char_with_query_param() -> Result<()> {
let req = Request::builder()
.uri("/?quote=%27")
.body(Body::from("field1,'field2','field3'\n1,'2',3"))?;
let res = convert_csv(req).await?;
assert_eq!(res.status(), StatusCode::OK);
let res_body = read_to_string(res.into_body()).await;
assert_eq!(&res_body, r#"[{"field1":"1","field2":"2","field3":"3"}]"#);
Ok(())
}
}

11
web-client/package.json Normal file
View file

@ -0,0 +1,11 @@
{
"name": "web-client",
"version": "0.1.0",
"description": "A web client for csv-to-json.",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "M. George Hansen",
"license": "UNLICENSED"
}