Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
540 changes: 540 additions & 0 deletions examples/sdk_examples/device_management/account_lock_device.py

Large diffs are not rendered by default.

540 changes: 540 additions & 0 deletions examples/sdk_examples/device_management/account_unlock_device.py

Large diffs are not rendered by default.

540 changes: 540 additions & 0 deletions examples/sdk_examples/device_management/lock_device.py

Large diffs are not rendered by default.

540 changes: 540 additions & 0 deletions examples/sdk_examples/device_management/unlock_device.py

Large diffs are not rendered by default.

139 changes: 123 additions & 16 deletions keepercli-package/src/keepercli/commands/device_management.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import argparse
from datetime import datetime
from tkinter.constants import S
from typing import List, Optional
from typing import Callable, List, Optional

from keepersdk.authentication import device_management

Expand All @@ -24,6 +23,20 @@ def _sdk_error(exc: Exception) -> base.CommandError:
return base.CommandError(str(exc))


def _run_device_action_command(
context: KeeperParams,
device_identifiers: List[str],
action_fn: Callable,
success_message: str,
) -> None:
base.require_login(context)
try:
for name in action_fn(context.auth, device_identifiers):
logger.info(success_message, name)
except ValueError as e:
raise _sdk_error(e) from e


class DeviceListCommand(base.ArgparseCommand):
def __init__(self):
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -115,13 +128,12 @@ def add_arguments_to_parser(parser: argparse.ArgumentParser):
parser.exit = base.ArgparseCommand.suppress_exit

def execute(self, context: KeeperParams, **kwargs):
base.require_login(context)
device_identifiers = kwargs.get('devices') or []
try:
for name in device_management.remove_user_devices(context.auth, device_identifiers):
logger.info("Device '%s' successfully removed", name)
except ValueError as e:
raise _sdk_error(e) from e
_run_device_action_command(
context,
kwargs.get('devices') or [],
device_management.remove_user_devices,
"Device '%s' successfully removed",
)


class DeviceLogoutCommand(base.ArgparseCommand):
Expand All @@ -140,10 +152,105 @@ def add_arguments_to_parser(parser: argparse.ArgumentParser):
parser.exit = base.ArgparseCommand.suppress_exit

def execute(self, context: KeeperParams, **kwargs):
base.require_login(context)
device_identifiers = kwargs.get('devices') or []
try:
for name in device_management.logout_user_devices(context.auth, device_identifiers):
logger.info("Device '%s' successfully logged out", name)
except ValueError as e:
raise _sdk_error(e) from e
_run_device_action_command(
context,
kwargs.get('devices') or [],
device_management.logout_user_devices,
"Device '%s' successfully logged out",
)


class DeviceLockCommand(base.ArgparseCommand):
def __init__(self):
parser = argparse.ArgumentParser(
prog='device-lock',
description='Lock one or more devices for all users (logs out all users on those devices)',
)
DeviceLockCommand.add_arguments_to_parser(parser)
super().__init__(parser)

@staticmethod
def add_arguments_to_parser(parser: argparse.ArgumentParser):
parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring')
parser.error = base.ArgparseCommand.raise_parse_exception
parser.exit = base.ArgparseCommand.suppress_exit

def execute(self, context: KeeperParams, **kwargs):
_run_device_action_command(
context,
kwargs.get('devices') or [],
device_management.lock_user_devices,
"Device '%s' successfully locked",
)


class DeviceUnlockCommand(base.ArgparseCommand):
def __init__(self):
parser = argparse.ArgumentParser(
prog='device-unlock',
description='Unlock one or more devices for the current user',
)
DeviceUnlockCommand.add_arguments_to_parser(parser)
super().__init__(parser)

@staticmethod
def add_arguments_to_parser(parser: argparse.ArgumentParser):
parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring')
parser.error = base.ArgparseCommand.raise_parse_exception
parser.exit = base.ArgparseCommand.suppress_exit

def execute(self, context: KeeperParams, **kwargs):
_run_device_action_command(
context,
kwargs.get('devices') or [],
device_management.unlock_user_devices,
"Device '%s' successfully unlocked",
)


class DeviceAccountLockCommand(base.ArgparseCommand):
def __init__(self):
parser = argparse.ArgumentParser(
prog='device-account-lock',
description='Lock one or more devices for the current user only (logs out if logged in)',
)
DeviceAccountLockCommand.add_arguments_to_parser(parser)
super().__init__(parser)

@staticmethod
def add_arguments_to_parser(parser: argparse.ArgumentParser):
parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring')
parser.error = base.ArgparseCommand.raise_parse_exception
parser.exit = base.ArgparseCommand.suppress_exit

def execute(self, context: KeeperParams, **kwargs):
_run_device_action_command(
context,
kwargs.get('devices') or [],
device_management.account_lock_user_devices,
"Device '%s' successfully account locked",
)


class DeviceAccountUnlockCommand(base.ArgparseCommand):
def __init__(self):
parser = argparse.ArgumentParser(
prog='device-account-unlock',
description='Unlock one or more devices for the current user',
)
DeviceAccountUnlockCommand.add_arguments_to_parser(parser)
super().__init__(parser)

@staticmethod
def add_arguments_to_parser(parser: argparse.ArgumentParser):
parser.add_argument('devices', nargs='+', help='Device ID (from device-list) or device name substring')
parser.error = base.ArgparseCommand.raise_parse_exception
parser.exit = base.ArgparseCommand.suppress_exit

