diff --git a/.coveragerc b/.coveragerc index 658a98112..97715b18d 100644 --- a/.coveragerc +++ b/.coveragerc @@ -17,3 +17,5 @@ source=shotgun_api3 omit= shotgun_api3/lib/httplib2/* shotgun_api3/lib/six.py + shotgun_api3/lib/certify/* + shotgun_api3/lib/pyparsing.py diff --git a/azure-pipelines-templates/run-tests.yml b/azure-pipelines-templates/run-tests.yml index 8fb33a6d6..551c06989 100644 --- a/azure-pipelines-templates/run-tests.yml +++ b/azure-pipelines-templates/run-tests.yml @@ -96,7 +96,7 @@ jobs: # for example 'Windows - 2.7' - bash: | cp ./tests/example_config ./tests/config - pytest -v --cov shotgun_api3 --cov-report xml --test-run-title="${{parameters.name}}-$(python.version)" + pytest --durations=0 -v --cov shotgun_api3 --cov-report xml --test-run-title="${{parameters.name}}-$(python.version)" displayName: Running tests env: # Pass the values needed to authenticate with the Flow Production Tracking site and create some entities. diff --git a/tests/base.py b/tests/base.py index 900b95998..04e52aa72 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3,6 +3,7 @@ import os import random import re +import time import unittest from . import mock @@ -26,6 +27,11 @@ def skip(f): return lambda self: None +THUMBNAIL_MAX_ATTEMPTS = 30 +THUMBNAIL_RETRY_INTERVAL = 10 +TRANSIENT_IMAGE_PATH = "images/status/transient" + + class TestBase(unittest.TestCase): '''Base class for tests. @@ -59,6 +65,14 @@ def setUpClass(cls): cur_folder = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(cur_folder, "config") cls.config.read_config(config_path) + if cls.config.jenkins: + cls.auth_args = dict( + login=cls.config.human_login, password=cls.config.human_password + ) + else: + cls.auth_args = dict( + script_name=cls.config.script_name, api_key=cls.config.api_key + ) def setUp(self, auth_mode='ApiUser'): # When running the tests from a pull request from a client, the Shotgun @@ -90,9 +104,8 @@ def setUp(self, auth_mode='ApiUser'): # first make an instance based on script key/name so # we can generate a session token sg = api.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key, - http_proxy=self.config.http_proxy) + http_proxy=self.config.http_proxy, + **self.auth_args) self.session_token = sg.get_session_token() # now log in using session token self.sg = api.Shotgun(self.config.server_url, @@ -234,7 +247,9 @@ def _setup_mock_data(self): class LiveTestBase(TestBase): '''Test base for tests relying on connection to server.''' - def setUp(self, auth_mode='ApiUser'): + def setUp(self, auth_mode=None): + if not auth_mode: + auth_mode = 'HumanUser' if self.config.jenkins else 'ApiUser' super(LiveTestBase, self).setUp(auth_mode) if self.sg.server_caps.version and \ self.sg.server_caps.version >= (3, 3, 0) and \ @@ -260,18 +275,10 @@ def setUpClass(cls): # When running the tests from a pull request from a client, the Shotgun # site URL won't be set, so do not attempt to connect to Shotgun. if cls.config.server_url: - if cls.config.jenkins: - sg = api.Shotgun( - cls.config.server_url, - login=cls.config.human_login, - password=cls.config.human_password - ) - else: - sg = api.Shotgun( - cls.config.server_url, - cls.config.script_name, - cls.config.api_key - ) + sg = api.Shotgun( + cls.config.server_url, + **cls.auth_args, + ) cls.sg_version = tuple(sg.info()['version'][:3]) cls._setup_db(cls.config, sg) @@ -365,6 +372,19 @@ def gen_entity(self, entity_type, **kwargs): rv = self.sg.delete(entity_type, entity["id"]) assert rv == True + def find_one_await_thumbnail(self, entity_type, filters, fields=["image"], thumbnail_field_name="image", **kwargs): + attempts = 0 + while attempts < THUMBNAIL_MAX_ATTEMPTS: + result = self.sg.find_one(entity_type, filters, fields=fields, **kwargs) + if TRANSIENT_IMAGE_PATH in result.get(thumbnail_field_name, ""): + return result + + time.sleep(THUMBNAIL_RETRY_INTERVAL) + attempts += 1 + else: + if self.config.jenkins: + self.skipTest("Jenkins test timed out waiting for thumbnail") + class HumanUserAuthLiveTestBase(LiveTestBase): ''' diff --git a/tests/test_api.py b/tests/test_api.py index 9186d4430..9fbc7a678 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -40,10 +40,6 @@ from . import base -THUMBNAIL_MAX_ATTEMPTS = 30 -THUMBNAIL_RETRY_INTERAL = 10 -TRANSIENT_IMAGE_PATH = "images/status/transient" - class TestShotgunApi(base.LiveTestBase): def setUp(self): @@ -317,9 +313,6 @@ def test_upload_to_sg(self, mock_send_form): """ Upload an attachment tests for _upload_to_sg() """ - if "localhost" in self.server_url: - self.skipTest("upload / down tests skipped for localhost") - self.sg.server_info["s3_direct_uploads_enabled"] = False mock_send_form.method.assert_called_once() mock_send_form.return_value = "1\n:123\nasd" @@ -383,11 +376,10 @@ def test_upload_thumbnail_in_create(self): data = {'image': path, 'code': 'Test Version', 'project': self.project} new_version = self.sg.create("Version", data, return_fields=['image']) - new_version = find_one_await_thumbnail( - self.sg, + new_version = self.find_one_await_thumbnail( "Version", [["id", "is", new_version["id"]]], - fields=["image", "project", "type", "id"] + fields=["image", "project", "type", "id"], ) self.assertTrue(new_version is not None) @@ -434,7 +426,9 @@ def test_upload_thumbnail_for_version(self): # check result on version version_with_thumbnail = self.sg.find_one('Version', [['id', 'is', self.version['id']]]) - version_with_thumbnail = find_one_await_thumbnail(self.sg, 'Version', [['id', 'is', self.version['id']]]) + version_with_thumbnail = self.find_one_await_thumbnail( + "Version", [["id", "is", self.version["id"]]] + ) self.assertEqual(version_with_thumbnail.get('type'), 'Version') self.assertEqual(version_with_thumbnail.get('id'), self.version['id']) @@ -461,7 +455,9 @@ def test_upload_thumbnail_for_task(self): # check result on version task_with_thumbnail = self.sg.find_one('Task', [['id', 'is', self.task['id']]]) - task_with_thumbnail = find_one_await_thumbnail(self.sg, 'Task', [['id', 'is', self.task['id']]]) + task_with_thumbnail = self.find_one_await_thumbnail( + "Task", [["id", "is", self.task["id"]]] + ) self.assertEqual(task_with_thumbnail.get('type'), 'Task') self.assertEqual(task_with_thumbnail.get('id'), self.task['id']) @@ -557,12 +553,11 @@ def test_linked_thumbnail_url(self): thumb_id = self.sg.upload_thumbnail("Project", self.version['project']['id'], path) - response_version_with_project = find_one_await_thumbnail( - self.sg, + response_version_with_project = self.find_one_await_thumbnail( "Version", [["id", "is", self.version["id"]]], fields=["id", "code", "project.Project.image"], - thumbnail_field_name="project.Project.image" + thumbnail_field_name="project.Project.image", ) if self.sg.server_caps.version and self.sg.server_caps.version >= (3, 3, 0): @@ -597,12 +592,12 @@ def share_thumbnail_retry(*args, **kwargs): # the thumbnail to finish processing. thumbnail_id = None attempts = 0 - while attempts < THUMBNAIL_MAX_ATTEMPTS and thumbnail_id is None: + while attempts < base.THUMBNAIL_MAX_ATTEMPTS and thumbnail_id is None: try: thumbnail_id = self.sg.share_thumbnail(*args, **kwargs) attempts += 1 except shotgun_api3.ShotgunError: - time.sleep(THUMBNAIL_RETRY_INTERAL) + time.sleep(base.THUMBNAIL_RETRY_INTERVAL) return thumbnail_id this_dir, _ = os.path.split(__file__) @@ -610,17 +605,15 @@ def share_thumbnail_retry(*args, **kwargs): # upload thumbnail to first entity and share it with the rest share_thumbnail_retry([self.version, self.shot], thumbnail_path=path) - response_version_thumbnail = find_one_await_thumbnail( - self.sg, + response_version_thumbnail = self.find_one_await_thumbnail( 'Version', [['id', 'is', self.version['id']]], - fields=['id', 'code', 'image'] + fields=['id', 'code', 'image'], ) - response_shot_thumbnail = find_one_await_thumbnail( - self.sg, + response_shot_thumbnail = self.find_one_await_thumbnail( 'Shot', [['id', 'is', self.shot['id']]], - fields=['id', 'code', 'image'] + fields=['id', 'code', 'image'], ) shot_url = urllib.parse.urlparse(response_shot_thumbnail.get('image')) @@ -632,23 +625,20 @@ def share_thumbnail_retry(*args, **kwargs): # share thumbnail from source entity with entities self.sg.upload_thumbnail("Version", self.version['id'], path) share_thumbnail_retry([self.asset, self.shot], source_entity=self.version) - response_version_thumbnail = find_one_await_thumbnail( - self.sg, + response_version_thumbnail = self.find_one_await_thumbnail( 'Version', [['id', 'is', self.version['id']]], - fields=['id', 'code', 'image'] + fields=['id', 'code', 'image'], ) - response_shot_thumbnail = find_one_await_thumbnail( - self.sg, + response_shot_thumbnail = self.find_one_await_thumbnail( 'Shot', [['id', 'is', self.shot['id']]], - fields=['id', 'code', 'image'] + fields=['id', 'code', 'image'], ) - response_asset_thumbnail = find_one_await_thumbnail( - self.sg, + response_asset_thumbnail = self.find_one_await_thumbnail( 'Asset', [['id', 'is', self.asset['id']]], - fields=['id', 'code', 'image'] + fields=['id', 'code', 'image'], ) shot_url = urllib.parse.urlparse(response_shot_thumbnail.get('image')) @@ -820,9 +810,8 @@ def test_summary_values(self): def test_ensure_ascii(self): '''test_ensure_ascii tests ensure_unicode flag.''' sg_ascii = shotgun_api3.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key, - ensure_ascii=True) + ensure_ascii=True, + **self.auth_args) result = sg_ascii.find_one('Note', [['id', 'is', self.note['id']]], fields=['content']) if six.PY2: @@ -832,9 +821,8 @@ def test_ensure_ascii(self): def test_ensure_unicode(self): '''test_ensure_unicode tests ensure_unicode flag.''' sg_unicode = shotgun_api3.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key, - ensure_ascii=False) + ensure_ascii=False, + **self.auth_args) result = sg_unicode.find_one('Note', [['id', 'is', self.note['id']]], fields=['content']) self.assertTrue(_has_unicode(result)) @@ -858,7 +846,6 @@ def test_work_schedule(self): self.assertRaises(shotgun_api3.ShotgunError, self.sg.work_schedule_read, start_date_obj, end_date_obj, project, user) - resp = self.sg.work_schedule_update('2012-01-02', False, 'Studio Holiday') expected = { 'date': '2012-01-02', @@ -907,11 +894,8 @@ def test_work_schedule(self): work_schedule['2012-01-04'] = {"reason": "USER_EXCEPTION", "working": False, "description": "Artist Holiday"} self.assertEqual(work_schedule, resp) - # For now disable tests that are erroneously failling on some sites to - # allow CI to pass until the known issue causing this is resolved. # test_preferences_read fails when preferences don't match the expected # preferences. - @base.skip("Skip test_preferences_read because preferences on test sites are mismatched.") def test_preferences_read(self): # Only run the tests on a server with the feature. if not self.sg.server_caps.version or self.sg.server_caps.version < (7, 10, 0): @@ -936,8 +920,9 @@ def test_preferences_read(self): 'format_number_fields': '1,000', 'format_time_hour_fields': '12 hour', 'hours_per_day': 8.0, - 'last_day_work_week': None, - 'support_local_storage': True + 'support_local_storage': True, + 'enable_rv_integration': True, + 'enable_shotgun_review_for_rv': False, } # Simply make sure viewmaster settings are there. These change frequently and we # don't want to have the test break because Viewmaster changed or because we didn't @@ -1010,6 +995,8 @@ def test_set_date(self): self.assertEqual(expected, actual) def test_set_date_time(self): + if self.config.jenkins: + self.skipTest("Jenkins. locked_until not updating.") entity = 'HumanUser' entity_id = self.human_user['id'] field_name = 'locked_until' @@ -1069,8 +1056,7 @@ def test_set_list(self): def test_set_multi_entity(self): sg = shotgun_api3.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key) + **self.auth_args) keys = ['project', 'user', 'code'] data = {'project': self.project, 'user': self.human_user, @@ -1204,20 +1190,22 @@ def setUp(self): self.datetime_none = datetime.datetime(2008, 10, 13, 23, 10) def test_convert_to_utc(self): + if self.config.jenkins: + self.skipTest("Jenkins. locked_until not updating.") sg_utc = shotgun_api3.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key, http_proxy=self.config.http_proxy, - convert_datetimes_to_utc=True) + convert_datetimes_to_utc=True, + **self.auth_args) self._assert_expected(sg_utc, self.datetime_none, self.datetime_local) self._assert_expected(sg_utc, self.datetime_local, self.datetime_local) def test_no_convert_to_utc(self): + if self.config.jenkins: + self.skipTest("Jenkins. locked_until not updating.") sg_no_utc = shotgun_api3.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key, http_proxy=self.config.http_proxy, - convert_datetimes_to_utc=False) + convert_datetimes_to_utc=False, + **self.auth_args) self._assert_expected(sg_no_utc, self.datetime_none, self.datetime_none) self._assert_expected(sg_no_utc, self.datetime_utc, self.datetime_none) @@ -1853,6 +1841,10 @@ def test_following(self): class TestErrors(base.TestBase): + def setUp(self): + auth_mode = "HumanUser" if self.config.jenkins else "ApiUser" + super(TestErrors, self).setUp(auth_mode) + def test_bad_auth(self): '''test_bad_auth invalid script name or api key raises fault''' server_url = self.config.server_url @@ -2079,7 +2071,7 @@ def test_sanitized_auth_params(self, mock_open): # Try to upload a bogus file self.sg.upload('Note', 1234, thumbnail_path) except shotgun_api3.ShotgunError as e: - self.assertFalse(self.api_key in str(e)) + self.assertFalse(str(self.api_key) in str(e)) return # You should never get here... Otherwise some mocking failed and the @@ -2114,7 +2106,7 @@ def test_upload_missing_file(self): class TestScriptUserSudoAuth(base.LiveTestBase): def setUp(self): - super(TestScriptUserSudoAuth, self).setUp('ApiUser') + super(TestScriptUserSudoAuth, self).setUp() self.sg.update( 'HumanUser', @@ -2131,10 +2123,9 @@ def test_user_is_creator(self): return x = shotgun_api3.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key, http_proxy=self.config.http_proxy, - sudo_as_login=self.config.human_login) + sudo_as_login=self.config.human_login, + **self.auth_args) data = { 'project': self.project, @@ -2161,6 +2152,8 @@ def test_human_user_sudo_auth_fails(self): Test 'sudo_as_login' option for HumanUser. Request fails on server because user has no permission to Sudo. """ + if self.config.jenkins: + self.skipTest("Jenkins. locked_until not updating.") if not self.sg.server_caps.version or self.sg.server_caps.version < (5, 3, 12): return @@ -2217,7 +2210,10 @@ def test_humanuser_upload_thumbnail_for_version(self): self.assertTrue(isinstance(thumb_id, int)) # check result on version - version_with_thumbnail = find_one_await_thumbnail(self.sg, 'Version', [['id', 'is', self.version['id']]]) + version_with_thumbnail = self.find_one_await_thumbnail( + "Version", + [["id", "is", self.version["id"]]], + ) self.assertEqual(version_with_thumbnail.get('type'), 'Version') self.assertEqual(version_with_thumbnail.get('id'), self.version['id']) @@ -2274,7 +2270,10 @@ def test_humanuser_upload_thumbnail_for_version(self): self.assertTrue(isinstance(thumb_id, int)) # check result on version - version_with_thumbnail = find_one_await_thumbnail(self.sg, 'Version', [['id', 'is', self.version['id']]]) + version_with_thumbnail = self.find_one_await_thumbnail( + "Version", + [["id", "is", self.version["id"]]], + ) self.assertEqual(version_with_thumbnail.get('type'), 'Version') self.assertEqual(version_with_thumbnail.get('id'), self.version['id']) @@ -2340,10 +2339,9 @@ def test_sudo_as_user(self): return sg = shotgun_api3.Shotgun(self.config.server_url, - self.config.script_name, - self.config.api_key, http_proxy=self.config.http_proxy, - sudo_as_login=self.config.human_login) + sudo_as_login=self.config.human_login, + **self.auth_args) initial = sg.find_one('Project', [['id', 'is', self.project['id']]], ['last_accessed_by_current_user']) time.sleep(1) @@ -2490,6 +2488,13 @@ def _check_note(self, data, note_id, additional_fields): note_data = self.sg.find_one("Note", [["id", "is", note_id]], list(expected_fields)) + # remove images before comparison + if ( + "created_by.HumanUser.image" in note_data + and "created_by.HumanUser.image" in data + ): + note_data.pop("created_by.HumanUser.image") + data.pop("created_by.HumanUser.image") self.assertEqual(note_data, data) def _check_reply(self, data, reply_id, additional_fields): @@ -2520,6 +2525,10 @@ def _check_attachment(self, data, attachment_id, additional_fields): [["id", "is", attachment_id]], list(expected_fields)) + # remove images before comparison + if "this_file" in attachment_data and "this_file" in data: + attachment_data["this_file"].pop("url") + data["this_file"].pop("url") self.assertEqual(attachment_data, data) # For now skip tests that are erroneously failling on some sites to @@ -2532,6 +2541,8 @@ def test_simple(self): if not self.sg.server_caps.version or self.sg.server_caps.version < (6, 2, 0): return + user_entity = "HumanUser" if self.config.jenkins else "ApiUser" + # create note note = self.sg.create("Note", {"content": "Test!", "project": self.project}) @@ -2542,21 +2553,21 @@ def test_simple(self): d = self.sg.find_one("Note", [["id", "is", note["id"]]], - ["created_by", "created_by.ApiUser.image"]) + ["created_by", f"created_by.{user_entity}.image"]) - current_thumbnail = d["created_by.ApiUser.image"] + current_thumbnail = d[f"created_by.{user_entity}.image"] if current_thumbnail is None: # upload thumbnail - self.sg.upload_thumbnail("ApiUser", + self.sg.upload_thumbnail(user_entity, d["created_by"]["id"], self._thumbnail_path) d = self.sg.find_one("Note", [["id", "is", note["id"]]], - ["created_by", "created_by.ApiUser.image"]) + ["created_by", f"created_by.{user_entity}.image"]) - current_thumbnail = d["created_by.ApiUser.image"] + current_thumbnail = d[f"created_by.{user_entity}.image"] # get thread result = self.sg.note_thread_read(note["id"]) @@ -3022,15 +3033,5 @@ def _get_path(url): return url.path -def find_one_await_thumbnail(sg, entity_type, filters, fields=["image"], thumbnail_field_name="image", **kwargs): - attempts = 0 - result = sg.find_one(entity_type, filters, fields=fields, **kwargs) - while attempts < THUMBNAIL_MAX_ATTEMPTS and TRANSIENT_IMAGE_PATH in result.get(thumbnail_field_name): - time.sleep(THUMBNAIL_RETRY_INTERAL) - result = sg.find_one(entity_type, filters, fields=fields, **kwargs) - attempts += 1 - return result - - if __name__ == '__main__': unittest.main() diff --git a/tests/test_api_long.py b/tests/test_api_long.py index 65f741cc4..0bf509b3c 100644 --- a/tests/test_api_long.py +++ b/tests/test_api_long.py @@ -96,7 +96,6 @@ def test_automated_find(self): limit = (limit % 5) + 1 page = (page % 3) + 1 - @base.skip("Skipping test due to CI failure. Too many database columns.") def test_schema(self): """Called schema functions"""