This document describes how to import metadata from a third-party source into Dataplex by running a managed connectivity pipeline in Workflows.
To set up a managed connectivity pipeline, you build a connector for your data source. Then, you run the pipeline in Workflows. The pipeline extracts metadata from your data source and then imports the metadata into Dataplex. If necessary, the pipeline also creates Dataplex Catalog entry groups in your Google Cloud project.
For more information about managed connectivity, see Managed connectivity overview.
Before you begin
Before you import metadata, complete the tasks in this section.
Build a connector
A connector extracts the metadata from your data source and generates a metadata import file that can be imported by Dataplex. The connector is an Artifact Registry image that can be run on Dataproc Serverless.
Build a custom connector that extracts metadata from your third-party source.
For an example connector that you can use as a reference template to build your own connector, see Develop a custom connector for metadata import.
Configure Google Cloud resources
-
Enable the Workflows, Dataproc, Cloud Storage, Dataplex, Secret Manager, Artifact Registry, and Cloud Scheduler APIs.
If you don't plan to run the pipeline on a schedule, you don't need to enable the Cloud Scheduler API.
Create secrets in Secret Manager to store the credentials for your third-party data source.
Configure your Virtual Private Cloud (VPC) network to run Dataproc Serverless for Spark workloads.
Create a Cloud Storage bucket to store the metadata import files.
Create the following Dataplex Catalog resources:
Create custom aspect types for the entries that you want to import.
Create custom entry types for the entries that you want to import.
Required roles
A service account represents the identity of a workflow and determines what permissions the workflow has and which Google Cloud resources it can access. You need a service account for Workflows (to run the pipeline) and for Dataproc Serverless (to run the connector).
You can use the Compute Engine default service account
(PROJECT_NUMBER[email protected]
), or create your own service account
(or accounts) to run the managed connectivity pipeline.
Console
In the Google Cloud console, go to the IAM page.
Select the project that you want to import metadata into.
Click
Grant Access, and then enter the service account's email address.Assign the following roles to the service account:
- Logs Writer
- Dataplex Entry Group Owner
- Dataplex Metadata Job Owner
- Dataplex Catalog Editor
- Dataproc Editor
- Dataproc Worker
- Secret Manager Secret Accessor - on the secret that stores the credentials for your data source
- Storage Object User - on the Cloud Storage bucket
- Artifact Registry Reader - on the Artifact Registry repository that contains the connector image
- Service Account User - if you use different service accounts, grant the service account running Workflows this role on the service account running the Dataproc Serverless batch jobs
- Workflows Invoker - if you want to schedule the pipeline
Save your changes.
gcloud
Grant roles to the service account. Run the following commands:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/logging.logWriter gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.entryGroupOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.metadataJobOwner gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataplex.catalogEditor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.editor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/dataproc.worker
Replace the following:
-
PROJECT_ID
: the name of the target Google Cloud project to import the metadata into. SERVICE_ACCOUNT_ID
: the service account, such as[email protected]
.
-
Grant the service account the following roles on the resource level:
gcloud secrets add-iam-policy-binding SECRET_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/secretmanager.secretaccessor gcloud projects add-iam-policy-binding PROJECT_ID \ --member="serviceAccount:SERVICE_ACCOUNT_ID" \ --role=roles/storage.objectUser \ --condition=resource.name.startsWith('projects/_/buckets/BUCKET_ID') gcloud artifacts repositories add-iam-policy-binding REPOSITORY \ --location=REPOSITORY_LOCATION \ --member=SERVICE_ACCOUNT_ID} \ --role=roles/artifactregistry.reader
Replace the following:
SECRET_ID
: the ID of the secret that stores the credentials for your data source. It uses the formatprojects/PROJECT_ID/secrets/SECRET_ID
.BUCKET_ID
: the name of the Cloud Storage bucket.REPOSITORY
: the Artifact Registry repository that contains the connector image.REPOSITORY_LOCATION
: the Google Cloud location where the repository is hosted.
Grant the service account running Workflows the
roles/iam.serviceAccountUser
role on the service account running the Dataproc Serverless batch jobs. You must grant this role even if you use the same service account for both Workflows and Dataproc Serverless.gcloud iam service-accounts add-iam-policy-binding \ serviceAccount:SERVICE_ACCOUNT_ID \ --member='SERVICE_ACCOUNT_ID' \ --role='roles/iam.serviceAccountUser'
If you use different service accounts, the value for the
--member
flag is the service account running the Dataproc Serverless batch jobs.If you want to schedule the pipeline, grant the service account the following role:
gcloud projects add-iam-policy-binding PROJECT_ID \ --member="SERVICE_ACCOUNT_ID" \ --role=roles/workflows.invoker
Import metadata
To import metadata, create and then execute a workflow that runs the managed connectivity pipeline. Optionally, you can also create a schedule for running the pipeline.
Console
Create the workflow. Provide the following information:
- Service account: the service account that you configured in the Required roles section of this document.
Encryption: select Google-managed encryption key.
Define workflow: provide the following definition file:
To run the pipeline on demand, execute the workflow.
Provide the following runtime arguments:
Replace the following:
-
PROJECT_ID
: the name of the target Google Cloud project to import the metadata into. -
LOCATION_ID
: the target Google Cloud location where the Dataproc Serverless and metadata import jobs will run, and metadata will be imported into. -
ENTRY_GROUP_ID
: the ID of the entry group to import metadata into. The entry group ID can contain lowercase letters, numbers, and hyphens.The full resource name of this entry group is
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
. -
CREATE_ENTRY_GROUP_BOOLEAN
: if you want the pipeline to create the entry group if it doesn't already exist in your project, set this value totrue
. -
BUCKET_ID
: the name of the Cloud Storage bucket to store the metadata import file that is generated by the connector. Each workflow execution creates a new folder. -
SERVICE_ACCOUNT_ID
: the service account that you configured in the Required roles section of this document. The service account runs the connector in Dataproc Serverless. -
ADDITIONAL_CONNECTOR_ARGUMENTS
: a list of additional arguments to pass to the connector. For examples, see Develop a custom connector for metadata import. Enclose each argument in double quotation marks, and separate the arguments with commas. -
CONTAINER_IMAGE
: the custom container image of the connector hosted in Artifact Registry. -
ENTRY_TYPES
: a list of entry types that are in scope for import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
. TheLOCATION_ID
must be either the same Google Cloud location that you import metadata into, orglobal
. -
ASPECT_TYPES
: a list of aspect types that are in scope for import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
. TheLOCATION_ID
must be either the same Google Cloud location that you import metadata into, orglobal
. -
Optional: for the
NETWORK_TAGS
argument, provide a list of network tags. -
Optional: For the
NETWORK_URI
argument, provide the URI of the VPC network that connects to the data source. If you provide a network, omit the subnetwork argument. -
Optional: For the
SUBNETWORK_URI
argument, provide the URI of the subnetwork that connects to the data source. If you provide a subnet, omit the network argument.
Depending on the amount of metadata that you import, the pipeline might take several minutes or longer to run. For more information about how to view the progress, see Access workflow execution results.
After the pipeline has finished running, you can search for the imported metadata in Dataplex Catalog.
-
Optional: If you want to run the pipeline on a schedule, create a schedule by using Cloud Scheduler. Provide the following information:
- Frequency: a unix-cron expression that defines the schedule to run the pipeline.
- Workflow argument: the runtime arguments for the connector, as described in the previous step.
- Service account: the service account. The service account manages the scheduler.
gcloud
Save the following workload definition as a YAML file:
Define Bash variables, create the workflow, and optionally create a schedule for running the pipeline:
Replace the following:
-
PROJECT_ID
: the name of the target Google Cloud project to import the metadata into. -
LOCATION_ID
: the target Google Cloud location where the Dataproc Serverless and metadata import jobs will run, and metadata will be imported into. -
SERVICE_ACCOUNT_ID
: the service account that you configured in the Required roles section of this document. WORKFLOW_DEFINITION_FILE
: the path to the workflow definition YAML file.WORKFLOW_NAME
: the name of the workflow.WORKFLOW_ARGUMENTS
: the runtime arguments to pass to the connector. The arguments are in JSON format:For Cloud Scheduler, the double quotation marks inside the quoted string are escaped using backslashes (\). For example:
--message-body="{\"argument\": \"{\\\"key\\\": \\\"value\\\"}\"}"
.Replace the following:
-
ENTRY_GROUP_ID
: the ID of the entry group to import metadata into. The entry group ID can contain lowercase letters, numbers, and hyphens.The full resource name of this entry group is
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
. -
CREATE_ENTRY_GROUP_BOOLEAN
: if you want the pipeline to create the entry group if it doesn't already exist in your project, set this value totrue
. -
BUCKET_ID
: the name of the Cloud Storage bucket to store the metadata import file that is generated by the connector. Each workflow execution creates a new folder. -
ADDITIONAL_CONNECTOR_ARGUMENTS
: a list of additional arguments to pass to the connector. For examples, see Develop a custom connector for metadata import. -
CONTAINER_IMAGE
: the custom container image of the connector hosted in Artifact Registry. -
ENTRY_TYPES
: a list of entry types that are in scope for import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
. TheLOCATION_ID
must be either the same Google Cloud location that you import metadata into, orglobal
. -
ASPECT_TYPES
: a list of aspect types that are in scope for import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
. TheLOCATION_ID
must be either the same Google Cloud location that you import metadata into, orglobal
. -
Optional: for the
NETWORK_TAGS
argument, provide a list of network tags. -
Optional: For the
NETWORK_URI
argument, provide the URI of the VPC network that connects to the data source. If you provide a network, omit the subnetwork argument. -
Optional: For the
SUBNETWORK_URI
argument, provide the URI of the subnetwork that connects to the data source. If you provide a subnet, omit the network argument.
-
CRON_SCHEDULE_EXPRESSION
: a cron expression that defines the schedule to run the pipeline. For example, to run the schedule at midnight every day, use the expression0 0 * * *
.
-
To run the pipeline on demand, execute the workflow:
The workflow arguments are in JSON format, but not escaped.
Depending on the amount of metadata that you import, the workflow might take several minutes or longer to run. For more information about how to view the progress, see Access workflow execution results.
After the pipeline has finished running, you can search for the imported metadata in Dataplex Catalog.
Terraform
Clone the
cloud-dataplex
repository.The repository includes the following Terraform files:
main.tf
: defines the Google Cloud resources to create.variables.tf
: declares the variables.byo-connector.tfvars
: defines the variables for your managed connectivity pipeline.
Edit the
.tfvars
file to replace the placeholders with the information for your connector.Replace the following:
-
PROJECT_ID
: the name of the target Google Cloud project to import the metadata into. -
LOCATION_ID
: the target Google Cloud location where the Dataproc Serverless and metadata import jobs will run, and metadata will be imported into. -
SERVICE_ACCOUNT_ID
: the service account that you configured in the Required roles section of this document. -
CRON_SCHEDULE_EXPRESSION
: a cron expression that defines the schedule to run the pipeline. For example, to run the schedule at midnight every day, use the expression0 0 * * *
. -
ENTRY_GROUP_ID
: the ID of the entry group to import metadata into. The entry group ID can contain lowercase letters, numbers, and hyphens.The full resource name of this entry group is
projects/PROJECT_ID/locations/LOCATION_ID/entryGroups/ENTRY_GROUP_ID
. -
CREATE_ENTRY_GROUP_BOOLEAN
: if you want the pipeline to create the entry group if it doesn't already exist in your project, set this value totrue
. -
BUCKET_ID
: the name of the Cloud Storage bucket to store the metadata import file that is generated by the connector. Each workflow execution creates a new folder. -
ADDITIONAL_CONNECTOR_ARGUMENTS
: a list of additional arguments to pass to the connector. For examples, see Develop a custom connector for metadata import. Enclose each argument in double quotation marks, and separate the arguments with commas. -
CONTAINER_IMAGE
: the custom container image of the connector hosted in Artifact Registry. -
ENTRY_TYPES
: a list of entry types that are in scope for import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/entryTypes/ENTRY_TYPE_ID
. TheLOCATION_ID
must be either the same Google Cloud location that you import metadata into, orglobal
. -
ASPECT_TYPES
: a list of aspect types that are in scope for import, in the formatprojects/PROJECT_ID/locations/LOCATION_ID/aspectTypes/ASPECT_TYPE_ID
. TheLOCATION_ID
must be either the same Google Cloud location that you import metadata into, orglobal
. -
Optional: for the
NETWORK_TAGS
argument, provide a list of network tags. -
Optional: For the
NETWORK_URI
argument, provide the URI of the VPC network that connects to the data source. If you provide a network, omit the subnetwork argument. -
Optional: For the
SUBNETWORK_URI
argument, provide the URI of the subnetwork that connects to the data source. If you provide a subnet, omit the network argument.
-
Initialize Terraform:
terraform init
Validate Terraform with your
.tfvars
file:terraform plan --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Replace
CONNECTOR_VARIABLES_FILE
with the name of your variable definitions file.Deploy Terraform with your
.tfvars
file:terraform apply --var-file=CONNECTOR_VARIABLES_FILE.tfvars
Terraform creates a workflow and a Cloud Scheduler job in the specified project. Workflows runs the pipeline at the schedule that you specify.
Depending on the amount of metadata that you import, the workflow might take several minutes or longer to run. For more information about how to view the progress, see Access workflow execution results.
After the pipeline has finished running, you can search for the imported metadata in Dataplex Catalog.
View job logs
Use Cloud Logging to view logs for a managed connectivity pipeline. The log payload includes a link to the logs for the Dataproc Serverless batch job and the metadata import job, as relevant. For more information, see View workflow logs.
Troubleshooting
Use the following troubleshooting suggestions:
- Configure the import job log level for the metadata job to use debug-level logging instead of info-level logging.
- Review the logs for the Dataproc Serverless batch job (for connector runs) and the metadata import job. For more information, see Query Dataproc Serverless for Spark logs and Query metadata job logs.
- If an entry can't be imported using the pipeline and the error message doesn't provide enough information, try creating a custom entry with the same details, in a test entry group. For more information, see Create a custom entry.