diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 42e6a9d..a01a78a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -6,9 +6,10 @@ permissions: contents: read env: - CIBW_BEFORE_BUILD: pip install setuptools oldest-supported-numpy + CIBW_BEFORE_BUILD: pip install setuptools oldest-supported-numpy pytest + CIBW_BEFORE_TEST: pip install pytest CIBW_BUILD_VERBOSITY: 1 - CIBW_TEST_COMMAND: python -c "import sys, numexpr; sys.exit(0 if numexpr.test().wasSuccessful() else 1)" + CIBW_TEST_COMMAND: pytest --pyargs numexpr CIBW_TEST_SKIP: "*macosx*arm64*" # Building for musllinux and aarch64 takes way too much time. # Moreover, NumPy is not providing musllinux for x86_64 either, so it's not worth it. @@ -16,7 +17,7 @@ env: jobs: build_wheels: - name: Build wheels on ${{ matrix.os }} for ${{ matrix.arch }} - ${{ matrix.p_ver }} + name: Build wheels on ${{ matrix.os }} for ${{ matrix.arch }} - ${{ matrix.cibw_build }} runs-on: ${{ matrix.os }} permissions: contents: write @@ -24,12 +25,14 @@ jobs: CIBW_BUILD: ${{ matrix.cibw_build }} CIBW_ARCHS_LINUX: ${{ matrix.arch }} CIBW_ARCHS_MACOS: "x86_64 arm64" + CIBW_ENABLE: cpython-freethreading strategy: + fail-fast: false matrix: os: [ubuntu-latest, windows-latest, macos-latest] arch: [x86_64, aarch64] - cibw_build: ["cp3{10,11,12,13}-*"] - p_ver: ["3.10-3.13"] + cibw_build: ["cp3{10,11,12,13}-*", "cp313t-*"] + p_ver: ["3.10-3.13+3.13t"] exclude: - os: windows-latest arch: aarch64 @@ -53,6 +56,15 @@ jobs: if: ${{ matrix.arch == 'aarch64' }} name: Set up QEMU + - name: Setup free-threading variables + if: ${{ endsWith(matrix.cibw_build, 't-*') }} + shell: bash -l {0} + run: | + echo "CIBW_BEFORE_BUILD=pip install setuptools numpy" >> "$GITHUB_ENV" + echo "CIBW_BEFORE_TEST=pip install pytest pytest-run-parallel" >> "$GITHUB_ENV" + echo "CIBW_TEST_COMMAND=pytest --parallel-threads=4 --pyargs numexpr" >> "$GITHUB_ENV" + echo "ARTIFACT_LABEL=freethreaded" >> "$GITHUB_ENV" + - name: Build wheels run: | python -m cibuildwheel --output-dir wheelhouse @@ -66,6 +78,7 @@ jobs: - uses: actions/upload-artifact@v4 with: path: ./wheelhouse/* + name: numexpr-${{ matrix.os }}-${{ matrix.arch }}-${{ env.ARTIFACT_LABEL || 'standard' }} - name: Upload to GitHub Release uses: softprops/action-gh-release@v1 diff --git a/.gitignore b/.gitignore index 928bf15..7bf6f98 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ artifact/ numexpr.egg-info/ *.pyc *.swp +*.so *~ doc/_build site.cfg diff --git a/README.rst b/README.rst index 9033d51..264fd2b 100644 --- a/README.rst +++ b/README.rst @@ -159,6 +159,24 @@ Usage array([ True, False, False], dtype=bool) +Free-threading support +---------------------- +Starting on CPython 3.13 onwards there is a new distribution that disables the +Global Interpreter Lock (GIL) altogether, thus increasing the performance yields +under multi-threaded conditions on a single interpreter, as opposed to having to use +multiprocessing. + +Whilst numexpr has been demonstrated to work under free-threaded +CPython, considerations need to be taken when using numexpr native parallel +implementation vs using Python threads directly in order to prevent oversubscription, +we recommend either using the main CPython interpreter thread to spawn multiple C threads +using the parallel numexpr API, or spawning multiple CPython threads that do not use +the parallel API. + +For more information about free-threaded CPython, we recommend visiting the following +`community Wiki ` + + Documentation ------------- diff --git a/numexpr/interpreter.cpp b/numexpr/interpreter.cpp index edebd71..32f6c37 100644 --- a/numexpr/interpreter.cpp +++ b/numexpr/interpreter.cpp @@ -556,7 +556,7 @@ stringcontains(const char *haystack_start, const char *needle_start, npy_intp ma size_t si = 0; size_t min_len = min(needle_len, haystack_len); - while (*haystack && *needle && si < min_len) + while (si < min_len && *haystack && *needle) { ok &= *haystack++ == *needle++; si++; @@ -573,7 +573,7 @@ stringcontains(const char *haystack_start, const char *needle_start, npy_intp ma } /* calc haystack length */ - while (*haystack && si < haystack_len) { + while (si < haystack_len && *haystack) { haystack++; si++; } @@ -652,6 +652,7 @@ int vm_engine_iter_task(NpyIter *iter, npy_intp *memsteps, /* Then finish off the rest */ if (block_size > 0) do { + block_size = *size_ptr; #define REDUCTION_INNER_LOOP #define BLOCK_SIZE block_size #include "interp_body.cpp" @@ -698,6 +699,7 @@ vm_engine_iter_outer_reduce_task(NpyIter *iter, npy_intp *memsteps, /* Then finish off the rest */ if (block_size > 0) do { + block_size = *size_ptr; #define BLOCK_SIZE block_size #define NO_OUTPUT_BUFFERING // Because it's a reduction #include "interp_body.cpp" diff --git a/numexpr/module.cpp b/numexpr/module.cpp index 66b5b77..a42042b 100644 --- a/numexpr/module.cpp +++ b/numexpr/module.cpp @@ -47,11 +47,14 @@ void *th_worker(void *tidptr) char **errmsg; // For output buffering if needed vector out_buffer; + int init_sentinels_done = 0; while (1) { /* Sentinels have to be initialised yet */ - gs.init_sentinels_done = 0; + if (tid == 0) { + init_sentinels_done = 0; + } /* Meeting point for all threads (wait for initialization) */ pthread_mutex_lock(&gs.count_threads_mutex); @@ -380,7 +383,7 @@ Py_set_num_threads(PyObject *self, PyObject *args) } static PyObject* -Py_get_num_threads(PyObject *self, PyObject *args) +Py_get_num_threads(PyObject *self, PyObject *args) { int n_thread; n_thread = gs.nthreads; @@ -477,6 +480,10 @@ PyInit_interpreter(void) { if (m == NULL) INITERROR; + #ifdef Py_GIL_DISABLED + PyUnstable_Module_SetGIL(m, Py_MOD_GIL_NOT_USED); + #endif + Py_INCREF(&NumExprType); PyModule_AddObject(m, "NumExpr", (PyObject *)&NumExprType); diff --git a/numexpr/necompiler.py b/numexpr/necompiler.py index 98aee4c..296c41b 100644 --- a/numexpr/necompiler.py +++ b/numexpr/necompiler.py @@ -774,9 +774,12 @@ def getArguments(names, local_dict=None, global_dict=None, _frame_depth: int=2): # Dictionaries for caching variable names and compiled expressions -_names_cache = CacheDict(256) -_numexpr_cache = CacheDict(256) -_numexpr_last = ContextDict() +# _names_cache = CacheDict(256) +_names_cache = threading.local() +# _numexpr_cache = CacheDict(256) +_numexpr_cache = threading.local() +# _numexpr_last = ContextDict() +_numexpr_last = threading.local() evaluate_lock = threading.Lock() def validate(ex: str, @@ -853,6 +856,14 @@ def validate(ex: str, """ global _numexpr_last + if not hasattr(_numexpr_last, 'l'): + _numexpr_last.l = ContextDict() + + if not hasattr(_names_cache, 'c'): + _names_cache.c = CacheDict(256) + + if not hasattr(_numexpr_cache, 'c'): + _numexpr_cache.c = CacheDict(256) try: @@ -868,9 +879,9 @@ def validate(ex: str, # Get the names for this expression context = getContext(kwargs) expr_key = (ex, tuple(sorted(context.items()))) - if expr_key not in _names_cache: - _names_cache[expr_key] = getExprNames(ex, context, sanitize=sanitize) - names, ex_uses_vml = _names_cache[expr_key] + if expr_key not in _names_cache.c: + _names_cache.c[expr_key] = getExprNames(ex, context, sanitize=sanitize) + names, ex_uses_vml = _names_cache.c[expr_key] arguments = getArguments(names, local_dict, global_dict, _frame_depth=_frame_depth) # Create a signature @@ -880,12 +891,12 @@ def validate(ex: str, # Look up numexpr if possible. numexpr_key = expr_key + (tuple(signature),) try: - compiled_ex = _numexpr_cache[numexpr_key] + compiled_ex = _numexpr_cache.c[numexpr_key] except KeyError: - compiled_ex = _numexpr_cache[numexpr_key] = NumExpr(ex, signature, sanitize=sanitize, **context) + compiled_ex = _numexpr_cache.c[numexpr_key] = NumExpr(ex, signature, sanitize=sanitize, **context) kwargs = {'out': out, 'order': order, 'casting': casting, 'ex_uses_vml': ex_uses_vml} - _numexpr_last.set(ex=compiled_ex, argnames=names, kwargs=kwargs) + _numexpr_last.l.set(ex=compiled_ex, argnames=names, kwargs=kwargs) except Exception as e: return e return None @@ -987,13 +998,15 @@ def re_evaluate(local_dict: Optional[Dict] = None, not set this value. """ global _numexpr_last + if not hasattr(_numexpr_last, 'l'): + _numexpr_last.l = ContextDict() try: - compiled_ex = _numexpr_last['ex'] + compiled_ex = _numexpr_last.l['ex'] except KeyError: raise RuntimeError("A previous evaluate() execution was not found, please call `validate` or `evaluate` once before `re_evaluate`") - argnames = _numexpr_last['argnames'] + argnames = _numexpr_last.l['argnames'] args = getArguments(argnames, local_dict, global_dict, _frame_depth=_frame_depth) - kwargs = _numexpr_last['kwargs'] + kwargs = _numexpr_last.l['kwargs'] with evaluate_lock: return compiled_ex(*args, **kwargs) diff --git a/numexpr/tests/test_numexpr.py b/numexpr/tests/test_numexpr.py index 62210b4..98ae459 100644 --- a/numexpr/tests/test_numexpr.py +++ b/numexpr/tests/test_numexpr.py @@ -36,6 +36,13 @@ from numexpr.utils import detect_number_of_cores import unittest +from unittest.mock import MagicMock + +try: + import pytest + pytest_available = True +except ImportError: + pytest_available = False TestCase = unittest.TestCase @@ -44,6 +51,15 @@ MAX_THREADS = 16 +if not pytest_available: + def identity(f): + return f + + pytest = MagicMock() + pytest.mark = MagicMock() + pytest.mark.thread_unsafe = identity + + class test_numexpr(TestCase): """Testing with 1 thread""" nthreads = 1 @@ -318,6 +334,7 @@ def test_refcount(self): evaluate('1') assert sys.getrefcount(a) == 2 + @pytest.mark.thread_unsafe def test_locals_clears_globals(self): # Check for issue #313, whereby clearing f_locals also clear f_globals # if in the top-frame. This cannot be done inside `unittest` as it is always @@ -341,6 +358,7 @@ def test_locals_clears_globals(self): +@pytest.mark.thread_unsafe class test_numexpr2(test_numexpr): """Testing with 2 threads""" nthreads = 2 @@ -512,6 +530,7 @@ def test_illegal_value(self): else: self.fail() + @pytest.mark.thread_unsafe def test_sanitize(self): with _environment('NUMEXPR_SANITIZE', '1'): # Forbid dunder @@ -590,7 +609,7 @@ def test_sanitize(self): x = np.array(['a', 'b'], dtype=bytes) evaluate("x == 'b:'") - + @pytest.mark.thread_unsafe def test_no_sanitize(self): try: # Errors on compile() after eval() evaluate('import os;', sanitize=False) @@ -677,6 +696,7 @@ def test_ex_uses_vml(self): if 'sparc' not in platform.machine(): # Execution order set here so as to not use too many threads # during the rest of the execution. See #33 for details. + @pytest.mark.thread_unsafe def test_changing_nthreads_00_inc(self): a = linspace(-1, 1, 1000000) b = ((.25 * a + .75) * a - 1.5) * a - 2 @@ -685,6 +705,7 @@ def test_changing_nthreads_00_inc(self): c = evaluate("((.25*a + .75)*a - 1.5)*a - 2") assert_array_almost_equal(b, c) + @pytest.mark.thread_unsafe def test_changing_nthreads_01_dec(self): a = linspace(-1, 1, 1000000) b = ((.25 * a + .75) * a - 1.5) * a - 2 @@ -1123,6 +1144,7 @@ def _environment(key, value): del os.environ[key] # Test cases for the threading configuration +@pytest.mark.thread_unsafe class test_threading_config(TestCase): def test_max_threads_unset(self): # Has to be done in a subprocess as `importlib.reload` doesn't let us @@ -1306,6 +1328,7 @@ def _worker(qout=None): # Case test for subprocesses (via multiprocessing module) class test_subprocess(TestCase): + @pytest.mark.thread_unsafe def test_multiprocess(self): try: import multiprocessing as mp