Streaming Jupyter Integrations project includes a set of magics for interactively running Flink SQL jobs in Jupyter Notebooks
In order to actually use these magics, you must install our PIP package along jupyterlab-lsp:
python3 -m pip install jupyterlab-lsp streaming-jupyter-integrationsRegister in Jupyter with a running IPython in the first cell:
%load_ext streaming_jupyter_integrations.magicsThen you need to decide which execution mode and execution target to choose.
%flink_connect --execution-mode [mode] --execution-target [target]By default, the streaming execution mode and local execution target are used.
%flink_connectCurrently, Flink supports two execution modes: batch and streaming. Please see Flink documentation for more details.
In order to specify execution mode, add --execution-mode parameter, for instance:
%flink_connect --execution-mode batchStreaming Jupyter Integrations supports 3 execution targets:
- Local
- Remote
- YARN Session
Running Flink in local mode will start a MiniCluster in a local JVM with parallelism 1.
In order to run Flink locally, use:
%flink_connect --execution-target localAlternatively, since the execution target is local by default, use:
%flink_connectOne can specify port of the local JobManager (8099 by default). This is useful especially if you run multiple Notebooks in a single JupyterLab.
%flink_connect --execution-target local --local-port 8123Running Flink in remote mode will connect to an existing Flink session cluster. Besides specifying --execution-target
to be remote, you also need to specify --remote-hostname and --remote-port pointing to Flink Job Manager's
REST API address.
%flink_connect \
--execution-target remote \
--remote-hostname example.com \
--remote-port 8888Running Flink in yarn-session mode will connect to an existing Flink session cluster running on YARN. You may specify
the hostname and port of the YARN Resource Manager (--resource-manager-hostname and --resource-manager-port).
If Resource Manager address is not provided, it is assumed that notebook runs on the same node as Resource Manager.
You can also specify YARN applicationId (--yarn-application-id) to which the notebook will connect to.
If --yarn-application-id is not specified and there is one YARN application running on the cluster, the notebook will
try to connect to it. Otherwise, it will fail.
Connecting to a remote Flink session cluster running on a remote YARN cluster:
%flink_connect \
--execution-target yarn-session \
--resource-manager-hostname example.com \
--resource-manager-port 8888 \
--yarn-application-id application_1666172784500_0001Connecting to a Flink session cluster running on a YARN cluster:
%flink_connect \
--execution-target yarn-session \
--yarn-application-id application_1666172784500_0001Connecting to a Flink session cluster running on a dedicated YARN cluster:
%flink_connect --execution-target yarn-sessionMagics allow for dynamic variable substitution in Flink SQL cells.
my_variable = 1SELECT * FROM some_table WHERE product_id = {my_variable}Moreover, you can mark sensitive variables like password so they will be read from environment variables or user input every time one runs the cell:
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users',
'username' = '${my_username}',
'password' = '${my_password}'
);The command allows to use Python DataStream API and Table API. There are two handles exposed for each API:
stream_env and table_env, respectively.
Table API example:
%%flink_execute
query = """
SELECT user_id, COUNT(*)
FROM orders
GROUP BY user_id
"""
execution_output = table_env.execute_sql(query)When Table API is used, the final result has to be assigned to execution_output variable.
DataStream API example:
%%flink_execute
from pyflink.common.typeinfo import Types
execution_output = stream_env.from_collection(
collection=[(1, 'aaa'), (2, 'bb'), (3, 'cccc')],
type_info=Types.ROW([Types.INT(), Types.STRING()])
)When DataStream API is used, the final result has to be assigned to execution_output variable. Please note that
the pipeline does not end with .execute(), the execution is triggered by the Jupyter magics under the hood.
Note: You will need NodeJS to build the extension package.
The jlpm command is JupyterLab's pinned version of
yarn that is installed with JupyterLab. You may use
yarn or npm in lieu of jlpm below. In order to use jlpm, you have to
have jupyterlab installed (e.g., by brew install jupyterlab, if you use
Homebrew as your package manager).
# Clone the repo to your local environment
# Change directory to the flink_sql_lsp_extension directory
# Install package in development mode
pip install -e .
# Link your development version of the extension with JupyterLab
jupyter labextension develop . --overwrite
# Rebuild extension Typescript source after making changes
jlpm buildThe project uses pre-commit hooks to ensure code quality, mostly by linting. To use it, install pre-commit and then run
pre-commit install --install-hooksFrom that moment, it will lint the files you have modified on every commit attempt.