diff --git a/sdks/python/apache_beam/ml/anomaly/base.py b/sdks/python/apache_beam/ml/anomaly/base.py
index e3c6252474b..8251245f1ca 100644
--- a/sdks/python/apache_beam/ml/anomaly/base.py
+++ b/sdks/python/apache_beam/ml/anomaly/base.py
@@ -154,7 +154,7 @@ def __init__(
       threshold_criterion: Optional[ThresholdFn] = None,
       **kwargs):
     self._model_id = model_id if model_id is not None else getattr(
-        self, 'spec_type', 'unknown')
+        self, 'spec_type', lambda: "unknown")()
     self._features = features
     self._target = target
     self._threshold_criterion = threshold_criterion
@@ -200,7 +200,7 @@ def __init__(
       aggregation_strategy: Optional[AggregationFn] = None,
       **kwargs):
     if "model_id" not in kwargs or kwargs["model_id"] is None:
-      kwargs["model_id"] = getattr(self, 'spec_type', 'custom')
+      kwargs["model_id"] = getattr(self, 'spec_type', lambda: 'custom')()
 
     super().__init__(**kwargs)
 
diff --git a/sdks/python/apache_beam/ml/anomaly/detectors/offline.py b/sdks/python/apache_beam/ml/anomaly/detectors/offline.py
new file mode 100644
index 00000000000..0ecaf400b1e
--- /dev/null
+++ b/sdks/python/apache_beam/ml/anomaly/detectors/offline.py
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from typing import Any
+from typing import Dict
+from typing import Optional
+
+import apache_beam as beam
+from apache_beam.ml.anomaly.base import AnomalyDetector
+from apache_beam.ml.anomaly.specifiable import specifiable
+from apache_beam.ml.inference.base import KeyedModelHandler
+
+
+@specifiable
+class OfflineDetector(AnomalyDetector):
+  """A offline anomaly detector that uses a provided model handler for scoring.
+
+  Args:
+    keyed_model_handler: The model handler to use for inference.
+      Requires a `KeyModelHandler[Any, Row, float, Any]` instance.
+    run_inference_args: Optional arguments to pass to RunInference
+    **kwargs: Additional keyword arguments to pass to the base
+      AnomalyDetector class.
+  """
+  def __init__(
+      self,
+      keyed_model_handler: KeyedModelHandler[Any, beam.Row, float, Any],
+      run_inference_args: Optional[Dict[str, Any]] = None,
+      **kwargs):
+    super().__init__(**kwargs)
+
+    # TODO: validate the model handler type
+    self._keyed_model_handler = keyed_model_handler
+    self._run_inference_args = run_inference_args or {}
+
+    # always override model_identifier with model_id from the detector
+    self._run_inference_args["model_identifier"] = self._model_id
+
+  def learn_one(self, x: beam.Row) -> None:
+    """Not implemented since CustomDetector invokes RunInference directly."""
+    raise NotImplementedError
+
+  def score_one(self, x: beam.Row) -> Optional[float]:
+    """Not implemented since CustomDetector invokes RunInference directly."""
+    raise NotImplementedError
diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable.py b/sdks/python/apache_beam/ml/anomaly/specifiable.py
index 2eeb1d0de76..6e06d2baccf 100644
--- a/sdks/python/apache_beam/ml/anomaly/specifiable.py
+++ b/sdks/python/apache_beam/ml/anomaly/specifiable.py
@@ -27,7 +27,7 @@
 import logging
 import os
 from typing import Any
-from typing import ClassVar
+from typing import Callable
 from typing import Dict
 from typing import List
 from typing import Optional
@@ -35,6 +35,7 @@
 from typing import Type
 from typing import TypeVar
 from typing import Union
+from typing import overload
 from typing import runtime_checkable
 
 from typing_extensions import Self
@@ -59,7 +60,7 @@
 #: `spec_type` when applying the `specifiable` decorator to an existing class.
 _KNOWN_SPECIFIABLE = collections.defaultdict(dict)
 
