Skip to content

Commit 469f35f

Browse files
author
Jesse Whitehouse
committed
Add samples of GET and REMOVE
Signed-off-by: Jesse Whitehouse <[email protected]>
1 parent 4824b68 commit 469f35f

File tree

1 file changed

+48
-11
lines changed

1 file changed

+48
-11
lines changed

examples/staging_ingestion.py

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from databricks import sql
22
import os
33

4-
54
"""
65
Databricks experimentally supports data ingestion of local files via a cloud staging location.
76
Ingestion commands will work on DBR >12. And you must include an uploads_base_path kwarg when
@@ -28,21 +27,59 @@
2827
To run this script:
2928
3029
1. Set the INGESTION_USER constant to the account email address of the authenticated user
31-
2. Set the FILEPATH constant to the path of a file that will be uploaded
30+
2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file)
31+
3. Run this file
3232
"""
3333

34-
INGESTION_USER = "user.name@example.com"
34+
INGESTION_USER = "some.user@example.com"
3535
FILEPATH = "example.csv"
3636

37+
# FILEPATH can be relative to the current directory.
38+
# Resolve it into an absolute path
3739
_complete_path = os.path.realpath(FILEPATH)
38-
uploads_base_path = os.path.split(_complete_path)[:-1]
3940

41+
if not os.path.exists(_complete_path):
42+
43+
# It's easiest to save a file in the same directory as this script. But any path to a file will work.
44+
raise Exception(
45+
"You need to set FILEPATH in this script to a file that actually exists."
46+
)
47+
48+
# Set uploads_base_path equal to the directory that contains FILEPATH
49+
uploads_base_path = os.path.split(_complete_path)[0]
50+
51+
with sql.connect(
52+
server_hostname=os.getenv("DATABRICKS_SERVER_HOSTNAME"),
53+
http_path=os.getenv("DATABRICKS_HTTP_PATH"),
54+
access_token=os.getenv("DATABRICKS_TOKEN"),
55+
uploads_base_path=uploads_base_path,
56+
) as connection:
57+
58+
with connection.cursor() as cursor:
59+
60+
# Ingestion commands are executed like any other SQL.
61+
# Here's a sample PUT query. You can remove OVERWRITE at the end to avoid silently overwriting data.
62+
query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE"
63+
64+
print(f"Uploading {FILEPATH} to staging location")
65+
cursor.execute(query)
66+
print("Upload was successful")
67+
68+
temp_fp = os.path.realpath("temp.csv")
69+
70+
# Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from,
71+
# the uploads_base_path.
72+
query = (
73+
f"GET 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' TO '{temp_fp}'"
74+
)
75+
76+
print(f"Fetching from staging location into new file called temp.csv")
77+
cursor.execute(query)
78+
print("Download was successful")
4079

41-
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
42-
http_path = os.getenv("DATABRICKS_HTTP_PATH"),
43-
access_token = os.getenv("DATABRICKS_TOKEN"),
44-
uploads_base_path = uploads_base_path) as connection:
80+
# Here's a sample REMOVE query. It cleans up the the demo.csv created in our first query
81+
query = f"REMOVE 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv'"
4582

46-
with connection.cursor() as cursor:
47-
query = f"PUT '{_complete_path}' INTO 'stage://tmp/{INGESTION_USER}/pysql_examples/demo.csv' OVERWRITE"
48-
cursor.execute(query)
83+
print("Removing demo.csv from staging location")
84+
cursor.execute(query)
85+
print("Remove was successful")

0 commit comments

Comments
 (0)