diff --git a/Dockerfile b/Dockerfile index 252b32794..dfba2f2b5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -43,12 +43,14 @@ RUN echo "mongodb-org hold" | dpkg --set-selections \ RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/miniconda.sh \ && bash ~/miniconda.sh -b -p /miniconda-latest +# Setup Cron +COPY ./bin/ee2_cronjobs /etc/cron.d/ee2_cronjobs + # Need to change startup scripts to match this in MAKEFILE ENV PATH=/miniconda-latest/bin:$PATH RUN pip install --upgrade pip && python -V - - COPY ./requirements.txt /kb/module/requirements.txt + RUN pip install -r /kb/module/requirements.txt RUN adduser --disabled-password --gecos '' -shell /bin/bash kbase # ----------------------------------------- @@ -63,7 +65,10 @@ WORKDIR /kb/module/scripts RUN chmod +x download_runner.sh && ./download_runner.sh WORKDIR /kb/module/ + +# Set deploy.cfg location ENV KB_DEPLOYMENT_CONFIG=/kb/module/deploy.cfg +ENV PATH=/kb/module:$PATH ENTRYPOINT [ "./scripts/entrypoint.sh" ] CMD [ ] diff --git a/README.md b/README.md index f7762d853..bd3f0d54c 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,24 @@ Note that the representation of this data in the catalog API is idiosyncratic - CSV data are split by commas into parts. EE2 will detect JSON entries and reconsitute them before deserialization. + +# CronJobs/Reaper Scripts + +* Notifications are sent to the #ee_notifications slack channel + +### PurgeBadJobs +* Cronjobs are copied in and launched via the Dockerfile +* There are cronjobs configured in /etc/cron.d/ee2_cronjobs +* You can monitor them by reading the logs in /root/cron-purge.log + +### PurgeHeldJobs +* This is a daemon launched by entrypoint.sh +* It is not a cronjob because there is no way to easy way to seek through the HTCondor EXECUTE log, which takes a while to seek through + +#### Horizontal Scaling +* These scripts will have to be rethought if we do not want multiple copies running if ee2 is horizontally scaled. + + # Help Contact @Tianhao-Gu, @bio_boris, @briehl diff --git a/bin/PurgeBadJobs.py b/bin/PurgeBadJobs.py new file mode 100644 index 000000000..d2a182c84 --- /dev/null +++ b/bin/PurgeBadJobs.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +# Script to purge jobs that have been queued for too long, or stuck in the created state for too long + +import logging +import os +from configparser import ConfigParser +from datetime import datetime, timedelta, timezone +from time import sleep + +import pymongo +from bson import ObjectId + +from lib.execution_engine2.db.models.models import TerminatedCode, Status +from lib.execution_engine2.utils.SlackUtils import SlackClient +from lib.installed_clients.execution_engine2Client import execution_engine2 + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +config = ConfigParser() +config.read(os.environ["KB_DEPLOYMENT_CONFIG"]) +ee2_endpoint = config.get(section="execution_engine2", option="ee2-url") +slack_token = config.get(section="execution_engine2", option="slack-token") + +ee2 = execution_engine2(url=ee2_endpoint, token=os.environ["EE2_ADMIN_SERVICE_TOKEN"]) +slack_client = SlackClient( + slack_token, channel="#ee_notifications", debug=True, endpoint=ee2_endpoint +) +db_client = pymongo.MongoClient( + host=config.get(section="execution_engine2", option="mongo-host"), + port=int(config.get(section="execution_engine2", option="mongo-port")), + username=config.get(section="execution_engine2", option="mongo-user"), + password=config.get(section="execution_engine2", option="mongo-password"), + authSource=config.get(section="execution_engine2", option="mongo-database"), + authMechanism=config.get(section="execution_engine2", option="mongo-authmechanism"), + serverSelectionTimeoutMS=1000, +) +ee2_db = db_client.get_database( + config.get(section="execution_engine2", option="mongo-database") +) +ee2_jobs_collection = ee2_db.get_collection( + config.get(section="execution_engine2", option="mongo-jobs-collection") +) + +CREATED_MINUTES_AGO = 5 +QUEUE_THRESHOLD_DAYS = 14 + + +def cancel(record): + job_id = str(record["_id"]) + scheduler_id = record.get("scheduler_id") + cjp = { + "as_admin": True, + "job_id": job_id, + "terminated_code": TerminatedCode.terminated_by_automation.value, + } + print("About to cancel ee2 job", cjp) + ee2.cancel_job(params=cjp) + slack_client.cancel_job_message( + job_id=job_id, + scheduler_id=scheduler_id, + termination_code=TerminatedCode.terminated_by_automation.value, + ) + # Avoid rate limit of 1 msg per second + sleep(1) + + +def cancel_jobs_stuck_in_queue(): + """ + For jobs over 14 days old, cancel them + Update a completed Job as necessary to test this out: + ee2.update_job_status({'job_id': '601af2afeeb773acaf9de80d', 'as_admin': True, 'status': 'queued'}) + :return: + """ + queue_threshold_days = QUEUE_THRESHOLD_DAYS + before_days = ( + datetime.today() - timedelta(days=queue_threshold_days + 1) + ).timestamp() + print({"status": "queued", "queued": {"$lt": before_days}}) + stuck_jobs = ee2_jobs_collection.find( + {"status": Status.queued.value, "queued": {"$lt": before_days}} + ) + print( + f"Found {stuck_jobs.count()} jobs that were stuck in the {Status.queued.value} state over {queue_threshold_days} days" + ) + for record in stuck_jobs: + queued_time = record["queued"] + now = datetime.now(timezone.utc).timestamp() + elapsed = now - queued_time + print("queued days=", elapsed / 86000) + cancel(record) + + +def cancel_created(): + """ + For jobs that are not batch jobs, and have been in the created state for more than 5 minutes, uh oh, spaghettio, time to go + """ + + five_mins_ago = ObjectId.from_datetime( + datetime.now(timezone.utc) - timedelta(minutes=CREATED_MINUTES_AGO) + ) + stuck_jobs = ee2_jobs_collection.find( + {"status": "created", "_id": {"$lt": five_mins_ago}, "batch_job": {"$ne": True}} + ) + print( + f"Found {stuck_jobs.count()} jobs that were stuck in the {Status.created.value} state for over 5 mins" + ) + for record in stuck_jobs: + cancel(record) + + +def clean_retried_jobs(): + """Clean up jobs that couldn't finish the retry lifecycle""" + # TODO + + +def purge(): + cancel_jobs_stuck_in_queue() + cancel_created() + + +if __name__ == "__main__": + try: + purge() + except Exception as e: + slack_client.ee2_reaper_failure(endpoint=ee2_endpoint, e=e) + raise e diff --git a/bin/PurgeHeldJobs.py b/bin/PurgeHeldJobs.py index 294a7623b..b8e037646 100644 --- a/bin/PurgeHeldJobs.py +++ b/bin/PurgeHeldJobs.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import logging import os -import sys import time from configparser import ConfigParser from datetime import datetime, timedelta @@ -9,31 +8,20 @@ import htcondor -# I wish a knew a better way to do this -sys.path.append(".") - from lib.execution_engine2.utils.SlackUtils import SlackClient from lib.installed_clients.execution_engine2Client import execution_engine2 -from lib.execution_engine2.utils.Condor import Condor logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) config = ConfigParser() -config_filepath = os.environ["KB_DEPLOYMENT_CONFIG"] - -# Condor -condor = Condor(config_filepath=config_filepath) -# EE2 - -cfg = condor.config -ee2_endpoint = cfg.get(section="execution_engine2", option="ee2-url") - +config.read(os.environ["KB_DEPLOYMENT_CONFIG"]) +ee2_endpoint = config.get(section="execution_engine2", option="ee2-url") +slack_token = config.get(section="execution_engine2", option="slack-token") ee2 = execution_engine2(url=ee2_endpoint, token=os.environ["EE2_ADMIN_SERVICE_TOKEN"]) -# Slack -slack_token = cfg.get(section="execution_engine2", option="slack-token") -# TODO change this channel -slack_client = SlackClient(slack_token, channel="#ee_notifications", debug=True) +slack_client = SlackClient( + slack_token, channel="#ee_notifications", debug=True, endpoint=ee2_endpoint +) def read_events(path): @@ -172,4 +160,4 @@ def handle_hold_event(event): ) time.sleep(5) except Exception as e: - slack_client.ee2_reaper_failure(endpoint=ee2_endpoint) + slack_client.ee2_reaper_failure(endpoint=ee2_endpoint, e=e) diff --git a/bin/cron_vars b/bin/cron_vars new file mode 100644 index 000000000..d7b9cec77 --- /dev/null +++ b/bin/cron_vars @@ -0,0 +1,2 @@ +EE2_ADMIN_SERVICE_TOKEN=$EE2_ADMIN_SERVICE_TOKEN +KB_DEPLOYMENT_CONFIG=$KB_DEPLOYMENT_CONFIG \ No newline at end of file diff --git a/bin/ee2_cronjobs b/bin/ee2_cronjobs new file mode 100644 index 000000000..036231a15 --- /dev/null +++ b/bin/ee2_cronjobs @@ -0,0 +1,6 @@ +SHELL=/bin/bash +BASH_ENV=/etc/environment +# Check the cron-purge.log for issues why the script isn't running, such as missing `EE2_ADMIN_SERVICE_TOKEN` + +# m h dom mon dow user command + * * * * * root . /etc/environment; /miniconda-latest/bin/python3 /kb/module/bin/PurgeBadJobs.py >> /root/cron-purge.log 2>&1 diff --git a/build/templates/deploy.docker.cfg.templ b/build/templates/deploy.docker.cfg.templ index 849e88219..a12b338cb 100644 --- a/build/templates/deploy.docker.cfg.templ +++ b/build/templates/deploy.docker.cfg.templ @@ -27,6 +27,7 @@ mongo-database = ee2 mongo-user = travis mongo-password = travis mongo-authmechanism = DEFAULT +mongo-retry-rewrites = False start-local-mongo = 0 diff --git a/deploy.cfg b/deploy.cfg index 8a685860c..9618cb902 100644 --- a/deploy.cfg +++ b/deploy.cfg @@ -25,6 +25,7 @@ mongo-database = {{ default .Env.mongodb_database "ee2" }} mongo-user = {{ default .Env.mongodb_user "" }} mongo-password = {{ default .Env.mongodb_pwd "" }} mongo-authmechanism = {{ default .Env.mongodb_auth_mechanism "DEFAULT" }} +mongo-retry-rewrites = {{ default .Env.mongodb_retry_rewrites "False" }} start-local-mongo = {{ default .Env.start_local_mongo "0" }} mongo-collection = legacy diff --git a/lib/execution_engine2/db/MongoUtil.py b/lib/execution_engine2/db/MongoUtil.py index 0845cd84b..349b066bc 100644 --- a/lib/execution_engine2/db/MongoUtil.py +++ b/lib/execution_engine2/db/MongoUtil.py @@ -5,7 +5,6 @@ from contextlib import contextmanager from datetime import datetime from typing import Dict, List - from bson.objectid import ObjectId from mongoengine import connect, connection from pymongo import MongoClient, UpdateOne @@ -16,6 +15,8 @@ RecordNotFoundException, InvalidStatusTransitionException, ) + +from lib.execution_engine2.utils.arg_processing import parse_bool from execution_engine2.sdk.EE2Runjob import JobIdPair @@ -27,6 +28,7 @@ def __init__(self, config: Dict): self.mongo_database = config["mongo-database"] self.mongo_user = config["mongo-user"] self.mongo_pass = config["mongo-password"] + self.retry_rewrites = parse_bool(config["mongo-retry-rewrites"]) self.mongo_authmechanism = config["mongo-authmechanism"] self.mongo_collection = None self._start_local_service() @@ -42,6 +44,7 @@ def _get_pymongo_client(self): password=self.mongo_pass, authSource=self.mongo_database, authMechanism=self.mongo_authmechanism, + retryWrites=self.retry_rewrites, ) def _get_mongoengine_client(self) -> connection: @@ -53,7 +56,9 @@ def _get_mongoengine_client(self) -> connection: password=self.mongo_pass, authentication_source=self.mongo_database, authentication_mechanism=self.mongo_authmechanism, + retryWrites=self.retry_rewrites, ) + # This MongoDB deployment does not support retryable writes def _start_local_service(self): try: diff --git a/lib/execution_engine2/utils/SlackUtils.py b/lib/execution_engine2/utils/SlackUtils.py index 836c41dc2..5a8c13fa8 100644 --- a/lib/execution_engine2/utils/SlackUtils.py +++ b/lib/execution_engine2/utils/SlackUtils.py @@ -26,8 +26,8 @@ def held_job_message(self, held_job): message = f"Held Job Stats {held_job}" self.safe_chat_post_message(channel=self.channel, text=message) - def ee2_reaper_failure(self, endpoint="Unknown EE2 URL", job_id="Unknown"): - message = f"EE2 Held Job reaper failed for {endpoint} (job {job_id}). Please check it out" + def ee2_reaper_failure(self, endpoint="Unknown EE2 URL", job_id="Unknown", e=None): + message = f"EE2 Held Job reaper failed for {endpoint} (job {job_id}), {e}. Please check it out" self.safe_chat_post_message(channel=self.channel, text=message) def ee2_reaper_success( @@ -55,7 +55,7 @@ def cancel_job_message(self, job_id, scheduler_id, termination_code): if self.debug is False: return - message = f"scheduler_id:{scheduler_id} job_id:{job_id} has been canceled due to {termination_code} ({self.endpoint})" + message = f"scheduler_id:`{scheduler_id}` job_id:`{job_id}` has been canceled due to `{termination_code}` ({self.endpoint})" self.safe_chat_post_message(channel=self.channel, text=message) def finish_job_message(self, job_id, scheduler_id, finish_status, error_code=None): diff --git a/scripts/entrypoint.sh b/scripts/entrypoint.sh index 642ca69f3..2d5fbbab2 100755 --- a/scripts/entrypoint.sh +++ b/scripts/entrypoint.sh @@ -2,25 +2,29 @@ cp ./deploy.cfg ./work/config.properties -#condor_shared=condor_shared - - -if [ $# -eq 0 ] ; then +if [ $# -eq 0 ]; then useradd kbase - if [ "${POOL_PASSWORD}" ] ; then - /usr/sbin/condor_store_cred -p "${POOL_PASSWORD}" -f /etc/condor/password - chown kbase:kbase /etc/condor/password + if [ "${POOL_PASSWORD}" ]; then + /usr/sbin/condor_store_cred -p "${POOL_PASSWORD}" -f /etc/condor/password + chown kbase:kbase /etc/condor/password fi chown kbase /etc/condor/password + + # Copy downloaded JobRunner to a shared volume mount cp -rf /runner/JobRunner.tgz /condor_shared cp -rf ./scripts/execute_runner.sh /condor_shared + # Give permissions to transfer logs into here mkdir /condor_shared/runner_logs && chown kbase /condor_shared/runner_logs mkdir /condor_shared/cluster_logs && chown kbase /condor_shared/cluster_logs + # Save ENV Variables to file for cron and Remove _=/usr/bin/env + envsubst /etc/environment + chmod a+rw /etc/environment + service cron start sh ./scripts/start_server.sh -elif [ "${1}" = "test" ] ; then +elif [ "${1}" = "test" ]; then echo "Run Tests" make test diff --git a/test/deploy.cfg b/test/deploy.cfg index 0bfb9bc55..fa520f56c 100644 --- a/test/deploy.cfg +++ b/test/deploy.cfg @@ -28,6 +28,7 @@ mongo-database = ee2 mongo-user = travis mongo-password = travis mongo-authmechanism = DEFAULT +mongo-retry-rewrites = False # mongo-in-docker-compose = mini_kb_ci-mongo_1 # mongo-in-docker-compose = condor_mongo_1