-SpecT = TypeVar('SpecT', bound='Specifiable')
+T = TypeVar('T', bound=type)
 
 
 def _class_to_subspace(cls: Type) -> str:
@@ -104,33 +105,47 @@ class Spec():
   config: Optional[Dict[str, Any]] = dataclasses.field(default_factory=dict)
 
 
-@runtime_checkable
-class Specifiable(Protocol):
-  """Protocol that a specifiable class needs to implement."""
-  #: The value of the `type` field in the object's spec for this class.
-  spec_type: ClassVar[str]
-  #: The raw keyword arguments passed to `__init__` method during object
-  #: initialization.
-  init_kwargs: dict[str, Any]
+def _from_spec_helper(v, _run_init):
+  if isinstance(v, Spec):
+    return Specifiable.from_spec(v, _run_init)
+
+  if isinstance(v, List):
+    return [_from_spec_helper(e, _run_init) for e in v]
+
+  return v
+
+
+def _to_spec_helper(v):
+  if isinstance(v, Specifiable):
+    return v.to_spec()
+
+  if isinstance(v, List):
+    return [_to_spec_helper(e) for e in v]
+
+  if inspect.isfunction(v):
+    if not hasattr(v, "spec_type"):
+      _register(v, inject_spec_type=False)
+    return Spec(type=_get_default_spec_type(v), config=None)
 
-  # a boolean to tell whether the original `__init__` method is called
-  _initialized: bool
-  # a boolean used by new_getattr to tell whether it is in the `__init__` method
-  # call
-  _in_init: bool
+  if inspect.isclass(v):
+    if not hasattr(v, "spec_type"):
+      _register(v, inject_spec_type=False)
+    return Spec(type=_get_default_spec_type(v), config=None)
 
-  @staticmethod
-  def _from_spec_helper(v, _run_init):
-    if isinstance(v, Spec):
-      return Specifiable.from_spec(v, _run_init)
+  return v
 
-    if isinstance(v, List):
-      return [Specifiable._from_spec_helper(e, _run_init) for e in v]
 
-    return v
+@runtime_checkable
+class Specifiable(Protocol):
+  """Protocol that a specifiable class needs to implement."""
+  @classmethod
+  def spec_type(cls) -> str:
+    ...
 
   @classmethod
-  def from_spec(cls, spec: Spec, _run_init: bool = True) -> Union[Self, type]:
+  def from_spec(cls,
+                spec: Spec,
+                _run_init: bool = True) -> Union[Self, type[Self]]:
     """Generate a `Specifiable` subclass object based on a spec.
 
     Args:
@@ -155,35 +170,14 @@ def from_spec(cls, spec: Spec, _run_init: bool = True) -> Union[Self, type]:
       return subclass
 
     kwargs = {
-        k: Specifiable._from_spec_helper(v, _run_init)
-        for k,
-        v in spec.config.items()
+        k: _from_spec_helper(v, _run_init)
+        for k, v in spec.config.items()
     }
 
     if _run_init:
       kwargs["_run_init"] = True
     return subclass(**kwargs)
 
-  @staticmethod
-  def _to_spec_helper(v):
-    if isinstance(v, Specifiable):
-      return v.to_spec()
-
-    if isinstance(v, List):
-      return [Specifiable._to_spec_helper(e) for e in v]
-
-    if inspect.isfunction(v):
-      if not hasattr(v, "spec_type"):
-        _register(v, inject_spec_type=False)
-      return Spec(type=_get_default_spec_type(v), config=None)
-
-    if inspect.isclass(v):
-      if not hasattr(v, "spec_type"):
-        _register(v, inject_spec_type=False)
-      return Spec(type=_get_default_spec_type(v), config=None)
-
-    return v
-
   def to_spec(self) -> Spec:
     """Generate a spec from a `Specifiable` subclass object.
 
