Interim commit.

This commit is contained in:
Elf M. Sternberg 2023-03-24 12:22:28 -07:00
parent 2aa202d05c
commit 5da8bb6b79
15 changed files with 616 additions and 72 deletions

33
Cargo.lock generated
View File

@ -155,8 +155,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e3c5919066adf22df73762e50cffcde3a758f2a848b113b586d1f86728b673b"
dependencies = [
"iana-time-zone",
"js-sys",
"num-integer",
"num-traits",
"serde",
"time",
"wasm-bindgen",
"winapi",
]
@ -487,7 +491,7 @@ checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31"
dependencies = [
"cfg-if",
"libc",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
@ -825,7 +829,7 @@ checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9"
dependencies = [
"libc",
"log",
"wasi",
"wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.45.0",
]
@ -1593,6 +1597,17 @@ dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
dependencies = [
"libc",
"wasi 0.10.0+wasi-snapshot-preview1",
"winapi",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
@ -1843,6 +1858,10 @@ name = "uuid"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79"
dependencies = [
"getrandom",
"serde",
]
[[package]]
name = "valuable"
@ -1872,6 +1891,12 @@ dependencies = [
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
@ -2087,12 +2112,16 @@ name = "ztp"
version = "0.1.0"
dependencies = [
"axum",
"chrono",
"config",
"hyper",
"serde",
"serde_json",
"sqlx",
"thiserror",
"tokio",
"tower",
"tracing",
"tracing-subscriber",
"uuid",
]

View File

@ -12,14 +12,18 @@ name = "ztp"
[dependencies]
axum = "0.6.11"
chrono = { version = "0.4.24", features = ["serde"] }
config = "0.13.3"
hyper = { version = "0.14.25", features = ["full"] }
serde = { version = "1.0.158", features = ["derive"] }
serde_json = "1.0.94"
sqlx = { version = "0.6.3", features = ["runtime-tokio-native-tls", "macros", "postgres", "uuid", "chrono"] }
thiserror = "1.0.40"
tokio = { version = "1.26.0", features = ["full"] }
tower = "0.4.13"
tracing = "0.1.35"
tracing-subscriber = "0.3.14"
uuid = { version = "1.3.0", features = ["v4", "serde"] }
[profile.release]
opt-level = 3

View File

@ -0,0 +1,324 @@
+++
title = "Databases: Connecting and Using"
date = 2023-03-20T17:38:12Z
weight = 4
+++
## Chapter 3.8: Databases
First, we're gonna expand the configuration we defined previously. The one
thing we will require is the password, although eventually I'm going to make
that a command-line option using Clap.
``` rust
#[derive(serde::Deserialize)]
#[serde(default)]
pub struct DatabaseSettings {
pub username: String,
pub password: String,
pub host: String,
pub port: u16,
pub database: String,
}
#[derive(serde::Deserialize)]
#[serde(default)]
pub struct Settings {
pub database: DatabaseSettings,
pub port: u16,
}
impl Default for DatabaseSettings {
fn default() -> Self {
DatabaseSettings {
username: "newsletter".to_string(),
password: "".to_string(),
host: "localhost".to_string(),
port: 5432,
database: "newsletter".to_string(),
}
}
}
impl Default for Settings {
fn default() -> Self {
Settings {
port: 3001,
database: DatabaseSettings::default(),
}
}
}
```
## Cleanup on Aisle Three
The code's beginning to get a bit messsy. So let's re-arrange. I already have
the configuration handler in its own file, but let's clean up the routes. I'm
going to take the functions I've already defined and put them in a subfolder,
`src/routes`, and then for each one I'll make a new file. For example:
``` rust
use axum::{http::StatusCode, response::IntoResponse};
pub(crate) async fn health_check() -> impl IntoResponse {
(StatusCode::OK, ())
}
```
Just a note: *ignore* your IDE's "advice" to remove the unneeded `async`; Axum
will not compile this module correctly, `IntoResponse` requires that it be an
async function. I don't know why rust-analyzer failed to pick that up.
Now make a `src/routes.rs` (yes, the same name as the folder). You can activate
and de-activate routes at will:
``` rust
use axum::{
routing::{get, post},
Router,
};
mod greet;
use greet::greet;
mod health_check;
use health_check::health_check;
mod index;
use index::index;
mod subscribe;
use subscribe::subscribe;
pub(crate) fn app() -> Router {
Router::new()
.route("/", get(index))
.route("/subscriptions", post(subscribe))
.route("/:name", get(greet))
.route("/health_check", get(health_check))
}
```
You don't need the `use` clauses, you could just say `index::index`, but I kinda
like the way this looks, even with the repetition. It makes it clear what I'm
doing and what I'm looking at. This is Rust 2021, where the presence of a file
and a folder with the same name indicate a module with submodules. In prior
editions of rust, instead of `routes.rs`, we would have used `routes/mod.rs`.
I'm not entirely sure which I like better.
Now that that's done, in our `lib.rs`, we use the `routes`:
``` rust
use std::net::SocketAddr;
mod configuration;
use configuration::get_configuration;
mod routes;
use routes::app;
pub async fn run() {
let configuration = get_configuration().unwrap();
let addr = SocketAddr::from(([127, 0, 0, 1], configuration.port));
tracing::info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app().into_make_service())
.await
.unwrap()
}
```
All of the Axum imports have been removed, since they're part of the routes.
Nice, clean and clear code. At this point, even `cargo clippy` is happy.
### The Database String
Connecting to a database these days involves a URL. Let's make it easy to
generate one. Now, we could implement the trait `std::string::ToString`, or
`Display`, but that would conflict with any debugging information. Instead,
let's define a new method in `configuration.rs`:
``` rust
impl DatabaseSettings {
pub fn url(&self) -> String {
if self.password.len() == 0 {
format!(
"postgres://{}@{}:{}/{}",
self.username, self.host, self.port, self.database
)
} else {
format!(
"postgres://{}:{}@{}:{}/{}",
self.username, self.password, self.host, self.port, self.database
)
}
}
}
```
It is possible (unwise, but possible) to connect to a Postgres database with no
password. In a dynamic language this would have been less wordy, but I can't
complain about Rust being a little extra verbose in exchange for being a lot
more precise.
### Providing the database to the application
Since we're using Postgres, in `lib.rs` we'll add `use
sqlx::postgres::PgPoolOptions;`, which will allow us to create a pool of workers
and limit how many there are. In the `run()` function, we're going to extract
the `app()` definition and use it:
``` rust
use axum::Extension;
use sqlx::postgres::PgPoolOptions;
// in: fn run()
let pool = PgPoolOptions::new()
.max_connections(50)
.connect(&configuration.database.url())
.await
.expect("could not connect to database_url");
let routes = app().layer(Extension(pool));
tracing::info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(routes.into_make_service())
.await
.unwrap()
```
Now we have a configuration that is runnable. If you run it as-is, it will fail
because, if you set up your Postgres database correctly, there is no password.
Create a `ztp.config.yaml` file (or JSON, or TOML, or whatever floats your boat)
and establish the password:
``` yaml
database:
password: redacted
```
At one point, I had this file named only `ztp.config`, and unfortunately it
crashed. Adding `#[derive(Debug)]` to my configuration structs helped show that
the password wasn't getting set, but it took me a few minutes to realize that
I'd forgotten the extension so `Config` didn't know how to read the file. Naming
it `ztp.config.yaml` fixed it, and this illustrates just how useful TOML can be
in circumstances like this.
### Pulling in a subscription
This turned out to be a bit of a beast, and I'm incredibly indebted to Carlos
Armando Marcano Vargas and [his blog](https://github.com/carlosm27/blog) where
he documented much of what I needed to make this work with Axum.
In the migrations file, we specified that the primary key of our subscriptions
table would be a UUID, and that we were going to record when the subscription
happened. To do this, first we have to make sure that we have the `Uuid` and
`Chrono` crates, and that they're all hooked up for proper serialization:
``` toml
chrono = { version = "0.4.24", features = ["serde"] }
uuid = { version = "1.3.0", features = ["v4", "serde"] }
sqlx = { version = "0.6.3", features = ["runtime-tokio-native-tls", "macros",
"postgres", "uuid", "chrono"] }
```
Both of the new creates need `serde` as a feature and we'll be using `UUID4` for
our primary key.
We'll also need a full subscription object to store, and then a
way to generate it from a new subscription:
```
#[derive(serde::Deserialize)]
pub(crate) struct NewSubscription {
pub email: String,
pub name: String,
}
#[derive(serde::Deserialize, serde::Serialize, sqlx::FromRow)]
struct Subscription {
pub id: Uuid,
pub email: String,
pub name: String,
pub subscribed_at: DateTime<Utc>,
}
impl From<&NewSubscription> for Subscription {
fn from(s: &NewSubscription) -> Self {
Subscription {
id: Uuid::new_v4(),
email: s.email.clone(),
name: s.name.clone(),
subscribed_at: Utc::now(),
}
}
}
```
I've renamed "Subscription" to "NewSubscription" to distinguish it from the
thing we're going to reliably keep in our database, which has all the fields.
And now we're going to re-write the `subscribe()` function to actually talk to
the database. All of those parens around the `payload` are necessary to help
Rust understand what we want to borrow. You'll note that it's `payload.0`, not
just `payload`; Form objects can have multiple instances of their content.
```
pub(crate) async fn subscribe(Extension(pool): Extension<PgPool>, payload: Form<NewSubscription>) -> impl IntoResponse {
let sql = "INSERT INTO subscriptions (id, email, name, subscribed_at) VALUES ($1, $2, $3, $4);".to_string();
let subscription: Subscription = (&(payload.0)).into();
let _ = sqlx::query(&sql)
.bind(subscription.id)
.bind(subscription.email)
.bind(subscription.name)
.bind(subscription.subscribed_at)
.execute(&pool)
.await
.expect("Failed for some reason?");
(StatusCode::OK, ())
}
```
Alternatively, I could have written:
``` rust
let subscription = Subscription::from(&(payload.0));
```
Either way works.
### Better error handling.
That unused `.expect()` has Tokio panicking if I give it a duplicate email
address, as one of the constraints in our original migration reads:
``` sql
email text not null unique,
```
We want to catch and handle that error. We also want to fix something that's
been sitting in the "TODO" list for awhile: make sure that when the subscription
form isn't fully filled out, we return a straight 400. That's actually the
easier part: we make the payload optional and return our default `BAD_REQUEST`
if it's not there:
```
pub(crate) async fn subscribe(
Extension(pool): Extension<PgPool>,
payload: Option<Form<NewSubscription>>,
) -> impl IntoResponse {
if let Some(payload) = payload {
...
(StatusCode::OK, ())
} else {
(StatusCode::BAD_REQUEST, ())
}
```
So, handling the errors is going to be interesting. Going above and beyond,
it's time to use `thiserror`.

View File

@ -1 +1,3 @@
edition = "2021"
max_width = 120
attr_fn_like_width = 120

View File

@ -1,7 +1,7 @@
use config::Config;
/*
#[derive(serde::Deserialize)]
#[derive(serde::Deserialize, Debug)]
#[serde(default)]
pub struct DatabaseSettings {
pub username: String,
pub password: String,
@ -10,23 +10,53 @@ pub struct DatabaseSettings {
pub database: String,
}
*/
#[derive(serde::Deserialize)]
impl DatabaseSettings {
pub fn url(&self) -> String {
if self.password.len() == 0 {
format!(
"postgres://{}@{}:{}/{}",
self.username, self.host, self.port, self.database
)
} else {
format!(
"postgres://{}:{}@{}:{}/{}",
self.username, self.password, self.host, self.port, self.database
)
}
}
}
#[derive(serde::Deserialize, Debug)]
#[serde(default)]
pub struct Settings {
// pub database: DatabaseSettings,
pub database: DatabaseSettings,
pub port: u16,
}
impl Default for DatabaseSettings {
fn default() -> Self {
DatabaseSettings {
username: "newsletter".to_string(),
password: "".to_string(),
host: "localhost".to_string(),
port: 5432,
database: "newsletter".to_string(),
}
}
}
impl Default for Settings {
fn default() -> Self {
Settings { port: 3001 }
Settings {
port: 3001,
database: DatabaseSettings::default(),
}
}
}
pub(crate) fn get_configuration() -> Result<Settings, config::ConfigError> {
pub fn get_configuration() -> Result<Settings, config::ConfigError> {
Config::builder()
.add_source(config::File::with_name("./ztd.config").required(false))
.add_source(config::File::with_name("./ztp.config").required(false))
.build()?
.try_deserialize()
}

24
src/errors.rs Normal file
View File

@ -0,0 +1,24 @@
use axum::{http::StatusCode, response::IntoResponse, Json};
use serde_json::json;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ZTPError {
#[error("Form data was incomplete")]
FormIncomplete,
#[error("Email Address Already Subscribed")]
DuplicateEmail,
#[error("Unknown error")]
Unknown,
}
impl IntoResponse for ZTPError {
fn into_response(self) -> axum::response::Response {
let (status, error_message) = match self {
Self::FormIncomplete => (StatusCode::BAD_REQUEST, self.to_string()),
Self::DuplicateEmail => (StatusCode::BAD_REQUEST, self.to_string()),
Self::Unknown => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
};
(status, Json(json!({ "error": error_message }))).into_response()
}
}

View File

@ -1,42 +1,30 @@
use axum::{
extract::Path,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Router,
};
use axum::{Extension, Router};
use std::net::SocketAddr;
pub mod configuration;
use configuration::{get_configuration, Settings};
mod errors;
mod routes;
use routes::routes;
use sqlx::postgres::PgPoolOptions;
mod configuration;
mod user;
use configuration::get_configuration;
use user::{index, subscribe};
pub async fn app(configuration: &Settings) -> Router {
let pool = PgPoolOptions::new()
.max_connections(50)
.connect(&configuration.database.url())
.await
.expect("could not connect to database_url");
async fn greet(Path(name): Path<String>) -> impl IntoResponse {
let greeting = String::from("He's dead, ") + name.as_str();
let greeting = greeting + &String::from("!\n");
(StatusCode::OK, greeting)
}
async fn health_check() -> impl IntoResponse {
(StatusCode::OK, ())
}
pub fn app() -> Router {
Router::new()
.route("/", get(index))
.route("/subscriptions", post(subscribe))
.route("/:name", get(greet))
.route("/health_check", get(health_check))
routes().layer(Extension(pool))
}
pub async fn run() {
let configuration = get_configuration().unwrap();
let addr = SocketAddr::from(([127, 0, 0, 1], configuration.port));
let routes = app(&configuration).await;
tracing::info!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app().into_make_service())
.serve(routes.into_make_service())
.await
.unwrap()
}
@ -44,21 +32,19 @@ pub async fn run() {
#[cfg(test)]
mod tests {
use super::*;
use axum::{
body::Body,
http::{Request, StatusCode},
};
use axum::{body::Body, http::Request};
use std::net::{SocketAddr, TcpListener};
#[tokio::test]
async fn the_real_deal() {
let configuration = get_configuration().unwrap();
let listener = TcpListener::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::Server::from_tcp(listener)
.unwrap()
.serve(app().into_make_service())
.serve(app(&configuration).await.into_make_service())
.await
.unwrap();
});

22
src/routes.rs Normal file
View File

@ -0,0 +1,22 @@
use axum::{
routing::{get, post},
Router,
};
mod greet;
mod health_check;
mod index;
mod subscribe;
use greet::greet;
use health_check::health_check;
use index::index;
use subscribe::subscribe;
pub(crate) fn routes() -> Router {
Router::new()
.route("/", get(index))
.route("/subscriptions", post(subscribe))
.route("/:name", get(greet))
.route("/health_check", get(health_check))
}

7
src/routes/greet.rs Normal file
View File

@ -0,0 +1,7 @@
use axum::{extract::Path, http::StatusCode, response::IntoResponse};
pub(crate) async fn greet(Path(name): Path<String>) -> impl IntoResponse {
let greeting = String::from("He's dead, ") + name.as_str();
let greeting = greeting + &String::from("!\n");
(StatusCode::OK, greeting)
}

View File

@ -0,0 +1,5 @@
use axum::{http::StatusCode, response::IntoResponse};
pub(crate) async fn health_check() -> impl IntoResponse {
(StatusCode::OK, ())
}

13
src/routes/index.rs Normal file
View File

@ -0,0 +1,13 @@
use axum::{http::StatusCode, response::IntoResponse, Form};
#[derive(serde::Deserialize)]
pub(crate) struct IndexData {
username: String,
}
pub(crate) async fn index(payload: Option<Form<IndexData>>) -> impl IntoResponse {
let username = payload.map_or("World".to_string(), move |index| -> String {
String::from(&(index.username))
});
(StatusCode::OK, format!("Hello, {}!\n", &username))
}

51
src/routes/subscribe.rs Normal file
View File

@ -0,0 +1,51 @@
use crate::errors::ZTPError;
use axum::{http::StatusCode, Extension, Form};
use chrono::{DateTime, Utc};
use sqlx::types::Uuid;
use sqlx::PgPool;
#[derive(serde::Deserialize)]
pub(crate) struct NewSubscription {
pub email: String,
pub name: String,
}
#[derive(serde::Deserialize, serde::Serialize, sqlx::FromRow)]
struct Subscription {
pub id: Uuid,
pub email: String,
pub name: String,
pub subscribed_at: DateTime<Utc>,
}
impl From<&NewSubscription> for Subscription {
fn from(s: &NewSubscription) -> Self {
Subscription {
id: Uuid::new_v4(),
email: s.email.clone(),
name: s.name.clone(),
subscribed_at: Utc::now(),
}
}
}
pub(crate) async fn subscribe(
Extension(pool): Extension<PgPool>,
payload: Option<Form<NewSubscription>>,
) -> Result<(StatusCode, ()), ZTPError> {
if let Some(payload) = payload {
let sql = "INSERT INTO subscriptions (id, email, name, subscribed_at) VALUES ($1, $2, $3, $4);".to_string();
let subscription: Subscription = (&(payload.0)).into();
sqlx::query(&sql)
.bind(subscription.id)
.bind(subscription.email)
.bind(subscription.name)
.bind(subscription.subscribed_at)
.execute(&pool)
.await
.map_or(Ok((StatusCode::OK, ())), |_| Err(ZTPError::DuplicateEmail))
} else {
Err(ZTPError::FormIncomplete)
}
}

View File

@ -1,7 +1,4 @@
use axum::{
body::Body,
http::{Request, StatusCode},
};
use axum::{body::Body, http::Request};
use std::net::{SocketAddr, TcpListener};
use tokio::task::JoinHandle;
use ztp::*;
@ -9,13 +6,14 @@ use ztp::*;
type NullHandle = JoinHandle<()>;
async fn spawn_server() -> (SocketAddr, NullHandle) {
let configuration = ztp::configuration::get_configuration().unwrap();
let listener = TcpListener::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
let handle: NullHandle = tokio::spawn(async move {
axum::Server::from_tcp(listener)
.unwrap()
.serve(app().into_make_service())
.serve(app(&configuration).await.into_make_service())
.await
.unwrap();
});
@ -41,27 +39,6 @@ async fn test_for_hello_world() {
assert_eq!(&body[..], b"Hello, World!\n");
}
#[tokio::test]
async fn valid_subscription() {
let (addr, _server_handle) = spawn_server().await;
let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";
let response = hyper::Client::new()
.request(
Request::builder()
.method("POST")
.header("content-type", "application/x-www-form-urlencoded")
.uri(format!("http://{}/subscriptions", addr))
.body(Body::from(body))
.unwrap(),
)
.await
.expect("Failed to execute request.");
// Assert
assert_eq!(200, response.status().as_u16());
}
#[tokio::test]
async fn subscribe_returns_400_on_missing_data() {
let (addr, _server_handle) = spawn_server().await;
@ -85,9 +62,8 @@ async fn subscribe_returns_400_on_missing_data() {
.await
.expect("Failed to execute request.");
// TODO This should be 400 "Bad Request"
assert_eq!(
415,
400,
response.status().as_u16(),
// Additional customised error message on test failure
"The API did not fail with 400 Bad Request when the payload was {}.",

View File

@ -0,0 +1,68 @@
use axum::{body::Body, http::Request};
use sqlx::postgres::PgConnection;
use sqlx::Connection;
use std::net::{SocketAddr, TcpListener};
use tokio::task::JoinHandle;
use ztp::*;
type NullHandle = JoinHandle<()>;
async fn spawn_server() -> (SocketAddr, NullHandle) {
let configuration = ztp::configuration::get_configuration().unwrap();
let listener = TcpListener::bind("127.0.0.1:0".parse::<SocketAddr>().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
let handle: NullHandle = tokio::spawn(async move {
axum::Server::from_tcp(listener)
.unwrap()
.serve(app(&configuration).await.into_make_service())
.await
.unwrap();
});
(addr, handle)
}
#[tokio::test]
async fn valid_subscription() {
let (addr, _server_handle) = spawn_server().await;
let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";
let configuration = ztp::configuration::get_configuration().unwrap();
let mut connection = PgConnection::connect(&configuration.database.url())
.await
.expect("Failed to connect to Postgres.");
sqlx::query!("DELETE FROM subscriptions")
.execute(&mut connection)
.await
.expect("Failed to clear out the subscriptions table");
let response = hyper::Client::new()
.request(
Request::builder()
.method("POST")
.header("content-type", "application/x-www-form-urlencoded")
.uri(format!("http://{}/subscriptions", addr))
.body(Body::from(body))
.unwrap(),
)
.await
.expect("Failed to execute request.");
// Assert
assert_eq!(200, response.status().as_u16());
let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
.fetch_one(&mut connection)
.await
.expect("Failed to fetch saved subscription.");
assert_eq!(saved.email, "ursula_le_guin@gmail.com");
assert_eq!(saved.name, "le guin");
sqlx::query!("DELETE FROM subscriptions")
.execute(&mut connection)
.await
.expect("Failed to clear out the subscriptions table");
}

3
ztp.config.yaml Normal file
View File

@ -0,0 +1,3 @@
database:
password: readthenews