Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

### Changes

- `hq event-log` command renamed to `hq journal`
* `hq event-log` command renamed to `hq journal`

### New features

* Added `hq journal prune` for pruning journal file.
* Added `hq journal flush` for forcing server to flush the journal.

## v0.20.0

Expand All @@ -11,13 +16,16 @@
* It is now possible to dynamically submit new tasks into an existing job (we call this concept "Open jobs").
See [Open jobs documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/openjobs/)

* Worker streaming. Before, you could stream task stderr/stdout to the server over the network using the `--log` parameter of `hq submit`.
This approach had various issues and was not scalable. Therefore, we have replaced this functionality with worker streaming,
* Worker streaming. Before, you could stream task stderr/stdout to the server over the network using the `--log`
parameter of `hq submit`.
This approach had various issues and was not scalable. Therefore, we have replaced this functionality with worker
streaming,
where the streaming of task output to a set of files on disk is performed by workers instead.
This new streaming approach creates more files than original solution (where it was always one file per job),
but the number of files stays small and independent on the number of executed tasks.
The new architecture also allows parallel I/O writing and storing of multiple job streams in one stream handle.
You can use worker streaming using the `--stream` parameter of `hq submit`. Check out the documentation for more information.
You can use worker streaming using the `--stream` parameter of `hq submit`. Check out the documentation for more
information.

* Optimization of journal size

Expand Down Expand Up @@ -607,7 +615,7 @@ would pass `OMP_NUM_THREADS=4` to the executed `<program>`.
is `hq submit`, which is now a shortcut for `hq job submit`. Here is a table of changed commands:

| **Previous command** | **New command** |
|----------------------|--------------------|
|----------------------|--------------------|
| `hq jobs` | `hq job list` |
| `hq job` | `hq job info` |
| `hq resubmit` | `hq job resubmit` |
Expand Down Expand Up @@ -723,18 +731,18 @@ would pass `OMP_NUM_THREADS=4` to the executed `<program>`.