@@ -195,14 +189,19 @@ def to_spec(self) -> Spec:
           f"'{type(self).__name__}' not registered as Specifiable. "
           f"Decorate ({type(self).__name__}) with @specifiable")
 
-    args = {k: self._to_spec_helper(v) for k, v in self.init_kwargs.items()}
+    args = {k: _to_spec_helper(v) for k, v in self.init_kwargs.items()}
 
-    return Spec(type=self.__class__.spec_type, config=args)
+    return Spec(type=self.spec_type(), config=args)
 
   def run_original_init(self) -> None:
     """Invoke the original __init__ method with original keyword arguments"""
     pass
 
+  @classmethod
+  def unspecifiable(cls) -> None:
+    """Resume the class structure prior to specifiable"""
+    pass
+
 
 def _get_default_spec_type(cls):
   spec_type = cls.__name__
@@ -216,7 +215,7 @@ def _get_default_spec_type(cls):
 
 
 # Register a `Specifiable` subclass in `KNOWN_SPECIFIABLE`
-def _register(cls, spec_type=None, inject_spec_type=True) -> None:
+def _register(cls: type, spec_type=None, inject_spec_type=True) -> None:
   assert spec_type is None or inject_spec_type, \
       "need to inject spec_type to class if spec_type is not None"
   if spec_type is None:
@@ -237,7 +236,8 @@ def _register(cls, spec_type=None, inject_spec_type=True) -> None:
     _KNOWN_SPECIFIABLE[subspace][spec_type] = cls
 
   if inject_spec_type:
-    cls.spec_type = spec_type
+    setattr(cls, cls.__name__ + '__spec_type', spec_type)
+    # cls.__spec_type = spec_type
 
 
 # Keep a copy of arguments that are used to call the `__init__` method when the
@@ -250,13 +250,35 @@ def _get_init_kwargs(inst, init_method, *args, **kwargs):
   return params
 
 
+@overload
 def specifiable(
-    my_cls=None,
+    my_cls: None = None,
     /,
     *,
-    spec_type=None,
-    on_demand_init=True,
-    just_in_time_init=True):
+    spec_type: Optional[str] = None,
+    on_demand_init: bool = True,
+    just_in_time_init: bool = True) -> Callable[[T], T]:
+  ...
+
+
+@overload
+def specifiable(
+    my_cls: T,
+    /,
+    *,
+    spec_type: Optional[str] = None,
+    on_demand_init: bool = True,
+    just_in_time_init: bool = True) -> T:
+  ...
+
+
+def specifiable(
+    my_cls: Optional[T] = None,
+    /,
+    *,
+    spec_type: Optional[str] = None,
+    on_demand_init: bool = True,
+    just_in_time_init: bool = True) -> Union[T, Callable[[T], T]]:
   """A decorator that turns a class into a `Specifiable` subclass by
   implementing the `Specifiable` protocol.
 
@@ -285,8 +307,8 @@ class Bar():
       original `__init__` method will be called when the first time an attribute
       is accessed.
   """
-  def _wrapper(cls):
-    def new_init(self: Specifiable, *args, **kwargs):
+  def _wrapper(cls: T) -> T:
+    def new_init(self, *args, **kwargs):
       self._initialized = False
       self._in_init = False
 
@@ -358,20 +380,40 @@ def new_getattr(self, name):
           name)
       return self.__getattribute__(name)
 
+    def spec_type_func(cls):
+      return getattr(cls, spec_type_attr_name)
+
+    def unspecifiable(cls):
+      delattr(cls, spec_type_attr_name)
+      cls.__init__ = original_init
+      if just_in_time_init:
+        delattr(cls, '__getattr__')
+      delattr(cls, 'spec_type')
+      delattr(cls, 'run_original_init')
+      delattr(cls, 'to_spec')
+      delattr(cls, 'from_spec')
+      delattr(cls, 'unspecifiable')
+
+    spec_type_attr_name = cls.__name__ + "__spec_type"
+
+    # the class is registered
+    if hasattr(cls, spec_type_attr_name):
+      return cls
+
     # start of the function body of _wrapper
     _register(cls, spec_type)
 
     class_name = cls.__name__
