In-Database Logistic Regression with R

by Roland Stevenson

Roland Stevenson is a data scientist and consultant who may be reached on Linkedin.

In a previous article we illustrated how to calculate xgboost model predictions in-database. This was referenced and incorporated into tidypredict. After learning more about what the tidypredict team is up to, I discovered another tidyverse package called modeldb that fits models in-database. It currently supports linear regression and k-means clustering, so I thought I would provide an example of how to do in-database logistic regression.

Rather than focusing on the details of logistic regression, we will focus more on how we can use R and some carefully written SQL statements to iteratively minimize a cost function. We will also use the condusco R package, which allows us to iterate through the results of a query easily.

A Simple Logistic Regression Example

Let’s start with a simple logistic regression example. We’ll simulate an outcome \(y\) based on the fact that \(Pr(y=1) = \frac{e^{\beta x}}{1+e^{\beta x}}\). Here \(\beta\) is a vector containing the coefficients we will later be estimating (including an intercept term). In the example below, our \(x\) values are uniform random values between -1 and 1.

set.seed(1)

# the number of samples
n <- 1000

# uniform random on (-1,1)
x1 <- 2*runif(n)-1
x2 <- 2*runif(n)-1
x <- cbind(1, x1, x2)

# our betas
beta <- c(-1, -3.0, 5.0)

probs <- exp(beta %*% t(x))/(1+exp(beta %*% t(x)))

y <- rbinom(n,1,probs)

sim <- data.frame(id = seq(1:n), y = y, x1 = x1, x2 = x2)

mylogit <- glm(y ~ x1 + x2, data = sim, family = "binomial")
summary(mylogit)
mylogit$coefficients

As expected, the coefficients of our logistic model successfully approximate the parameters in our beta vector.

In-database Logistic Regression

Now, let’s see if we can find a way to calculate these same coefficients in-database. In this example, we’re going to use Google BigQuery as our database, and we’ll use condusco’s run_pipeline_gbq function to iteratively run the functions we define later on. To do this, we’ll need to take care of some initial housekeeping:

library(bigrquery)
library(whisker)
library(condusco)

# Uncomment and define your own config
# config <- list(
#   project = '<YOUR GBQ PROJECT>',
#   dataset = '<YOUR GBQ DATASET>',
#   table_prefix = '<A TABLE_PREFIX TO USE>'
# )

# a simple whisker.render helper function for our use-case
wr <- function(s, params=config){whisker.render(s,params)}

# put the simulated data in GBQ
insert_upload_job(
  project = wr('{{{project}}}'),
  dataset = wr('{{{dataset}}}'),
  table = "logreg_sim",
  values = sim,
  write_disposition = "WRITE_TRUNCATE"
)

Now, we’ll create the pipelines to do the logistic regression. Please note that the code below is quite verbose. While all of it is needed for the code to work, we’ll just focus on understanding how a couple of steps work. Once we understand one step, the rest is pretty easy. Feel free to skip ahead.

First, we create a pipeline that does two things:

  • create a main table containing all of our global settings
  • calls another pipeline (log_reg_stack) with the global settings as inputs

Importantly, note that all of the parameters (eg. {{{project}}}) are dynamically swapped out in the query below with the wr function and the params variables. So this pipeline dynamically creates a query based on the parameters passed to it. We will call this pipeline later to run the process.

