Skip to content

Commit

Permalink
Release 1.9.0 (#35)
Browse files Browse the repository at this point in the history
* Allow for whitespaces in secret variables regex (#22)

* k8s operator - deployment

* refactor

* change api to rest

* fix tests

* update changelog

* fix pre-commit hooks

* add tests

* update pyyaml

* STREAM-381 fixed version of python-dateutil (#27)

* STREAM-381 fixed version of python-dateutil (required by flink 1.16.0)

* STREAM-384 (#29)

* jupyter convert cmd

* STREAM-386 deployment mode (#31)

* flink_execute (#33)

* flink_execute

* pre commit hooks - black

* fix imports in template

---------

Co-authored-by: Anna Urbala <[email protected]>

* don't add shell commands to the converted notebook (#34)

* don't add shell commands to the converted notebook

* changelog

---------

Co-authored-by: Anna Urbala <[email protected]>

* FIX #35  - Bump version and CHANGELOG for release 1.9.0

---------

Co-authored-by: Andrzej Swatowski <[email protected]>
Co-authored-by: Anna Urbala <[email protected]>
Co-authored-by: niladrem <[email protected]>
Co-authored-by: Maciej Maciejko <[email protected]>
Co-authored-by: Anna Naroska <[email protected]>
Co-authored-by: Maciej Maciejko <[email protected]>
Co-authored-by: Maciej Maciejko <[email protected]>
Co-authored-by: github-actions <[email protected]>
  • Loading branch information
9 people authored Mar 1, 2023
1 parent d1c2569 commit c15ef00
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 73 deletions.
16 changes: 13 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## [Unreleased]

## [1.9.0] - 2023-02-28

### Added

- Handle `%%flink_execute`

### Fixed

- Don't add shell commands to the converted notebooks

## [1.8.1] - 2023-02-02

### Fixed
Expand Down Expand Up @@ -85,11 +95,11 @@

- First release

[Unreleased]: https://github.com/getindata/streaming-cli/compare/1.8.1...HEAD
[Unreleased]: https://github.com/getindata/streaming-cli/compare/1.9.0...HEAD

[1.8.1]: https://github.com/getindata/streaming-cli/compare/1.8.0...1.8.1
[1.9.0]: https://github.com/getindata/streaming-cli/compare/1.8.0...1.9.0

[1.7.1]: https://github.com/getindata/streaming-cli/compare/1.7.0...1.7.1
[1.8.0]: https://github.com/getindata/streaming-cli/compare/1.7.1...1.8.0

[1.7.0]: https://github.com/getindata/streaming-cli/compare/1.6.0...1.7.0

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.8.1
current_version = 1.9.0

[flake8]
exclude = .git,__pycache__,build,dist,*.yml
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from setuptools import find_packages, setup
from setuptools.command.install import install

__version__ = "1.8.1"
__version__ = "1.9.0"

with open("README.md", "r") as fh:
long_description = fh.read()
Expand Down
5 changes: 4 additions & 1 deletion streamingcli/jupyter/notebook_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _get_notebook_entry(
) -> Sequence[NotebookEntry]:
if cell.source.startswith("%"):
return self._handle_magic_cell(cell, notebook_dir)
elif not cell.source.startswith("##"):
elif not cell.source.startswith("##") and not cell.source.startswith("!"):
return [Code(value=cell.source)]
else:
return []
Expand All @@ -159,6 +159,9 @@ def _handle_magic_cell(
if NotebookConverter._skip_statement(sql_statement):
return []
return self._convert_sql_statement_to_python_instructions(sql_statement)
if source.startswith("%%flink_execute"):
code = "\n".join(source.split("\n")[1:])
return [Code(value=code)]
if source.startswith("%flink_execute_sql_file"):
return self._get_statements_from_file(source, notebook_dir)
if source.startswith("%flink_register_function"):
Expand Down
15 changes: 7 additions & 8 deletions streamingcli/project/templates/flink_app.py.template
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
{% for entry in notebook_entries %}
{% if entry.type == 'SQL' %}
t_env.execute_sql(f"""{{entry.value}}""")
table_env.execute_sql(f"""{{entry.value}}""")
{% endif %}
{% if entry.type == 'REGISTER_UDF' %}
t_env.create_temporary_function("{{entry.function_name}}", {{entry.object_name}})
table_env.create_temporary_function("{{entry.function_name}}", {{entry.object_name}})
{% endif %}
{% if entry.type == 'REGISTER_JAVA_UDF' %}
t_env.create_java_temporary_function("{{entry.function_name}}", "{{entry.object_name}}")
table_env.create_java_temporary_function("{{entry.function_name}}", "{{entry.object_name}}")
{% endif %}
{% if entry.type == 'CODE' %}
{{entry.value}}
Expand Down
138 changes: 79 additions & 59 deletions tests/streamingcli/jupyter/test_notebook_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ def test_converter(self):
converted_notebook.content
== '''import sys
from pyflink.table import DataTypes
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
maximum_number_of_rows = 10
Expand All @@ -32,7 +32,7 @@ def test_converter(self):
number_of_rows = 10
t_env.execute_sql(f"""CREATE TABLE datagen (
table_env.execute_sql(f"""CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen',
Expand All @@ -47,7 +47,7 @@ def filter_print(condition, message):
return condition
t_env.create_temporary_function("filter_print", filter_print)
table_env.create_temporary_function("filter_print", filter_print)
'''
)

Expand All @@ -59,25 +59,26 @@ def test_notebook_with_remote_java_udf_conversion(self):
assert (
converted_notebook.content
== '''from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
maximum_number_of_rows = 10
some_text_variable = "some_text_value"
t_env.create_java_temporary_function("remote_trace", "com.getindata.TraceUDF")
table_env.create_java_temporary_function(
"remote_trace", "com.getindata.TraceUDF")
t_env.create_java_temporary_function("other_function", "com.getindata.Other")
table_env.create_java_temporary_function(
"other_function", "com.getindata.Other")
t_env.execute_sql(f"""CREATE TABLE datagen (
table_env.execute_sql(f"""CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen',
Expand All @@ -94,18 +95,18 @@ def test_notebook_with_local_java_udf_conversion(self):
assert (
converted_notebook.content
== '''from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
t_env.create_java_temporary_function("local_trace", "com.getindata.TraceUDF")
table_env.create_java_temporary_function(
"local_trace", "com.getindata.TraceUDF")
t_env.execute_sql(f"""CREATE TABLE datagen (
table_env.execute_sql(f"""CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen',
Expand All @@ -127,27 +128,26 @@ def test_notebook_with_flink_execute_sql_file(self):
assert (
converted_notebook.content
== '''from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
t_env.execute_sql(f"""CREATE TABLE datagen (
table_env.execute_sql(f"""CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen',
'number-of-rows' = '{number_of_rows}'
)""")
t_env.execute_sql(
table_env.execute_sql(
f"""select * from datagen WHERE remote_trace(true, 'TRACE_ME', id)""")
t_env.execute_sql(f"""select * from other""")
table_env.execute_sql(f"""select * from other""")
'''
)

Expand All @@ -159,15 +159,14 @@ def test_notebook_with_show_and_describe(self):
assert (
converted_notebook.content
== '''from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
t_env.execute_sql(f"""CREATE TABLE datagen (
table_env.execute_sql(f"""CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen',
Expand All @@ -185,12 +184,11 @@ def test_notebook_hidden_to_env_conversion(self):
converted_notebook.content
== '''import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
mysql_table_name = 'datagen'
Expand All @@ -199,7 +197,7 @@ def test_notebook_hidden_to_env_conversion(self):
__env_var_0__MY_ENV_VARIABLE = os.environ["MY_ENV_VARIABLE"]
t_env.execute_sql(f"""CREATE TABLE datagen (
table_env.execute_sql(f"""CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen',
Expand All @@ -213,7 +211,7 @@ def test_notebook_hidden_to_env_conversion(self):
__env_var_2__MYSQL_PASSWORD = os.environ["MYSQL_PASSWORD"]
t_env.execute_sql(f"""CREATE TABLE mysql (
table_env.execute_sql(f"""CREATE TABLE mysql (
id INT
) WITH (
'connector' = 'jdbc',
Expand All @@ -224,7 +222,7 @@ def test_notebook_hidden_to_env_conversion(self):
)""")
t_env.execute_sql(f"""INSERT INTO mysql (SELECT * FROM datagen)""")
table_env.execute_sql(f"""INSERT INTO mysql (SELECT * FROM datagen)""")
'''
)

Expand All @@ -236,12 +234,11 @@ def test_notebook_load_secret(self):
assert (
converted_notebook.content
== '''from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
with open("tests/streamingcli/resources/jupyter/secret.txt", "r") as secret_file:
Expand All @@ -251,7 +248,7 @@ def test_notebook_load_secret(self):
kafka_topic = 'example_topic'
t_env.execute_sql(f"""CREATE TABLE kafka (
table_env.execute_sql(f"""CREATE TABLE kafka (
id INT
) WITH (
'connector' = 'kafka',
Expand All @@ -266,7 +263,7 @@ def test_notebook_load_secret(self):
)""")
t_env.execute_sql(f"""CREATE TABLE mysql (
table_env.execute_sql(f"""CREATE TABLE mysql (
id INT
) WITH (
'connector' = 'jdbc',
Expand All @@ -277,7 +274,7 @@ def test_notebook_load_secret(self):
)""")
t_env.execute_sql(f"""INSERT INTO mysql (SELECT * FROM kafka)""")
table_env.execute_sql(f"""INSERT INTO mysql (SELECT * FROM kafka)""")
'''
)

Expand All @@ -292,13 +289,13 @@ def test_notebook_load_config_secrets(self):
converted_notebook.content
== '''import sys
from pyflink.table import DataTypes
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
with open("/var/secrets/secret.txt", "r") as secret_file:
Expand All @@ -312,7 +309,7 @@ def test_notebook_load_config_secrets(self):
number_of_rows = 10
t_env.execute_sql(f"""CREATE TABLE datagen (
table_env.execute_sql(f"""CREATE TABLE datagen (
id INT
) WITH (
'connector' = 'datagen',
Expand All @@ -327,6 +324,29 @@ def filter_print(condition, message):
return condition
t_env.create_temporary_function("filter_print", filter_print)
table_env.create_temporary_function("filter_print", filter_print)
'''
)

def test_notebook_with_flink_code(self):
# given
file_path = "tests/streamingcli/resources/jupyter/notebook7.ipynb"
# expect
converted_notebook = convert_notebook(file_path)
assert (
converted_notebook.content
== """from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
stream_env = StreamExecutionEnvironment.get_execution_environment()
stream_env.set_parallelism(1)
table_env = StreamTableEnvironment.create(stream_env)
execution_output = stream_env.from_collection(
collection=[(1, 'aaa'), (2, 'bb'), (3, 'cccc')],
type_info=Types.ROW([Types.INT(), Types.STRING()])
)
"""
)
Loading

0 comments on commit c15ef00

Please sign in to comment.