-    original_init = cls.__init__
-    cls.__init__ = new_init
+    original_init = cls.__init__  # type: ignore[misc]
+    cls.__init__ = new_init  # type: ignore[misc]
     if just_in_time_init:
       cls.__getattr__ = new_getattr
 
+    cls.spec_type = classmethod(spec_type_func)
     cls.run_original_init = run_original_init
     cls.to_spec = Specifiable.to_spec
-    cls._to_spec_helper = staticmethod(Specifiable._to_spec_helper)
     cls.from_spec = Specifiable.from_spec
-    cls._from_spec_helper = staticmethod(Specifiable._from_spec_helper)
+    cls.unspecifiable = classmethod(unspecifiable)
 
     return cls
     # end of the function body of _wrapper
diff --git a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py
index 4c1a7bdaf32..4492cbbe410 100644
--- a/sdks/python/apache_beam/ml/anomaly/specifiable_test.py
+++ b/sdks/python/apache_beam/ml/anomaly/specifiable_test.py
@@ -52,7 +52,7 @@ class B():
 
     # apply the decorator function to an existing class
     A = specifiable(A)
-    self.assertEqual(A.spec_type, "A")
+    self.assertEqual(A.spec_type(), "A")
     self.assertTrue(isinstance(A(), Specifiable))
     self.assertIn("A", _KNOWN_SPECIFIABLE[_FALLBACK_SUBSPACE])
     self.assertEqual(_KNOWN_SPECIFIABLE[_FALLBACK_SUBSPACE]["A"], A)
@@ -63,13 +63,10 @@ class B():
     # Raise an error when re-registering spec_type with a different class
     self.assertRaises(ValueError, specifiable(spec_type='A'), B)
 
-    # apply the decorator function to an existing class with a different
-    # spec_type
+    # Applying the decorator function to an existing class with a different
+    # spec_type will have no effect.
     A = specifiable(spec_type="A_DUP")(A)
-    self.assertEqual(A.spec_type, "A_DUP")
-    self.assertTrue(isinstance(A(), Specifiable))
-    self.assertIn("A_DUP", _KNOWN_SPECIFIABLE[_FALLBACK_SUBSPACE])
-    self.assertEqual(_KNOWN_SPECIFIABLE[_FALLBACK_SUBSPACE]["A_DUP"], A)
+    self.assertEqual(A.spec_type(), "A")
 
   def test_decorator_in_syntactic_sugar_form(self):
     # call decorator without parameters
@@ -585,6 +582,49 @@ def apply(self, x, y):
     self.assertEqual(w_2.run_func_in_class(5, 3), 150)
 
 
+class TestUncommonUsages(unittest.TestCase):
+  def test_double_specifiable(self):
+    @specifiable
+    @specifiable
+    class ZZ():
+      def __init__(self, a):
+        self.a = a
+
+    assert issubclass(ZZ, Specifiable)
+    c = ZZ("b")
+    c.run_original_init()
+    self.assertEqual(c.a, "b")
+
+  def test_unspecifiable(self):
+    class YY():
+      def __init__(self, x):
+        self.x = x
+        assert False
+
+    YY = specifiable(YY)
+    assert issubclass(YY, Specifiable)
+    y = YY(1)
+    # __init__ is called (with assertion error raised) when attribute is first
+    # accessed
+    self.assertRaises(AssertionError, lambda: y.x)
+
+    # unspecifiable YY
+    YY.unspecifiable()
+    # __init__ is called immediately
+    self.assertRaises(AssertionError, YY, 1)
+    self.assertFalse(hasattr(YY, 'run_original_init'))
+    self.assertFalse(hasattr(YY, 'spec_type'))
+    self.assertFalse(hasattr(YY, 'to_spec'))
+    self.assertFalse(hasattr(YY, 'from_spec'))
+    self.assertFalse(hasattr(YY, 'unspecifiable'))
+
+    # make YY specifiable again
+    YY = specifiable(YY)
+    assert issubclass(YY, Specifiable)
+    y = YY(1)
+    self.assertRaises(AssertionError, lambda: y.x)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
diff --git a/sdks/python/apache_beam/ml/anomaly/transforms.py b/sdks/python/apache_beam/ml/anomaly/transforms.py
index 08b656072ac..4830d938b53 100644
--- a/sdks/python/apache_beam/ml/anomaly/transforms.py
+++ b/sdks/python/apache_beam/ml/anomaly/transforms.py
@@ -17,9 +17,11 @@
 
 import dataclasses
 import uuid
