Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/infuse_iot/rpc_wrappers/security_state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

import base64
import ctypes
import random

Expand Down Expand Up @@ -50,10 +51,12 @@ class security_state(InfuseRpcCommand, defs.security_state):
@classmethod
def add_parser(cls, parser):
parser.add_argument("--pem", type=ValidFile, help="Cloud .pem file for identity validation")
parser.add_argument("--base64", action="store_true", help="Print results as base64")

def __init__(self, args):
self.challenge = random.randbytes(16)
self.pem = args.pem
self.base64 = args.base64

def auth_level(self):
return Auth.NETWORK
Expand Down Expand Up @@ -87,22 +90,29 @@ def handle_response(self, return_code, response):
print(f"Failed to query current time ({errno.strerror(-return_code)})")
return

def print_bytes(ctypes_array) -> str:
b = bytes(ctypes_array)
if self.base64:
return base64.b64encode(b).decode("utf-8")
return b.hex()

print("Challenge:")
print(f"\t Challenge Bytes: {bytes(self.challenge).hex()}")
print(f"\t Challenge Bytes: {print_bytes(self.challenge)}")

# Decrypt identity information
print("Security State:")
print(f"\tDevice Public Key: {bytes(response.device_public_key).hex()}")
print(f"\t Cloud Public Key: {bytes(response.cloud_public_key).hex()}")
print(f"\tDevice Public Key: {print_bytes(response.device_public_key)}")
print(f"\t Cloud Public Key: {print_bytes(response.cloud_public_key)}")
print(f"\t Network: 0x{response.network_id:06x}")
if self.pem is None:
print("\t Identity: Cannot validate")
print(f"\t Raw: {bytes(response.challenge_response).hex()}")
print(f"\t Type: {response.challenge_response_type}")
print(f"\t Raw: {print_bytes(response.challenge_response)}")
else:
challenge_rsp = self._decrypt_response(response)

valid_response = bytes(challenge_rsp.challenge) == self.challenge

print(f"\t Response: {'Valid' if valid_response else 'Invalid'}")
print(f"\t Device ID: {challenge_rsp.device_id:016x}")
print(f"\t Identity Secret: {bytes(challenge_rsp.identity).hex()}")
print(f"\t Identity Secret: {print_bytes(challenge_rsp.identity)}")
4 changes: 2 additions & 2 deletions src/infuse_iot/socket_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from infuse_iot.tdf import TDF


def default_multicast_address():
return ("224.1.1.1", 8751)
def default_multicast_address(port: int = 8751) -> tuple[str, int]:
return ("224.1.1.1", port)


class ClientNotification:
Expand Down
4 changes: 3 additions & 1 deletion src/infuse_iot/tools/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ def add_parser(cls, parser):
)
parser.add_argument("--baud", type=int, default=115200, help="Baudrate for serial port")
parser.add_argument("--root", type=ValidFile, help="Root identity certificate to use instead of cloud")
parser.add_argument("--server-port", type=int, help="Alternate multicast port to use")

def __init__(self, args: argparse.Namespace):
self.port: SerialLike
Expand All @@ -544,7 +545,8 @@ def __init__(self, args: argparse.Namespace):
if args.display_only:
self.server = None
else:
self.server = LocalServer(default_multicast_address())
addr = default_multicast_address(args.server_port) if args.server_port else default_multicast_address()
self.server = LocalServer(addr)
self.rpc_server = LocalRpcServer(self.ddb)
self._common = CommonThreadState(self.server, self.port, self.ddb, self.rpc_server)
self.log = args.log
Expand Down
Loading