Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,24 @@ COMMENT ON TABLE public.audit_log IS 'anon: {"mutation_name": "delete"}';
|----------|-----------|-------------|
| `string_by_mask` | `mask`, `char`, `digit`, `unique` | Template: `@`=letter, `#`=digit |

### JSON

| Mutation | Parameters | Description |
|----------|-----------|-------------|
| `json_update` | map of `key → nested mutation spec` | Partially updates a JSON object column. Each value is `{"mutation_name": ..., "mutation_kwargs": ...}`. `mutation_name: "delete"` clears the value (sets it to `""`) — the key stays. Missing keys are skipped — the mutation is not applied and the key is not added. Nested mutation output is inserted as a JSON string (or `null` when it returns `\N`). |

Example:

```sql
COMMENT ON COLUMN public.users.meta IS 'anon: [{
"mutation_name": "json_update",
"mutation_kwargs": {
"name": {"mutation_name": "first_name"},
"secret": {"mutation_name": "delete"}
}
}]';
```

## Condition Operations

| Operation | Description |
Expand Down
102 changes: 102 additions & 0 deletions src/mutator/json_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use serde_json::{Map, Value};

use crate::error::{PgStageError, Result};
use crate::mutator::{resolve_mutation, MutationContext};
use crate::FastMap;

/// Partially mutates a JSON object value. `mutation_kwargs` maps JSON keys to
/// nested mutation specs: `{"mutation_name": "...", "mutation_kwargs": {...}}`.
/// The special `mutation_name: "delete"` clears the key's value (sets it to
/// an empty string) — it does NOT remove the key.
///
/// Missing keys: the mutation is skipped entirely (the key is NOT added).
/// The nested mutation receives the existing JSON value (stringified) as its
/// `current_value`; its output is inserted as a JSON string (or `null` if the
/// mutation returns the SQL null sentinel `\N`).
pub fn json_update(ctx: &mut MutationContext) -> Result<String> {
let mut root: Value = if ctx.current_value == "\\N" || ctx.current_value.is_empty() {
Value::Object(Map::new())
} else {
serde_json::from_str(ctx.current_value).map_err(|e| {
PgStageError::MutationError(format!("json_update: failed to parse value as JSON: {}", e))
})?
};

let obj = root.as_object_mut().ok_or_else(|| {
PgStageError::MutationError("json_update: top-level value is not a JSON object".to_string())
})?;

// Rebind so the iterator below borrows the map directly, leaving `ctx`
// free for split borrows of `rng` / `unique_tracker` inside the loop.
let kwargs = ctx.kwargs;

for (key, spec_val) in kwargs.iter() {
let spec_obj = spec_val.as_object().ok_or_else(|| {
PgStageError::InvalidParameter(format!(
"json_update: expected object spec for key '{}'",
key
))
})?;

let mutation_name = spec_obj
.get("mutation_name")
.and_then(|v| v.as_str())
.ok_or_else(|| {
PgStageError::InvalidParameter(format!(
"json_update: missing 'mutation_name' for key '{}'",
key
))
})?;

// Skip the mutation entirely if the key is not present in the JSON.
if !obj.contains_key(key) {
continue;
}

if mutation_name == "delete" {
obj.insert(key.clone(), Value::String(String::new()));
continue;
}

let mutation_fn = resolve_mutation(mutation_name)
.ok_or_else(|| PgStageError::UnknownMutation(mutation_name.to_string()))?;

let mut inner_kwargs: FastMap<String, Value> = FastMap::new();
if let Some(kw) = spec_obj.get("mutation_kwargs").and_then(|v| v.as_object()) {
for (k, v) in kw.iter() {
inner_kwargs.insert(k.clone(), v.clone());
}
}

let cur_value_str = match obj.get(key) {
Some(Value::String(s)) => s.clone(),
Some(v) => v.to_string(),
None => String::new(),
};

let new_value = {
let mut inner_ctx = MutationContext {
kwargs: &inner_kwargs,
current_value: &cur_value_str,
rng: &mut *ctx.rng,
unique_tracker: &mut *ctx.unique_tracker,
locale: ctx.locale,
secrets: ctx.secrets,
obfuscated_values: ctx.obfuscated_values,
};
mutation_fn(&mut inner_ctx)?
};

let json_val = if new_value == "\\N" {
Value::Null
} else {
Value::String(new_value)
};

obj.insert(key.clone(), json_val);
}

serde_json::to_string(&root).map_err(|e| {
PgStageError::MutationError(format!("json_update: failed to serialize: {}", e))
})
}
3 changes: 3 additions & 0 deletions src/mutator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod contact;
pub mod datetime;
pub mod identity;
pub mod json_update;
pub mod locale;
pub mod mask;
pub mod names;
Expand Down Expand Up @@ -88,6 +89,8 @@ pub fn resolve_mutation(name: &str) -> Option<MutationFn> {

"string_by_mask" => mask::string_by_mask,

"json_update" => json_update::json_update,

_ => return None,
})
}
108 changes: 108 additions & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,3 +448,111 @@ fn test_delete_table_pattern() {
assert!(!result.contains("log entry"));
assert!(!result.contains("COPY public.audit_log"));
}