#
# Pipeline: log_reg
#
log_reg <- function(params){
  
  print ("log_reg")

  query <- '
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_settings
    AS 
    SELECT
      "{{{project}}}" AS project,
      "{{{dataset}}}" AS dataset,
      "{{{data_table}}}" AS data_table,
      {{{max_steps}}} AS max_steps,
      {{{error_tol}}} AS error_tol,
      {{{learning_rate}}} AS learning_rate,
      "{{{id_column}}}"   AS id_column,
      "{{{label_column}}}" AS label_column,
      "{{{fieldnames}}}" AS fieldnames,
      "{{{constant_id}}}" AS constant_id,
      "{{{table_prefix}}}" AS table_prefix
  '
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  # Now run the log_reg_stack pipeline and pass the settings to it
  invocation_query <- '
    SELECT *
    FROM {{{dataset}}}.{{table_prefix}}_settings
  '
  run_pipeline_gbq(
    log_reg_stack,
    wr(invocation_query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
}

The above pipeline calls another pipeline, log_reg_stack, which is defined below. log_reg_stack creates a table with the field names that we will use in the logistic regression and then runs log_reg_stack_field on each of the field names. Note that the invocation_query below contains a query that results in one or more rows containing a field name. run_pipeline_gbq takes the results and iterates over them, calling log_reg_stack_field on each one. Finally, it creates the _labels table and calls log_reg_setup, passing it the results of the global settings query.

#
# Pipeline: stack variables
#
log_reg_stack <- function(params){
  
  print ("log_reg_stack")
  
  # Table: _fieldnames 
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_fieldnames
    AS
    SELECT TRIM(fieldname) AS fieldname
    FROM (
      SELECT split(fieldnames,',') AS fieldname
      FROM (
          SELECT '{{{fieldnames}}}' AS fieldnames
      )
    ), UNNEST(fieldname) as fieldname
    GROUP BY 1
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  # Run _stack_field
  query <- "
    DROP TABLE IF EXISTS {{{dataset}}}.{{{table_prefix}}}_stacked
  "
  
  tryCatch({
    query_exec(
      project = wr('{{{project}}}', params),
      query = wr(query, params),
      use_legacy_sql = FALSE
    )},
    error = function(e){
      print(e)
  })
    
  invocation_query <- "
    SELECT
      a.fieldname AS fieldname,  
      b.*
    FROM (  
      SELECT fieldname  
      FROM {{{dataset}}}.{{{table_prefix}}}_fieldnames  
      GROUP BY fieldname
    ) a  
    CROSS JOIN (  
      SELECT *  
        FROM {{{dataset}}}.{{{table_prefix}}}_settings
    ) b
  "
  run_pipeline_gbq(
    log_reg_stack_field,
    wr(invocation_query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
  
  # Table: _labels
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_labels
    AS
    SELECT
      {{{id_column}}} AS id,
      {{{label_column}}} AS label
    FROM {{{data_table}}}
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  
  # Run _setup
  invocation_query <- "
    SELECT *  
      FROM {{{dataset}}}.{{{table_prefix}}}_settings
  "
  run_pipeline_gbq(
    log_reg_setup,
    wr(invocation_query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
  
}

The log_reg_stack_field and log_reg_setup pipelines are not particularly interesting. They do the groundwork needed to allow the log_reg_loop pipeline to iterate. The _stacked table contains the feature names and their values, and the _feature_stats and features_stacked_vni tables contains normalized values used later. Finally, the _fit_params table contains the value of the fit parameters that will be updated as we iteratively minimize the cost function in the loop. The log_reg_setup pipeline ends by calling log_reg_loop, passing it the results of the global settings query.

log_reg_stack_field <- function(params){
  
  print ("log_reg_stack_field")

  destination_table <- '{{{dataset}}}.{{{table_prefix}}}_stacked'

  query <- "
    SELECT {{{id_column}}} AS id,
      LTRIM('{{{fieldname}}}') AS feature_name,
      CAST({{{fieldname}}} AS FLOAT64) AS vi
    FROM {{{data_table}}}
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    destination_table = wr(destination_table, params),
    use_legacy_sql = FALSE,
    write_disposition = 'WRITE_APPEND',
    create_disposition = 'CREATE_IF_NEEDED'
  )
  
}


log_reg_setup <- function(params){
  
  print ("log_reg_setup")
  
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_feature_stats
    AS
    SELECT feature_name,
      AVG(vi) AS mean,
      STDDEV(vi) AS stddev
    FROM {{{dataset}}}.{{{table_prefix}}}_stacked
    GROUP BY feature_name
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_features_stacked_vni
    AS
    SELECT
      a.id AS id,
      a.feature_name AS feature_name,
      CASE
        WHEN b.stddev > 0.0 THEN (vi - b.mean) / b.stddev
        ELSE vi - b.mean
      END AS vni
    FROM {{{dataset}}}.{{{table_prefix}}}_stacked a
    JOIN {{{dataset}}}.{{{table_prefix}}}_feature_stats b
      ON a.feature_name = b.feature_name
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  query <- "
    INSERT INTO {{{dataset}}}.{{{table_prefix}}}_features_stacked_vni (id, feature_name, vni)     
    SELECT
      id,
      '{{{constant_id}}}' as feature_name,
      1.0 as vni
    FROM {{{dataset}}}.{{{table_prefix}}}_stacked
    GROUP BY 1,2,3
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_fit_params
    AS
    SELECT
      step,
      param_id,
      param_value,
      cost,
      stop,
      message
    FROM (
      SELECT 1 as step,
      feature_name as param_id,
      0.0 as param_value,
      1e6 as cost,
      false as stop,
      '' as message
      FROM {{{dataset}}}.{{{table_prefix}}}_stacked
      GROUP BY param_id
    ) UNION ALL (
      SELECT 1 as step,
      '{{{constant_id}}}' as param_id,
      0.0 as param_value,
      1e6 as cost,
      false as stop,
      '' as message
    )
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  # Run _loop
  invocation_query <- "
    SELECT *  
      FROM {{{dataset}}}.{{{table_prefix}}}_settings
  "
  run_pipeline_gbq(
    log_reg_loop,
    wr(invocation_query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
}

Next, we’ll create a loop pipeline that will iteratively calculate the cost function and update the _fit_params table with the latest update.

#
# Pipeline: loop
#
log_reg_loop <- function(params){
  
  print ("log_reg_loop")
  
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_x_dot_beta_i
    AS 
    SELECT
      a.id AS id,
      SUM(a.vni * b.param_value) AS x_dot_beta_i
    FROM {{{dataset}}}.{{{table_prefix}}}_features_stacked_vni a  
    RIGHT JOIN (
      SELECT param_id, param_value    
      FROM {{{dataset}}}.{{{table_prefix}}}_fit_params    
      WHERE STEP = (SELECT max(step) FROM {{{dataset}}}.{{{table_prefix}}}_fit_params)
    ) b
    ON a.feature_name = b.param_id
    GROUP BY 1
  "
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  query <- '
  INSERT INTO {{{dataset}}}.{{{table_prefix}}}_fit_params (step, param_id, param_value, cost, stop, message)
  SELECT  
    b.step + 1 as step,  
    b.param_id as param_id,  
    b.param_value - {{{learning_rate}}} * err as param_value,
    -1.0 * a.cost as cost,
    CASE
      WHEN ( abs((b.cost-(-1.0*a.cost))/b.cost) < {{{error_tol}}} ) OR (step+1 > {{{max_steps}}})  
        THEN true
      ELSE false  
      END AS stop,  
    CONCAT( "cost: ", CAST(abs((b.cost-(-1.0*a.cost))/b.cost) AS STRING), " error_tol: ", CAST({{{error_tol}}} AS STRING)) as message  
  FROM (  
    SELECT  
      param_id,  
      avg(err) as err,  
      avg(cost) as cost  
    FROM (  
      SELECT  
        a.id,
        param_id,
        (1.0/(1.0 + EXP(-1.0 * (c.x_dot_beta_i))) - CAST(label AS FLOAT64)) * vni as err,
        CAST(label AS FLOAT64) * LOG( 1.0/(1.0 + EXP(-1.0 * (c.x_dot_beta_i))) )   
          + (1.0-CAST(label AS FLOAT64))*(log(1.0 - (1.0/(1.0 + EXP(-1.0 * (c.x_dot_beta_i))))))  as cost
      FROM (  
        SELECT a.id as id,  
        b.param_id as param_id,
        a.vni as vni,
        b.param_value as param_value
        FROM {{{dataset}}}.{{{table_prefix}}}_features_stacked_vni a  
        JOIN (
          SELECT param_id, param_value    
          FROM {{{dataset}}}.{{{table_prefix}}}_fit_params    
          WHERE STEP = (SELECT max(step) FROM {{{dataset}}}.{{{table_prefix}}}_fit_params)
        ) b
        ON a.feature_name = b.param_id
        GROUP BY 1,2,3,4
      ) a
      JOIN {{{dataset}}}.{{{table_prefix}}}_labels b  
      ON a.id = b.id
      JOIN {{{dataset}}}.{{{table_prefix}}}_x_dot_beta_i c
      ON a.id = c.id
    )  
    GROUP BY param_id
  ) a
  JOIN (
    SELECT *
    FROM {{{dataset}}}.{{{table_prefix}}}_fit_params
    WHERE STEP = (SELECT max(step) FROM {{{dataset}}}.{{{table_prefix}}}_fit_params)
  ) b
  ON a.param_id = b.param_id
  '
  
  query_exec(
    project = wr('{{{project}}}', params),
    query = wr(query, params),
    use_legacy_sql = FALSE
  )
  
  
  # Loop or stop
  query <- "
      SELECT stop  AS stop
      FROM (
        SELECT *
          FROM {{{dataset}}}.{{{table_prefix}}}_fit_params
        ORDER BY step DESC
        LIMIT 1
      )
  "
  
  res <- query_exec(
    wr(query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
  
  if(res$stop == FALSE){
    print("stop == FALSE")
    invocation_query <- '
      SELECT *
      FROM {{{dataset}}}.{{table_prefix}}_settings
    '
    run_pipeline_gbq(
      log_reg_loop,
      wr(invocation_query,  params),
      wr('{{{project}}}', params),
      use_legacy_sql = FALSE
    )
  }
  else {
    print("stop == TRUE")
    invocation_query <- '
      SELECT *
      FROM {{{dataset}}}.{{table_prefix}}_settings
    '
    run_pipeline_gbq(
      log_reg_done,
      wr(invocation_query,  params),
      wr('{{{project}}}', params),
      use_legacy_sql = FALSE
    )
  }
  
}

And finally, a log_reg_done pipeline that outputs the results:

#
# Pipeline: done
#
log_reg_done <- function(params){
  
  print ("log_reg_done")
  
  # Display results in norm'd coords
  query <- '
    SELECT "normalized coords parameters" as message,
      step,  
      param_id,  
      param_value 
    FROM {{{dataset}}}.{{{table_prefix}}}_fit_params
    WHERE step = (SELECT max(step) from {{{dataset}}}.{{{table_prefix}}}_fit_params)
  '
  
  res <- query_exec(
    wr(query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
  
  print(res)
  
  # Display results in original coords
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_model_params_stacked
    AS 
    SELECT
      param_id,
      param_value_rescaled
    FROM (
      SELECT
        a.param_id AS param_id,
        a.param_value + b.constant_offset AS param_value_rescaled
      FROM (
        SELECT
          step,
          param_id,
          param_value
        FROM {{{dataset}}}.{{{table_prefix}}}_fit_params
        WHERE step = (SELECT max(step) from {{{dataset}}}.{{{table_prefix}}}_fit_params)
        AND param_id = 'CONSTANT'
      ) a
      JOIN (
        SELECT
          step,
          'CONSTANT' as param_id,
          sum(-1.0*param_value*mean/stddev) as constant_offset
        FROM {{{dataset}}}.{{{table_prefix}}}_fit_params a
        JOIN {{{dataset}}}.{{{table_prefix}}}_feature_stats b
          ON a.param_id = b.feature_name
        WHERE step = (SELECT max(step) FROM {{{dataset}}}.{{{table_prefix}}}_fit_params)
        GROUP BY 1,2
      ) b
      ON a.param_id = b.param_id
    ) UNION ALL (
      SELECT
        param_id,
        param_value/stddev as param_value_rescaled
      FROM {{{dataset}}}.{{{table_prefix}}}_fit_params a
      JOIN {{{dataset}}}.{{{table_prefix}}}_feature_stats b
      ON a.param_id = b.feature_name
      WHERE step = (SELECT max(step) FROM {{{dataset}}}.{{{table_prefix}}}_fit_params)
      GROUP BY 1,2
    )
  "
  
  res <- query_exec(
    wr(query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
  
  print(res)
  
  
  # transpose the _model_params_stacked table
  invocation_query <- '
    SELECT
      a.list,
      b.*
    FROM (
      SELECT CONCAT("[", STRING_AGG(CONCAT("{\\"val\\": \\"",TRIM(fieldname), "\\"}")), "]") AS list
      FROM rstevenson.indb_logreg_001_fieldnames
    ) a
    CROSS JOIN (
      SELECT *
        FROM rstevenson.indb_logreg_001_settings
    ) b
  '
  
  run_pipeline_gbq(
    log_reg_model_params,
    wr(invocation_query, config),
    wr('{{{project}}}', config),
    use_legacy_sql = FALSE
  )
  
  print("DONE")
  
}

Our last pipeline, called at the end of the above pipeline, will transpose the stacked model params. In other words, it will output the parameters of the model in separate columns:

log_reg_model_params <- function(params){
  
  query <- "
    CREATE OR REPLACE TABLE {{{dataset}}}.{{{table_prefix}}}_model_params
    AS 
    SELECT
    {{#list}}
      MAX(CASE WHEN param_id='{{val}}' THEN param_value_rescaled END ) AS {{val}},
    {{/list}}
    MAX(CASE WHEN param_id='{{constant_id}}' THEN param_value_rescaled END ) AS {{constant_id}}
    FROM {{{dataset}}}.{{{table_prefix}}}_model_params_stacked
  ;"

  res <- query_exec(
    wr(query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
  
  print(res)
}

Running the pipeline

We are now ready to run the log_reg pipeline. We’ll set up the invocation query with all of our global parameters. These will be stored in the _settings table and then, after stacking and setup, the pipeline will iterate through the loop to calculate the logistic regression coefficients.

# Run the log_reg pipeline with the following params (2D test)
invocation_query <- '
  SELECT
  "{{{project}}}" as project,
  "{{{dataset}}}" as dataset,
  "{{{table_prefix}}}" as table_prefix,
  "{{{dataset}}}.logreg_sim" as data_table,        
  "25"  as max_steps,
  "1e-6" as error_tol,
  "6.0"  as learning_rate,
  "id"   as id_column,
  "y"  as label_column,
  "x1, x2"  as fieldnames,
  "CONSTANT" as constant_id
'

cat(wr(invocation_query, config))

query_exec(wr(invocation_query, config), project=config$project, use_legacy_sql = FALSE)

run_pipeline_gbq(
  log_reg,
  wr(invocation_query, config),
  project = wr('{{{project}}}', config),
  use_legacy_sql = FALSE
)

After running the above, we should be able to query the table that holds the fitted parameters:

  query <- "
    SELECT *
    FROM {{{dataset}}}.{{{table_prefix}}}_model_params
  ;"

  query_exec(
    wr(query),
    wr('{{{project}}}'),
    use_legacy_sql = FALSE
  )

As expected, these results are pretty close to our original beta values.

Please keep in mind that this is not ready to be released into the wild. Further improvements include modifications to deal with categorical variables, output describing whether a logistic fit is statistically significant for a particular parameter, and options for controlling step-sizes. But it does show the concept of how an iterative process like logistic regression can be done while using the database to maintain state.

Prediction

Now that we have fit the logistic regression model and the model is stored in the database, we can predict values using the model. We just need a prediction pipeline:

#
# Pipeline: predict
#
log_reg_predict <- function(params){
  
  query <- '
  SELECT
    1/(1+exp(-1.0*(CONSTANT + {{#list}}a.{{val}}*b.{{val}} + {{/list}} + 0))) as probability
  FROM {{{dataset}}}.{{{table_prefix}}}_model_params a
  CROSS JOIN {{{data_table}}} b
  ORDER BY {{{id_column}}}
  '
  
  res <- query_exec(
    wr(query, params),
    wr('{{{project}}}', params),
    use_legacy_sql = FALSE
  )
  
}

Note that the above uses whisker to calculate the dot product \(x\beta\) by expanding a JSON-formatted array of field names into {{#list}}a.{{val}}*b.{{val}} + {{/list}} code. In the code below, we will create a JSON-formatted array of field names. Now let’s run the predictions:

# Run the prediction pipeline with the following params
invocation_query <- '
  SELECT
    "{{{project}}}" as project,
    "{{{dataset}}}" as dataset,
    "{{{table_prefix}}}" as table_prefix,
    "{{{dataset}}}.logreg_sim" as data_table,
    "id" as id_column,
    CONCAT("[", STRING_AGG(CONCAT("{\\"val\\": \\"",TRIM(fieldname), "\\"}")), "]") AS list
  FROM {{{dataset}}}.{{{table_prefix}}}_fieldnames
'

predictions <- run_pipeline_gbq(
  log_reg_predict,
  wr(invocation_query, config),
  project = wr('{{{project}}}', config),
  use_legacy_sql = FALSE
)

Let’s test the rounded predictions to see how well they approximate the outcomes:

# inspect first 5 true probs vs. predicted probabilities
head(probs[1:5])
head(predictions[[1]]$probability[1:5])

# mean relative error between true probs and predicted probabilities 
mean((abs(probs-predictions[[1]]$probability))/probs)

Our model-based logistic regression model predicts the true probabilities with a mean relative error of about 7%.

Next steps

We have shown how to train and store a logistic regression model in a database. We can then predict outcomes given features that are also stored in the database without having to move data back and forth to a prediction server. In this particular example, it would likely be much faster to move the data to a computer and run the predictions there. However, certain use cases exist where in-database modeling could be an avenue for consideration. Further, since logistic models are fundamental to many types of tree and forest predictors, in-database logistic regression would be a necessary step in developing in-database tree methods. It remains to be seen if this approach can be easily translated into the tidyverse modeldb package.

Share Comments · · ·

You may leave a comment below or discuss the post in the forum community.rstudio.com.