11from databricks import sql
22import os
33
4-
54"""
65Databricks experimentally supports data ingestion of local files via a cloud staging location.
76Ingestion commands will work on DBR >12. And you must include an uploads_base_path kwarg when
2827To run this script:
2928
30291. Set the INGESTION_USER constant to the account email address of the authenticated user
31- 2. Set the FILEPATH constant to the path of a file that will be uploaded
30+ 2. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV file)
31+ 3. Run this file
3232"""
3333
34- INGESTION_USER = "user.name @example.com"
34+ INGESTION_USER = "some.user @example.com"
3535FILEPATH = "example.csv"
3636
37+ # FILEPATH can be relative to the current directory.
38+ # Resolve it into an absolute path
3739_complete_path = os .path .realpath (FILEPATH )
38- uploads_base_path = os .path .split (_complete_path )[:- 1 ]
3940
41+ if not os .path .exists (_complete_path ):
42+
43+ # It's easiest to save a file in the same directory as this script. But any path to a file will work.
44+ raise Exception (
45+ "You need to set FILEPATH in this script to a file that actually exists."
46+ )
47+
48+ # Set uploads_base_path equal to the directory that contains FILEPATH
49+ uploads_base_path = os .path .split (_complete_path )[0 ]
50+
51+ with sql .connect (
52+ server_hostname = os .getenv ("DATABRICKS_SERVER_HOSTNAME" ),
53+ http_path = os .getenv ("DATABRICKS_HTTP_PATH" ),
54+ access_token = os .getenv ("DATABRICKS_TOKEN" ),
55+ uploads_base_path = uploads_base_path ,
56+ ) as connection :
57+
58+ with connection .cursor () as cursor :
59+
60+ # Ingestion commands are executed like any other SQL.
61+ # Here's a sample PUT query. You can remove OVERWRITE at the end to avoid silently overwriting data.
62+ query = f"PUT '{ _complete_path } ' INTO 'stage://tmp/{ INGESTION_USER } /pysql_examples/demo.csv' OVERWRITE"
63+
64+ print (f"Uploading { FILEPATH } to staging location" )
65+ cursor .execute (query )
66+ print ("Upload was successful" )
67+
68+ temp_fp = os .path .realpath ("temp.csv" )
69+
70+ # Here's a sample GET query. Note that `temp_fp` must also be contained within, or descended from,
71+ # the uploads_base_path.
72+ query = (
73+ f"GET 'stage://tmp/{ INGESTION_USER } /pysql_examples/demo.csv' TO '{ temp_fp } '"
74+ )
75+
76+ print (f"Fetching from staging location into new file called temp.csv" )
77+ cursor .execute (query )
78+ print ("Download was successful" )
4079
41- with sql .connect (server_hostname = os .getenv ("DATABRICKS_SERVER_HOSTNAME" ),
42- http_path = os .getenv ("DATABRICKS_HTTP_PATH" ),
43- access_token = os .getenv ("DATABRICKS_TOKEN" ),
44- uploads_base_path = uploads_base_path ) as connection :
80+ # Here's a sample REMOVE query. It cleans up the the demo.csv created in our first query
81+ query = f"REMOVE 'stage://tmp/{ INGESTION_USER } /pysql_examples/demo.csv'"
4582
46- with connection . cursor () as cursor :
47- query = f"PUT ' { _complete_path } ' INTO 'stage://tmp/ { INGESTION_USER } /pysql_examples/demo.csv' OVERWRITE"
48- cursor . execute ( query )
83+ print ( "Removing demo.csv from staging location" )
84+ cursor . execute ( query )
85+ print ( "Remove was successful" )
0 commit comments