Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion cycleops/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import requests
import sec
import typer
import websockets
from requests.models import Response
from websockets.legacy.client import WebSocketClientProtocol

from .auth import CycleopsAuthentication
from .exceptions import APIError, AuthenticationError
Expand Down Expand Up @@ -151,7 +153,7 @@ def deploy(self, setup_id: int) -> Optional[Dict[str, Any]]:
description: str = f"Deploying setup: {setup_id}"
type: str = "Deployment"

jobs_client = JobClient(cycleops_client)
jobs_client: JobClient = JobClient(cycleops_client)
return jobs_client.create(description=description, type=type, setup=setup_id)


Expand Down Expand Up @@ -266,3 +268,50 @@ def delete(self, hostgroup_id: int) -> Optional[Dict[str, Any]]:


cycleops_client: Client = Client()


class WebSocketClient:
"""
A client for interacting with Cycleops websockets to request and listen for job logs.
"""

def __init__(self, job_id: str):
self.url: str = "wss://cloud.cycleops.io/ansible-worker-ws/ws/ansible-output"
self.job_id: str = job_id
self._jwt: Optional[str] = None
self._job: Optional[Dict[str, Any]] = None

@property
def jwt(self):
if not self._jwt:
self._jwt = self.authenticate()
return self._jwt

@property
def job(self):
if not self._job:
self._job = self.get_job()
return self._job

def authenticate(self) -> Optional[str]:
token: str = cycleops_client._request("POST", f"identity/token")
return token["access_token"]

def get_job(self) -> Optional[Dict[str, Any]]:
job_client: JobClient = JobClient(cycleops_client)
job: Optional[Dict[str, Any]] = job_client.retrieve(self.job_id)

return job

async def get_job_logs(self, websocket: WebSocketClientProtocol) -> None:
message: str = f"id={self.job_id}|jwt={self.jwt}|account={self.job['account']}"
await websocket.send(message)

async def listen(self, websocket: WebSocketClientProtocol) -> None:
while message := await websocket.recv():
print(f"{message}\n")

async def run(self) -> None:
async with websockets.connect(self.url) as websocket:
await self.get_job_logs(websocket)
await self.listen(websocket)
62 changes: 39 additions & 23 deletions cycleops/setups.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import time
from typing import Any, Dict, List, Optional

import typer
import websockets
from rich import print

from .client import JobClient, SetupClient, cycleops_client
from .client import JobClient, SetupClient, WebSocketClient, cycleops_client
from .exceptions import NotFound
from .utils import display_error_message, display_success_message

Expand Down Expand Up @@ -180,33 +182,37 @@ def deploy(

try:
setup = get_setup(setup_identifier)

job = setup_client.deploy(setup["id"])
report_queued = print if wait else display_success_message
report_queued(f"Setup {setup['id']} has been queued for deployment")

while wait:
match status := job["status"]:
case "Initialized":
print(f"Setup {setup['id']} has been initialized")
case "Deploying":
print(f"Setup {setup['id']} is being deployed")
case "Deployed":
display_success_message(
f"Setup {setup['id']} has been deployed successfully"
)
break
case "Failed":
display_error_message(job)
raise Exception(f"Setup {setup['id']} could not be deployed")
case _:
print(f"Setup {setup['id']} is in status {status}")
time.sleep(3)
job = job_client.retrieve(job["id"])
except Exception as error:
display_error_message(error)
raise typer.Abort()

deployment_scheduled_message = (
f"Setup {setup_identifier} has been queued for deployment"
)

if not wait:
display_success_message(deployment_scheduled_message)
return

print(f"{deployment_scheduled_message}\n")

try:
display_job_logs(job["id"])
except websockets.exceptions.ConnectionClosed:
job = job_client.retrieve(job["id"])

match job["status"]:
case "Deployed":
display_success_message(
f"Setup {setup_identifier} has been deployed successfully"
)
case "Failed":
display_error_message(f"Setup {setup_identifier} could not be deployed")
case _:
print(f"Setup {setup_identifier} is in status {job['status']}")
return


def get_setup(setup_identifier: str) -> Optional[Dict[str, Any]]:
"""
Expand All @@ -221,3 +227,13 @@ def get_setup(setup_identifier: str) -> Optional[Dict[str, Any]]:
setup = setup_client.retrieve(setup_identifier)

return setup


def display_job_logs(job_id: str) -> None:
"""
Displays the deployements logs of the specified job
"""

websocket_client = WebSocketClient(job_id)

asyncio.get_event_loop().run_until_complete(websocket_client.run())
Loading