diff --git a/stationapi/src/import.rs b/stationapi/src/import.rs index 23b9a2ed..550beef8 100644 --- a/stationapi/src/import.rs +++ b/stationapi/src/import.rs @@ -687,7 +687,7 @@ async fn import_gtfs_shapes( let mut rdr = ReaderBuilder::new().from_path(&shapes_path)?; let mut batch: Vec<(String, f64, f64, i32, Option)> = Vec::new(); - let batch_size = 5000; + let batch_size = 2000; for result in rdr.records() { let record = result?; @@ -748,7 +748,7 @@ async fn insert_shapes_batch( dist_str )); - if (i + 1) % 1000 == 0 || i == batch.len() - 1 { + if (i + 1) % 500 == 0 || i == batch.len() - 1 { sql.push_str(&values.join(",")); sql.push_str(" ON CONFLICT DO NOTHING"); sqlx::query(&sql).execute(&mut *conn).await?; @@ -776,7 +776,7 @@ async fn import_gtfs_trips( let mut rdr = ReaderBuilder::new().from_path(&trips_path)?; let mut count = 0; let mut batch: Vec = Vec::new(); - let batch_size = 5000; + let batch_size = 2000; for result in rdr.records() { let record = result?; @@ -854,23 +854,27 @@ async fn insert_trips_batch( return Ok(()); } + // Split into smaller chunks (500 rows) to reduce memory usage let mut sql = String::from( "INSERT INTO gtfs_trips (trip_id, route_id, service_id, trip_headsign, trip_short_name, direction_id, block_id, shape_id, wheelchair_accessible, bikes_allowed) VALUES ", ); - let mut values: Vec = Vec::with_capacity(batch.len()); + let mut values: Vec = Vec::new(); for ( - trip_id, - route_id, - service_id, - trip_headsign, - trip_short_name, - direction_id, - block_id, - shape_id, - wheelchair_accessible, - bikes_allowed, - ) in batch + i, + ( + trip_id, + route_id, + service_id, + trip_headsign, + trip_short_name, + direction_id, + block_id, + shape_id, + wheelchair_accessible, + bikes_allowed, + ), + ) in batch.iter().enumerate() { let headsign_str = trip_headsign .as_ref() @@ -911,12 +915,18 @@ async fn insert_trips_batch( wheelchair_str, bikes_str )); - } - - sql.push_str(&values.join(",")); - sql.push_str(" ON CONFLICT (trip_id) DO NOTHING"); - sqlx::query(&sql).execute(&mut *conn).await?; + // Execute every 500 rows to reduce memory usage + if (i + 1) % 500 == 0 || i == batch.len() - 1 { + sql.push_str(&values.join(",")); + sql.push_str(" ON CONFLICT (trip_id) DO NOTHING"); + sqlx::query(&sql).execute(&mut *conn).await?; + sql = String::from( + "INSERT INTO gtfs_trips (trip_id, route_id, service_id, trip_headsign, trip_short_name, direction_id, block_id, shape_id, wheelchair_accessible, bikes_allowed) VALUES ", + ); + values.clear(); + } + } Ok(()) } @@ -937,7 +947,7 @@ async fn import_gtfs_stop_times( let mut rdr = ReaderBuilder::new().from_path(&stop_times_path)?; let mut count = 0; let mut batch: Vec = Vec::new(); - let batch_size = 5000; + let batch_size = 1000; for result in rdr.records() { let record = result?; @@ -1032,23 +1042,27 @@ async fn insert_stop_times_batch( } // Build multi-row INSERT for better performance + // Split into smaller chunks (500 rows) to reduce memory usage let mut sql = String::from( "INSERT INTO gtfs_stop_times (trip_id, arrival_time, departure_time, stop_id, stop_sequence, stop_headsign, pickup_type, drop_off_type, shape_dist_traveled, timepoint) VALUES ", ); - let mut values: Vec = Vec::with_capacity(batch.len()); + let mut values: Vec = Vec::new(); for ( - trip_id, - arrival_time, - departure_time, - stop_id, - stop_sequence, - stop_headsign, - pickup_type, - drop_off_type, - shape_dist_traveled, - timepoint, - ) in batch + i, + ( + trip_id, + arrival_time, + departure_time, + stop_id, + stop_sequence, + stop_headsign, + pickup_type, + drop_off_type, + shape_dist_traveled, + timepoint, + ), + ) in batch.iter().enumerate() { let arrival_str = arrival_time .as_ref() @@ -1088,12 +1102,18 @@ async fn insert_stop_times_batch( dist_str, timepoint_str )); - } - - sql.push_str(&values.join(",")); - sql.push_str(" ON CONFLICT DO NOTHING"); - sqlx::query(&sql).execute(&mut *conn).await?; + // Execute every 500 rows to reduce memory usage + if (i + 1) % 500 == 0 || i == batch.len() - 1 { + sql.push_str(&values.join(",")); + sql.push_str(" ON CONFLICT DO NOTHING"); + sqlx::query(&sql).execute(&mut *conn).await?; + sql = String::from( + "INSERT INTO gtfs_stop_times (trip_id, arrival_time, departure_time, stop_id, stop_sequence, stop_headsign, pickup_type, drop_off_type, shape_dist_traveled, timepoint) VALUES ", + ); + values.clear(); + } + } Ok(()) }