def execute(self, context: KeeperParams, **kwargs):
_run_device_action_command(
context,
kwargs.get('devices') or [],
device_management.account_unlock_user_devices,
"Device '%s' successfully account unlocked",
)
4 changes: 4 additions & 0 deletions keepercli-package/src/keepercli/register_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def register_commands(commands: base.CliCommands, scopes: Optional[base.CommandS
commands.register_command('device-rename', device_management.DeviceRenameCommand(), base.CommandScope.Account)
commands.register_command('device-remove', device_management.DeviceRemoveCommand(), base.CommandScope.Account)
commands.register_command('device-logout', device_management.DeviceLogoutCommand(), base.CommandScope.Account)
commands.register_command('device-lock', device_management.DeviceLockCommand(), base.CommandScope.Account)
commands.register_command('device-unlock', device_management.DeviceUnlockCommand(), base.CommandScope.Account)
commands.register_command('device-account-lock', device_management.DeviceAccountLockCommand(), base.CommandScope.Account)
commands.register_command('device-account-unlock', device_management.DeviceAccountUnlockCommand(), base.CommandScope.Account)
commands.register_command('whoami', account_commands.WhoamiCommand(), base.CommandScope.Account)
commands.register_command('reset-password', account_commands.ResetPasswordCommand(), base.CommandScope.Account)
commands.register_command('2fa', two_fa.TwoFaCommand(), base.CommandScope.Account)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# |_|\_\___\___| .__/\___|_|
# |_|
#
# Keeper SDK for Python — user device management (list, rename, logout, remove).
# Keeper SDK for Python — user device management (list, rename, logout, remove, lock, unlock).
#

import re
Expand Down Expand Up @@ -124,6 +124,74 @@ def remove_user_devices(
return _execute_device_action(auth, device_identifiers, DeviceManagement_pb2.DA_REMOVE)


def lock_user_devices(
auth: keeper_auth.KeeperAuth,
device_identifiers: List[str],
) -> List[str]:
"""
Lock one or more devices for all users (and linked devices). Logs out all users.

Returns:
Names of devices successfully locked.

Raises:
ValueError: validation, not found, or API failure.
"""
return _execute_device_action(auth, device_identifiers, DeviceManagement_pb2.DA_LOCK)


def unlock_user_devices(
auth: keeper_auth.KeeperAuth,
device_identifiers: List[str],
) -> List[str]:
"""
Unlock one or more devices (and linked devices) for the calling user.

Returns:
Names of devices successfully unlocked.

Raises:
ValueError: validation, not found, or API failure.
"""
return _execute_device_action(auth, device_identifiers, DeviceManagement_pb2.DA_UNLOCK)


def account_lock_user_devices(
auth: keeper_auth.KeeperAuth,
device_identifiers: List[str],
) -> List[str]:
"""
Lock one or more devices for the current user only (logs out if logged in).

Returns:
Names of devices successfully account-locked.

Raises:
ValueError: validation, not found, or API failure.
"""
return _execute_device_action(
auth, device_identifiers, DeviceManagement_pb2.DA_DEVICE_ACCOUNT_LOCK
)


def account_unlock_user_devices(
auth: keeper_auth.KeeperAuth,
device_identifiers: List[str],
) -> List[str]:
"""
Unlock one or more devices for the current user.

Returns:
Names of devices successfully account-unlocked.

Raises:
ValueError: validation, not found, or API failure.
"""
return _execute_device_action(
auth, device_identifiers, DeviceManagement_pb2.DA_DEVICE_ACCOUNT_UNLOCK
)


def _fetch_devices(auth: keeper_auth.KeeperAuth) -> List[DeviceManagement_pb2.Device]:
rs = auth.execute_auth_rest(
rest_endpoint=URL_DEVICE_USER_LIST,
Expand Down
42 changes: 42 additions & 0 deletions keepersdk-package/unit_tests/test_device_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,48 @@ def test_remove_user_devices(self):
DeviceManagement_pb2.DA_REMOVE,
)

def test_lock_user_devices(self):
auth = MagicMock()
list_rs = DeviceManagement_pb2.DeviceUserResponse()
g = list_rs.deviceGroups.add()
g.devices.append(_device('Workstation', 75))

action_rs = DeviceManagement_pb2.DeviceActionResponse()
ar = action_rs.deviceActionResult.add()
ar.deviceActionStatus = DeviceManagement_pb2.SUCCESS
ar.encryptedDeviceToken.append(b'\x01\x02')

auth.execute_auth_rest.side_effect = [list_rs, action_rs]

names = device_management.lock_user_devices(auth, ['Workstation'])
self.assertEqual(names, ['Workstation'])
request = auth.execute_auth_rest.call_args_list[1].kwargs.get('request')
self.assertEqual(
request.deviceAction[0].deviceActionType,
DeviceManagement_pb2.DA_LOCK,
)

def test_account_unlock_user_devices(self):
auth = MagicMock()
list_rs = DeviceManagement_pb2.DeviceUserResponse()
g = list_rs.deviceGroups.add()
g.devices.append(_device('Tablet', 10))

action_rs = DeviceManagement_pb2.DeviceActionResponse()
ar = action_rs.deviceActionResult.add()
ar.deviceActionStatus = DeviceManagement_pb2.SUCCESS
ar.encryptedDeviceToken.append(b'\x01\x02')

auth.execute_auth_rest.side_effect = [list_rs, action_rs]

names = device_management.account_unlock_user_devices(auth, ['1'])
self.assertEqual(names, ['Tablet'])
request = auth.execute_auth_rest.call_args_list[1].kwargs.get('request')
self.assertEqual(
request.deviceAction[0].deviceActionType,
DeviceManagement_pb2.DA_DEVICE_ACCOUNT_UNLOCK,
)


if __name__ == '__main__':
unittest.main()