GSoC 2023: Week 6-7

Not much going on on week 6 so I decided to take a break and cram down week 6-7 in a single article.

As you may remember, the project has two different sources from which it has to get different kind of data, but for now I’ve only talked about Postgres. The other source is VictoriaMetrics, which is a timeseries database.

There is no special driver to query VictoriaMetrics, the communication happens over HTTP. In order to talk with it I’ll need to create a dedicated HTTP client, this seems like a pretty good time to import the reqwest crate into the project, which is a widely used library to create web clients and make HTTP requests.

// Cargo.toml
// ...
reqwest = { version = "0.11", features = ["json"] }
// ...

Since the requests are going to be pretty much the same and I don’t want to create a client in every single process, or build the same requests multiple times in different parts of the codebase, I’ve created an ad-hoc client that wraps around reqwest::Client and that contains info about VictoriaMetrics and useful methods to query the instance.

#[derive(Debug, Clone)]
pub struct VictoriaMetricsClient {
    pub config: VictoriaMetricsConfig,
    pub client: reqwest::Client,
}

I can now pass this single client around in each handler and it will use the underlying connection pool provided by the reqwest::Client struct.

// src/server.rs
// ...
pub fn run(
    listener: TcpListener,
    db_pool: PgPool,
    vm_client: VictoriaMetricsClient,
    factory: ResponseFactory
) -> Result<Server, std::io::Error> {
    let vm_client = web::Data::new(vm_client);

    std::env::set_var("RUST_LOG", "actix_web=info");
    let _ = env_logger::try_init();

    let server = HttpServer::new(move || {
        App::new()
            .wrap(middleware::Logger::default())
            // ...
            .app_data(vm_client.clone())
            // ...
    })
    .listen(listener)?
    .run();

    Ok(server)
}

Notice that we don’t have to use Arc<_> in this case as it is used internally by the reqwest::Client struct:

/// An asynchronous `Client` to make Requests with.
/// ...
/// The `Client` holds a connection pool internally, so it is advised that
/// you create one and **reuse** it.
///
/// You do **not** have to wrap the `Client` in an [`Rc`] or [`Arc`] to **reuse** it,
/// because it already uses an [`Arc`] internally.
///
/// [`Rc`]: std::rc::Rc
#[derive(Clone)]
pub struct Client {
    inner: Arc<ClientRef>,
}

Atomic reference count is usually needed when you want to pass around values in different threads because they provide a thread-safe way to keep track of memory instances. I don’t have to do any extra-work in this case, if you need to have an HTTP client in your handlers you can just create one when the server starts and pass it as app data by simply cloning it .app_data(client.clone()).

The wrapper structure is pretty simple, it exposes a new method that takes in a configuration, instantitates an HTTP client that enforces HTTPS only requests and have a timeout of 5 seconds, it should be enough for the moment.

impl VictoriaMetricsClient {
    pub fn new(config: VictoriaMetricsConfig) -> Self {
        let client = reqwest::Client::builder()
            .https_only(true)
            .connect_timeout(Duration::new(5, 0))
            .build()
            .unwrap();

        Self {
            config,
            client
        }
    }
}

Next, I need a method that triggers the actual request to VM and returns data from it. VictoriaMetrics accepts query parameters to filter the data that you want to get back, I want the flexibility to choose those directly in the handler so they must be passed through the utility function. Plus, I’ll make the function return valid, deserialized data so that the response doesn’t have to be passed around to other callers.

impl VictoriaMetricsClient {
    pub async fn query(&self, params: Vec<(String, String)>) -> Result<Vec<victoriametrics::Result>, String> {
        let res = self.client.get(&self.config.url)
            .query(&params)
            .send()
            .await
            .map_err(|e| e.to_string())?;

        if res.status().as_str() != "200" {
            let err_res = res.json::<victoriametrics::ErrorResponse>()
                .await
                .unwrap();

            return Err(err_res.error);
        }

        if let Ok(response) = res.json::<victoriametrics::Response>().await {
            return Ok(response.data.result);
        }

        Err("something went wrong during VM data deserialization".into())
    }
}

Error handling is not the best at the moment because I’m just returning a String which is not really that useful if I want to act differently when certain kind of errors are returned. Keep in mind that this will most certainly change as the whole system shapes and new needs arise, for the moment it is okay to just return a simple error to the user.

I can now make use of this VM wrapper by just including it in the handler method signature