* Generic resource management has been added. You can find out more in
the [documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/gresources/).
* HyperQueue can now automatically detect how many Nvidia GPUs are present on a worker node.
* HyperQueue can now automatically detect how many Nvidia GPUs are present on a worker node.
* You can now submit a task array where each task will receive one element of a JSON array using
`hq submit --from-json`. You can find out more in
the [documentation](https://it4innovations.github.io/hyperqueue/stable/jobs/arrays/#json-array).

### Changes

* There have been a few slight CLI changes:
* `hq worker list` no longer has `--offline` and `--online` flags. It will now display only running
workers by default. If you want to show also offline workers, use the `--all` flag.
* `hq alloc add` no longer has a required `--queue/--partition` option. The PBS queue/Slurm partition
should now be passed as a trailing argument after `--`: `hq alloc add pbs -- -qqprod`.
* `hq worker list` no longer has `--offline` and `--online` flags. It will now display only running
workers by default. If you want to show also offline workers, use the `--all` flag.
* `hq alloc add` no longer has a required `--queue/--partition` option. The PBS queue/Slurm partition
should now be passed as a trailing argument after `--`: `hq alloc add pbs -- -qqprod`.
* Server subdirectories generated for each run of the HyperQueue server are now named with a numeric ID instead of
a date.
* The documentation has been [rewritten](https://it4innovations.github.io/hyperqueue).
Expand Down Expand Up @@ -805,4 +813,4 @@ would pass `OMP_NUM_THREADS=4` to the executed `<program>`.

* Job arrays
* Cpu management
* --stdout/--stderr configuration in submit
* --stdout/--stderr configuration in submit
12 changes: 7 additions & 5 deletions crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,13 @@ pub async fn forget_job(
) -> anyhow::Result<()> {
let JobForgetOpts { selector, filter } = opts;

let response = rpc_call!(session.connection(), FromClientMessage::ForgetJob(ForgetJobRequest {
selector,
filter: filter.into_iter().map(|s| s.into_status()).collect()
}), ToClientMessage::ForgetJobResponse(r) => r)
.await?;
let message = FromClientMessage::ForgetJob(ForgetJobRequest {
selector,
filter: filter.into_iter().map(|s| s.into_status()).collect(),
});
let response =
rpc_call!(session.connection(), message, ToClientMessage::ForgetJobResponse(r) => r)
.await?;

let mut message = format!(
"{} {} were forgotten",
Expand Down
33 changes: 32 additions & 1 deletion crates/hyperqueue/src/client/commands/journal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ mod output;
use crate::client::commands::journal::output::format_event;
use crate::client::globalsettings::GlobalSettings;
use crate::common::utils::str::pluralize;
use crate::rpc_call;
use crate::server::bootstrap::get_client_session;
use crate::server::event::log::JournalReader;
use crate::server::event::journal::JournalReader;
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
use anyhow::anyhow;
use clap::{Parser, ValueHint};
Expand All @@ -26,6 +27,12 @@ enum JournalCommand {

/// Live stream events from the server.
Stream,

/// Connect to a server and remove completed tasks and non-active workers from journal
Prune,

/// Connect to a server and forces to flush a journal
Flush,
Comment thread
Kobzol marked this conversation as resolved.
}

#[derive(Parser)]
Expand All @@ -40,6 +47,8 @@ pub async fn command_journal(gsettings: &GlobalSettings, opts: JournalOpts) -> a
match opts.command {
JournalCommand::Export(opts) => export_json(opts),
JournalCommand::Stream => stream_json(gsettings).await,
JournalCommand::Prune => prune_journal(gsettings).await,
JournalCommand::Flush => flush_journal(gsettings).await,
}
}

Expand Down Expand Up @@ -97,3 +106,25 @@ The file might have been incomplete."
stdout.flush()?;
Ok(())
}

async fn prune_journal(gsettings: &GlobalSettings) -> anyhow::Result<()> {
let mut session = get_client_session(gsettings.server_directory()).await?;
rpc_call!(
session.connection(),
FromClientMessage::PruneJournal,
ToClientMessage::Finished
)
.await?;
Ok(())
}

async fn flush_journal(gsettings: &GlobalSettings) -> anyhow::Result<()> {
let mut session = get_client_session(gsettings.server_directory()).await?;
rpc_call!(
session.connection(),
FromClientMessage::FlushJournal,
ToClientMessage::Finished
)
.await?;
Ok(())
}
5 changes: 5 additions & 0 deletions crates/hyperqueue/src/client/commands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ struct ServerStartOpts {
#[arg(long)]
journal: Option<PathBuf>,

#[arg(long, value_parser = parse_human_time, default_value = "30s")]
/// Configure how often should be the journal written, default: 30s
journal_flush_period: Duration,

/// Path to access file that is used for configuration of secret keys and ports
#[arg(long)]
access_file: Option<PathBuf>,
Expand Down Expand Up @@ -140,6 +144,7 @@ async fn start_server(gsettings: &GlobalSettings, opts: ServerStartOpts) -> anyh
client_port,
worker_port,
journal_path: opts.journal,
journal_flush_period: opts.journal_flush_period,
worker_secret_key: access_file.as_ref().map(|a| a.worker_key().clone()),
client_secret_key: access_file.as_ref().map(|a| a.client_key().clone()),
server_uid: access_file.as_ref().map(|a| a.server_uid().to_string()),
Expand Down
10 changes: 7 additions & 3 deletions crates/hyperqueue/src/server/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::common::serverdir::{
};
use crate::server::autoalloc::{create_autoalloc_service, QueueId};
use crate::server::backend::Backend;
use crate::server::event::log::start_event_streaming;
use crate::server::event::log::JournalWriter;
use crate::server::event::journal::start_event_streaming;
use crate::server::event::journal::JournalWriter;
use crate::server::event::streamer::EventStreamer;
use crate::server::restore::StateRestorer;
use crate::server::state::StateRef;
Expand Down Expand Up @@ -46,6 +46,7 @@ pub struct ServerConfig {
pub client_port: Option<u16>,
pub worker_port: Option<u16>,
pub journal_path: Option<PathBuf>,
pub journal_flush_period: Duration,
pub worker_secret_key: Option<Arc<SecretKey>>,
pub client_secret_key: Option<Arc<SecretKey>>,
pub server_uid: Option<String>,
Expand Down Expand Up @@ -292,7 +293,8 @@ async fn prepare_event_management(
)
})?;

let (tx, stream_fut) = start_event_streaming(writer, log_path);
let (tx, stream_fut) =
start_event_streaming(writer, log_path, server_cfg.journal_flush_period);
let streamer = EventStreamer::new(Some(tx));
streamer.on_server_start(server_uid);
(streamer, Box::pin(stream_fut))
Expand Down Expand Up @@ -400,6 +402,7 @@ mod tests {
use std::future::Future;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tempfile::TempDir;

pub async fn init_test_server(
Expand All @@ -417,6 +420,7 @@ mod tests {
client_port: None,
worker_port: None,
journal_path: None,
journal_flush_period: Duration::from_secs(30),
worker_secret_key: None,
client_secret_key: None,
server_uid: None,
Expand Down
44 changes: 43 additions & 1 deletion crates/hyperqueue/src/server/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Notify};

use tako::gateway::{CancelTasks, FromGatewayMessage, StopWorkerRequest, ToGatewayMessage};
use tako::TaskGroup;
use tako::{Set, TaskGroup};

use crate::client::status::{job_status, Status};
use crate::common::serverdir::ServerDir;
Expand Down Expand Up @@ -181,6 +181,15 @@ pub async fn client_rpc_loop<
FromClientMessage::ServerInfo => {
ToClientMessage::ServerInfo(state_ref.get().server_info().clone())
}
FromClientMessage::PruneJournal => {
handle_prune_journal(&state_ref, senders).await
}
FromClientMessage::FlushJournal => {
if let Some(callback) = senders.events.flush_journal() {
let _ = callback.await;
};
ToClientMessage::Finished
}
};
if let Err(error) = tx.send(response).await {
log::error!("Cannot reply to client: {error:?}");
Expand All @@ -203,6 +212,39 @@ pub async fn client_rpc_loop<
}
}

async fn handle_prune_journal(state_ref: &StateRef, senders: &Senders) -> ToClientMessage {
log::debug!("Client asked for journal prunning");
let (live_jobs, live_workers) = {
let state = state_ref.get();
let live_jobs: Set<_> = state
.jobs()
.filter_map(|job| {
if job.is_terminated() {
None
} else {
Some(job.job_id)
}
})
.collect();
let live_workers = state
.get_workers()
.values()
.filter_map(|worker| {
if worker.is_running() {
Some(worker.worker_id())
} else {
None
}
})
.collect();
(live_jobs, live_workers)
};
if let Some(receiver) = senders.events.prune_journal(live_jobs, live_workers) {
let _ = receiver.await;
}
ToClientMessage::Finished
}

/// Waits until all jobs matched by the `selector` are finished (either by completing successfully,
/// failing or being canceled).
async fn handle_wait_for_jobs_message(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod prune;
mod read;
mod stream;
mod write;
Expand Down
41 changes: 41 additions & 0 deletions crates/hyperqueue/src/server/event/journal/prune.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::server::event::journal::{JournalReader, JournalWriter};
use crate::server::event::payload::EventPayload;
use crate::JobId;
use tako::{Set, WorkerId};

pub(crate) fn prune_journal(
reader: &mut JournalReader,
writer: &mut JournalWriter,
live_job_ids: &Set<JobId>,
live_worker_ids: &Set<WorkerId>,
) -> crate::Result<()> {
for event in reader {
let event = event?;
let retain = match &event.payload {
EventPayload::WorkerConnected(worker_id, _)
| EventPayload::WorkerLost(worker_id, _) => live_worker_ids.contains(worker_id),
EventPayload::WorkerOverviewReceived(overview) => {
live_worker_ids.contains(&overview.id)
}
EventPayload::Submit { job_id, .. }
| EventPayload::JobCompleted(job_id)
| EventPayload::JobOpen(job_id, _)
| EventPayload::JobClose(job_id)
| EventPayload::TaskStarted { job_id, .. }
| EventPayload::TaskFinished { job_id, .. }
| EventPayload::TaskFailed { job_id, .. }
| EventPayload::TaskCanceled { job_id, .. } => live_job_ids.contains(job_id),
EventPayload::AllocationQueueCreated(_, _)
| EventPayload::AllocationQueueRemoved(_)
| EventPayload::AllocationQueued { .. }
| EventPayload::AllocationStarted(_, _)
| EventPayload::AllocationFinished(_, _)
| EventPayload::ServerStart { .. }
| EventPayload::ServerStop => true,
};
if retain {
writer.store(event)?;
}
}
Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::common::serialization::SerializationConfig;
use crate::server::event::log::HQ_JOURNAL_HEADER;
use crate::server::event::journal::HQ_JOURNAL_HEADER;
use crate::server::event::{Event, EventSerializationConfig};
use crate::HQ_VERSION;
use anyhow::{anyhow, bail};
Expand Down Expand Up @@ -79,7 +79,7 @@ impl Iterator for &mut JournalReader {

#[cfg(test)]
mod tests {
use crate::server::event::log::{JournalReader, JournalWriter};
use crate::server::event::journal::{JournalReader, JournalWriter};
use crate::server::event::payload::EventPayload;
use crate::server::event::Event;
use chrono::Utc;
Expand Down
Loading