diff --git a/README.rst b/README.rst index 8c20f0037e..53d08eca3c 100644 --- a/README.rst +++ b/README.rst @@ -182,12 +182,14 @@ Some more companies are using Luigi but haven't had a chance yet to write about * `Hopper `_ * `VOYAGE GROUP/Zucks `_ * `Textpert `_ +* `Tracktics `_ * `Whizar `_ * `xtream `__ * `Skyscanner `_ * `Jodel `_ * `Mekar `_ * `M3 `_ +* `Assist Digital `_ We're more than happy to have your company added here. Just send a PR on GitHub. diff --git a/doc/central_scheduler.rst b/doc/central_scheduler.rst index 161bf144c2..418242305d 100644 --- a/doc/central_scheduler.rst +++ b/doc/central_scheduler.rst @@ -78,20 +78,20 @@ The task history has the following pages: .. figure:: history.png :alt: Recent history screenshot -* ``/history/by_id/:id`` +* ``/history/by_id/{id}`` detailed information about a run, including: parameter values, the host on which it ran, and timing information. Example screenshot: .. figure:: history_by_id.png :alt: By id screenshot -* ``/history/by_name/:name`` - a listing of all runs of a task with the given task name. +* ``/history/by_name/{name}`` + a listing of all runs of a task with the given task ``{name}``. Example screenshot: .. figure:: history_by_name.png :alt: By name screenshot -* ``/history/by_params/:name?data=params`` - a listing of all runs of a given task restricted to runs with param values matching the given data. - The data is a json blob describing the parameters, - e.g. ``{"foo": "bar"}`` looks for a task with ``foo=bar``. +* ``/history/by_params/{name}?data=params`` + a listing of all runs of the task ``{name}`` restricted to runs with ``params`` matching the given history. + The ``params`` is a json blob describing the parameters, + e.g. ``data={"foo": "bar"}`` looks for a task with ``foo=bar``. diff --git a/luigi/contrib/external_program.py b/luigi/contrib/external_program.py index 68a03d08ef..d5dcf26d77 100644 --- a/luigi/contrib/external_program.py +++ b/luigi/contrib/external_program.py @@ -192,6 +192,7 @@ def _track_url_by_pattern(): self.build_tracking_url(match.group(1)) ) else: + file_to_write.flush() sleep(time_to_sleep) track_proc = Process(target=_track_url_by_pattern) diff --git a/luigi/contrib/gcs.py b/luigi/contrib/gcs.py index 3a7d5f67a6..4c1b37fb14 100644 --- a/luigi/contrib/gcs.py +++ b/luigi/contrib/gcs.py @@ -26,12 +26,21 @@ from urllib.parse import urlsplit from io import BytesIO +from tenacity import retry +from tenacity import retry_if_exception +from tenacity import retry_if_exception_type +from tenacity import wait_exponential +from tenacity import stop_after_attempt +from tenacity import after_log from luigi.contrib import gcp import luigi.target from luigi.format import FileWrapper logger = logging.getLogger('luigi-interface') +# Retry when following errors happened +RETRYABLE_ERRORS = None + try: import httplib2 @@ -42,12 +51,8 @@ logger.warning("Loading GCS module without the python packages googleapiclient & google-auth. \ This will crash at runtime if GCS functionality is used.") else: - # Retry transport and file IO errors. RETRYABLE_ERRORS = (httplib2.HttpLib2Error, IOError) -# Number of times to retry failed downloads. -NUM_RETRIES = 5 - # Number of bytes to send/receive in each request. CHUNKSIZE = 10 * 1024 * 1024 @@ -64,6 +69,18 @@ GCS_BATCH_URI = 'https://storage.googleapis.com/batch/storage/v1' +# Retry configurations. For more details, see https://tenacity.readthedocs.io/en/latest/ +def is_error_5xx(err): + return isinstance(err, errors.HttpError) and err.resp.status >= 500 + + +gcs_retry = retry(retry=(retry_if_exception(is_error_5xx) | retry_if_exception_type(RETRYABLE_ERRORS)), + wait=wait_exponential(multiplier=1, min=1, max=10), + stop=stop_after_attempt(5), + reraise=True, + after=after_log(logger, logging.WARNING)) + + def _wait_for_consistency(checker): """Eventual consistency: wait until GCS reports something is true. @@ -133,6 +150,7 @@ def _is_root(self, key): def _add_path_delimiter(self, key): return key if key[-1:] == '/' else key + '/' + @gcs_retry def _obj_exists(self, bucket, obj): try: self.client.objects().get(bucket=bucket, object=obj).execute() @@ -157,6 +175,7 @@ def _list_iter(self, bucket, prefix): response = request.execute() + @gcs_retry def _do_put(self, media, dest_path): bucket, obj = self._path_to_bucket_and_key(dest_path) @@ -165,28 +184,10 @@ def _do_put(self, media, dest_path): return request.execute() response = None - attempts = 0 while response is None: - error = None - try: - status, response = request.next_chunk() - if status: - logger.debug('Upload progress: %.2f%%', 100 * status.progress()) - except errors.HttpError as err: - error = err - if err.resp.status < 500: - raise - logger.warning('Caught error while uploading', exc_info=True) - except RETRYABLE_ERRORS as err: - logger.warning('Caught error while uploading', exc_info=True) - error = err - - if error: - attempts += 1 - if attempts >= NUM_RETRIES: - raise error - else: - attempts = 0 + status, response = request.next_chunk() + if status: + logger.debug('Upload progress: %.2f%%', 100 * status.progress()) _wait_for_consistency(lambda: self._obj_exists(bucket, obj)) return response @@ -380,6 +381,7 @@ def list_wildcard(self, wildcard_path): len(it) >= len(path + '/' + wildcard_parts[0]) + len(wildcard_parts[1]): yield it + @gcs_retry def download(self, path, chunksize=None, chunk_callback=lambda _: False): """Downloads the object contents to local file system. @@ -400,29 +402,11 @@ def download(self, path, chunksize=None, chunk_callback=lambda _: False): request = self.client.objects().get_media(bucket=bucket, object=obj) downloader = http.MediaIoBaseDownload(fp, request, chunksize=chunksize) - attempts = 0 done = False while not done: - error = None - try: - _, done = downloader.next_chunk() - if chunk_callback(fp): - done = True - except errors.HttpError as err: - error = err - if err.resp.status < 500: - raise - logger.warning('Error downloading file, retrying', exc_info=True) - except RETRYABLE_ERRORS as err: - logger.warning('Error downloading file, retrying', exc_info=True) - error = err - - if error: - attempts += 1 - if attempts >= NUM_RETRIES: - raise error - else: - attempts = 0 + _, done = downloader.next_chunk() + if chunk_callback(fp): + done = True return return_fp diff --git a/luigi/tools/deps_tree.py b/luigi/tools/deps_tree.py index 27a00313e3..ea3391bb54 100755 --- a/luigi/tools/deps_tree.py +++ b/luigi/tools/deps_tree.py @@ -9,18 +9,18 @@ $ luigi-deps-tree --module foo_complex examples.Foo ... └─--[Foo-{} (PENDING)] - |--[Bar-{'num': '0'} (PENDING)] - | |--[Bar-{'num': '4'} (PENDING)] - | └─--[Bar-{'num': '5'} (PENDING)] - |--[Bar-{'num': '1'} (PENDING)] - └─--[Bar-{'num': '2'} (PENDING)] - └─--[Bar-{'num': '6'} (PENDING)] - |--[Bar-{'num': '7'} (PENDING)] - | |--[Bar-{'num': '9'} (PENDING)] - | └─--[Bar-{'num': '10'} (PENDING)] - | └─--[Bar-{'num': '11'} (PENDING)] - └─--[Bar-{'num': '8'} (PENDING)] - └─--[Bar-{'num': '12'} (PENDING)] + |---[Bar-{'num': '0'} (PENDING)] + | |---[Bar-{'num': '4'} (PENDING)] + | └─--[Bar-{'num': '5'} (PENDING)] + |---[Bar-{'num': '1'} (PENDING)] + └─--[Bar-{'num': '2'} (PENDING)] + └─--[Bar-{'num': '6'} (PENDING)] + |---[Bar-{'num': '7'} (PENDING)] + | |---[Bar-{'num': '9'} (PENDING)] + | └─--[Bar-{'num': '10'} (PENDING)] + | └─--[Bar-{'num': '11'} (PENDING)] + └─--[Bar-{'num': '8'} (PENDING)] + └─--[Bar-{'num': '12'} (PENDING)] """ from luigi.task import flatten @@ -52,10 +52,10 @@ def print_tree(task, indent='', last=True): result = '\n' + indent if(last): result += '└─--' - indent += ' ' + indent += ' ' else: - result += '|--' - indent += '| ' + result += '|---' + indent += '| ' result += '[{0}-{1} ({2})]'.format(name, params, is_complete) children = flatten(task.requires()) for index, child in enumerate(children): diff --git a/setup.py b/setup.py index 5bdcc84afc..e305d2aeda 100644 --- a/setup.py +++ b/setup.py @@ -37,7 +37,7 @@ def get_static_files(path): with open('README.rst') as fobj: long_description = "\n\n" + readme_note + "\n\n" + fobj.read() -install_requires = ['python-dateutil>=2.7.5,<3'] +install_requires = ['python-dateutil>=2.7.5,<3', 'tenacity>=6.3.0,<7'] # Can't use python-daemon>=2.2.0 if on windows # See https://pagure.io/python-daemon/issue/18 diff --git a/test/contrib/gcs_test.py b/test/contrib/gcs_test.py index 1fa19a7f35..5d0cbd4c36 100644 --- a/test/contrib/gcs_test.py +++ b/test/contrib/gcs_test.py @@ -30,6 +30,7 @@ import os import tempfile import unittest +from unittest import mock from luigi.contrib import gcs from target_test import FileSystemTargetTestMixin @@ -143,7 +144,7 @@ def test_listdir(self): def test_put_file(self): with tempfile.NamedTemporaryFile() as fp: - lorem = 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt\n' + lorem = b'Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt\n' # Larger file than chunk size, fails with incorrect progress set up big = lorem * 41943 fp.write(big) @@ -196,3 +197,26 @@ def test_close_twice(self): assert src.closed src.close() assert src.closed + + +class RetryTest(unittest.TestCase): + def test_success_with_retryable_error(self): + m = mock.MagicMock(side_effect=[IOError, IOError, 'test_func_output']) + + @gcs.gcs_retry + def mock_func(): + return m() + + actual = mock_func() + expected = 'test_func_output' + self.assertEqual(expected, actual) + + def test_fail_with_retry_limit_exceed(self): + m = mock.MagicMock(side_effect=[IOError, IOError, IOError, IOError, IOError]) + + @gcs.gcs_retry + def mock_func(): + return m() + + with self.assertRaises(IOError): + mock_func()