pub async fn get_bandwidth(
    params: QueryFilters,
    vm: web::Data<VictoriaMetricsClient>,
    pg: web::Data<PgPool>
) {
    // ...
}

This is pretty much everything I need at the moment to query VM.

Moving on, you may recall from previous articles that each response has this structure

#[derive(Debug, Serialize, Deserialize, Default, Builder)]
pub struct SummaryResponse<R, B> {
    pub version: String,
    pub next_major_version_scheduled: Option<String>,
    pub build_version: Option<String>,
    pub relays_published: String,
    pub relays_skipped: Option<i32>,
    pub relays_truncated: i32,
    pub relays: Vec<R>,
    pub bridges_published: String,
    pub bridges_skipped: Option<i32>,
    pub bridges_truncated: i32,
    pub bridges: Vec<B>
}

Each handler will inject different kind of elements in bridges and relays, *_skipped fields will be equal to what the end-user passes as offset query param and *_trucated fields will depend on both offset and limit query params and it is the result of total_(bridges/relays) - limit - offset.

Other than that, each response will have a lot of values in common, such as version, next_major_version_scheduled etc. Those hypotethically are info that need to be queried from pg every time a new response is being built. That means that in each handler you would have to make multiple queries to the pg instance to get the same info. That does not scale well, it would be best to have a single query for each handler, in this case the one that will get the data to fill relays and bridges.

We decided that most of those values would be read from a file on the server, periodically, as data is being updated on the instance. That way we can have a sort of memory cache that is providing that data for us. I’ve decided to implement a response factory that initiates the response building process and returns a response builder that already contains the values that we were talking about above. This makes the code a lot cleaner because each handler will only need to worry about the logic to query the data that it needs to provide.

#[derive(Debug, Deserialize)]
struct GenericInfo {
    pub version: String,
    pub build_version: Option<String>,
    pub total_relays: u32,
    pub total_bridges: u32,
    pub next_major_version_scheduled: Option<String>,
    pub relays_published: String,
    pub bridges_published: String,
}

#[derive(Debug)]
pub struct ResponseFactory {
    config_path: std::path::PathBuf,
    generic_info: GenericInfo,
    generic_info_exp: u64
}

impl ResponseFactory {
    pub fn with_config(path: String) -> Result<Self> {
        let path = Path::new(&path);
        let input = std::fs::read_to_string(path)?;
        let generic_info = serde_json::from_str::<GenericInfo>(input.as_str())?;

        let future_time = SystemTime::now() + Duration::from_secs(7 * 24 * 60 * 60);
        let exp = future_time
            .duration_since(UNIX_EPOCH)
            .expect("Time went backwards")
            .as_secs();

        Ok(Self {
            config_path: path.to_path_buf(),
            generic_info,
            generic_info_exp: exp
        })
    }

    pub fn get<R, B>(&self) -> SummaryResponseBuilder<R, B> where R: Clone, B: Clone {
        let mut builder = SummaryResponseBuilder::default();
        builder.version(self.generic_info.version.clone());
        builder.build_version(self.generic_info.build_version.clone());
        builder.relays_published(self.generic_info.relays_published.clone());
        builder.bridges_published(self.generic_info.bridges_published.clone());
        builder.next_major_version_scheduled(self.generic_info.next_major_version_scheduled.clone());

        builder
    }
}

When the factory is initialized, it reads the content of the file at path and deserializes it into the GenericInfo struct that is then saved into the factory itself. It also sets up an expiration time of 7 days in the future that is going to be used to check if generic_info needs to be updated or not.

It’s not been implemented yet, but factory initialization will also spawn a thread that periodically checks if generic_info_exp (unix time) is in the past. If it is, it updates generic_info with new data that may be contained in the config file on the server.

With this change the code in each handlers shrinks by a lot and it looks way better, take the /summary endpoint for example

pub async fn get_summary(
    params: QueryFilters,
    factory: web::Data<ResponseFactory>,
    pg: web::Data<PgPool>
) -> Result<HttpResponse, Error> {
    let mut response = factory.get::<RelaySummary, BridgeSummary>();
    // ...
    match params.r#type.unwrap_or(ParametersType::Relay) {
        ParametersType::Relay => {
            let relays = metrics::relay_summary(&pg, &params)
                .await
                .map_err(ErrorInternalServerError)?;
                // ...
            response.relays(relays);
        },
        ParametersType::Bridge => {
            let bridges = metrics::bridges_summary(&pg, &params)
                .await
                .map_err(ErrorInternalServerError)?;
                // ...
            response.relays(bridges);
        }
    }

    let summary = response.build()
        .map_err(ErrorInternalServerError)?;

    Ok(HttpResponse::Ok().json(summary))
}

