From 4a36ec3c48e1dfc5e9b0889052c48796c046659f Mon Sep 17 00:00:00 2001 From: Manan Tyagi Date: Fri, 12 Jun 2026 13:36:28 +0530 Subject: [PATCH] Add device lock/unlock commands and examples --- .../device_management/account_lock_device.py | 540 ++++++++++++++++++ .../account_unlock_device.py | 540 ++++++++++++++++++ .../device_management/lock_device.py | 540 ++++++++++++++++++ .../device_management/unlock_device.py | 540 ++++++++++++++++++ .../keepercli/commands/device_management.py | 139 ++++- .../src/keepercli/register_commands.py | 4 + .../authentication/device_management.py | 70 ++- .../unit_tests/test_device_management.py | 42 ++ 8 files changed, 2398 insertions(+), 17 deletions(-) create mode 100644 examples/sdk_examples/device_management/account_lock_device.py create mode 100644 examples/sdk_examples/device_management/account_unlock_device.py create mode 100644 examples/sdk_examples/device_management/lock_device.py create mode 100644 examples/sdk_examples/device_management/unlock_device.py diff --git a/examples/sdk_examples/device_management/account_lock_device.py b/examples/sdk_examples/device_management/account_lock_device.py new file mode 100644 index 0000000..f4d113b --- /dev/null +++ b/examples/sdk_examples/device_management/account_lock_device.py @@ -0,0 +1,540 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + +from keepersdk.authentication import device_management + + +def print_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nUser Devices ({len(devices)} found)') + print('=' * 100) + print(f"{'ID':<4} {'Name':<24} {'Client Type':<14} {'Login Status':<14} {'Last Accessed':<20}") + print('-' * 100) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.name[:23]:<24} " + f"{d.client_type[:13]:<14} {d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 100) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here. + device_identifiers = [''] + + try: + print(f'Account-locking {len(device_identifiers)} device(s)...') + for name in device_management.account_lock_user_devices( + keeper_auth_context, device_identifiers + ): + print(f"Device '{name}' successfully account locked") + print('\nUpdated device list:') + print_devices_table(device_management.list_user_devices(keeper_auth_context)) + except Exception as e: + print(f'Error account-locking devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/examples/sdk_examples/device_management/account_unlock_device.py b/examples/sdk_examples/device_management/account_unlock_device.py new file mode 100644 index 0000000..4c16f87 --- /dev/null +++ b/examples/sdk_examples/device_management/account_unlock_device.py @@ -0,0 +1,540 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + +from keepersdk.authentication import device_management + + +def print_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nUser Devices ({len(devices)} found)') + print('=' * 100) + print(f"{'ID':<4} {'Name':<24} {'Client Type':<14} {'Login Status':<14} {'Last Accessed':<20}") + print('-' * 100) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.name[:23]:<24} " + f"{d.client_type[:13]:<14} {d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 100) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here. + device_identifiers = [''] + + try: + print(f'Account-unlocking {len(device_identifiers)} device(s)...') + for name in device_management.account_unlock_user_devices( + keeper_auth_context, device_identifiers + ): + print(f"Device '{name}' successfully account unlocked") + print('\nUpdated device list:') + print_devices_table(device_management.list_user_devices(keeper_auth_context)) + except Exception as e: + print(f'Error account-unlocking devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/examples/sdk_examples/device_management/lock_device.py b/examples/sdk_examples/device_management/lock_device.py new file mode 100644 index 0000000..16f2267 --- /dev/null +++ b/examples/sdk_examples/device_management/lock_device.py @@ -0,0 +1,540 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + +from keepersdk.authentication import device_management + + +def print_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nUser Devices ({len(devices)} found)') + print('=' * 100) + print(f"{'ID':<4} {'Name':<24} {'Client Type':<14} {'Login Status':<14} {'Last Accessed':<20}") + print('-' * 100) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.name[:23]:<24} " + f"{d.client_type[:13]:<14} {d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 100) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here. + device_identifiers = [''] + + try: + print(f'Locking {len(device_identifiers)} device(s)...') + for name in device_management.lock_user_devices( + keeper_auth_context, device_identifiers + ): + print(f"Device '{name}' successfully locked") + print('\nUpdated device list:') + print_devices_table(device_management.list_user_devices(keeper_auth_context)) + except Exception as e: + print(f'Error locking devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/examples/sdk_examples/device_management/unlock_device.py b/examples/sdk_examples/device_management/unlock_device.py new file mode 100644 index 0000000..6f9e2cf --- /dev/null +++ b/examples/sdk_examples/device_management/unlock_device.py @@ -0,0 +1,540 @@ +import getpass +import sqlite3 +import json +import logging +from typing import Dict, Optional + +import fido2 +import webbrowser + +from keepersdk import errors, utils +from keepersdk.authentication import ( + configuration, + endpoint, + keeper_auth, + login_auth, +) +from keepersdk.authentication.yubikey import ( + IKeeperUserInteraction, + yubikey_authenticate, +) +from keepersdk.constants import KEEPER_PUBLIC_HOSTS +from keepersdk.vault import sqlite_storage, vault_online, ksm_management + +try: + import pyperclip +except ImportError: + pyperclip = None + +logger = utils.get_logger() +logger.setLevel(logging.INFO) +if not logger.handlers: + _handler = logging.StreamHandler() + _handler.setLevel(logging.INFO) + _handler.setFormatter( + logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s") + ) + logger.addHandler(_handler) + + +class FidoCliInteraction(fido2.client.UserInteraction, IKeeperUserInteraction): + def output_text(self, text: str) -> None: + print(text) + + def prompt_up(self) -> None: + print( + "\nTouch the flashing Security key to authenticate or " + "press Ctrl-C to resume with the primary two factor authentication..." + ) + + def request_pin(self, permissions, rd_id): + return getpass.getpass("Enter Security Key PIN: ") + + def request_uv(self, permissions, rd_id): + print("User Verification required.") + return True + + +# Two-factor duration codes (used by LoginFlow) +_TWO_FACTOR_DURATION_CODES: Dict[login_auth.TwoFactorDuration, str] = { + login_auth.TwoFactorDuration.EveryLogin: "login", + login_auth.TwoFactorDuration.Every12Hours: "12_hours", + login_auth.TwoFactorDuration.EveryDay: "24_hours", + login_auth.TwoFactorDuration.Every30Days: "30_days", + login_auth.TwoFactorDuration.Forever: "forever", +} + + +class LoginFlow: + """ + Handles the full login process: server selection, username, password, + device approval, 2FA, SSO data key, and SSO token. + """ + + def __init__(self) -> None: + self._config = configuration.JsonConfigurationStorage() + self._logged_in_with_persistent = True + self._endpoint: Optional[endpoint.KeeperEndpoint] = None + + @property + def endpoint(self) -> Optional[endpoint.KeeperEndpoint]: + return self._endpoint + + @property + def logged_in_with_persistent(self) -> bool: + """True if login succeeded by resuming an existing persistent session (no step loop).""" + return self._logged_in_with_persistent + + def run(self) -> Optional[keeper_auth.KeeperAuth]: + """ + Run the login flow. + + Returns: + Authenticated Keeper context, or None if login fails. + """ + server = self._ensure_server() + keeper_endpoint = endpoint.KeeperEndpoint(self._config, server) + self._endpoint = keeper_endpoint + login_auth_context = login_auth.LoginAuth(keeper_endpoint) + + username = self._config.get().last_login or input("Enter username: ") + login_auth_context.resume_session = True + login_auth_context.login(username) + + while not login_auth_context.login_step.is_final(): + step = login_auth_context.login_step + if isinstance(step, login_auth.LoginStepDeviceApproval): + self._handle_device_approval(step) + elif isinstance(step, login_auth.LoginStepTwoFactor): + self._handle_two_factor(step) + elif isinstance(step, login_auth.LoginStepPassword): + self._handle_password(step) + elif isinstance(step, login_auth.LoginStepSsoToken): + self._handle_sso_token(step) + elif isinstance(step, login_auth.LoginStepSsoDataKey): + self._handle_sso_data_key(step) + elif isinstance(step, login_auth.LoginStepError): + print(f"Login error: ({step.code}) {step.message}") + return None + else: + raise NotImplementedError( + f"Unsupported login step type: {type(step).__name__}" + ) + self._logged_in_with_persistent = False + + if self._logged_in_with_persistent: + print("Successfully logged in with persistent login") + + if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected): + return login_auth_context.login_step.take_keeper_auth() + + return None + + def _ensure_server(self) -> str: + if not self._config.get().last_server: + print("Available server options:") + for region, host in KEEPER_PUBLIC_HOSTS.items(): + print(f" {region}: {host}") + server = ( + input("Enter server (default: keepersecurity.com): ").strip() + or "keepersecurity.com" + ) + self._config.get().last_server = server + else: + server = self._config.get().last_server + return server + + def _handle_device_approval( + self, step: login_auth.LoginStepDeviceApproval + ) -> None: + """Device approval: same options as keepercli verify_device (email, keeper push, 2FA, resume).""" + menu = [ + ("email_send", "to send email"), + ("email_code=", "to validate verification code sent via email"), + ("keeper_push", "to send Keeper Push notification"), + ("2fa_send", "to send 2FA code"), + ("2fa_code=", "to validate a code provided by 2FA application"), + ("", "to resume"), + ] + lines = ["Approve by selecting a method below"] + lines.extend(f" {cmd} {desc}" for cmd, desc in menu) + print("\n".join(lines)) + + selection = input("Type your selection or to resume: ").strip() + if selection is None: + return + if selection in ("email_send", "es"): + step.send_push(channel=login_auth.DeviceApprovalChannel.Email) + print("An email with instructions has been sent. Press when approved.") + elif selection.startswith("email_code="): + code = selection[len("email_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.Email, code=code) + print("Successfully verified email code.") + elif selection in ("keeper_push", "kp"): + step.send_push(channel=login_auth.DeviceApprovalChannel.KeeperPush) + print( + "Successfully made a push notification to the approved device. " + "Press when approved." + ) + elif selection in ("2fa_send", "2fs"): + step.send_push(channel=login_auth.DeviceApprovalChannel.TwoFactor) + print("2FA code was sent.") + elif selection.startswith("2fa_code="): + code = selection[len("2fa_code=") :] + step.send_code(channel=login_auth.DeviceApprovalChannel.TwoFactor, code=code) + print("Successfully verified 2FA code.") + else: + step.resume() + + def _handle_password(self, step: login_auth.LoginStepPassword) -> None: + """Password step: prompt for password and retry on auth_failed (aligned with keepercli handle_verify_password).""" + print(f"\nEnter password for {step.username}") + while True: + password = getpass.getpass("Password: ") + if not password: + raise KeyboardInterrupt() + try: + step.verify_password(password) + break + except errors.KeeperApiError as kae: + print( + "Invalid email or password combination, please re-enter." + if kae.result_code == "auth_failed" + else kae.message + ) + + def _handle_two_factor(self, step: login_auth.LoginStepTwoFactor) -> None: + channels = [ + x + for x in step.get_channels() + if x.channel_type != login_auth.TwoFactorChannel.Other + ] + menu = [] + for i, channel in enumerate(channels): + desc = self._two_factor_channel_desc(channel.channel_type) + menu.append( + ( + str(i + 1), + f"{desc} {channel.channel_name} {channel.phone}", + ) + ) + menu.append(("q", "Quit authentication attempt and return to Commander prompt.")) + + lines = ["", "This account requires 2FA Authentication"] + lines.extend(f" {a}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + selection = input("Selection: ") + if selection is None: + return + if selection in ("q", "Q"): + raise KeyboardInterrupt() + try: + assert selection.isnumeric() + idx = 1 if not selection else int(selection) + assert 1 <= idx <= len(channels) + channel = channels[idx - 1] + desc = self._two_factor_channel_desc(channel.channel_type) + print(f"Selected {idx}. {desc}") + except AssertionError: + print( + "Invalid entry, additional factors of authentication shown " + "may be configured if not currently enabled." + ) + continue + + if channel.channel_type in ( + login_auth.TwoFactorChannel.TextMessage, + login_auth.TwoFactorChannel.KeeperDNA, + login_auth.TwoFactorChannel.DuoSecurity, + ): + action = next( + ( + x + for x in step.get_channel_push_actions(channel.channel_uid) + if x + in ( + login_auth.TwoFactorPushAction.TextMessage, + login_auth.TwoFactorPushAction.KeeperDna, + ) + ), + None, + ) + if action: + step.send_push(channel.channel_uid, action) + + if channel.channel_type == login_auth.TwoFactorChannel.SecurityKey: + try: + challenge = json.loads(channel.challenge) + signature = yubikey_authenticate(challenge, FidoCliInteraction()) + if signature: + print("Verified Security Key.") + step.send_code(channel.channel_uid, signature) + return + except Exception as e: + logger.error(e) + continue + + # 2FA code path + step.duration = min(step.duration, channel.max_expiration) + available_dura = sorted( + x for x in _TWO_FACTOR_DURATION_CODES if x <= channel.max_expiration + ) + available_codes = [ + _TWO_FACTOR_DURATION_CODES.get(x) or "login" for x in available_dura + ] + + while True: + mfa_desc = self._two_factor_duration_desc(step.duration) + prompt_exp = ( + f"\n2FA Code Duration: {mfa_desc}.\n" + f"To change duration: 2fa_duration={'|'.join(available_codes)}" + ) + print(prompt_exp) + + selection = input("\nEnter 2FA Code or Duration: ") + if not selection: + return + if selection in available_codes: + step.duration = self._two_factor_code_to_duration(selection) + elif selection.startswith("2fa_duration="): + code = selection[len("2fa_duration=") :] + if code in available_codes: + step.duration = self._two_factor_code_to_duration(code) + else: + print(f"Invalid 2FA duration: {code}") + else: + try: + step.send_code(channel.channel_uid, selection) + print("Successfully verified 2FA Code.") + return + except errors.KeeperApiError as kae: + print(f"Invalid 2FA code: ({kae.result_code}) {kae.message}") + + def _handle_sso_data_key( + self, step: login_auth.LoginStepSsoDataKey + ) -> None: + menu = [ + ("1", "Keeper Push. Send a push notification to your device."), + ("2", "Admin Approval. Request your admin to approve this device."), + ("r", "Resume SSO authentication after device is approved."), + ("q", "Quit SSO authentication attempt and return to Commander prompt."), + ] + lines = ["Approve this device by selecting a method below:"] + lines.extend(f" {cmd:>3}. {text}" for cmd, text in menu) + print("\n".join(lines)) + + while True: + answer = input("Selection: ") + if answer is None: + return + if answer == "q": + raise KeyboardInterrupt() + if answer == "r": + step.resume() + break + if answer in ("1", "2"): + step.request_data_key( + login_auth.DataKeyShareChannel.KeeperPush + if answer == "1" + else login_auth.DataKeyShareChannel.AdminApproval + ) + else: + print(f'Action "{answer}" is not supported.') + + def _handle_sso_token(self, step: login_auth.LoginStepSsoToken) -> None: + menu = [ + ("a", "SSO User with a Master Password."), + ] + if pyperclip: + menu.append(("c", "Copy SSO Login URL to clipboard.")) + else: + menu.append(("u", "Show SSO Login URL.")) + try: + wb = webbrowser.get() + menu.append(("o", "Navigate to SSO Login URL with the default web browser.")) + except Exception: + wb = None + if pyperclip: + menu.append(("p", "Paste SSO Token from clipboard.")) + menu.append(("t", "Enter SSO Token manually.")) + menu.append(("q", "Quit SSO authentication attempt and return to Commander prompt.")) + + lines = [ + "", + "SSO Login URL:", + step.sso_login_url, + "Navigate to SSO Login URL with your browser and complete authentication.", + "Copy a returned SSO Token into clipboard." + + (" Paste that token into Commander." if pyperclip else " Then use option 't' to enter the token manually."), + 'NOTE: To copy SSO Token please click "Copy authentication token" ' + 'button on "SSO Connect" page.', + "", + ] + lines.extend(f" {a:>3}. {t}" for a, t in menu) + print("\n".join(lines)) + + while True: + token = input("Selection: ") + if token == "q": + raise KeyboardInterrupt() + if token == "a": + step.login_with_password() + return + if token == "c": + token = None + if pyperclip: + try: + pyperclip.copy(step.sso_login_url) + print("SSO Login URL is copied to clipboard.") + except Exception: + print("Failed to copy SSO Login URL to clipboard.") + else: + print("Clipboard not available (install pyperclip).") + elif token == "u": + token = None + if not pyperclip: + print("\nSSO Login URL:", step.sso_login_url, "\n") + else: + print("Unsupported menu option (use 'c' to copy URL).") + elif token == "o": + token = None + if wb: + try: + wb.open_new_tab(step.sso_login_url) + except Exception: + print("Failed to open web browser.") + elif token == "p": + if pyperclip: + try: + token = pyperclip.paste() + except Exception: + token = "" + print("Failed to paste from clipboard") + else: + token = None + print("Clipboard not available (use 't' to enter token manually).") + elif token == "t": + token = getpass.getpass("Enter SSO Token: ").strip() + else: + if len(token) < 10: + print(f"Unsupported menu option: {token}") + continue + + if token: + try: + step.set_sso_token(token) + break + except errors.KeeperApiError as kae: + print(f"SSO Login error: ({kae.result_code}) {kae.message}") + + @staticmethod + def _two_factor_channel_desc( + channel_type: login_auth.TwoFactorChannel, + ) -> str: + return { + login_auth.TwoFactorChannel.Authenticator: "TOTP (Google and Microsoft Authenticator)", + login_auth.TwoFactorChannel.TextMessage: "Send SMS Code", + login_auth.TwoFactorChannel.DuoSecurity: "DUO", + login_auth.TwoFactorChannel.RSASecurID: "RSA SecurID", + login_auth.TwoFactorChannel.SecurityKey: "WebAuthN (FIDO2 Security Key)", + login_auth.TwoFactorChannel.KeeperDNA: "Keeper DNA (Watch)", + login_auth.TwoFactorChannel.Backup: "Backup Code", + }.get(channel_type, "Not Supported") + + @staticmethod + def _two_factor_duration_desc( + duration: login_auth.TwoFactorDuration, + ) -> str: + return { + login_auth.TwoFactorDuration.EveryLogin: "Require Every Login", + login_auth.TwoFactorDuration.Forever: "Save on this Device Forever", + login_auth.TwoFactorDuration.Every12Hours: "Ask Every 12 hours", + login_auth.TwoFactorDuration.EveryDay: "Ask Every 24 hours", + login_auth.TwoFactorDuration.Every30Days: "Ask Every 30 days", + }.get(duration, "Require Every Login") + + @staticmethod + def _two_factor_code_to_duration( + text: str, + ) -> login_auth.TwoFactorDuration: + for dura, code in _TWO_FACTOR_DURATION_CODES.items(): + if code == text: + return dura + return login_auth.TwoFactorDuration.EveryLogin + + +def enable_persistent_login(keeper_auth_context: keeper_auth.KeeperAuth) -> None: + """ + Enable persistent login and register data key for device. + Sets persistent_login to on and logout_timer to 30 days. + """ + keeper_auth.set_user_setting(keeper_auth_context, 'persistent_login', '1') + keeper_auth.register_data_key_for_device(keeper_auth_context) + mins_per_day = 60 * 24 + timeout_in_minutes = mins_per_day * 30 # 30 days + keeper_auth.set_user_setting(keeper_auth_context, 'logout_timer', str(timeout_in_minutes)) + print("Persistent login turned on successfully and device registered") + + +def login(): + """ + Handle the login process including server selection, authentication, + and multi-factor authentication steps (device approval, password, 2FA + with channel selection and Security Key, SSO data key, SSO token). + + Returns: + tuple: (keeper_auth_context, keeper_endpoint) on success, or (None, None) if login fails. + """ + flow = LoginFlow() + keeper_auth_context = flow.run() + if keeper_auth_context and not flow.logged_in_with_persistent: + enable_persistent_login(keeper_auth_context) + keeper_endpoint = flow.endpoint if keeper_auth_context else None + return keeper_auth_context, keeper_endpoint + +from keepersdk.authentication import device_management + + +def print_devices_table(devices): + if not devices: + print('\nNo devices found.') + return + print(f'\nUser Devices ({len(devices)} found)') + print('=' * 100) + print(f"{'ID':<4} {'Name':<24} {'Client Type':<14} {'Login Status':<14} {'Last Accessed':<20}") + print('-' * 100) + for d in devices: + last = d.last_accessed.strftime('%Y-%m-%d %H:%M:%S') if d.last_accessed else 'N/A' + print( + f"{d.list_index:<4} {d.name[:23]:<24} " + f"{d.client_type[:13]:<14} {d.login_status[:13]:<14} {last:<20}" + ) + print('-' * 100) + + +def main(): + keeper_auth_context, _ = login() + if not keeper_auth_context: + return + + # Fill in your values here. + device_identifiers = [''] + + try: + print(f'Unlocking {len(device_identifiers)} device(s)...') + for name in device_management.unlock_user_devices( + keeper_auth_context, device_identifiers + ): + print(f"Device '{name}' successfully unlocked") + print('\nUpdated device list:') + print_devices_table(device_management.list_user_devices(keeper_auth_context)) + except Exception as e: + print(f'Error unlocking devices: {e}') + finally: + keeper_auth_context.close() + + +if __name__ == '__main__': + main() diff --git a/keepercli-package/src/keepercli/commands/device_management.py b/keepercli-package/src/keepercli/commands/device_management.py index b19e5d3..deb6d7f 100644 --- a/keepercli-package/src/keepercli/commands/device_management.py +++ b/keepercli-package/src/keepercli/commands/device_management.py @@ -1,7 +1,6 @@ import argparse from datetime import datetime -from tkinter.constants import S -from typing import List, Optional +from typing import Callable, List, Optional from keepersdk.authentication import device_management @@ -24,6 +23,20 @@ def _sdk_error(exc: Exception) -> base.CommandError: return base.CommandError(str(exc)) +def _run_device_action_command( + context: KeeperParams, + device_identifiers: List[str], + action_fn: Callable, + success_message: str, +) -> None: + base.require_login(context) + try: + for name in action_fn(context.auth, device_identifiers): + logger.info(success_message, name) + except ValueError as e: + raise _sdk_error(e) from e + + class DeviceListCommand(base.ArgparseCommand): def __init__(self): parser = argparse.ArgumentParser( @@ -115,13 +128,12 @@ def add_arguments_to_parser(parser: argparse.ArgumentParser): parser.exit = base.ArgparseCommand.suppress_exit def execute(self, context: KeeperParams, **kwargs): - base.require_login(context) - device_identifiers = kwargs.get('devices') or [] - try: - for name in device_management.remove_user_devices(context.auth, device_identifiers): - logger.info("Device '%s' successfully removed", name) - except ValueError as e: - raise _sdk_error(e) from e + _run_device_action_command( + context, + kwargs.get('devices') or [], + device_management.remove_user_devices, + "Device '%s' successfully removed", + ) class DeviceLogoutCommand(base.ArgparseCommand): @@ -140,10 +152,105 @@ def add_arguments_to_parser(parser: argparse.ArgumentParser): parser.exit = base.ArgparseCommand.suppress_exit def execute(self, context: KeeperParams, **kwargs): - base.require_login(context) - device_identifiers = kwargs.get('devices') or [] - try: - for name in device_management.logout_user_devices(context.auth, device_identifiers): - logger.info("Device '%s' successfully logged out", name) - except ValueError as e: - raise _sdk_error(e) from e + _run_device_action_command( + context, + kwargs.get('devices') or [], + device_management.logout_user_devices, + "Device '%s' successfully logged out", + ) + + +class DeviceLockCommand(base.ArgparseCommand): + def __init__(self): + parser = argparse.ArgumentParser( + prog='device-lock', + description='Lock one or more devices for all users (logs out all users on those devices)', + ) + DeviceLockCommand.add_arguments_to_parser(parser) + super().__init__(parser) + + @staticmethod + def add_arguments_to_parser(parser: argparse.ArgumentParser): + parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring') + parser.error = base.ArgparseCommand.raise_parse_exception + parser.exit = base.ArgparseCommand.suppress_exit + + def execute(self, context: KeeperParams, **kwargs): + _run_device_action_command( + context, + kwargs.get('devices') or [], + device_management.lock_user_devices, + "Device '%s' successfully locked", + ) + + +class DeviceUnlockCommand(base.ArgparseCommand): + def __init__(self): + parser = argparse.ArgumentParser( + prog='device-unlock', + description='Unlock one or more devices for the current user', + ) + DeviceUnlockCommand.add_arguments_to_parser(parser) + super().__init__(parser) + + @staticmethod + def add_arguments_to_parser(parser: argparse.ArgumentParser): + parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring') + parser.error = base.ArgparseCommand.raise_parse_exception + parser.exit = base.ArgparseCommand.suppress_exit + + def execute(self, context: KeeperParams, **kwargs): + _run_device_action_command( + context, + kwargs.get('devices') or [], + device_management.unlock_user_devices, + "Device '%s' successfully unlocked", + ) + + +class DeviceAccountLockCommand(base.ArgparseCommand): + def __init__(self): + parser = argparse.ArgumentParser( + prog='device-account-lock', + description='Lock one or more devices for the current user only (logs out if logged in)', + ) + DeviceAccountLockCommand.add_arguments_to_parser(parser) + super().__init__(parser) + + @staticmethod + def add_arguments_to_parser(parser: argparse.ArgumentParser): + parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring') + parser.error = base.ArgparseCommand.raise_parse_exception + parser.exit = base.ArgparseCommand.suppress_exit + + def execute(self, context: KeeperParams, **kwargs): + _run_device_action_command( + context, + kwargs.get('devices') or [], + device_management.account_lock_user_devices, + "Device '%s' successfully account locked", + ) + + +class DeviceAccountUnlockCommand(base.ArgparseCommand): + def __init__(self): + parser = argparse.ArgumentParser( + prog='device-account-unlock', + description='Unlock one or more devices for the current user', + ) + DeviceAccountUnlockCommand.add_arguments_to_parser(parser) + super().__init__(parser) + + @staticmethod + def add_arguments_to_parser(parser: argparse.ArgumentParser): + parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring') + parser.error = base.ArgparseCommand.raise_parse_exception + parser.exit = base.ArgparseCommand.suppress_exit + + def execute(self, context: KeeperParams, **kwargs): + _run_device_action_command( + context, + kwargs.get('devices') or [], + device_management.account_unlock_user_devices, + "Device '%s' successfully account unlocked", + ) diff --git a/keepercli-package/src/keepercli/register_commands.py b/keepercli-package/src/keepercli/register_commands.py index 20c4323..b217925 100644 --- a/keepercli-package/src/keepercli/register_commands.py +++ b/keepercli-package/src/keepercli/register_commands.py @@ -27,6 +27,10 @@ def register_commands(commands: base.CliCommands, scopes: Optional[base.CommandS commands.register_command('device-rename', device_management.DeviceRenameCommand(), base.CommandScope.Account) commands.register_command('device-remove', device_management.DeviceRemoveCommand(), base.CommandScope.Account) commands.register_command('device-logout', device_management.DeviceLogoutCommand(), base.CommandScope.Account) + commands.register_command('device-lock', device_management.DeviceLockCommand(), base.CommandScope.Account) + commands.register_command('device-unlock', device_management.DeviceUnlockCommand(), base.CommandScope.Account) + commands.register_command('device-account-lock', device_management.DeviceAccountLockCommand(), base.CommandScope.Account) + commands.register_command('device-account-unlock', device_management.DeviceAccountUnlockCommand(), base.CommandScope.Account) commands.register_command('whoami', account_commands.WhoamiCommand(), base.CommandScope.Account) commands.register_command('reset-password', account_commands.ResetPasswordCommand(), base.CommandScope.Account) commands.register_command('2fa', two_fa.TwoFaCommand(), base.CommandScope.Account) diff --git a/keepersdk-package/src/keepersdk/authentication/device_management.py b/keepersdk-package/src/keepersdk/authentication/device_management.py index a829597..be14910 100644 --- a/keepersdk-package/src/keepersdk/authentication/device_management.py +++ b/keepersdk-package/src/keepersdk/authentication/device_management.py @@ -4,7 +4,7 @@ # |_|\_\___\___| .__/\___|_| # |_| # -# Keeper SDK for Python — user device management (list, rename, logout, remove). +# Keeper SDK for Python — user device management (list, rename, logout, remove, lock, unlock). # import re @@ -124,6 +124,74 @@ def remove_user_devices( return _execute_device_action(auth, device_identifiers, DeviceManagement_pb2.DA_REMOVE) +def lock_user_devices( + auth: keeper_auth.KeeperAuth, + device_identifiers: List[str], +) -> List[str]: + """ + Lock one or more devices for all users (and linked devices). Logs out all users. + + Returns: + Names of devices successfully locked. + + Raises: + ValueError: validation, not found, or API failure. + """ + return _execute_device_action(auth, device_identifiers, DeviceManagement_pb2.DA_LOCK) + + +def unlock_user_devices( + auth: keeper_auth.KeeperAuth, + device_identifiers: List[str], +) -> List[str]: + """ + Unlock one or more devices (and linked devices) for the calling user. + + Returns: + Names of devices successfully unlocked. + + Raises: + ValueError: validation, not found, or API failure. + """ + return _execute_device_action(auth, device_identifiers, DeviceManagement_pb2.DA_UNLOCK) + + +def account_lock_user_devices( + auth: keeper_auth.KeeperAuth, + device_identifiers: List[str], +) -> List[str]: + """ + Lock one or more devices for the current user only (logs out if logged in). + + Returns: + Names of devices successfully account-locked. + + Raises: + ValueError: validation, not found, or API failure. + """ + return _execute_device_action( + auth, device_identifiers, DeviceManagement_pb2.DA_DEVICE_ACCOUNT_LOCK + ) + + +def account_unlock_user_devices( + auth: keeper_auth.KeeperAuth, + device_identifiers: List[str], +) -> List[str]: + """ + Unlock one or more devices for the current user. + + Returns: + Names of devices successfully account-unlocked. + + Raises: + ValueError: validation, not found, or API failure. + """ + return _execute_device_action( + auth, device_identifiers, DeviceManagement_pb2.DA_DEVICE_ACCOUNT_UNLOCK + ) + + def _fetch_devices(auth: keeper_auth.KeeperAuth) -> List[DeviceManagement_pb2.Device]: rs = auth.execute_auth_rest( rest_endpoint=URL_DEVICE_USER_LIST, diff --git a/keepersdk-package/unit_tests/test_device_management.py b/keepersdk-package/unit_tests/test_device_management.py index 3e87724..a204b98 100644 --- a/keepersdk-package/unit_tests/test_device_management.py +++ b/keepersdk-package/unit_tests/test_device_management.py @@ -75,6 +75,48 @@ def test_remove_user_devices(self): DeviceManagement_pb2.DA_REMOVE, ) + def test_lock_user_devices(self): + auth = MagicMock() + list_rs = DeviceManagement_pb2.DeviceUserResponse() + g = list_rs.deviceGroups.add() + g.devices.append(_device('Workstation', 75)) + + action_rs = DeviceManagement_pb2.DeviceActionResponse() + ar = action_rs.deviceActionResult.add() + ar.deviceActionStatus = DeviceManagement_pb2.SUCCESS + ar.encryptedDeviceToken.append(b'\x01\x02') + + auth.execute_auth_rest.side_effect = [list_rs, action_rs] + + names = device_management.lock_user_devices(auth, ['Workstation']) + self.assertEqual(names, ['Workstation']) + request = auth.execute_auth_rest.call_args_list[1].kwargs.get('request') + self.assertEqual( + request.deviceAction[0].deviceActionType, + DeviceManagement_pb2.DA_LOCK, + ) + + def test_account_unlock_user_devices(self): + auth = MagicMock() + list_rs = DeviceManagement_pb2.DeviceUserResponse() + g = list_rs.deviceGroups.add() + g.devices.append(_device('Tablet', 10)) + + action_rs = DeviceManagement_pb2.DeviceActionResponse() + ar = action_rs.deviceActionResult.add() + ar.deviceActionStatus = DeviceManagement_pb2.SUCCESS + ar.encryptedDeviceToken.append(b'\x01\x02') + + auth.execute_auth_rest.side_effect = [list_rs, action_rs] + + names = device_management.account_unlock_user_devices(auth, ['1']) + self.assertEqual(names, ['Tablet']) + request = auth.execute_auth_rest.call_args_list[1].kwargs.get('request') + self.assertEqual( + request.deviceAction[0].deviceActionType, + DeviceManagement_pb2.DA_DEVICE_ACCOUNT_UNLOCK, + ) + if __name__ == '__main__': unittest.main()