Skip to content

Commit

Permalink
working azure setting
Browse files Browse the repository at this point in the history
  • Loading branch information
nfoerster2 committed Jun 25, 2024
1 parent efd4db0 commit 90455e5
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 26 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ testext
test/python/__pycache__/
.Rhistory
data/generated
__azurite*__.json
__blobstorage__
.venv
.vscode
3 changes: 3 additions & 0 deletions extension_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ duckdb_extension_load(delta
# Build the httpfs extension to test with s3/http
duckdb_extension_load(httpfs)

# Build the azure extension to test with azure
duckdb_extension_load(azure)

# Build the tpch and tpcds extension for testing/benchmarking
duckdb_extension_load(tpch)
duckdb_extension_load(tpcds)
87 changes: 61 additions & 26 deletions src/functions/delta_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <string>
#include <numeric>
#include <regex>

namespace duckdb {

Expand Down Expand Up @@ -65,11 +66,23 @@ static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::Kernel
ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback);
}

std::string parseFromConnectionString(const std::string& connectionString, const std::string& key) {
std::regex pattern(key + "=([^;]+);");
std::smatch matches;
if (std::regex_search(connectionString, matches, pattern) && matches.size() > 1) {
// The second match ([1]) contains the access key
return matches[1].str();
} else {
// If no access key is found, return an empty string or handle as needed
return "";
}
}

