GSoC 2023: Week 6-7
Not much going on on week 6 so I decided to take a break and cram down week 6-7 in a single article.
As you may remember, the project has two different sources from which it has to get different kind of data, but for now I’ve only talked about Postgres. The other source is VictoriaMetrics, which is a timeseries database.
There is no special driver to query VictoriaMetrics, the communication happens
over HTTP. In order to talk with it I’ll need to create a dedicated HTTP
client, this seems like a pretty good time to import the reqwest
crate into
the project, which is a widely used library to create web clients and make HTTP
requests.
// Cargo.toml
// ...
reqwest = { version = "0.11", features = ["json"] }
// ...
Since the requests are going to be pretty much the same and I don’t want to
create a client in every single process, or build the same requests multiple
times in different parts of the codebase, I’ve created an ad-hoc client that
wraps around reqwest::Client
and that contains info about VictoriaMetrics and
useful methods to query the instance.
#[derive(Debug, Clone)]
pub struct VictoriaMetricsClient {
pub config: VictoriaMetricsConfig,
pub client: reqwest::Client,
}
I can now pass this single client around in each handler and it will use the
underlying connection pool provided by the reqwest::Client
struct.
// src/server.rs
// ...
pub fn run(
listener: TcpListener,
db_pool: PgPool,
vm_client: VictoriaMetricsClient,
factory: ResponseFactory
) -> Result<Server, std::io::Error> {
let vm_client = web::Data::new(vm_client);
std::env::set_var("RUST_LOG", "actix_web=info");
let _ = env_logger::try_init();
let server = HttpServer::new(move || {
App::new()
.wrap(middleware::Logger::default())
// ...
.app_data(vm_client.clone())
// ...
})
.listen(listener)?
.run();
Ok(server)
}
Notice that we don’t have to use Arc<_>
in this case as it is used internally
by the reqwest::Client
struct:
/// An asynchronous `Client` to make Requests with.
/// ...
/// The `Client` holds a connection pool internally, so it is advised that
/// you create one and **reuse** it.
///
/// You do **not** have to wrap the `Client` in an [`Rc`] or [`Arc`] to **reuse** it,
/// because it already uses an [`Arc`] internally.
///
/// [`Rc`]: std::rc::Rc
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientRef>,
}
Atomic reference count is usually needed when you want to pass around values in
different threads because they provide a thread-safe way to keep track of memory
instances. I don’t have to do any extra-work in this case, if you need to have
an HTTP client in your handlers you can just create one when the server starts
and pass it as app data by simply cloning it .app_data(client.clone())
.
The wrapper structure is pretty simple, it exposes a new
method that takes in
a configuration, instantitates an HTTP client that enforces HTTPS only requests
and have a timeout of 5 seconds, it should be enough for the moment.
impl VictoriaMetricsClient {
pub fn new(config: VictoriaMetricsConfig) -> Self {
let client = reqwest::Client::builder()
.https_only(true)
.connect_timeout(Duration::new(5, 0))
.build()
.unwrap();
Self {
config,
client
}
}
}
Next, I need a method that triggers the actual request to VM and returns data from it. VictoriaMetrics accepts query parameters to filter the data that you want to get back, I want the flexibility to choose those directly in the handler so they must be passed through the utility function. Plus, I’ll make the function return valid, deserialized data so that the response doesn’t have to be passed around to other callers.
impl VictoriaMetricsClient {
pub async fn query(&self, params: Vec<(String, String)>) -> Result<Vec<victoriametrics::Result>, String> {
let res = self.client.get(&self.config.url)
.query(¶ms)
.send()
.await
.map_err(|e| e.to_string())?;
if res.status().as_str() != "200" {
let err_res = res.json::<victoriametrics::ErrorResponse>()
.await
.unwrap();
return Err(err_res.error);
}
if let Ok(response) = res.json::<victoriametrics::Response>().await {
return Ok(response.data.result);
}
Err("something went wrong during VM data deserialization".into())
}
}
Error handling is not the best at the moment because I’m just returning a
String
which is not really that useful if I want to act differently when certain kind
of errors are returned. Keep in mind that this will most certainly change as the
whole system shapes and new needs arise, for the moment it is okay to just return a simple error to the user.
I can now make use of this VM wrapper by just including it in the handler method signature
pub async fn get_bandwidth(
params: QueryFilters,
vm: web::Data<VictoriaMetricsClient>,
pg: web::Data<PgPool>
) {
// ...
}
This is pretty much everything I need at the moment to query VM.
Moving on, you may recall from previous articles that each response has this structure
#[derive(Debug, Serialize, Deserialize, Default, Builder)]
pub struct SummaryResponse<R, B> {
pub version: String,
pub next_major_version_scheduled: Option<String>,
pub build_version: Option<String>,
pub relays_published: String,
pub relays_skipped: Option<i32>,
pub relays_truncated: i32,
pub relays: Vec<R>,
pub bridges_published: String,
pub bridges_skipped: Option<i32>,
pub bridges_truncated: i32,
pub bridges: Vec<B>
}
Each handler will inject different kind of elements in bridges
and relays
,
*_skipped
fields will be equal to what the end-user passes as offset
query
param and *_trucated
fields will depend on both offset
and limit
query
params and it is the result of total_(bridges/relays) - limit - offset
.
Other than that, each response will have a lot of values in common, such as
version
, next_major_version_scheduled
etc. Those hypotethically are info
that need to be queried from pg every time a new response is being built. That
means that in each handler you would have to make multiple queries to the pg
instance to get the same info. That does not scale well, it would be best to
have a single query for each handler, in this case the one that will get the
data to fill relays
and bridges
.
We decided that most of those values would be read from a file on the server, periodically, as data is being updated on the instance. That way we can have a sort of memory cache that is providing that data for us. I’ve decided to implement a response factory that initiates the response building process and returns a response builder that already contains the values that we were talking about above. This makes the code a lot cleaner because each handler will only need to worry about the logic to query the data that it needs to provide.
#[derive(Debug, Deserialize)]
struct GenericInfo {
pub version: String,
pub build_version: Option<String>,
pub total_relays: u32,
pub total_bridges: u32,
pub next_major_version_scheduled: Option<String>,
pub relays_published: String,
pub bridges_published: String,
}
#[derive(Debug)]
pub struct ResponseFactory {
config_path: std::path::PathBuf,
generic_info: GenericInfo,
generic_info_exp: u64
}
impl ResponseFactory {
pub fn with_config(path: String) -> Result<Self> {
let path = Path::new(&path);
let input = std::fs::read_to_string(path)?;
let generic_info = serde_json::from_str::<GenericInfo>(input.as_str())?;
let future_time = SystemTime::now() + Duration::from_secs(7 * 24 * 60 * 60);
let exp = future_time
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
Ok(Self {
config_path: path.to_path_buf(),
generic_info,
generic_info_exp: exp
})
}
pub fn get<R, B>(&self) -> SummaryResponseBuilder<R, B> where R: Clone, B: Clone {
let mut builder = SummaryResponseBuilder::default();
builder.version(self.generic_info.version.clone());
builder.build_version(self.generic_info.build_version.clone());
builder.relays_published(self.generic_info.relays_published.clone());
builder.bridges_published(self.generic_info.bridges_published.clone());
builder.next_major_version_scheduled(self.generic_info.next_major_version_scheduled.clone());
builder
}
}
When the factory is initialized, it reads the content of the file at path
and
deserializes it into the GenericInfo
struct that is then saved into the
factory itself. It also sets up an expiration time of 7 days in the future that is
going to be used to check if generic_info
needs to be updated or not.
It’s not been implemented yet, but factory initialization will also spawn a
thread that periodically checks if generic_info_exp
(unix time) is in the past. If it is, it updates
generic_info
with new data that may be contained in the config file on the
server.
With this change the code in each handlers shrinks by a lot and it looks way
better, take the /summary
endpoint for example
pub async fn get_summary(
params: QueryFilters,
factory: web::Data<ResponseFactory>,
pg: web::Data<PgPool>
) -> Result<HttpResponse, Error> {
let mut response = factory.get::<RelaySummary, BridgeSummary>();
// ...
match params.r#type.unwrap_or(ParametersType::Relay) {
ParametersType::Relay => {
let relays = metrics::relay_summary(&pg, ¶ms)
.await
.map_err(ErrorInternalServerError)?;
// ...
response.relays(relays);
},
ParametersType::Bridge => {
let bridges = metrics::bridges_summary(&pg, ¶ms)
.await
.map_err(ErrorInternalServerError)?;
// ...
response.relays(bridges);
}
}
let summary = response.build()
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(summary))
}
I’m not really 100% okay with the solution though. Right now I’m passing the
factory directly as web::Data<_>
into each handler but I would prefer to
actually offload this logic into a separate component that is not directly
visible in the handler, maybe a middleware? My goal would be to only return
bridges
and relays
in the handler but I haven’t found a compelling solution
for this yet.
Lastly, this week I re-designed the overall logic that I’m using to make queries against Postgres.
Initially, my approach was to implement FromRow
for each response struct that
I had to return. This caused a lot of head scratching because the database
structure is in the making and we’re adjusting it as we see fit, therefore a lot
of types do not match exactly what the response should return or won’t match in
the future. To make that work I had to create a lot of FromRow
implementations for each request, but this approach did not feel right or
idiomatic.
I opted for a different approach which involves more code that sits between the
database and the server logic. I’ve created a *Row
type, for each query, that
matches exactly the types and names of the columns queried.
Let’s take summary
as an example. The BridgeSummaryRow
struct matches
exactly the type and shape of the query that I’m going to make against the
Postgres instance.
#[derive(Debug, sqlx::FromRow)]
pub struct BridgeSummaryRow {
pub nickname: String,
pub fingerprint: String,
pub running: Option<bool>
}
pub async fn bridges_summary(pg: &PgPool, filters: &QueryFilters) -> Result<Vec<BridgeSummaryRow>, String> {
sqlx::query_as!(
BridgeSummaryRow,
r#"SELECT nickname, fingerprint, running
FROM server_status
WHERE is_bridge = true
"#
)
.fetch_all(pg)
.await
.map_err(|e| e.to_string())
}
And here are the corresponding columns in the table
CREATE TABLE IF NOT EXISTS server_status(
-- ...
nickname TEXT NOT NULL,
fingerprint TEXT NOT NULL,
running BOOLEAN,
-- ...
);
The first thing that I get from this is that I don’t have to write a lot of
FromRow
code because we have a 1:1 match between struct and query row. The
other nice thing that we get is that, for now, we can still make use of sqlx
macros to statically check that queries are indeed correct and match the
expected returned type.
But that’s not it, I still need to adapt these *Row
structs to their
corresponding responses. To do that, I’ve implemented the From
trait for each
response type in which I make all the transformations that I need.
Take a look, now instead of this
impl sqlx::FromRow<'_, PgRow> for RelaySummary {
fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
let n = row.try_get("nickname")?;
let f = row.try_get("fingerprint")?;
let a = row.try_get("or_addresses")?;
let r = row.try_get("running")?;
let a: Vec<_> = value.or_addresses
.split(',')
.map(String::from)
.collect();
Ok(Self::from(n,f,a,r))
}
}
I have this
impl From<RelaySummaryRow> for RelaySummary {
fn from(value: RelaySummaryRow) -> Self {
let or_addresses: Vec<_> = value.or_addresses
.split(',')
.map(String::from)
.collect();
Self {
n: value.nickname,
f: value.fingerprint,
a: or_addresses,
r: value.running.unwrap_or(false)
}
}
}
I find this to be a better approach for multiple reasons:
-
Decoupling of database data types and response types
-
Data transformation from database rows to responses is easier to test and mock
-
Less error prone and cleaner since I am working with types instead of extracting columns one by one
-
FromRow
can fail,From
can’t
I know I know, RelaySummaryRow
is the one that can fail now, but I'm pretty sure that it won't since I'm leveraging sqlx static checks to make queries exact and correct.
The cons of this whole approach, of course, is that I need to implement a Row
type for each query that I want to make and this is a pretty verbose process
that introduced ~400 lines of code, but I guess that’s the price to pay.
I would say I’ve made a pretty good progress in these past weeks, /summary
and
/details
seem to work okay without query parameters. Next up is the
/bandwidth
endpoint which is the handler that acts as a proxy for
VictoriaMetrics. The plan is to also push for more tests and stabilize what’s
working right now, but I’ll talk about that in next week’s blog. See you!