+from typing import Any
 from typing import Callable
 from typing import Dict
 from typing import Iterable
+from typing import List
 from typing import Optional
 from typing import Tuple
 from typing import TypeVar
@@ -33,8 +35,10 @@
 from apache_beam.ml.anomaly.base import AnomalyResult
 from apache_beam.ml.anomaly.base import EnsembleAnomalyDetector
 from apache_beam.ml.anomaly.base import ThresholdFn
+from apache_beam.ml.anomaly.detectors.offline import OfflineDetector
 from apache_beam.ml.anomaly.specifiable import Spec
 from apache_beam.ml.anomaly.specifiable import Specifiable
+from apache_beam.ml.inference.base import RunInference
 from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
 
 KeyT = TypeVar('KeyT')
@@ -97,9 +101,11 @@ def process(
     yield k1, (k2,
                AnomalyResult(
                    example=data,
-                   predictions=[AnomalyPrediction(
-                       model_id=self._underlying._model_id,
-                       score=self.score_and_learn(data))]))
+                   predictions=[
+                       AnomalyPrediction(
+                           model_id=self._underlying._model_id,
+                           score=self.score_and_learn(data))
+                   ]))
 
     model_state.write(self._underlying)
 
@@ -325,7 +331,8 @@ def expand(
     if self._aggregation_fn is None:
       # simply put predictions into an iterable (list)
       ret = (
-          post_gbk | beam.MapTuple(
+          post_gbk
+          | beam.MapTuple(
               lambda k,
               v: (
                   k[0],
@@ -353,7 +360,8 @@ def expand(
     # We use (original_key, temp_key) as the key for GroupByKey() so that
     # scores from multiple detectors per data point are grouped.
     ret = (
-        post_gbk | beam.MapTuple(
+        post_gbk
+        | beam.MapTuple(
             lambda k,
             v,
             agg=aggregation_fn: (
@@ -406,6 +414,76 @@ def expand(
     return ret
 
 
+class RunOfflineDetector(beam.PTransform[beam.PCollection[KeyedInputT],
+                                         beam.PCollection[KeyedOutputT]]):
+  """Runs a offline anomaly detector on a PCollection of data.
+
+  This PTransform applies a `OfflineDetector` to the input data, handling
+  custom input/output conversion and inference.
+
+  Args:
+    custom_detector: The `OfflineDetector` to run.
+  """
+  def __init__(self, offline_detector: OfflineDetector):
+    self._offline_detector = offline_detector
+
+  def unnest_and_convert(
+      self, nested: Tuple[Tuple[Any, Any], dict[str, List]]) -> KeyedOutputT:
+    """Unnests and converts the model output to AnomalyResult.
+
+    Args:
+      nested: A tuple containing the combined key (origin key, temp key) and
+        a dictionary of input and output from RunInference.
+
+    Returns:
+      A tuple containing the original key and AnomalyResult.
+    """
+    key, value_dict = nested
+    score = value_dict['output'][0]
+    result = AnomalyResult(
+        example=value_dict['input'][0],
+        predictions=[
+            AnomalyPrediction(
+                model_id=self._offline_detector._model_id, score=score)
+        ])
+    return key[0], (key[1], result)
+
+  def expand(
+      self,
+      input: beam.PCollection[KeyedInputT]) -> beam.PCollection[KeyedOutputT]:
+    model_uuid = f"{self._offline_detector._model_id}:{uuid.uuid4().hex[:6]}"
+
+    # Call RunInference Transform with the keyed model handler
+    run_inference = RunInference(
+        self._offline_detector._keyed_model_handler,
+        **self._offline_detector._run_inference_args)
+
+    # ((orig_key, temp_key), beam.Row)
+    rekeyed_model_input = input | "Rekey" >> beam.Map(
+        lambda x: ((x[0], x[1][0]), x[1][1]))
+
+    # ((orig_key, temp_key), float)
+    rekeyed_model_output = (
+        rekeyed_model_input
+        | f"Call RunInference ({model_uuid})" >> run_inference)
+
+    # ((orig_key, temp_key), {'input':[row], 'output:[float]})
+    rekeyed_cogbk = {
+        'input': rekeyed_model_input, 'output': rekeyed_model_output
+    } | beam.CoGroupByKey()
+
+    ret = (
+        rekeyed_cogbk |
+        "Unnest and convert model output" >> beam.Map(self.unnest_and_convert))
+
+    if self._offline_detector._threshold_criterion:
+      ret = (
+          ret | f"Run Threshold Criterion ({model_uuid})" >>
+          RunThresholdCriterion(self._offline_detector._threshold_criterion))
+
+    return ret
+
+
 class RunEnsembleDetector(beam.PTransform[beam.PCollection[KeyedInputT],
                                           beam.PCollection[KeyedOutputT]]):
   """Runs an ensemble of anomaly detectors on a PCollection of data.
@@ -432,8 +510,14 @@ def expand(
     for idx, detector in enumerate(self._ensemble_detector._sub_detectors):
       if isinstance(detector, EnsembleAnomalyDetector):
         results.append(
-            input | f"Run Ensemble Detector at index {idx} ({model_uuid})" >>
+            input
+            | f"Run Ensemble Detector at index {idx} ({model_uuid})" >>
             RunEnsembleDetector(detector))
+      elif isinstance(detector, OfflineDetector):
+        results.append(
+            input
+            | f"Run Offline Detector at index {idx} ({model_uuid})" >>
+            RunOfflineDetector(detector))
       else:
         results.append(
             input
@@ -518,6 +602,8 @@ def expand(
 
     if isinstance(self._root_detector, EnsembleAnomalyDetector):
       keyed_output = (keyed_input | RunEnsembleDetector(self._root_detector))
+    elif isinstance(self._root_detector, OfflineDetector):
+      keyed_output = (keyed_input | RunOfflineDetector(self._root_detector))
     else:
       keyed_output = (keyed_input | RunOneDetector(self._root_detector))
 
diff --git a/sdks/python/apache_beam/ml/anomaly/transforms_test.py b/sdks/python/apache_beam/ml/anomaly/transforms_test.py
index cf398728f37..b8ed7c7e5e1 100644
--- a/sdks/python/apache_beam/ml/anomaly/transforms_test.py
+++ b/sdks/python/apache_beam/ml/anomaly/transforms_test.py
@@ -17,20 +17,45 @@
 
 import logging
 import math
+import os
+import pickle
+import shutil
+import tempfile
 import unittest
+from typing import Any
+from typing import Dict
 from typing import Iterable
+from typing import Optional
+from typing import Sequence
+from typing import SupportsFloat
+from typing import Tuple
+
+import mock
+import numpy
+from sklearn.base import BaseEstimator
 
 import apache_beam as beam
 from apache_beam.ml.anomaly.aggregations import AnyVote
 from apache_beam.ml.anomaly.base import AnomalyPrediction
 from apache_beam.ml.anomaly.base import AnomalyResult
 from apache_beam.ml.anomaly.base import EnsembleAnomalyDetector
+from apache_beam.ml.anomaly.detectors.offline import OfflineDetector
 from apache_beam.ml.anomaly.detectors.zscore import ZScore
+from apache_beam.ml.anomaly.specifiable import Spec
+from apache_beam.ml.anomaly.specifiable import Specifiable
+from apache_beam.ml.anomaly.specifiable import _spec_type_to_subspace
+from apache_beam.ml.anomaly.specifiable import specifiable
 from apache_beam.ml.anomaly.thresholds import FixedThreshold
 from apache_beam.ml.anomaly.thresholds import QuantileThreshold
 from apache_beam.ml.anomaly.transforms import AnomalyDetection
 from apache_beam.ml.anomaly.transforms import _StatefulThresholdDoFn
 from apache_beam.ml.anomaly.transforms import _StatelessThresholdDoFn
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.base import _PostProcessingModelHandler
+from apache_beam.ml.inference.base import _PreProcessingModelHandler
+from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
@@ -253,6 +278,178 @@ def test_multiple_sub_detectors_with_aggregation(self):
                     prediction in zip(self._input, aggregated)]))
 
 
+class FakeNumpyModel():
+  def __init__(self):
+    self.total_predict_calls = 0
+
+  def predict(self, input_vector: numpy.ndarray):
+    self.total_predict_calls += 1
+    return [input_vector[0][0] * 10 - input_vector[0][1]]
+
+
+def alternate_numpy_inference_fn(
+    model: BaseEstimator,
+    batch: Sequence[numpy.ndarray],
+    inference_args: Optional[Dict[str, Any]] = None) -> Any:
+  return [0]
+
+
+def _to_keyed_numpy_array(t: Tuple[Any, beam.Row]):
+  """Converts an Apache Beam Row to a NumPy array."""
+  return t[0], numpy.array(list(t[1]))
+
+
+def _from_keyed_numpy_array(t: Tuple[Any, PredictionResult]):
+  assert isinstance(t[1].inference, SupportsFloat)
+  return t[0], float(t[1].inference)
+
+
+class TestOfflineDetector(unittest.TestCase):
+  def setUp(self):
+    global SklearnModelHandlerNumpy, KeyedModelHandler
+    global _PreProcessingModelHandler, _PostProcessingModelHandler
+    # Make model handlers into Specifiable
+    SklearnModelHandlerNumpy = specifiable(SklearnModelHandlerNumpy)
+    KeyedModelHandler = specifiable(KeyedModelHandler)
+    _PreProcessingModelHandler = specifiable(_PreProcessingModelHandler)
+    _PostProcessingModelHandler = specifiable(_PostProcessingModelHandler)
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    shutil.rmtree(self.tmpdir)
+    # Make the model handlers back to normal
+    SklearnModelHandlerNumpy.unspecifiable()
+    KeyedModelHandler.unspecifiable()
+    _PreProcessingModelHandler.unspecifiable()
+    _PostProcessingModelHandler.unspecifiable()
+
+  def test_default_inference_fn(self):
+    temp_file_name = self.tmpdir + os.sep + 'pickled_file'
+    with open(temp_file_name, 'wb') as file:
+      pickle.dump(FakeNumpyModel(), file)
+
+    keyed_model_handler = KeyedModelHandler(
+        SklearnModelHandlerNumpy(model_uri=temp_file_name)).with_preprocess_fn(
+            _to_keyed_numpy_array).with_postprocess_fn(_from_keyed_numpy_array)
+
+    detector = OfflineDetector(keyed_model_handler=keyed_model_handler)
+    detector_spec = detector.to_spec()
+    expected_spec = Spec(
+        type='OfflineDetector',
+        config={
+            'keyed_model_handler': Spec(
+                type='_PostProcessingModelHandler',
+                config={
+                    'base': Spec(
+                        type='_PreProcessingModelHandler',
+                        config={
+                            'base': Spec(
+                                type='KeyedModelHandler',
+                                config={
+                                    'unkeyed': Spec(
+                                        type='SklearnModelHandlerNumpy',
+                                        config={'model_uri': temp_file_name})
+                                }),
+                            'preprocess_fn': Spec(
+                                type='_to_keyed_numpy_array', config=None)
+                        }),
+                    'postprocess_fn': Spec(
+                        type='_from_keyed_numpy_array', config=None)
+                })
+        })
+    self.assertEqual(detector_spec, expected_spec)
+
+    self.assertEqual(_spec_type_to_subspace('SklearnModelHandlerNumpy'), '*')
+    self.assertEqual(_spec_type_to_subspace('_PreProcessingModelHandler'), '*')
+    self.assertEqual(_spec_type_to_subspace('_PostProcessingModelHandler'), '*')
+    self.assertEqual(_spec_type_to_subspace('_to_keyed_numpy_array'), '*')
+    self.assertEqual(_spec_type_to_subspace('_from_keyed_numpy_array'), '*')
+
+    # Make sure the spec from the detector can be used to reconstruct the same
+    # detector
+    detector_new = Specifiable.from_spec(detector_spec)
+
+    input = [
+        (1, beam.Row(x=1, y=2)),
+        (1, beam.Row(x=2, y=4)),
+        (1, beam.Row(x=3, y=6)),
+    ]
+    expected_predictions = [
+        AnomalyPrediction(
+            model_id='OfflineDetector',
+            score=8.0,
+            label=None,
+            threshold=None,
+            info='',
+            source_predictions=None),
+        AnomalyPrediction(
+            model_id='OfflineDetector',
+            score=16.0,
+            label=None,
+            threshold=None,
+            info='',
+            source_predictions=None),
+        AnomalyPrediction(
+            model_id='OfflineDetector',
+            score=24.0,
+            label=None,
+            threshold=None,
+            info='',
+            source_predictions=None),
+    ]
+    with TestPipeline() as p:
+      result = (
+          p | beam.Create(input)
+          # TODO: get rid of this conversion between BeamSchema to beam.Row.
+          | beam.Map(lambda t: (t[0], beam.Row(**t[1]._asdict())))
+          | AnomalyDetection(detector_new))
+
+      assert_that(
+          result,
+          equal_to([(
+              input[0],
+              AnomalyResult(example=input[1], predictions=[prediction]))
+                    for input,
+                    prediction in zip(input, expected_predictions)]))
+
+  def test_run_inference_args(self):
+    model_handler = SklearnModelHandlerNumpy(model_uri="unused")
+    detector = OfflineDetector(
+        keyed_model_handler=model_handler,
+        run_inference_args={"inference_args": {
+            "multiplier": 10
+        }})
+
+    p = TestPipeline()
+
+    input = [
+        (1, beam.Row(x=1, y=2)),
+        (1, beam.Row(x=2, y=4)),
+        (1, beam.Row(x=3, y=6)),
+    ]
+
+    # patch the RunInference in "apache_beam.ml.anomaly.transforms" where
+    # it is imported and call
+    with mock.patch('apache_beam.ml.anomaly.transforms.RunInference') as mock_run_inference:  # pylint: disable=line-too-long
+      # make the actual RunInference as the sideeffect, so we record the call
+      # information but also create the true RunInference instance.
+      mock_run_inference.side_effect = RunInference
+      try:
+        p = TestPipeline()
+        _ = (p | beam.Create(input) | AnomalyDetection(detector))
+      except:  # pylint: disable=bare-except
+        pass
+      call_args = mock_run_inference.call_args[1]
+      self.assertEqual(
+          call_args,
+          {
+              'inference_args': {
+                  'multiplier': 10
+              },
+              'model_identifier': 'OfflineDetector'
+          })
+
+
 R = beam.Row(x=10, y=20)