|
| 1 | += Custom Python processors |
| 2 | + |
| 3 | +In NiFi 2.0, support for custom processors written in Python was added. |
| 4 | +The Stackable images already contain the required tooling, such as - obviously - a supported Python version. |
| 5 | + |
| 6 | +== General configuration |
| 7 | + |
| 8 | +[source,yaml] |
| 9 | +---- |
| 10 | +spec: |
| 11 | + nodes: |
| 12 | + configOverrides: |
| 13 | + nifi.properties: |
| 14 | + # The command used to launch Python. |
| 15 | + # This property must be set to enable Python-based processors. |
| 16 | + nifi.python.command: python3 |
| 17 | + # The directory that NiFi should look in to find custom Python-based |
| 18 | + # Processors. |
| 19 | + nifi.python.extensions.source.directory.custom: /nifi-python-extensions |
| 20 | + # The directory that contains the Python framework for communicating |
| 21 | + # between the Python and Java processes. |
| 22 | + nifi.python.framework.source.directory: /stackable/nifi/python/framework/ |
| 23 | + # The working directory where NiFi should store artifacts |
| 24 | + # This property defaults to ./work/python but if you want to mount an |
| 25 | + # emptyDir for the working directory then another directory has to be |
| 26 | + # set to avoid ownership conflicts with ./work/nar. |
| 27 | + nifi.python.working.directory: /nifi-python-working-directory |
| 28 | +---- |
| 29 | + |
| 30 | +== Getting Python scripts into NiFi |
| 31 | + |
| 32 | +TIP: NiFi should hot-reload the Python scripts. You might need to refresh your browser window to see the new processor. |
| 33 | + |
| 34 | +[#configmap] |
| 35 | +=== 1. Mount as ConfigMap |
| 36 | + |
| 37 | +The easiest way is defining a ConfigMap and mounting it as follows. |
| 38 | +This way, the Python processors are stored and versioned alongside your NifiCluster itself. |
| 39 | + |
| 40 | +// Technically it's yaml, but the most content is Python |
| 41 | +[source,python] |
| 42 | +---- |
| 43 | +apiVersion: v1 |
| 44 | +kind: ConfigMap |
| 45 | +metadata: |
| 46 | + name: nifi-python-extensions |
| 47 | +data: |
| 48 | + CreateFlowFileProcessor.py: | |
| 49 | + from nifiapi.flowfilesource import FlowFileSource, FlowFileSourceResult |
| 50 | +
|
| 51 | + class CreateFlowFile(FlowFileSource): |
| 52 | + class Java: |
| 53 | + implements = ['org.apache.nifi.python.processor.FlowFileSource'] |
| 54 | +
|
| 55 | + class ProcessorDetails: |
| 56 | + version = '0.0.1-SNAPSHOT' |
| 57 | + description = '''A Python processor that creates FlowFiles.''' |
| 58 | +
|
| 59 | + def __init__(self, **kwargs): |
| 60 | + pass |
| 61 | +
|
| 62 | + def create(self, context): |
| 63 | + return FlowFileSourceResult( |
| 64 | + relationship = 'success', |
| 65 | + attributes = {'greeting': 'hello'}, |
| 66 | + contents = 'Hello World!' |
| 67 | + ) |
| 68 | +---- |
| 69 | + |
| 70 | +The Python script is taken from https://nifi.apache.org/nifi-docs/python-developer-guide.html#flowfile-source[the offical NiFi Python developer guide]. |
| 71 | + |
| 72 | +You can add multiple Python scripts in the ConfigMap. |
| 73 | +Afterwards we need to mount the Python scripts into `/nifi-python-extensions`: |
| 74 | + |
| 75 | +[source,yaml] |
| 76 | +---- |
| 77 | +spec: |
| 78 | + nodes: |
| 79 | + podOverrides: |
| 80 | + spec: |
| 81 | + containers: |
| 82 | + - name: nifi |
| 83 | + volumeMounts: |
| 84 | + - name: nifi-python-extensions |
| 85 | + mountPath: /nifi-python-extensions |
| 86 | + - name: nifi-python-working-directory |
| 87 | + mountPath: /nifi-python-working-directory |
| 88 | + volumes: |
| 89 | + - name: nifi-python-extensions |
| 90 | + configMap: |
| 91 | + name: nifi-python-extensions |
| 92 | + - name: nifi-python-working-directory |
| 93 | + emptyDir: {} |
| 94 | +---- |
| 95 | + |
| 96 | +[#git-sync] |
| 97 | +=== 2. Use git-sync |
| 98 | + |
| 99 | +As an alternative you can use `git-sync` to keep your Python processors up to date. |
| 100 | +You need to add a sidecar using podOverrides that syncs into a shared volume between the `nifi` and `git-sync` container. |
| 101 | + |
| 102 | +The following snippet can serve as a starting point (the Git repo has the folder `processors` with the Python scripts inside). |
| 103 | + |
| 104 | +[source,yaml] |
| 105 | +---- |
| 106 | +spec: |
| 107 | + nodes: |
| 108 | + podOverrides: |
| 109 | + spec: |
| 110 | + containers: |
| 111 | + - name: nifi |
| 112 | + volumeMounts: |
| 113 | + - name: nifi-python-extensions |
| 114 | + mountPath: /nifi-python-extensions |
| 115 | + - name: nifi-python-working-directory |
| 116 | + mountPath: /nifi-python-working-directory |
| 117 | + - name: git-sync |
| 118 | + image: registry.k8s.io/git-sync/git-sync:v4.2.3 |
| 119 | + args: |
| 120 | + - --repo=https://github.com/stackabletech/nifi-talk |
| 121 | + - --root=/nifi-python-extensions |
| 122 | + - --period=10s |
| 123 | + volumeMounts: |
| 124 | + - name: nifi-python-extensions |
| 125 | + mountPath: /nifi-python-extensions |
| 126 | + volumes: |
| 127 | + - name: nifi-python-extensions |
| 128 | + emptyDir: {} |
| 129 | + - name: nifi-python-working-directory |
| 130 | + emptyDir: {} |
| 131 | +---- |
| 132 | + |
| 133 | +Afterwards you need to update your source directory (the one you added previously) accordingly, to point into the Git subfolder you have. |
| 134 | + |
| 135 | +[source,yaml] |
| 136 | +---- |
| 137 | +spec: |
| 138 | + nodes: |
| 139 | + configOverrides: |
| 140 | + nifi.properties: |
| 141 | + # Replace the property from the previous step |
| 142 | + # Format is /nifi-python-extensions/<git-repo-name>/<git-folder>/ |
| 143 | + nifi.python.extensions.source.directory.custom: > |
| 144 | + /nifi-python-extensions/nifi-talk/processors/ |
| 145 | +---- |
| 146 | + |
| 147 | +=== 3. Use PersistentVolume |
| 148 | + |
| 149 | +You can also mount a PVC below `/nifi-python-extensions` using podOverrides and shell into the NiFi Pod to make changes. |
| 150 | +However, the <<configmap>> or <<git-sync>> approach is recommended. |
| 151 | + |
| 152 | +== Check processors have been loaded |
| 153 | + |
| 154 | +NiFi logs every Python processor it found. |
| 155 | +You can use that to check if the processors have been loaded. |
| 156 | + |
| 157 | +[source,console] |
| 158 | +---- |
| 159 | +$ kubectl logs nifi-2-0-0-node-default-0 -c nifi \ |
| 160 | + | grep 'Discovered.*Python Processor' |
| 161 | +… INFO [main] … Discovered Python Processor PythonZgrepProcessor |
| 162 | +… INFO [main] … Discovered Python Processor TransformOpenskyStates |
| 163 | +… INFO [main] … Discovered Python Processor UpdateAttributeFileLookup |
| 164 | +… INFO [main] … Discovered or updated 3 Python Processors in 64 millis |
| 165 | +---- |
0 commit comments