fn run_json_update(rules_json: &str, row_json: &str) -> String {
let input = format!(
"COMMENT ON COLUMN public.users.meta IS 'anon: [{}]';\nCOPY public.users (id, meta) FROM stdin;\n1\t{}\n\\.\n",
rules_json, row_json,
);
let mut output = Vec::new();
let mut handler = PlainHandler::new(make_processor());
handler.process(Cursor::new(b""), &mut output, input.as_bytes()).unwrap();
let result = String::from_utf8(output).unwrap();
let data_line = result
.lines()
.find(|l| l.starts_with("1\t"))
.expect("data row not found in output");
data_line.splitn(2, '\t').nth(1).unwrap().to_string()
}

#[test]
fn test_plain_mutation_json_update_replace_key() {
let meta = run_json_update(
r#"{"mutation_name": "json_update", "mutation_kwargs": {"key2": {"mutation_name": "fixed_value", "mutation_kwargs": {"value": "REPLACED"}}}}"#,
r#"{"key1":"foo","key2":"bar","key3":123}"#,
);
assert!(meta.contains(r#""key1":"foo""#), "got: {}", meta);
assert!(meta.contains(r#""key2":"REPLACED""#), "got: {}", meta);
assert!(meta.contains(r#""key3":123"#), "got: {}", meta);
assert!(!meta.contains(r#""bar""#), "got: {}", meta);
}

#[test]
fn test_plain_mutation_json_update_delete_clears_value_keeps_key() {
// "delete" on an existing key sets its value to "" but keeps the key.
let meta = run_json_update(
r#"{"mutation_name": "json_update", "mutation_kwargs": {"key1": {"mutation_name": "delete"}, "key3": {"mutation_name": "delete"}}}"#,
r#"{"key1":"foo","key2":"bar","key3":123}"#,
);
assert!(meta.contains(r#""key1":"""#), "got: {}", meta);
assert!(meta.contains(r#""key2":"bar""#), "got: {}", meta);
assert!(meta.contains(r#""key3":"""#), "got: {}", meta);
assert!(!meta.contains(r#""foo""#), "got: {}", meta);
assert!(!meta.contains(r#"123"#), "got: {}", meta);
}

#[test]
fn test_plain_mutation_json_update_missing_key_normal_skipped() {
// Normal mutation on a missing key is skipped — the key is NOT added.
let meta = run_json_update(
r#"{"mutation_name": "json_update", "mutation_kwargs": {"new_key": {"mutation_name": "fixed_value", "mutation_kwargs": {"value": "NEW"}}}}"#,
r#"{"key1":"foo"}"#,
);
assert!(meta.contains(r#""key1":"foo""#), "got: {}", meta);
assert!(!meta.contains("new_key"), "got: {}", meta);
assert!(!meta.contains(r#""NEW""#), "got: {}", meta);
}

#[test]
fn test_plain_mutation_json_update_missing_key_delete_is_noop() {
// "delete" on a missing key is a no-op — the key is NOT added.
let meta = run_json_update(
r#"{"mutation_name": "json_update", "mutation_kwargs": {"absent": {"mutation_name": "delete"}}}"#,
r#"{"key1":"foo"}"#,
);
assert!(meta.contains(r#""key1":"foo""#), "got: {}", meta);
assert!(!meta.contains("absent"), "got: {}", meta);
}

#[test]
fn test_plain_mutation_json_update_nested_first_name_preserves_untouched_keys() {
let meta = run_json_update(
r#"{"mutation_name": "json_update", "mutation_kwargs": {"name": {"mutation_name": "first_name"}}}"#,
r#"{"name":"OriginalName","age":30}"#,
);
assert!(!meta.contains("OriginalName"), "got: {}", meta);
// age unchanged (still numeric), name replaced with some non-empty string
assert!(meta.contains(r#""age":30"#), "got: {}", meta);
assert!(meta.contains(r#""name":""#), "got: {}", meta);
assert!(!meta.contains(r#""name":"""#), "name should not be empty: {}", meta);
}

#[test]
fn test_plain_mutation_json_update_mixed_replace_delete_and_missing() {
// "keep" exists → replaced; "clear" exists → cleared; "absent" missing → skipped.
let meta = run_json_update(
r#"{"mutation_name": "json_update", "mutation_kwargs": {
"keep": {"mutation_name": "fixed_value", "mutation_kwargs": {"value": "X"}},
"clear": {"mutation_name": "delete"},
"absent": {"mutation_name": "fixed_value", "mutation_kwargs": {"value": "Y"}}
}}"#,
r#"{"keep":"old","clear":"data","other":42}"#,
);
assert!(meta.contains(r#""keep":"X""#), "got: {}", meta);
assert!(meta.contains(r#""clear":"""#), "got: {}", meta);
assert!(meta.contains(r#""other":42"#), "got: {}", meta);
assert!(!meta.contains("absent"), "got: {}", meta);
assert!(!meta.contains(r#""Y""#), "got: {}", meta);
assert!(!meta.contains(r#""old""#), "got: {}", meta);
assert!(!meta.contains(r#""data""#), "got: {}", meta);
}

#[test]
fn test_plain_mutation_json_update_empty_object_skips_all() {
// Nothing to mutate — missing keys are skipped, object stays empty.
let meta = run_json_update(
r#"{"mutation_name": "json_update", "mutation_kwargs": {"anything": {"mutation_name": "fixed_value", "mutation_kwargs": {"value": "hello"}}}}"#,
r#"{}"#,
);
assert_eq!(meta, "{}", "got: {}", meta);
}
Loading