Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 58 additions & 38 deletions stationapi/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ async fn import_gtfs_shapes(

let mut rdr = ReaderBuilder::new().from_path(&shapes_path)?;
let mut batch: Vec<(String, f64, f64, i32, Option<f64>)> = Vec::new();
let batch_size = 5000;
let batch_size = 2000;

for result in rdr.records() {
let record = result?;
Expand Down Expand Up @@ -748,7 +748,7 @@ async fn insert_shapes_batch(
dist_str
));

if (i + 1) % 1000 == 0 || i == batch.len() - 1 {
if (i + 1) % 500 == 0 || i == batch.len() - 1 {
sql.push_str(&values.join(","));
sql.push_str(" ON CONFLICT DO NOTHING");
sqlx::query(&sql).execute(&mut *conn).await?;
Expand Down Expand Up @@ -776,7 +776,7 @@ async fn import_gtfs_trips(
let mut rdr = ReaderBuilder::new().from_path(&trips_path)?;
let mut count = 0;
let mut batch: Vec<TripBatchRow> = Vec::new();
let batch_size = 5000;
let batch_size = 2000;

for result in rdr.records() {
let record = result?;
Expand Down Expand Up @@ -854,23 +854,27 @@ async fn insert_trips_batch(
return Ok(());
}

// Split into smaller chunks (500 rows) to reduce memory usage
let mut sql = String::from(
"INSERT INTO gtfs_trips (trip_id, route_id, service_id, trip_headsign, trip_short_name, direction_id, block_id, shape_id, wheelchair_accessible, bikes_allowed) VALUES ",
);
let mut values: Vec<String> = Vec::with_capacity(batch.len());
let mut values: Vec<String> = Vec::new();

for (
trip_id,
route_id,
service_id,
trip_headsign,
trip_short_name,
direction_id,
block_id,
shape_id,
wheelchair_accessible,
bikes_allowed,
) in batch
i,
(
trip_id,
route_id,
service_id,
trip_headsign,
trip_short_name,
direction_id,
block_id,
shape_id,
wheelchair_accessible,
bikes_allowed,
),
) in batch.iter().enumerate()
{
let headsign_str = trip_headsign
.as_ref()
Expand Down Expand Up @@ -911,12 +915,18 @@ async fn insert_trips_batch(
wheelchair_str,
bikes_str
));
}

sql.push_str(&values.join(","));
sql.push_str(" ON CONFLICT (trip_id) DO NOTHING");

sqlx::query(&sql).execute(&mut *conn).await?;
// Execute every 500 rows to reduce memory usage
if (i + 1) % 500 == 0 || i == batch.len() - 1 {
sql.push_str(&values.join(","));
sql.push_str(" ON CONFLICT (trip_id) DO NOTHING");
sqlx::query(&sql).execute(&mut *conn).await?;
sql = String::from(
"INSERT INTO gtfs_trips (trip_id, route_id, service_id, trip_headsign, trip_short_name, direction_id, block_id, shape_id, wheelchair_accessible, bikes_allowed) VALUES ",
);
values.clear();
}
}

Ok(())
}
Expand All @@ -937,7 +947,7 @@ async fn import_gtfs_stop_times(
let mut rdr = ReaderBuilder::new().from_path(&stop_times_path)?;
let mut count = 0;
let mut batch: Vec<StopTimeBatchRow> = Vec::new();
let batch_size = 5000;
let batch_size = 1000;

for result in rdr.records() {
let record = result?;
Expand Down Expand Up @@ -1032,23 +1042,27 @@ async fn insert_stop_times_batch(
}

// Build multi-row INSERT for better performance
// Split into smaller chunks (500 rows) to reduce memory usage
let mut sql = String::from(
"INSERT INTO gtfs_stop_times (trip_id, arrival_time, departure_time, stop_id, stop_sequence, stop_headsign, pickup_type, drop_off_type, shape_dist_traveled, timepoint) VALUES ",
);
let mut values: Vec<String> = Vec::with_capacity(batch.len());
let mut values: Vec<String> = Vec::new();

for (
trip_id,
arrival_time,
departure_time,
stop_id,
stop_sequence,
stop_headsign,
pickup_type,
drop_off_type,
shape_dist_traveled,
timepoint,
) in batch
i,
(
trip_id,
arrival_time,
departure_time,
stop_id,
stop_sequence,
stop_headsign,
pickup_type,
drop_off_type,
shape_dist_traveled,
timepoint,
),
) in batch.iter().enumerate()
{
let arrival_str = arrival_time
.as_ref()
Expand Down Expand Up @@ -1088,12 +1102,18 @@ async fn insert_stop_times_batch(
dist_str,
timepoint_str
));
}

sql.push_str(&values.join(","));
sql.push_str(" ON CONFLICT DO NOTHING");

sqlx::query(&sql).execute(&mut *conn).await?;
// Execute every 500 rows to reduce memory usage
if (i + 1) % 500 == 0 || i == batch.len() - 1 {
sql.push_str(&values.join(","));
sql.push_str(" ON CONFLICT DO NOTHING");
sqlx::query(&sql).execute(&mut *conn).await?;
sql = String::from(
"INSERT INTO gtfs_stop_times (trip_id, arrival_time, departure_time, stop_id, stop_sequence, stop_headsign, pickup_type, drop_off_type, shape_dist_traveled, timepoint) VALUES ",
);
values.clear();
}
}

Ok(())
}
Expand Down