Data
This chapter processes the data that is used in the examples in this book. You do not have to fully understand these code blocks at this point to run them, but they are commented. The rest of the examples in this book assumes you have run all of these code blocks. The SQL and the s3 bucket sections of this chater can be skipped if you do not want to install these dependencies (you will have to skip the databases and cloud chapters of the book).
This book uses the Public microdata teaching sample, England and Wales: Census 2021, a 1% sample of individual records from Census 2021 for teaching of statistics and social sciences. It can be downloaded here. This CSV contains non-aggregated data for a wide variety of variables collected from the UK population. The codeset (i.e. the values for the variables in the dataset), can be downloaded from here and the userguide from here.
caution
The goal of this book is to show the power of data analysis using Rust, not analyze the UK Census data. Some examples will use this data in a way that does not produce valid results (e.g. incorrect population, unweighted statistics, longitudinal analysis). No results in this book should be interpreted as being valid.
Extracting
A compressed version of the UK Census is available in this crate here. It can also be downloaded here. If you downloaded your own version of the UK Census, place it under ./data/raw, call it census.csv and skip this code. Make sure to also create the other sub-folders for future data storage locations.
You can run this script using cargo run -r --example 1_2_1_extract.
use std::{
fs::File,
io::{Read, Write},
};
// Create directories used throughout the book
let _ = std::fs::remove_dir_all("./data");
std::fs::create_dir("./data").unwrap();
std::fs::create_dir("./data/raw").unwrap();
std::fs::create_dir("./data/codeset").unwrap();
std::fs::create_dir("./data/csv").unwrap();
std::fs::create_dir("./data/parquet").unwrap();
std::fs::create_dir("./data/large").unwrap();
std::fs::create_dir("./data/temp_data").unwrap();
std::fs::create_dir("./data/minio").unwrap();
std::fs::create_dir("./data/output").unwrap();
// Open ZIP file
let zip_file = File::open("./zip/data.zip").unwrap();
// Initiate CSV buffer
let mut csv_buf: Vec<u8> = Vec::new();
// Find census.csv in Zip
let mut archive = zip::ZipArchive::new(zip_file).unwrap();
let _ = archive
.by_name("census.csv")
.unwrap()
.read_to_end(&mut csv_buf)
.unwrap();
// Write `census.csv` file
let mut file = std::fs::File::create("./data/raw/census.csv").unwrap();
file.write_all(&csv_buf).unwrap();
// Clear buffer
csv_buf.clear();
// Find codeset.csv in Zip
let _ = archive
.by_name("codeset.csv")
.unwrap()
.read_to_end(&mut csv_buf)
.unwrap();
// Write `codeset.csv` file
let mut file = std::fs::File::create("./data/codeset/codeset.csv").unwrap();
file.write_all(&csv_buf).unwrap();
Rename
This code will rename the long variables names on the UK census to shorter names, for easier display and code in this book. It will also rename the variables in the codeset to match. You can run this code with cargo run -r --example 1_2_2_rename.
use polars::prelude::*;
// --- Data File ---
// Read CSV
let lf = LazyCsvReader::new(PlPath::from_string("./data/raw/census.csv".to_string()))
.with_infer_schema_length(Some(10_000)) // Default 100, missing = String
.with_has_header(true)
.finish()
.unwrap();
// Rename columns
let lf = lf.select([
col("resident_id_m").alias("id"),
col("approx_social_grade").alias("social"),
col("country_of_birth_3a").alias("birth"),
col("economic_activity_status_10m").alias("econ"),
col("ethnic_group_tb_6a").alias("ethnic"),
col("health_in_general").alias("health"),
col("hh_families_type_6a").alias("fam_type"),
col("hours_per_week_worked").alias("hours_worked"),
col("in_full_time_education").alias("education"),
col("industry_10a").alias("industry"),
col("iol22cd").alias("london"),
col("legal_partnership_status_6a").alias("mar_stat"),
col("occupation_10a").alias("occupation"),
col("region"),
col("religion_tb").alias("religion"),
col("residence_type"),
col("resident_age_7d").alias("age_group"),
col("sex"),
col("usual_short_student").alias("keep_type"),
]);
// Write output to CSV
let mut df = lf.collect().unwrap();
let mut file = std::fs::File::create("./data/raw/census.csv").unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
// --- Codeset ---
// Read CSV
let lf = LazyCsvReader::new(PlPath::from_string(
"./data/codeset/codeset.csv".to_string(),
))
.with_infer_schema_length(Some(10_000)) // Default 100, missing = String
.with_has_header(true)
.finish()
.unwrap();
// Rename variables
let lf = lf.with_column(col("variable").replace_strict(
lit(Series::from_iter(vec![
"resident_id_m",
"approx_social_grade",
"country_of_birth_3a",
"economic_activity_status_10m",
"ethnic_group_tb_6a",
"health_in_general",
"hh_families_type_6a",
"hours_per_week_worked",
"in_full_time_education",
"industry_10a",
"iol22cd",
"legal_partnership_status_6a",
"occupation_10a",
"region",
"religion_tb",
"residence_type",
"resident_age_7d",
"sex",
"usual_short_student",
])),
lit(Series::from_iter(vec![
"id",
"social",
"birth",
"econ",
"ethnic",
"health",
"fam_type",
"hours_worked",
"education",
"industry",
"london",
"mar_stat",
"occupation",
"region",
"religion",
"residence_type",
"age_group",
"sex",
"keep_type",
])),
None,
Some(DataType::String),
));
// Write output to CSV
let mut df = lf.collect().unwrap();
let mut file = std::fs::File::create("./data/codeset/codeset.csv").unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
Synthetic data
The UK Census is useful for demographic information, but does not contain any continuous values (e.g. yearly income) or any survey weights. These are useful variables when learning how to code in Polars (for means, medians, etc.). The code below creates a yearly income variable that is randomly set to between £10,000 and £100,000. It also creates a weight variable that has the (very rought) goal of re-creating the 100% sample, from each 1% file, by providing random weights between 75 and 125 for each record. This variable will not be brought on the 100% sample of the file created in the next step. You can run this code with cargo run -r --example 1_2_3_synthetic.
use polars::prelude::*;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
// Read CSV (into memory)
let mut df = LazyCsvReader::new(PlPath::from_string("./data/raw/census.csv".to_string()))
.with_infer_schema_length(Some(10_000)) // Default 100, missing = String
.with_has_header(true)
.finish()
.unwrap()
.collect()
.unwrap();
// Start up seeded RNG
let mut rng = ChaCha8Rng::seed_from_u64(1);
// Create random income vector of correct size
let random_income: Vec<i64> = (0..df.height())
.map(|_| rng.random_range(10_000..100_000))
.collect();
// Create random weight vector of correct size
let randome_weight: Vec<i64> = (0..df.height())
.map(|_| rng.random_range(75..125))
.collect();
// Make them "Series" as required by Polars
let income = Series::new("income".into(), random_income);
let weight = Series::new("weight".into(), randome_weight);
// Add them as columns to the data
let df = df.with_column(income).unwrap().with_column(weight).unwrap();
// If economically non-active, put income as `Null`
let mut df = df
.clone()
.lazy()
.with_column(
when(col("econ").is_in(
lit(Series::from_iter(vec![-8, 5, 6, 7, 8, 9])).implode(),
false,
))
.then(Null {}.lit())
.otherwise(col("income"))
.alias("income"),
)
.collect()
.unwrap();
// Write output to CSV
let mut file = std::fs::File::create("./data/raw/census.csv").unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
Expand
This code will multiply the CSVs 100 times to pseudo-convert the 1% sample into a 100% sample. It will also add a variable called "chunk" that will contain the values 0 to 99. This script will create approximately 4 GB of CSV data. You can run this code with cargo run -r --example 1_2_4_expand.
use polars::prelude::*;
// Read CSV
let lf = LazyCsvReader::new(PlPath::from_string("./data/raw/census.csv".to_string()))
.with_infer_schema_length(Some(10_000)) // Default 100, missing = String
.with_has_header(true)
.finish()
.unwrap();
// Create 100x 1% sample
for chunk in 0..100 {
// Write csv
let mut file = std::fs::File::create(format!("./data/csv/census_{chunk}.csv")).unwrap();
let mut df = lf
.clone()
.with_column(lit(chunk).alias("chunk"))
.collect()
.unwrap();
CsvWriter::new(&mut file).finish(&mut df).unwrap();
}
Parquet
This section will convert each CSV into individual Parquet files. It will create approximately 500 MB of Parquet file form the 4 GB of CSV files. You can run this code with cargo run -r --example 1_2_5_parquet.
use polars::prelude::*;
// Get all files in path
let paths = std::fs::read_dir("./data/csv").unwrap();
// For each file, save as Parquet
for path in paths {
let path_csv = path.unwrap().path();
let file_name = std::path::Path::new(&path_csv)
.file_stem()
.unwrap()
.to_str()
.unwrap();
let path_parquet = format!("./data/parquet/{file_name}.parquet");
let path_csv_string = path_csv.into_os_string().into_string().unwrap();
// Read CSV
let mut df = LazyCsvReader::new(PlPath::from_string(path_csv_string.clone()))
.with_infer_schema_length(Some(10_000)) // Default 100, missing = String
.with_has_header(true)
.finish()
.unwrap()
.collect() // Can't collect in finish below
.unwrap();
// Write Parquet
let mut file = std::fs::File::create(path_parquet).unwrap();
ParquetWriter::new(&mut file).finish(&mut df).unwrap();
}
Large file
This section will create a large CSV file and a large Parquet file. This will become a "larger-than-memory" dataset. At no point will all the data be in memory at the same time. It will use approximately 2 GB of RAM. You can run this script using cargo run -r --example 1_2_6_large.
use polars::prelude::*;
use std::fs::File;
// Get all files in path
let paths = std::fs::read_dir("./data/parquet").unwrap();
let mut lf_vec = vec![];
for path in paths {
let parquet = path.unwrap().path().into_os_string().into_string().unwrap();
let args = ScanArgsParquet::default();
let lf = LazyFrame::scan_parquet(PlPath::from_string(parquet), args.clone()).unwrap();
lf_vec.push(lf);
}
// Create one big file
let union_args = UnionArgs::default();
let lf = concat(lf_vec, union_args).unwrap();
// Drop weight for the large file
let lf = lf.select([all().exclude_cols(["weight"]).as_expr()]);
// Get regions
let regions = lf
.clone()
.select([col("region").unique()])
.collect()
.unwrap()
.column("region")
.unwrap()
.str()
.unwrap()
.drop_nulls()
.into_no_null_iter()
.map(|s| s.to_string())
.collect::<Vec<String>>();
// Get age_group
let ages = lf
.clone()
.select([col("age_group").unique()])
.collect()
.unwrap()
.column("age_group")
.unwrap()
.i64()
.unwrap()
.to_vec_null_aware()
.left()
.unwrap();
// Ready write large parquet file by batch
let file = File::create("./data/large/census.parquet").unwrap();
let schema: Arc<Schema> = lf.clone().collect_schema().unwrap();
let mut pq_writer: polars::io::parquet::write::BatchedWriter<File> = ParquetWriter::new(file)
.set_parallel(true)
.batched(&schema)
.unwrap();
// Ready write large csv file by batch
let file = File::create("./data/large/census.csv").unwrap();
let schema: Arc<Schema> = lf.clone().collect_schema().unwrap();
let mut csv_writer: polars::io::csv::write::BatchedWriter<File> =
CsvWriter::new(file).batched(&schema).unwrap();
// By region
for region in regions {
// By age group
for age in ages.clone() {
// Collect chunk in memory
let mut chunk_df = lf
.clone()
.filter(col("region").eq(lit(region.clone())))
.filter(col("age_group").eq(lit(age)))
.collect()
.unwrap();
// Write Partitioned Parquet (by region and age_group) - unstable according to the docs
write_partitioned_dataset(
&mut chunk_df,
PlPath::from_str("./data/large/partitioned/").as_ref(),
vec!["region".into(), "age_group".into()],
&ParquetWriteOptions::default(),
None,
4294967296,
)
.unwrap();
// Write chunk to a large parquet file and large csv file
pq_writer.write_batch(&chunk_df).unwrap();
csv_writer.write_batch(&chunk_df).unwrap();
}
}
// Finalize the writing
pq_writer.finish().unwrap();
csv_writer.finish().unwrap();
SQL (optional)
This example will create a PostgreSQL server, in which the Census data will be loaded. Since this is just a test server, we will keep keep all the default configurations. To set it up, follow one of these guides: Windows, Linux (Ubuntu, Arch Linux) and macOS.
The following example, using Arch Linux, will show the general process:
- Install
PostgreSQLusingpacman -S postgresql. - Initialize a database, using the
postgresuser:sudo -u postgres initdb -D /var/lib/postgres/data - Enable and start the
systemctlservice:sudo systemctl enable postgresql.serviceandsudo systemctl start postgresql.service
Once set up, you can use Rust to load the data into the database. You can run this script using cargo run -r --example 1_2_7_sql.
use polars::prelude::*;
use std::io::{Read, Write};
// Connect to postgresql
let mut client =
postgres::Client::connect("host=localhost user=postgres", postgres::NoTls).unwrap();
// Drop table if exists
let _ = client.batch_execute("drop TABLE census;");
// Get all variable names and types using Polars;
let args = ScanArgsParquet::default();
let mut lf =
LazyFrame::scan_parquet(PlPath::from_str("./data/parquet/census_0.parquet"), args).unwrap();
let cols: Vec<String> = lf
.collect_schema()
.unwrap()
.iter_names()
.map(|c| c.to_owned().to_string())
.collect();
let types: Vec<String> = lf
.collect_schema()
.unwrap()
.iter_fields()
.map(|f| f.dtype.to_string())
.collect();
// Create table string
let mut ct_string = String::new();
ct_string.push_str("CREATE TABLE census (");
for i in 0..cols.len() {
ct_string.push('"');
ct_string.push_str(&cols[i]);
ct_string.push('"');
if types[i] == "i64" {
ct_string.push_str(" int,");
} else if types[i] == "str" {
ct_string.push_str(" VARCHAR(25),");
} else {
unreachable!("You have an unaccounted for type: {}.", types[i])
}
}
ct_string.pop();
ct_string.push_str(");");
client.batch_execute(&ct_string).unwrap();
// Get all files in path
let paths = std::fs::read_dir("./data/csv").unwrap();
// For each file, send it to postgresql
for path in paths {
let csv = path.unwrap().path();
let mut f = std::fs::File::open(csv.clone()).unwrap();
let metadata = std::fs::metadata(csv).unwrap();
let mut buffer = vec![0; metadata.len() as usize];
f.read_exact(&mut buffer).unwrap();
let mut writer = client.copy_in("COPY census FROM STDIN CSV HEADER").unwrap();
writer.write_all(&buffer).unwrap();
writer.finish().unwrap();
}
s3 bucket (optional)
Install MinIO and the minio-client. Since this is just for testing, do not change any of the default configuration.
Start the minio server and point it to the ./data/minio folder with minio server ./data/minio.
The following code creates a bucket called census and load the ./data/large/census.csv CSV file, the ./data/large/census.parquet parquet file and the partitioned parquet folder ./data/large/partitioned/ with Rust. Run this script using cargo run -r --example 1_2_8_minio.
note
Due to the length of this code, because of the multi-part upload S3 code, it was omited from the book. You can find the code on GitHub. It can be run with cargo run -r --example 1_2_8_minio.