I’m not really 100% okay with the solution though. Right now I’m passing the factory directly as web::Data<_> into each handler but I would prefer to actually offload this logic into a separate component that is not directly visible in the handler, maybe a middleware? My goal would be to only return bridges and relays in the handler but I haven’t found a compelling solution for this yet.

Lastly, this week I re-designed the overall logic that I’m using to make queries against Postgres.

Initially, my approach was to implement FromRow for each response struct that I had to return. This caused a lot of head scratching because the database structure is in the making and we’re adjusting it as we see fit, therefore a lot of types do not match exactly what the response should return or won’t match in the future. To make that work I had to create a lot of FromRow implementations for each request, but this approach did not feel right or idiomatic.

I opted for a different approach which involves more code that sits between the database and the server logic. I’ve created a *Row type, for each query, that matches exactly the types and names of the columns queried.

Let’s take summary as an example. The BridgeSummaryRow struct matches exactly the type and shape of the query that I’m going to make against the Postgres instance.

#[derive(Debug, sqlx::FromRow)]
pub struct BridgeSummaryRow {
    pub nickname: String,
    pub fingerprint: String,
    pub running: Option<bool>
}

pub async fn bridges_summary(pg: &PgPool, filters: &QueryFilters) -> Result<Vec<BridgeSummaryRow>, String> {
    sqlx::query_as!(
        BridgeSummaryRow,
        r#"SELECT nickname, fingerprint, running
        FROM server_status
        WHERE is_bridge = true
        "#
    )
    .fetch_all(pg)
    .await
    .map_err(|e| e.to_string())
}

And here are the corresponding columns in the table

CREATE TABLE IF NOT EXISTS server_status(
  -- ...
  nickname      TEXT     NOT NULL,
  fingerprint   TEXT     NOT NULL,
  running       BOOLEAN,
  -- ...
);

The first thing that I get from this is that I don’t have to write a lot of FromRow code because we have a 1:1 match between struct and query row. The other nice thing that we get is that, for now, we can still make use of sqlx macros to statically check that queries are indeed correct and match the expected returned type.

But that’s not it, I still need to adapt these *Row structs to their corresponding responses. To do that, I’ve implemented the From trait for each response type in which I make all the transformations that I need.

Take a look, now instead of this

impl sqlx::FromRow<'_, PgRow> for RelaySummary {
    fn from_row(row: &PgRow) -> Result<Self, sqlx::Error> {
        let n = row.try_get("nickname")?;
        let f = row.try_get("fingerprint")?;
        let a = row.try_get("or_addresses")?;
        let r = row.try_get("running")?;

        let a: Vec<_> = value.or_addresses
            .split(',')
            .map(String::from)
            .collect();

        Ok(Self::from(n,f,a,r))
    }
}

I have this

impl From<RelaySummaryRow> for RelaySummary {
    fn from(value: RelaySummaryRow) -> Self {
        let or_addresses: Vec<_> = value.or_addresses
            .split(',')
            .map(String::from)
            .collect();

        Self {
            n: value.nickname,
            f: value.fingerprint,
            a: or_addresses,
            r: value.running.unwrap_or(false)
        }
    }
}

I find this to be a better approach for multiple reasons:

  1. Decoupling of database data types and response types

  2. Data transformation from database rows to responses is easier to test and mock

  3. Less error prone and cleaner since I am working with types instead of extracting columns one by one

  4. FromRow can fail, From can’t

I know I know, RelaySummaryRow is the one that can fail now, but I'm pretty sure that it won't since I'm leveraging sqlx static checks to make queries exact and correct.

The cons of this whole approach, of course, is that I need to implement a Row type for each query that I want to make and this is a pretty verbose process that introduced ~400 lines of code, but I guess that’s the price to pay.

I would say I’ve made a pretty good progress in these past weeks, /summary and /details seem to work okay without query parameters. Next up is the /bandwidth endpoint which is the handler that acts as a proxy for VictoriaMetrics. The plan is to also push for more tests and stabilize what’s working right now, but I’ll talk about that in next week’s blog. See you!