static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &path) {
ffi::EngineBuilder* builder;

// For "regular" paths we early out with the default builder config
if (!StringUtil::StartsWith(path, "s3://") && !StringUtil::StartsWith(path, "azure://") && !StringUtil::StartsWith(path, "az://") && !StringUtil::StartsWith(path, "abfss://")) {
if (!StringUtil::StartsWith(path, "s3://") && !StringUtil::StartsWith(path, "azure://") && !StringUtil::StartsWith(path, "az://") && !StringUtil::StartsWith(path, "abfs://") && !StringUtil::StartsWith(path, "abfss://")) {
auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError);
return KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path);
}
Expand All @@ -87,7 +100,7 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p
bucket = path.substr(5, end_of_container-5);
path_in_bucket = path.substr(end_of_container);
secret_type = "s3";
} else if (StringUtil::StartsWith(path, "azure://")) {
} else if ((StringUtil::StartsWith(path, "azure://")) || (StringUtil::StartsWith(path, "abfss://"))) {
auto end_of_container = path.find('/',8);

if(end_of_container == string::npos) {
Expand All @@ -105,8 +118,8 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p
bucket = path.substr(5, end_of_container-5);
path_in_bucket = path.substr(end_of_container);
secret_type = "azure";
} else if (StringUtil::StartsWith(path, "abfss://")) {
auto end_of_container = path.find('/',8);
} else if (StringUtil::StartsWith(path, "abfs://")) {
auto end_of_container = path.find('/',7);

if(end_of_container == string::npos) {
throw IOException("Invalid azure url passed to delta scan: %s", path);
Expand Down Expand Up @@ -157,51 +170,73 @@ static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &p

} else if (secret_type == "azure") {

// azure seems to be super complicated as we need to cover duckdb azure plugin and delta RS builder
// and both require different settings

auto connection_string = kv_secret.TryGetValue("connection_string").ToString();
auto account_name = kv_secret.TryGetValue("account_name").ToString();
auto endpoint = kv_secret.TryGetValue("endpoint").ToString();
auto credential_chain = kv_secret.TryGetValue("credential_chain").ToString();
auto client_id = kv_secret.TryGetValue("client_id").ToString();
auto client_secret = kv_secret.TryGetValue("client_secret").ToString();
auto tenant_id = kv_secret.TryGetValue("tenant_id").ToString();
auto certificate_path = kv_secret.TryGetValue("certificate_path").ToString();
auto http_proxy = kv_secret.TryGetValue("http_proxy").ToString();
auto proxy_user_name = kv_secret.TryGetValue("proxy_user_name").ToString();
auto proxy_password = kv_secret.TryGetValue("proxy_password").ToString();
auto chain = kv_secret.TryGetValue("chain").ToString();

if (account_name == "devstoreaccount1" || connection_string.find("devstoreaccount1") != string::npos) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_emulator"), KernelUtils::ToDeltaString("true")); //needed for delta RS builder
}

if (!connection_string.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_storage_connection_string"), KernelUtils::ToDeltaString(connection_string));
if (!connection_string.empty() && connection_string != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_storage_connection_string"), KernelUtils::ToDeltaString(connection_string)); //needed for duckdb azure plugin
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("connection_string"), KernelUtils::ToDeltaString(connection_string)); //needed for duckdb azure plugin
account_name = parseFromConnectionString(connection_string, "AccountName");
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("access_key"), KernelUtils::ToDeltaString(parseFromConnectionString(connection_string, "AccountKey"))); //needed for delta RS builder
}
if (!account_name.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_storage_account_name"), KernelUtils::ToDeltaString(account_name));
if (!account_name.empty() && account_name != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_account_name"), KernelUtils::ToDeltaString(account_name)); //needed for duckdb azure plugin
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("account_name"), KernelUtils::ToDeltaString(account_name)); //needed for delta RS builder
}
if (!endpoint.empty()) {
if (!endpoint.empty() && endpoint != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_endpoint"), KernelUtils::ToDeltaString(endpoint));
} else {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_endpoint"), KernelUtils::ToDeltaString("https://" + account_name + ".blob.core.windows.net/")); //needed? Does that work with dfs files system?
}
if (!credential_chain.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_credential_chain"), KernelUtils::ToDeltaString(credential_chain));
if (!chain.empty() && chain != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("provider"), KernelUtils::ToDeltaString("credential_chain")); //needed for duckdb azure plugin

if (chain.find("cli") != std::string::npos) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_azure_cli"), KernelUtils::ToDeltaString("true")); //dont know if that is the right way, but we need to tell delta RS builder to authenticate with azure cli
}

ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_credential_chain"), KernelUtils::ToDeltaString(chain)); //needed for duckdb azure plugin, dont know if all three are necessary
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("credential_chain"), KernelUtils::ToDeltaString(chain)); //needed for duckdb azure plugin, dont know if all three are necessary
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("chain"), KernelUtils::ToDeltaString(chain)); //needed for duckdb azure plugin, dont know if all three are necessary
}
if (!client_id.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_id"), KernelUtils::ToDeltaString(client_id));
if (!client_id.empty() && client_id != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_id"), KernelUtils::ToDeltaString(client_id)); //untested
}
if (!client_secret.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_secret"), KernelUtils::ToDeltaString(client_secret));
if (!client_secret.empty() && client_secret != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_secret"), KernelUtils::ToDeltaString(client_secret)); //untested
}
if (!tenant_id.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_tenant_id"), KernelUtils::ToDeltaString(tenant_id));
if (!tenant_id.empty() && tenant_id != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_tenant_id"), KernelUtils::ToDeltaString(tenant_id)); //needed for duckdb azure plugin
}
if (!certificate_path.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_certificate_path"), KernelUtils::ToDeltaString(certificate_path));
if (!certificate_path.empty() && certificate_path != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_certificate_path"), KernelUtils::ToDeltaString(certificate_path)); //untested
}
if (!http_proxy.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("http_proxy"), KernelUtils::ToDeltaString(http_proxy));
if (!http_proxy.empty() && http_proxy != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("http_proxy"), KernelUtils::ToDeltaString(http_proxy)); //untested
}
if (!proxy_user_name.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("proxy_user_name"), KernelUtils::ToDeltaString(proxy_user_name));
if (!proxy_user_name.empty() && proxy_user_name != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("proxy_user_name"), KernelUtils::ToDeltaString(proxy_user_name)); //untested
}
if (!proxy_password.empty()) {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("proxy_password"), KernelUtils::ToDeltaString(proxy_password));
if (!proxy_password.empty() && proxy_password != "NULL") {
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("proxy_password"), KernelUtils::ToDeltaString(proxy_password)); //untested
}
ffi::set_builder_option(builder, KernelUtils::ToDeltaString("container_name"), KernelUtils::ToDeltaString(bucket)); // needed ?

}
return builder;
Expand Down
25 changes: 25 additions & 0 deletions test/sql/generated/azure.emulator.x
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# name: test/sql/generated/azure.emulator
# description: test delta scan on azure emulator data using secret
# group: [delta_generated]

require parquet

require httpfs

require azure

require delta

require-env GENERATED_AZURE_DATA_AVAILABLE

statement ok
CREATE SECRET azure_1 (TYPE AZURE, CONNECTION_STRING 'AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;DefaultEndpointsProtocol=http;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1')

# Run modified tpch q06 against the remote data
query I rowsort q1
SELECT
*
FROM
delta_scan('az://test-bucket-ceiveran/delta_testing/lineitem_sf0_01/delta_lake/')
LIMIT 100
----
3 changes: 3 additions & 0 deletions vcpkg.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"dependencies": [
"azure-identity-cpp",
"azure-storage-blobs-cpp",
"azure-storage-files-datalake-cpp",
"openssl"
]
}

0 comments on commit 90455e5

Please sign in to comment.