From b54774fa83de21f773ddc780c637e4d3bf289190 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Fri, 15 Nov 2024 18:15:48 +0100 Subject: [PATCH 1/8] Journal prunning --- crates/hyperqueue/src/client/commands/job.rs | 14 ++++--- .../src/client/commands/journal/mod.rs | 18 +++++++- .../hyperqueue/src/client/commands/server.rs | 2 +- crates/hyperqueue/src/server/bootstrap.rs | 6 +-- crates/hyperqueue/src/server/client/mod.rs | 40 +++++++++++++++++- .../src/server/event/{log => journal}/mod.rs | 3 +- .../src/server/event/journal/prune.rs | 41 +++++++++++++++++++ .../src/server/event/{log => journal}/read.rs | 4 +- .../server/event/{log => journal}/stream.rs | 41 +++++++++++++++---- .../server/event/{log => journal}/write.rs | 4 +- crates/hyperqueue/src/server/event/mod.rs | 4 +- .../hyperqueue/src/server/event/streamer.rs | 32 +++++++++++++-- crates/hyperqueue/src/server/restore.rs | 4 +- crates/hyperqueue/src/transfer/messages.rs | 4 +- tests/{test_restore.py => test_journal.py} | 14 +++++++ 15 files changed, 197 insertions(+), 34 deletions(-) rename crates/hyperqueue/src/server/event/{log => journal}/mod.rs (75%) create mode 100644 crates/hyperqueue/src/server/event/journal/prune.rs rename crates/hyperqueue/src/server/event/{log => journal}/read.rs (99%) rename crates/hyperqueue/src/server/event/{log => journal}/stream.rs (68%) rename crates/hyperqueue/src/server/event/{log => journal}/write.rs (96%) rename tests/{test_restore.py => test_journal.py} (95%) diff --git a/crates/hyperqueue/src/client/commands/job.rs b/crates/hyperqueue/src/client/commands/job.rs index b4d077d62..4c34133a5 100644 --- a/crates/hyperqueue/src/client/commands/job.rs +++ b/crates/hyperqueue/src/client/commands/job.rs @@ -317,11 +317,13 @@ pub async fn forget_job( ) -> anyhow::Result<()> { let JobForgetOpts { selector, filter } = opts; - let response = rpc_call!(session.connection(), FromClientMessage::ForgetJob(ForgetJobRequest { - selector, - filter: filter.into_iter().map(|s| s.into_status()).collect() - }), ToClientMessage::ForgetJobResponse(r) => r) - .await?; + let message = FromClientMessage::ForgetJob(ForgetJobRequest { + selector, + filter: filter.into_iter().map(|s| s.into_status()).collect(), + }); + let response = + rpc_call!(session.connection(), message, ToClientMessage::ForgetJobResponse(r) => r) + .await?; let mut message = format!( "{} {} were forgotten", @@ -337,4 +339,4 @@ pub async fn forget_job( log::info!("{message}"); Ok(()) -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/client/commands/journal/mod.rs b/crates/hyperqueue/src/client/commands/journal/mod.rs index ae78aba21..826bedd9f 100644 --- a/crates/hyperqueue/src/client/commands/journal/mod.rs +++ b/crates/hyperqueue/src/client/commands/journal/mod.rs @@ -3,8 +3,9 @@ mod output; use crate::client::commands::journal::output::format_event; use crate::client::globalsettings::GlobalSettings; use crate::common::utils::str::pluralize; +use crate::rpc_call; use crate::server::bootstrap::get_client_session; -use crate::server::event::log::JournalReader; +use crate::server::event::journal::JournalReader; use crate::transfer::messages::{FromClientMessage, ToClientMessage}; use anyhow::anyhow; use clap::{Parser, ValueHint}; @@ -26,6 +27,9 @@ enum JournalCommand { /// Live stream events from the server. Stream, + + /// Connect to a server and remove completed tasks and non-active workers from journal + Prune, } #[derive(Parser)] @@ -40,6 +44,7 @@ pub async fn command_journal(gsettings: &GlobalSettings, opts: JournalOpts) -> a match opts.command { JournalCommand::Export(opts) => export_json(opts), JournalCommand::Stream => stream_json(gsettings).await, + JournalCommand::Prune => prune_journal(gsettings).await, } } @@ -97,3 +102,14 @@ The file might have been incomplete." stdout.flush()?; Ok(()) } + +async fn prune_journal(gsettings: &GlobalSettings) -> anyhow::Result<()> { + let mut session = get_client_session(gsettings.server_directory()).await?; + rpc_call!( + session.connection(), + FromClientMessage::PruneJournal, + ToClientMessage::Finished + ) + .await?; + Ok(()) +} \ No newline at end of file diff --git a/crates/hyperqueue/src/client/commands/server.rs b/crates/hyperqueue/src/client/commands/server.rs index f906a34c1..0a8e5142b 100644 --- a/crates/hyperqueue/src/client/commands/server.rs +++ b/crates/hyperqueue/src/client/commands/server.rs @@ -211,4 +211,4 @@ fn command_server_generate_access( store_access_record(&worker_record, path)?; } Ok(()) -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/bootstrap.rs b/crates/hyperqueue/src/server/bootstrap.rs index 03bc87722..a36de4380 100644 --- a/crates/hyperqueue/src/server/bootstrap.rs +++ b/crates/hyperqueue/src/server/bootstrap.rs @@ -16,8 +16,8 @@ use crate::common::serverdir::{ }; use crate::server::autoalloc::{create_autoalloc_service, QueueId}; use crate::server::backend::Backend; -use crate::server::event::log::start_event_streaming; -use crate::server::event::log::JournalWriter; +use crate::server::event::journal::start_event_streaming; +use crate::server::event::journal::JournalWriter; use crate::server::event::streamer::EventStreamer; use crate::server::restore::StateRestorer; use crate::server::state::StateRef; @@ -504,4 +504,4 @@ mod tests { notify.notify_one(); fut.await.unwrap(); } -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index 0d5407aa5..b9d179339 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -8,7 +8,7 @@ use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{mpsc, Notify}; use tako::gateway::{CancelTasks, FromGatewayMessage, StopWorkerRequest, ToGatewayMessage}; -use tako::TaskGroup; +use tako::{Set, TaskGroup}; use crate::client::status::{job_status, Status}; use crate::common::serverdir::ServerDir; @@ -181,6 +181,9 @@ pub async fn client_rpc_loop< FromClientMessage::ServerInfo => { ToClientMessage::ServerInfo(state_ref.get().server_info().clone()) } + FromClientMessage::PruneJournal => { + handle_prune_journal(&state_ref, senders).await + } }; if let Err(error) = tx.send(response).await { log::error!("Cannot reply to client: {error:?}"); @@ -203,6 +206,39 @@ pub async fn client_rpc_loop< } } +async fn handle_prune_journal(state_ref: &StateRef, senders: &Senders) -> ToClientMessage { + log::debug!("Client asked for journal prunning"); + let (live_jobs, live_workers) = { + let state = state_ref.get(); + let live_jobs: Set<_> = state + .jobs() + .filter_map(|job| { + if job.is_terminated() { + None + } else { + Some(job.job_id) + } + }) + .collect(); + let live_workers = state + .get_workers() + .values() + .filter_map(|worker| { + if worker.is_running() { + Some(worker.worker_id()) + } else { + None + } + }) + .collect(); + (live_jobs, live_workers) + }; + if let Some(receiver) = senders.events.prune_journal(live_jobs, live_workers) { + let _ = receiver.await; + } + ToClientMessage::Finished +} + /// Waits until all jobs matched by the `selector` are finished (either by completing successfully, /// failing or being canceled). async fn handle_wait_for_jobs_message( @@ -532,4 +568,4 @@ async fn handle_worker_info(state_ref: &StateRef, worker_id: WorkerId) -> ToClie let state = state_ref.get(); ToClientMessage::WorkerInfoResponse(state.get_worker(worker_id).map(|w| w.make_info())) -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/event/log/mod.rs b/crates/hyperqueue/src/server/event/journal/mod.rs similarity index 75% rename from crates/hyperqueue/src/server/event/log/mod.rs rename to crates/hyperqueue/src/server/event/journal/mod.rs index 03cda5a8a..b9dbc70cb 100644 --- a/crates/hyperqueue/src/server/event/log/mod.rs +++ b/crates/hyperqueue/src/server/event/journal/mod.rs @@ -1,9 +1,10 @@ mod read; mod stream; +mod prune; mod write; pub use read::JournalReader; pub use stream::{start_event_streaming, EventStreamMessage, EventStreamSender}; pub use write::JournalWriter; -const HQ_JOURNAL_HEADER: &[u8] = b"hqjl0001"; +const HQ_JOURNAL_HEADER: &[u8] = b"hqjl0001"; \ No newline at end of file diff --git a/crates/hyperqueue/src/server/event/journal/prune.rs b/crates/hyperqueue/src/server/event/journal/prune.rs new file mode 100644 index 000000000..f330bb2cc --- /dev/null +++ b/crates/hyperqueue/src/server/event/journal/prune.rs @@ -0,0 +1,41 @@ +use crate::server::event::journal::{JournalReader, JournalWriter}; +use crate::server::event::payload::EventPayload; +use crate::JobId; +use tako::{Set, WorkerId}; + +pub(crate) fn prune_journal( + reader: &mut JournalReader, + writer: &mut JournalWriter, + live_job_ids: &Set, + live_worker_ids: &Set, +) -> crate::Result<()> { + for event in reader { + let event = event?; + let retain = match &event.payload { + EventPayload::WorkerConnected(worker_id, _) + | EventPayload::WorkerLost(worker_id, _) => live_worker_ids.contains(worker_id), + EventPayload::WorkerOverviewReceived(overview) => { + live_worker_ids.contains(&overview.id) + } + EventPayload::Submit { job_id, .. } + | EventPayload::JobCompleted(job_id) + | EventPayload::JobOpen(job_id, _) + | EventPayload::JobClose(job_id) + | EventPayload::TaskStarted { job_id, .. } + | EventPayload::TaskFinished { job_id, .. } + | EventPayload::TaskFailed { job_id, .. } + | EventPayload::TaskCanceled { job_id, .. } => live_job_ids.contains(job_id), + EventPayload::AllocationQueueCreated(_, _) + | EventPayload::AllocationQueueRemoved(_) + | EventPayload::AllocationQueued { .. } + | EventPayload::AllocationStarted(_, _) + | EventPayload::AllocationFinished(_, _) + | EventPayload::ServerStart { .. } + | EventPayload::ServerStop => true, + }; + if retain { + writer.store(event)?; + } + } + Ok(()) +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/event/log/read.rs b/crates/hyperqueue/src/server/event/journal/read.rs similarity index 99% rename from crates/hyperqueue/src/server/event/log/read.rs rename to crates/hyperqueue/src/server/event/journal/read.rs index 225d80a1f..06e6b20f9 100644 --- a/crates/hyperqueue/src/server/event/log/read.rs +++ b/crates/hyperqueue/src/server/event/journal/read.rs @@ -1,5 +1,5 @@ use crate::common::serialization::SerializationConfig; -use crate::server::event::log::HQ_JOURNAL_HEADER; +use crate::server::event::journal::HQ_JOURNAL_HEADER; use crate::server::event::{Event, EventSerializationConfig}; use crate::HQ_VERSION; use anyhow::{anyhow, bail}; @@ -237,4 +237,4 @@ mod tests { EventPayload::AllocationFinished(0, _) )); } -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/event/log/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs similarity index 68% rename from crates/hyperqueue/src/server/event/log/stream.rs rename to crates/hyperqueue/src/server/event/journal/stream.rs index 8b413382c..27c53dd25 100644 --- a/crates/hyperqueue/src/server/event/log/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -1,16 +1,26 @@ use crate::common::utils::str::pluralize; -use crate::server::event::log::write::JournalWriter; -use crate::server::event::log::JournalReader; +use crate::server::event::journal::prune::prune_journal; +use crate::server::event::journal::write::JournalWriter; +use crate::server::event::journal::JournalReader; use crate::server::event::payload::EventPayload; use crate::server::event::Event; +use crate::JobId; +use std::ffi::OsString; +use std::fs::{remove_file, rename}; use std::future::Future; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::time::Duration; +use tako::{Set, WorkerId}; use tokio::sync::mpsc; pub enum EventStreamMessage { Event(Event), RegisterListener(mpsc::UnboundedSender), + PruneJournal { + live_jobs: Set, + live_workers: Set, + callback: tokio::sync::oneshot::Sender<()>, + }, } pub type EventStreamSender = mpsc::UnboundedSender; @@ -56,7 +66,7 @@ const FLUSH_PERIOD: Duration = Duration::from_secs(30); async fn streaming_process( mut writer: JournalWriter, mut receiver: EventStreamReceiver, - log_path: &Path, + journal_path: &Path, ) -> anyhow::Result<()> { let mut flush_fut = tokio::time::interval(FLUSH_PERIOD); let mut events = 0; @@ -83,11 +93,28 @@ async fn streaming_process( And while this read is performed, we cannot allow modification of the file, so the writing to the file has to be paused anyway */ writer.flush()?; - let mut reader = JournalReader::open(log_path)?; + let mut reader = JournalReader::open(journal_path)?; for event in &mut reader { tx.send(event?).unwrap() } - } + }, + Some(EventStreamMessage::PruneJournal { live_jobs, live_workers, callback }) => { + writer.flush()?; + let mut tmp_path: OsString = journal_path.into(); + tmp_path.push(".tmp"); + let tmp_path: PathBuf = tmp_path.into(); + { + let mut reader = JournalReader::open(journal_path)?; + let mut writer = JournalWriter::create_or_append(&tmp_path, None)?; + if let Err(e) = prune_journal(&mut reader, &mut writer, &live_jobs, &live_workers) { + remove_file(&tmp_path)?; + return Err(e.into()) + } + } + rename(&tmp_path, &journal_path)?; + writer = JournalWriter::create_or_append(journal_path, None)?; + let _ = callback.send(()); + }, None => break } } @@ -101,4 +128,4 @@ async fn streaming_process( ); Ok(()) -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/event/log/write.rs b/crates/hyperqueue/src/server/event/journal/write.rs similarity index 96% rename from crates/hyperqueue/src/server/event/log/write.rs rename to crates/hyperqueue/src/server/event/journal/write.rs index 25c4b6367..f881a9a3e 100644 --- a/crates/hyperqueue/src/server/event/log/write.rs +++ b/crates/hyperqueue/src/server/event/journal/write.rs @@ -1,5 +1,5 @@ use crate::common::serialization::SerializationConfig; -use crate::server::event::log::HQ_JOURNAL_HEADER; +use crate::server::event::journal::HQ_JOURNAL_HEADER; use crate::server::event::{Event, EventSerializationConfig}; use crate::HQ_VERSION; use bincode::Options; @@ -53,4 +53,4 @@ impl JournalWriter { self.file.flush()?; Ok(()) } -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/event/mod.rs b/crates/hyperqueue/src/server/event/mod.rs index 9c5bd397f..1c13f2a0a 100644 --- a/crates/hyperqueue/src/server/event/mod.rs +++ b/crates/hyperqueue/src/server/event/mod.rs @@ -1,4 +1,4 @@ -pub mod log; +pub mod journal; pub mod payload; pub mod streamer; @@ -17,4 +17,4 @@ pub struct Event { #[serde(with = "ts_milliseconds")] pub time: DateTime, pub payload: EventPayload, -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index c06cc4e6a..2efa693d2 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -1,6 +1,6 @@ use crate::common::serialization::Serialized; use crate::server::autoalloc::{AllocationId, QueueId}; -use crate::server::event::log::{EventStreamMessage, EventStreamSender}; +use crate::server::event::journal::{EventStreamMessage, EventStreamSender}; use crate::server::event::payload::EventPayload; use crate::server::event::Event; use crate::transfer::messages::{AllocationQueueParams, JobDescription, SubmitRequest}; @@ -9,8 +9,8 @@ use chrono::Utc; use smallvec::SmallVec; use tako::gateway::LostWorkerReason; use tako::worker::{WorkerConfiguration, WorkerOverview}; -use tako::{InstanceId, WrappedRcRefCell}; -use tokio::sync::mpsc; +use tako::{InstanceId, Set, WrappedRcRefCell}; +use tokio::sync::{mpsc, oneshot}; struct Inner { storage_sender: Option, @@ -203,4 +203,28 @@ impl EventStreamer { .unwrap(); inner.client_listeners.remove(p); } -} + + pub fn prune_journal( + &self, + live_jobs: Set, + live_workers: Set, + ) -> Option> { + let inner = self.inner.get(); + if let Some(ref streamer) = inner.storage_sender { + let (sender, receiver) = oneshot::channel(); + if streamer + .send(EventStreamMessage::PruneJournal { + live_jobs, + live_workers, + callback: sender, + }) + .is_err() + { + log::error!("Event streaming queue has been closed."); + } + Some(receiver) + } else { + None + } + } +} \ No newline at end of file diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index 48f8fe336..13f83aeb5 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -1,7 +1,7 @@ use crate::common::error::HqError; use crate::server::autoalloc::QueueId; use crate::server::client::{submit_job_desc, validate_submit}; -use crate::server::event::log::JournalReader; +use crate::server::event::journal::JournalReader; use crate::server::event::payload::EventPayload; use crate::server::job::{Job, JobTaskState, StartedTaskData}; use crate::server::state::State; @@ -314,4 +314,4 @@ impl StateRestorer { } Ok(()) } -} +} \ No newline at end of file diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 1e1c75a55..3e3b4ac2c 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -42,6 +42,7 @@ pub enum FromClientMessage { // it will no longer reacts to any other client messages // and client will only receive ToClientMessage::Event StreamEvents, + PruneJournal, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -341,6 +342,7 @@ pub enum ToClientMessage { Error(String), ServerInfo(ServerInfo), Event(Event), + Finished, // Generic response, now used only for Server pruning } #[derive(Serialize, Deserialize, Debug)] @@ -479,4 +481,4 @@ pub struct WaitForJobsResponse { pub failed: u32, pub canceled: u32, pub invalid: u32, -} +} \ No newline at end of file diff --git a/tests/test_restore.py b/tests/test_journal.py similarity index 95% rename from tests/test_restore.py rename to tests/test_journal.py index 15fafb077..0612fe88f 100644 --- a/tests/test_restore.py +++ b/tests/test_journal.py @@ -295,3 +295,17 @@ def test_restore_streaming(hq_env: HqEnv, tmp_path): assert int(hq_env.command(["output-log", stream_path, "cat", "1", "stdout"])) > 0 table = hq_env.command(["output-log", stream_path, "summary"], as_table=True) table.check_row_value("Superseded streams", "1") + + +def test_prune_journal(hq_env: HqEnv, tmp_path): + journal_path = os.path.join(tmp_path, "my.journal") + stream_path = os.path.join(tmp_path, "stream") + os.mkdir(stream_path) + hq_env.start_server(args=["--journal", journal_path]) + + hq_env.start_workers(2) + + hq_env.command(["job", "open"]) + hq_env.command(["job", "submit", "--", "sleep", "0"]) + hq_env.command(["job", "submit", "--cpus=2", "--", "sleep", "0"]) + wait_for_job_state(hq_env, 2, "FINISHED") \ No newline at end of file From f8fa7f074247959dd185f7001a15c1ab76d777b6 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Sat, 16 Nov 2024 09:20:03 +0100 Subject: [PATCH 2/8] Added "hq server start --journal-flush-period" --- crates/hyperqueue/src/client/commands/server.rs | 5 +++++ crates/hyperqueue/src/server/bootstrap.rs | 4 +++- crates/hyperqueue/src/server/event/journal/stream.rs | 8 ++++---- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/hyperqueue/src/client/commands/server.rs b/crates/hyperqueue/src/client/commands/server.rs index 0a8e5142b..5a05e6187 100644 --- a/crates/hyperqueue/src/client/commands/server.rs +++ b/crates/hyperqueue/src/client/commands/server.rs @@ -89,6 +89,10 @@ struct ServerStartOpts { #[arg(long)] journal: Option, + #[arg(long, value_parser = parse_human_time, default_value = "30s")] + /// Configure how often should be the journal written, default: 30s + journal_flush_period: Duration, + /// Path to access file that is used for configuration of secret keys and ports #[arg(long)] access_file: Option, @@ -140,6 +144,7 @@ async fn start_server(gsettings: &GlobalSettings, opts: ServerStartOpts) -> anyh client_port, worker_port, journal_path: opts.journal, + journal_flush_period: opts.journal_flush_period, worker_secret_key: access_file.as_ref().map(|a| a.worker_key().clone()), client_secret_key: access_file.as_ref().map(|a| a.client_key().clone()), server_uid: access_file.as_ref().map(|a| a.server_uid().to_string()), diff --git a/crates/hyperqueue/src/server/bootstrap.rs b/crates/hyperqueue/src/server/bootstrap.rs index a36de4380..4969f6704 100644 --- a/crates/hyperqueue/src/server/bootstrap.rs +++ b/crates/hyperqueue/src/server/bootstrap.rs @@ -46,6 +46,7 @@ pub struct ServerConfig { pub client_port: Option, pub worker_port: Option, pub journal_path: Option, + pub journal_flush_period: Duration, pub worker_secret_key: Option>, pub client_secret_key: Option>, pub server_uid: Option, @@ -292,7 +293,8 @@ async fn prepare_event_management( ) })?; - let (tx, stream_fut) = start_event_streaming(writer, log_path); + let (tx, stream_fut) = + start_event_streaming(writer, log_path, server_cfg.journal_flush_period); let streamer = EventStreamer::new(Some(tx)); streamer.on_server_start(server_uid); (streamer, Box::pin(stream_fut)) diff --git a/crates/hyperqueue/src/server/event/journal/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs index 27c53dd25..790cf0b0c 100644 --- a/crates/hyperqueue/src/server/event/journal/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -38,11 +38,12 @@ fn create_event_stream_queue() -> (EventStreamSender, EventStreamReceiver) { pub fn start_event_streaming( writer: JournalWriter, log_path: &Path, + flush_period: Duration, ) -> (EventStreamSender, impl Future) { let (tx, rx) = create_event_stream_queue(); let log_path = log_path.to_path_buf(); let handle = std::thread::spawn(move || { - let process = streaming_process(writer, rx, &log_path); + let process = streaming_process(writer, rx, &log_path, flush_period); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -61,14 +62,13 @@ pub fn start_event_streaming( (tx, end_fut) } -const FLUSH_PERIOD: Duration = Duration::from_secs(30); - async fn streaming_process( mut writer: JournalWriter, mut receiver: EventStreamReceiver, journal_path: &Path, + flush_period: Duration, ) -> anyhow::Result<()> { - let mut flush_fut = tokio::time::interval(FLUSH_PERIOD); + let mut flush_fut = tokio::time::interval(flush_period); let mut events = 0; loop { tokio::select! { From 94e206f25135e62e8400bc9f71688637bb9bf2a5 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Sat, 16 Nov 2024 09:33:26 +0100 Subject: [PATCH 3/8] "hq journal flush" added --- .../src/client/commands/journal/mod.rs | 15 +++++++++++++++ crates/hyperqueue/src/server/client/mod.rs | 6 ++++++ .../src/server/event/journal/stream.rs | 7 ++++++- crates/hyperqueue/src/server/event/streamer.rs | 16 ++++++++++++++++ crates/hyperqueue/src/transfer/messages.rs | 3 ++- 5 files changed, 45 insertions(+), 2 deletions(-) diff --git a/crates/hyperqueue/src/client/commands/journal/mod.rs b/crates/hyperqueue/src/client/commands/journal/mod.rs index 826bedd9f..2c630dc0c 100644 --- a/crates/hyperqueue/src/client/commands/journal/mod.rs +++ b/crates/hyperqueue/src/client/commands/journal/mod.rs @@ -30,6 +30,9 @@ enum JournalCommand { /// Connect to a server and remove completed tasks and non-active workers from journal Prune, + + /// Connect to a server and forces to flush a journal + Flush, } #[derive(Parser)] @@ -45,6 +48,7 @@ pub async fn command_journal(gsettings: &GlobalSettings, opts: JournalOpts) -> a JournalCommand::Export(opts) => export_json(opts), JournalCommand::Stream => stream_json(gsettings).await, JournalCommand::Prune => prune_journal(gsettings).await, + JournalCommand::Flush => flush_journal(gsettings).await, } } @@ -112,4 +116,15 @@ async fn prune_journal(gsettings: &GlobalSettings) -> anyhow::Result<()> { ) .await?; Ok(()) +} + +async fn flush_journal(gsettings: &GlobalSettings) -> anyhow::Result<()> { + let mut session = get_client_session(gsettings.server_directory()).await?; + rpc_call!( + session.connection(), + FromClientMessage::FlushJournal, + ToClientMessage::Finished + ) + .await?; + Ok(()) } \ No newline at end of file diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index b9d179339..b60ba72e5 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -184,6 +184,12 @@ pub async fn client_rpc_loop< FromClientMessage::PruneJournal => { handle_prune_journal(&state_ref, senders).await } + FromClientMessage::FlushJournal => { + if let Some(callback) = senders.events.flush_journal() { + let _ = callback.await; + }; + ToClientMessage::Finished + } }; if let Err(error) = tx.send(response).await { log::error!("Cannot reply to client: {error:?}"); diff --git a/crates/hyperqueue/src/server/event/journal/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs index 790cf0b0c..26e56fc66 100644 --- a/crates/hyperqueue/src/server/event/journal/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -17,10 +17,11 @@ pub enum EventStreamMessage { Event(Event), RegisterListener(mpsc::UnboundedSender), PruneJournal { + callback: tokio::sync::oneshot::Sender<()>, live_jobs: Set, live_workers: Set, - callback: tokio::sync::oneshot::Sender<()>, }, + FlushJournal(tokio::sync::oneshot::Sender<()>), } pub type EventStreamSender = mpsc::UnboundedSender; @@ -115,6 +116,10 @@ async fn streaming_process( writer = JournalWriter::create_or_append(journal_path, None)?; let _ = callback.send(()); }, + Some(EventStreamMessage::FlushJournal(callback)) => { + writer.flush()?; + let _ = callback.send(()); + }, None => break } } diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index 2efa693d2..ebc1d4d05 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -204,6 +204,22 @@ impl EventStreamer { inner.client_listeners.remove(p); } + pub fn flush_journal(&self) -> Option> { + let inner = self.inner.get(); + if let Some(ref streamer) = inner.storage_sender { + let (sender, receiver) = oneshot::channel(); + if streamer + .send(EventStreamMessage::FlushJournal(sender)) + .is_err() + { + log::error!("Event streaming queue has been closed."); + } + Some(receiver) + } else { + None + } + } + pub fn prune_journal( &self, live_jobs: Set, diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 3e3b4ac2c..7f9553c1c 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -43,6 +43,7 @@ pub enum FromClientMessage { // and client will only receive ToClientMessage::Event StreamEvents, PruneJournal, + FlushJournal, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -342,7 +343,7 @@ pub enum ToClientMessage { Error(String), ServerInfo(ServerInfo), Event(Event), - Finished, // Generic response, now used only for Server pruning + Finished, // Generic response, now used only for journal pruning/flushing } #[derive(Serialize, Deserialize, Debug)] From 896a26d4c20ee8bde07ae849953c71f7cf8b6968 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Sat, 16 Nov 2024 10:02:56 +0100 Subject: [PATCH 4/8] Tests & Formatting & Clippy --- .../src/server/event/journal/stream.rs | 2 +- tests/test_events.py | 12 ++++-- tests/test_journal.py | 43 +++++++++++++++++-- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/crates/hyperqueue/src/server/event/journal/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs index 26e56fc66..0a5ed73aa 100644 --- a/crates/hyperqueue/src/server/event/journal/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -112,7 +112,7 @@ async fn streaming_process( return Err(e.into()) } } - rename(&tmp_path, &journal_path)?; + rename(&tmp_path, journal_path)?; writer = JournalWriter::create_or_append(journal_path, None)?; let _ = callback.send(()); }, diff --git a/tests/test_events.py b/tests/test_events.py index de0703cd5..bce32bfc0 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -167,7 +167,8 @@ def body(): "PCI Bus": "FOOBAR2" } } -print(json.dumps(data))""", +print(json.dumps(data)) + """, ): hq_env.start_worker(args=["--overview-interval", "10ms", "--resource", "gpus/amd=[0]"]) wait_for_worker_state(hq_env, 1, "RUNNING") @@ -201,14 +202,17 @@ def find_events(events, type: str) -> List: def get_events(hq_env: HqEnv, callback): - log_path = "events.log" - process = hq_env.start_server(args=["--journal", log_path]) + journal_path = "events.log" + process = hq_env.start_server(args=["--journal", journal_path]) callback() hq_env.command(["server", "stop"]) process.wait(timeout=5) hq_env.processes.clear() + return read_events(hq_env, journal_path) - output = hq_env.command(["journal", "export", log_path], ignore_stderr=True) + +def read_events(hq_env: HqEnv, journal_path: str): + output = hq_env.command(["journal", "export", journal_path], ignore_stderr=True) events = [] for line in output.splitlines(keepends=False): events.append(json.loads(line)) diff --git a/tests/test_journal.py b/tests/test_journal.py index 0612fe88f..5734ea5aa 100644 --- a/tests/test_journal.py +++ b/tests/test_journal.py @@ -1,5 +1,6 @@ import time +from .test_events import read_events from .utils.cmd import python from .autoalloc.mock.mock import MockJobManager from .autoalloc.mock.slurm import SlurmManager, adapt_slurm @@ -297,10 +298,22 @@ def test_restore_streaming(hq_env: HqEnv, tmp_path): table.check_row_value("Superseded streams", "1") -def test_prune_journal(hq_env: HqEnv, tmp_path): +def test_flush_and_prune_journal(hq_env: HqEnv, tmp_path): + def collect_ids(): + job_ids = set() + worker_ids = set() + for event in events: + event = event["event"] + t = event["type"] + if "task" in t: + job_ids.add(event["job"]) + if "job" in t: + job_ids.add(event.get("job_id") or event["job"]) + if "worker" in t: + worker_ids.add(event.get("id")) + return job_ids, worker_ids + journal_path = os.path.join(tmp_path, "my.journal") - stream_path = os.path.join(tmp_path, "stream") - os.mkdir(stream_path) hq_env.start_server(args=["--journal", journal_path]) hq_env.start_workers(2) @@ -308,4 +321,26 @@ def test_prune_journal(hq_env: HqEnv, tmp_path): hq_env.command(["job", "open"]) hq_env.command(["job", "submit", "--", "sleep", "0"]) hq_env.command(["job", "submit", "--cpus=2", "--", "sleep", "0"]) - wait_for_job_state(hq_env, 2, "FINISHED") \ No newline at end of file + wait_for_job_state(hq_env, 2, "FINISHED") + hq_env.command(["worker", "stop", "1"]) + + hq_env.command(["journal", "flush"]) + + size1 = os.stat(journal_path).st_size + + events = read_events(hq_env, journal_path) + + j_ids, w_ids = collect_ids() + assert j_ids == {1, 2, 3} + assert w_ids == {1, 2} + + hq_env.command(["journal", "prune"]) + + size2 = os.stat(journal_path).st_size + assert size1 > size2 + + events = read_events(hq_env, journal_path) + + j_ids, w_ids = collect_ids() + assert j_ids == {1, 3} + assert w_ids == {2} From d02683657deb36eae6b1fa868d93962fcc47eb4b Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Sat, 16 Nov 2024 10:15:38 +0100 Subject: [PATCH 5/8] Docs updated --- CHANGELOG.md | 30 +++++---- docs/deployment/server.md | 20 +++++- docs/jobs/jobs.md | 126 +++++++++++++++++++++++--------------- 3 files changed, 114 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3391dc717..bb5352bfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,12 @@ ### Changes -- `hq event-log` command renamed to `hq journal` +* `hq event-log` command renamed to `hq journal` + +### New features + +* Added `hq journal prune` for pruning journal file. +* Added `hq journal flush` for forcing server to flush the journal. ## v0.20.0 @@ -11,13 +16,16 @@ * It is now possible to dynamically submit new tasks into an existing job (we call this concept "Open jobs"). See [Open jobs documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/openjobs/) -* Worker streaming. Before, you could stream task stderr/stdout to the server over the network using the `--log` parameter of `hq submit`. - This approach had various issues and was not scalable. Therefore, we have replaced this functionality with worker streaming, +* Worker streaming. Before, you could stream task stderr/stdout to the server over the network using the `--log` + parameter of `hq submit`. + This approach had various issues and was not scalable. Therefore, we have replaced this functionality with worker + streaming, where the streaming of task output to a set of files on disk is performed by workers instead. This new streaming approach creates more files than original solution (where it was always one file per job), but the number of files stays small and independent on the number of executed tasks. The new architecture also allows parallel I/O writing and storing of multiple job streams in one stream handle. - You can use worker streaming using the `--stream` parameter of `hq submit`. Check out the documentation for more information. + You can use worker streaming using the `--stream` parameter of `hq submit`. Check out the documentation for more + information. * Optimization of journal size @@ -607,7 +615,7 @@ would pass `OMP_NUM_THREADS=4` to the executed ``. is `hq submit`, which is now a shortcut for `hq job submit`. Here is a table of changed commands: | **Previous command** | **New command** | - |----------------------|--------------------| + |----------------------|--------------------| | `hq jobs` | `hq job list` | | `hq job` | `hq job info` | | `hq resubmit` | `hq job resubmit` | @@ -723,7 +731,7 @@ would pass `OMP_NUM_THREADS=4` to the executed ``. * Generic resource management has been added. You can find out more in the [documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/gresources/). - * HyperQueue can now automatically detect how many Nvidia GPUs are present on a worker node. + * HyperQueue can now automatically detect how many Nvidia GPUs are present on a worker node. * You can now submit a task array where each task will receive one element of a JSON array using `hq submit --from-json`. You can find out more in the [documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/arrays/#json-array). @@ -731,10 +739,10 @@ would pass `OMP_NUM_THREADS=4` to the executed ``. ### Changes * There have been a few slight CLI changes: - * `hq worker list` no longer has `--offline` and `--online` flags. It will now display only running - workers by default. If you want to show also offline workers, use the `--all` flag. - * `hq alloc add` no longer has a required `--queue/--partition` option. The PBS queue/Slurm partition - should now be passed as a trailing argument after `--`: `hq alloc add pbs -- -qqprod`. + * `hq worker list` no longer has `--offline` and `--online` flags. It will now display only running + workers by default. If you want to show also offline workers, use the `--all` flag. + * `hq alloc add` no longer has a required `--queue/--partition` option. The PBS queue/Slurm partition + should now be passed as a trailing argument after `--`: `hq alloc add pbs -- -qqprod`. * Server subdirectories generated for each run of the HyperQueue server are now named with a numeric ID instead of a date. * The documentation has been [rewritten](https://it4innovations.github.io/hyperqueue). @@ -805,4 +813,4 @@ would pass `OMP_NUM_THREADS=4` to the executed ``. * Job arrays * Cpu management -* --stdout/--stderr configuration in submit +* --stdout/--stderr configuration in submit \ No newline at end of file diff --git a/docs/deployment/server.md b/docs/deployment/server.md index a1ee84a5b..992f5ec15 100644 --- a/docs/deployment/server.md +++ b/docs/deployment/server.md @@ -76,13 +76,15 @@ or using a terminal multiplexer like [tmux](https://en.wikipedia.org/wiki/Tmux). ## Resuming stopped/crashed server -The server supports resilience, which allows it to restore its state after it is stopped or if it crashes. To enable resilience, you can tell the server to log events into a *journal* file, using the `--journal` flag: +The server supports resilience, which allows it to restore its state after it is stopped or if it crashes. To enable +resilience, you can tell the server to log events into a *journal* file, using the `--journal` flag: ```bash $ hq server start --journal /path/to/journal ``` -If the server is stopped or it crashes, and you use the same command to start the server (using the same journal file path), it will continue from the last point: +If the server is stopped or it crashes, and you use the same command to start the server (using the same journal file +path), it will continue from the last point: ```bash $ hq server start --journal /path/to/journal @@ -99,6 +101,7 @@ have to be connected to the server after it restarts. after resuming the server, the task will be not be computed after a server restart. ### Exporting journal events + If you'd like to programmatically analyze events that are stored in the journal file, you can export them to JSON using the following command: @@ -110,6 +113,7 @@ The events will be read from the provided journal and printed to `stdout` encode event per line (this corresponds to line-delimited JSON, i.e. [NDJSON](http://ndjson.org/)). You can also directly stream events in real-time from the server using the following command: + ```bash $ hq journal stream ``` @@ -119,6 +123,16 @@ $ hq journal stream The JSON format of the journal events and their definition is currently unstable and can change with a new HyperQueue version. +### Pruning journal + +Command `hq journal prune` removes all completed jobs and disconnected workers from the journal file. + +### Flushing journal + +Command `hq journal flush` will force the server to flush the journal. +It is mainly for the testing purpose or if you are going to `hq journal export` on +a live journal (however, it is usually better to use `hq journal stream`). + ## Stopping server You can stop a running server with the following command: @@ -127,4 +141,4 @@ You can stop a running server with the following command: $ hq server stop ``` -When a server is stopped, all running jobs and connected workers will be immediately stopped. +When a server is stopped, all running jobs and connected workers will be immediately stopped. \ No newline at end of file diff --git a/docs/jobs/jobs.md b/docs/jobs/jobs.md index 27ea1e6ff..6e6679e26 100644 --- a/docs/jobs/jobs.md +++ b/docs/jobs/jobs.md @@ -10,6 +10,7 @@ graph). Jobs are units of computation management - you can submit, query or canc [Task arrays](arrays.md) to find out how to create jobs with multiple tasks. ## Identification numbers + Each job is identified by a positive integer that is assigned by the HyperQueue server when the job is submitted. We refer to it as `Job id`. @@ -20,6 +21,7 @@ have the same task id. In simple jobs, task id is always set to `0`. ## Submitting jobs + To submit a simple job that will execute some executable with the provided arguments, use the `hq submit` command: ```bash @@ -49,6 +51,7 @@ command. There are many parameters that you can set for the executed program, they are listed below. ### Name + Each job has an assigned name. It has only an informative character for the user. By default, the name is derived from the job's program name. You can also set the job name explicitly with the `--name` option: @@ -57,6 +60,7 @@ $ hq submit --name= ... ``` ### Working directory + By default, the working directory of the job will be set to the directory from which the job was submitted. You can change this using the `--cwd` option: @@ -73,6 +77,7 @@ $ hq submit --cwd= ... You can use [placeholders](#placeholders) in the working directory path. ### Output + By default, each job will produce two files containing the standard output and standard error output, respectively. The default paths of these files are @@ -85,14 +90,14 @@ You can change these paths with the `--stdout` and `--stderr` options. You can a files completely by setting the value to `none`: === "Change output paths" - ```bash - $ hq submit --stdout=out.txt --stderr=err.txt ... - ``` +```bash +$ hq submit --stdout=out.txt --stderr=err.txt ... +``` === "Disable `stdout`" - ```bash - $ hq submit --stdout=none ... - ``` +```bash +$ hq submit --stdout=none ... +``` !!! warning @@ -101,6 +106,7 @@ files completely by setting the value to `none`: [working directory](#working-directory) of the job. If you want to change that, use the `%{CWD}` [placeholder](#placeholders). ### Environment variables + You can set environment variables which will be passed to the provided command when the job is executed using the `--env =` option. Multiple environment variables can be passed if you repeat the option. @@ -118,37 +124,41 @@ Each executed task will also automatically receive the following environment var | `HQ_RESOURCE_...` | A set of variables related to allocated [resources](resources.md) | ### Time management -You can specify two time-related parameters when submitting a job. They will be applied to each task of the submitted job. + +You can specify two time-related parameters when submitting a job. They will be applied to each task of the submitted +job. - **Time Limit** is the maximal running time of a task. If it is reached, the task will be terminated, and it will transition into the `Failed` [state](#task-state). This setting has no impact on scheduling. - This can serve as a sanity check to make sure that some task will not run indefinitely. You can set it with the - `--time-limit` option[^2]: + This can serve as a sanity check to make sure that some task will not run indefinitely. You can set it with the + `--time-limit` option[^2]: ```bash $ hq submit --time-limit= ... ``` - !!! note - + !!! note + Time limit is counted separately for each task. If you set a time limit of `3 minutes` and create two tasks, where each will run for two minutes, the time limit will not be hit. -- **Time Request** is the minimal remaining [lifetime](../deployment/worker.md#time-limit) that a worker must have in order - to start executing the task. Workers that do not have enough remaining lifetime will not be considered for running this +- **Time Request** is the minimal remaining [lifetime](../deployment/worker.md#time-limit) that a worker must have in + order + to start executing the task. Workers that do not have enough remaining lifetime will not be considered for running + this task. - Time requests are only used during scheduling, where the server decides which worker should execute which task. - Once a task is scheduled and starts executing on a worker, the time request value will not have any effect. + Time requests are only used during scheduling, where the server decides which worker should execute which task. + Once a task is scheduled and starts executing on a worker, the time request value will not have any effect. - You can set the time request using the `--time-request` option[^2]: + You can set the time request using the `--time-request` option[^2]: ```bash $ hq submit --time-request= ... ``` - !!! note + !!! note Workers with an unknown remaining lifetime will be able to execute any task, disregarding its time request. @@ -157,7 +167,8 @@ You can specify two time-related parameters when submitting a job. They will be Here is an example situation where time limit and time request can be used: Let's assume that we have a collection of tasks where the vast majority of tasks usually finish within `10` minutes, but -some of them run for (at most) `30` minutes. We do not know in advance which tasks will be "slow". In this case we may want +some of them run for (at most) `30` minutes. We do not know in advance which tasks will be "slow". In this case we may +want to set the *time limit* to `35` minutes to protect us against an error (deadlock, endless loop, etc.). However, since we know that each task will usually take *at least* `10` minutes to execute, we don't want to start @@ -165,6 +176,7 @@ executing it on a worker if we know that the worker will definitely terminate in cause unnecessary lost computational resources. Therefore, we can set the *time request* to `10` minutes. ### Priority + You can modify the order in which tasks are executed using **Priority**. Priority can be any 32b *signed* integer. A lower number signifies lower priority, e.g. when task `A` with priority `5` and task `B` with priority `3` are scheduled to the same worker and only one of them may be executed, then `A` will be executed first. @@ -178,6 +190,7 @@ $hq submit --priority= If no priority is specified, then each task will have priority `0`. ### Placeholders + You can use special variables when setting certain job parameters ([working directory](#working-directory), [output](#output) paths, [log](streaming.md#redirecting-output-to-the-log) path). These variables, called **Placeholders**, will be replaced by job or task-specific information before the job is executed. @@ -195,7 +208,8 @@ You can use the following placeholders: | `%{CWD}` | Working directory of the task. | `stdout`, `stderr` | | `%{SERVER_UID}` | Unique server ID. | `stdout`, `stderr`, `cwd`, `log` | -`SERVER_UID` is a random string that is unique for each new server execution (each `hq server start` gets a separate value). +`SERVER_UID` is a random string that is unique for each new server execution (each `hq server start` gets a separate +value). As an example, if you wanted to include the [Instance ID](failure.md#task-restart) in the `stdout` path (to distinguish the individual outputs of restarted tasks), you can use placeholders like this: @@ -205,6 +219,7 @@ $ hq submit --stdout '%{CWD}/job-%{JOB_ID}/%{TASK_ID}-%{INSTANCE_ID}.stdout' ... ``` ## State + At any moment in time, each task and job has a specific *state* that represents what is currently happening to it. You can query the state of a job with the following command[^1]: @@ -215,6 +230,7 @@ $ hq job info [^1]: You can use various [shortcuts](../cli/shortcuts.md#id-selector) to select multiple jobs at once. ### Task state + Each task starts in the `Waiting` state and can end up in one of the terminal states: `Finished`, `Failed` or `Canceled`. @@ -241,6 +257,7 @@ Finished Failed Canceled If a task is in the `Finished`, `Failed` or `Canceled` state, it is `completed`. ### Job state + The state of a job is derived from the states of its individual tasks. The state is determined by the first rule that matches from the following list of rules: @@ -251,9 +268,10 @@ matches from the following list of rules: 5. If all tasks are finished and job is open (see [Open Jobs](openjobs.md)), then job state is `Opened`. 5. Remaining case: all tasks are `Finished` and job is closed, then job state is `Finished`. - ## Cancelling jobs -You can prematurely terminate a submitted job that haven't been completed yet by *cancelling* it using the `hq job cancel` + +You can prematurely terminate a submitted job that haven't been completed yet by *cancelling* it using +the `hq job cancel` command[^1]: ```bash @@ -263,6 +281,7 @@ $ hq job cancel Cancelling a job will cancel all of its tasks that are not yet completed. ## Forgetting jobs + If you want to completely forget a job, and thus free up its associated memory, you can do that using the `hq job forget` command[^1]: @@ -280,7 +299,11 @@ $ hq job forget all --status finished,canceled However, only jobs that are completed, i.e. that have been finished successfully, failed or have been canceled, can be forgotten. If you want to forget a waiting or a running job, [cancel](#cancelling-jobs) it first. +Note that if you are using a journal, forgetting only free the memory of the server but the tasks remains +in journal, run `hq journal prune` to remove completed jobs and workers from journal file. + ## Waiting for jobs + There are three ways of waiting until a job completes: - **Submit and wait** You can use the `--wait` flag when submitting a job. This will cause the submission command to @@ -290,11 +313,12 @@ There are three ways of waiting until a job completes: $ hq submit --wait ... ``` - !!! tip + !!! tip This method can be used for benchmarking the job duration. -- **Wait command** There is a separate `hq job wait` command that can be used to wait until an existing job completes[^1]: +- **Wait command** There is a separate `hq job wait` command that can be used to wait until an existing job + completes[^1]: ```bash $ hq job wait @@ -303,14 +327,14 @@ There are three ways of waiting until a job completes: - **Interactive wait** If you want to interactively observe the status of a job (which is useful especially if it has [multiple tasks](arrays.md)), you can use the `hq job progress` command: - === "Submit and observe" - ```bash - $ hq submit --progress ... - ``` - === "Observe an existing job[^1]" - ```bash - $ hq job progress - ``` + === "Submit and observe" + ```bash + $ hq submit --progress ... + ``` + === "Observe an existing job[^1]" + ```bash + $ hq job progress + ``` ## Attaching standard input @@ -318,7 +342,7 @@ When ``--stdin`` flag is used, HQ captures standard input and attaches it to eac When a task is started then the attached data is written into the standard input of the task. This can be used to submitting scripts without creating file. -The following command will capture stdin and executes it in Bash +The following command will capture stdin and executes it in Bash ```bash $ hq submit --stdin bash @@ -334,7 +358,7 @@ when the task is completed (for any reason). ## Providing own error message -A task may pass its own error message into the HyperQueue. +A task may pass its own error message into the HyperQueue. HyperQueue provides a filename via environment variable ``HQ_ERROR_FILENAME``, if a task creates this file and terminates with a non-zero return code, then the content of this file is taken as an error message. @@ -347,8 +371,10 @@ If the message is longer than 2KiB, then it is truncated to 2KiB. If task terminates with zero return code, then the error file is ignored. ## Automatic file cleanup + If you create a lot of tasks and do not use [output streaming](streaming.md), a lot of `stdout`/`stderr` -files can be created on the disk. In certain cases, you might not be interested in the contents of these files, especially +files can be created on the disk. In certain cases, you might not be interested in the contents of these files, +especially if the task has finished successfully, and you instead want to remove them as soon as they are not needed. For that, you can use a file cleanup mode when specifying `stdout` and/or `stderr` to choose what should happen with @@ -356,33 +382,36 @@ the file when its task finishes. The mode is specified as a name following a col Currently, one cleanup mode is implemented: - Remove the file if the task has [finished](#task-state) successfully: + ```bash $ hq submit --stdout="out.txt:rm-if-finished" /my-program ``` + The file will not be deleted if the task fails or is cancelled. !!! Note - If you want to use the default `stdout`/`stderr` file path (and you don't want to look it up), you can also specify - just the cleanup mode without the file path: - ```bash - $ hq submit --stdout=":rm-if-finished" /my-program - ``` +If you want to use the default `stdout`/`stderr` file path (and you don't want to look it up), you can also specify +just the cleanup mode without the file path: +```bash +$ hq submit --stdout=":rm-if-finished" /my-program +``` ## Useful job commands + Here is a list of useful job commands: ### Display job table === "List queued and running jobs" - ```bash - $ hq job list - ``` +```bash +$ hq job list +``` === "List all jobs" - ```bash - $ hq job list --all - ``` +```bash +$ hq job list --all +``` === "List jobs by status" - You can display only jobs having the selected [states](#job-state) by using the `--filter` flag: +You can display only jobs having the selected [states](#job-state) by using the `--filter` flag: ```bash $ hq job list --filter running,waiting @@ -397,6 +426,7 @@ Here is a list of useful job commands: - `canceled` ### Display a summary table of all jobs + ```commandline $ hq job summary ``` @@ -422,7 +452,7 @@ $ hq job cat [--tasks ] ## Crashing limit When a worker is lost then all running tasks on the worker are suspicious that they may cause the crash of the -worker. HyperQueue server remembers how many times were a task running while a worker is lost. If the count +worker. HyperQueue server remembers how many times were a task running while a worker is lost. If the count reaches the limit, then the task is set to the failed state. By default, this limit is `5` but it can be changed as follows: @@ -430,4 +460,4 @@ By default, this limit is `5` but it can be changed as follows: $ hq submit --crash-limit= ... ``` -If the limit is set to 0, then the limit is disabled. +If the limit is set to 0, then the limit is disabled. \ No newline at end of file From eee6e852e8944286c792b72b1e458219da57c755 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Sat, 16 Nov 2024 10:21:19 +0100 Subject: [PATCH 6/8] Rust tests and py binding updated --- crates/hyperqueue/src/client/commands/job.rs | 2 +- crates/hyperqueue/src/client/commands/journal/mod.rs | 2 +- crates/hyperqueue/src/client/commands/server.rs | 2 +- crates/hyperqueue/src/server/bootstrap.rs | 4 +++- crates/hyperqueue/src/server/client/mod.rs | 2 +- crates/hyperqueue/src/server/event/journal/mod.rs | 4 ++-- crates/hyperqueue/src/server/event/journal/prune.rs | 2 +- crates/hyperqueue/src/server/event/journal/read.rs | 4 ++-- crates/hyperqueue/src/server/event/journal/stream.rs | 2 +- crates/hyperqueue/src/server/event/journal/write.rs | 2 +- crates/hyperqueue/src/server/event/mod.rs | 2 +- crates/hyperqueue/src/server/event/streamer.rs | 2 +- crates/hyperqueue/src/server/restore.rs | 2 +- crates/hyperqueue/src/transfer/messages.rs | 2 +- crates/pyhq/src/cluster/server.rs | 2 ++ 15 files changed, 20 insertions(+), 16 deletions(-) diff --git a/crates/hyperqueue/src/client/commands/job.rs b/crates/hyperqueue/src/client/commands/job.rs index 4c34133a5..deaf5214e 100644 --- a/crates/hyperqueue/src/client/commands/job.rs +++ b/crates/hyperqueue/src/client/commands/job.rs @@ -339,4 +339,4 @@ pub async fn forget_job( log::info!("{message}"); Ok(()) -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/client/commands/journal/mod.rs b/crates/hyperqueue/src/client/commands/journal/mod.rs index 2c630dc0c..2ae7939fe 100644 --- a/crates/hyperqueue/src/client/commands/journal/mod.rs +++ b/crates/hyperqueue/src/client/commands/journal/mod.rs @@ -127,4 +127,4 @@ async fn flush_journal(gsettings: &GlobalSettings) -> anyhow::Result<()> { ) .await?; Ok(()) -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/client/commands/server.rs b/crates/hyperqueue/src/client/commands/server.rs index 5a05e6187..7a0281035 100644 --- a/crates/hyperqueue/src/client/commands/server.rs +++ b/crates/hyperqueue/src/client/commands/server.rs @@ -216,4 +216,4 @@ fn command_server_generate_access( store_access_record(&worker_record, path)?; } Ok(()) -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/bootstrap.rs b/crates/hyperqueue/src/server/bootstrap.rs index 4969f6704..0dd9f56d8 100644 --- a/crates/hyperqueue/src/server/bootstrap.rs +++ b/crates/hyperqueue/src/server/bootstrap.rs @@ -402,6 +402,7 @@ mod tests { use std::future::Future; use std::path::Path; use std::sync::Arc; + use std::time::Duration; use tempfile::TempDir; pub async fn init_test_server( @@ -419,6 +420,7 @@ mod tests { client_port: None, worker_port: None, journal_path: None, + journal_flush_period: Duration::from_secs(30), worker_secret_key: None, client_secret_key: None, server_uid: None, @@ -506,4 +508,4 @@ mod tests { notify.notify_one(); fut.await.unwrap(); } -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index b60ba72e5..c03b64056 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -574,4 +574,4 @@ async fn handle_worker_info(state_ref: &StateRef, worker_id: WorkerId) -> ToClie let state = state_ref.get(); ToClientMessage::WorkerInfoResponse(state.get_worker(worker_id).map(|w| w.make_info())) -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/event/journal/mod.rs b/crates/hyperqueue/src/server/event/journal/mod.rs index b9dbc70cb..832147dee 100644 --- a/crates/hyperqueue/src/server/event/journal/mod.rs +++ b/crates/hyperqueue/src/server/event/journal/mod.rs @@ -1,10 +1,10 @@ +mod prune; mod read; mod stream; -mod prune; mod write; pub use read::JournalReader; pub use stream::{start_event_streaming, EventStreamMessage, EventStreamSender}; pub use write::JournalWriter; -const HQ_JOURNAL_HEADER: &[u8] = b"hqjl0001"; \ No newline at end of file +const HQ_JOURNAL_HEADER: &[u8] = b"hqjl0001"; diff --git a/crates/hyperqueue/src/server/event/journal/prune.rs b/crates/hyperqueue/src/server/event/journal/prune.rs index f330bb2cc..e1cea2a31 100644 --- a/crates/hyperqueue/src/server/event/journal/prune.rs +++ b/crates/hyperqueue/src/server/event/journal/prune.rs @@ -38,4 +38,4 @@ pub(crate) fn prune_journal( } } Ok(()) -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/event/journal/read.rs b/crates/hyperqueue/src/server/event/journal/read.rs index 06e6b20f9..21b426c98 100644 --- a/crates/hyperqueue/src/server/event/journal/read.rs +++ b/crates/hyperqueue/src/server/event/journal/read.rs @@ -79,7 +79,7 @@ impl Iterator for &mut JournalReader { #[cfg(test)] mod tests { - use crate::server::event::log::{JournalReader, JournalWriter}; + use crate::server::event::journal::{JournalReader, JournalWriter}; use crate::server::event::payload::EventPayload; use crate::server::event::Event; use chrono::Utc; @@ -237,4 +237,4 @@ mod tests { EventPayload::AllocationFinished(0, _) )); } -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/event/journal/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs index 0a5ed73aa..2f97e008a 100644 --- a/crates/hyperqueue/src/server/event/journal/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -133,4 +133,4 @@ async fn streaming_process( ); Ok(()) -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/event/journal/write.rs b/crates/hyperqueue/src/server/event/journal/write.rs index f881a9a3e..86b736920 100644 --- a/crates/hyperqueue/src/server/event/journal/write.rs +++ b/crates/hyperqueue/src/server/event/journal/write.rs @@ -53,4 +53,4 @@ impl JournalWriter { self.file.flush()?; Ok(()) } -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/event/mod.rs b/crates/hyperqueue/src/server/event/mod.rs index 1c13f2a0a..1bc0d75b5 100644 --- a/crates/hyperqueue/src/server/event/mod.rs +++ b/crates/hyperqueue/src/server/event/mod.rs @@ -17,4 +17,4 @@ pub struct Event { #[serde(with = "ts_milliseconds")] pub time: DateTime, pub payload: EventPayload, -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/event/streamer.rs b/crates/hyperqueue/src/server/event/streamer.rs index ebc1d4d05..1cb7103ba 100644 --- a/crates/hyperqueue/src/server/event/streamer.rs +++ b/crates/hyperqueue/src/server/event/streamer.rs @@ -243,4 +243,4 @@ impl EventStreamer { None } } -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/server/restore.rs b/crates/hyperqueue/src/server/restore.rs index 13f83aeb5..bf31b9901 100644 --- a/crates/hyperqueue/src/server/restore.rs +++ b/crates/hyperqueue/src/server/restore.rs @@ -314,4 +314,4 @@ impl StateRestorer { } Ok(()) } -} \ No newline at end of file +} diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 7f9553c1c..5b2758e8d 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -482,4 +482,4 @@ pub struct WaitForJobsResponse { pub failed: u32, pub canceled: u32, pub invalid: u32, -} \ No newline at end of file +} diff --git a/crates/pyhq/src/cluster/server.rs b/crates/pyhq/src/cluster/server.rs index 31db29019..d983cd105 100644 --- a/crates/pyhq/src/cluster/server.rs +++ b/crates/pyhq/src/cluster/server.rs @@ -5,6 +5,7 @@ use hyperqueue::server::bootstrap::{initialize_server, ServerConfig}; use std::path::PathBuf; use std::sync::Arc; use std::thread::JoinHandle; +use std::time::Duration; use tokio::sync::Notify; use tokio::task::LocalSet; @@ -38,6 +39,7 @@ impl RunningServer { client_secret_key: None, worker_secret_key: None, server_uid: None, + journal_flush_period: Duration::from_secs(30), }; let main_future = async move { From b069a6cfd0b96e3e3012d5dc81f5b37b7e602613 Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Mon, 18 Nov 2024 22:17:50 +0100 Subject: [PATCH 7/8] JournalWriter::create added --- .../src/server/event/journal/stream.rs | 2 +- .../src/server/event/journal/write.rs | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/crates/hyperqueue/src/server/event/journal/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs index 2f97e008a..220ddb5bf 100644 --- a/crates/hyperqueue/src/server/event/journal/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -106,7 +106,7 @@ async fn streaming_process( let tmp_path: PathBuf = tmp_path.into(); { let mut reader = JournalReader::open(journal_path)?; - let mut writer = JournalWriter::create_or_append(&tmp_path, None)?; + let mut writer = JournalWriter::create(&tmp_path)?; if let Err(e) = prune_journal(&mut reader, &mut writer, &live_jobs, &live_workers) { remove_file(&tmp_path)?; return Err(e.into()) diff --git a/crates/hyperqueue/src/server/event/journal/write.rs b/crates/hyperqueue/src/server/event/journal/write.rs index 86b736920..caea59a30 100644 --- a/crates/hyperqueue/src/server/event/journal/write.rs +++ b/crates/hyperqueue/src/server/event/journal/write.rs @@ -31,14 +31,26 @@ impl JournalWriter { let mut file = BufWriter::new(raw_file); if position == 0 && file.stream_position()? == 0 { - file.write_all(HQ_JOURNAL_HEADER)?; - EventSerializationConfig::config().serialize_into(&mut file, HQ_VERSION)?; - file.flush()?; + Self::write_header(&mut file)?; }; Ok(Self { file }) } + pub fn create(path: &Path) -> anyhow::Result { + let raw_file = File::create(path)?; + let mut file = BufWriter::new(raw_file); + Self::write_header(&mut file)?; + Ok(Self { file }) + } + + fn write_header(mut file: &mut BufWriter) -> anyhow::Result<()> { + file.write_all(HQ_JOURNAL_HEADER)?; + EventSerializationConfig::config().serialize_into(&mut file, HQ_VERSION)?; + file.flush()?; + Ok(()) + } + pub fn store(&mut self, event: Event) -> anyhow::Result<()> { EventSerializationConfig::config().serialize_into(&mut self.file, &event)?; Ok(()) From ef5958f056f91baceea4343194f4e670288c2e3d Mon Sep 17 00:00:00 2001 From: Ada Bohm Date: Mon, 18 Nov 2024 22:26:36 +0100 Subject: [PATCH 8/8] Droping writer while pruning --- crates/hyperqueue/src/server/event/journal/stream.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/hyperqueue/src/server/event/journal/stream.rs b/crates/hyperqueue/src/server/event/journal/stream.rs index 220ddb5bf..7ed3924c1 100644 --- a/crates/hyperqueue/src/server/event/journal/stream.rs +++ b/crates/hyperqueue/src/server/event/journal/stream.rs @@ -101,6 +101,7 @@ async fn streaming_process( }, Some(EventStreamMessage::PruneJournal { live_jobs, live_workers, callback }) => { writer.flush()?; + drop(writer); let mut tmp_path: OsString = journal_path.into(); tmp_path.push(".tmp"); let tmp_path: PathBuf = tmp_path.into();