Skip to content

Commit

Permalink
feat(ingest): Add Source from Vertica (#4555)
Browse files Browse the repository at this point in the history
Co-authored-by: Ravindra Lanka <[email protected]>
  • Loading branch information
eburairu and rslanka authored May 26, 2022
1 parent c2bb029 commit 2911e1e
Show file tree
Hide file tree
Showing 15 changed files with 681 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ MANIFEST
**/spark-lineage/**/derby.log
**/spark-lineage/**/hive/
**/spark-lineage/**/out*.csv/

#VS Code
.vscode
.devcontainer

#spark smoke test
smoke-test/spark-smoke-test/docker/workspace/
Expand Down
27 changes: 27 additions & 0 deletions metadata-ingestion/docs/sources/vertica/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
## Integration Details

<!-- Plain-language description of what this integration is meant to do. -->
<!-- Include details about where metadata is extracted from (ie. logs, source API, manifest, etc.) -->

This plugin extracts metadata for Tables and Views on Vertica.

This plugin is in beta and has only been tested on sample data on the Vertica database.

### Concept Mapping

<!-- This should be a manual mapping of concepts from the source to the DataHub Metadata Model -->
<!-- Authors should provide as much context as possible about how this mapping was generated, including assumptions made, known shortcuts, & any other caveats -->

This ingestion source maps the following Source System Concepts to DataHub Concepts:

<!-- Remove all unnecessary/irrevant DataHub Concepts -->

| Source Concept | DataHub Concept | Notes |
| -------------- | ------------------------------------------------------------------ | ----- |
| `Vertica` | [Data Platform](../../metamodel/entities/dataPlatform.md) | |
| Table | [Dataset](../../metamodel/entities/dataset.md) | |
| View | [Dataset](../../metamodel/entities/dataset.md) | |

## Metadata Ingestion Quickstart

For context on getting started with ingestion, check out our [metadata ingestion guide](../../../../metadata-ingestion/README.md).
7 changes: 7 additions & 0 deletions metadata-ingestion/docs/sources/vertica/vertica.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Prerequisites

In order to ingest metadata from Vertica, you will need:

- Python Version 3.6+
- Vertica Server Version 10.1.1-0 and avobe. It may also work for older versions.
- Vertica Credentials (Username/Password)
13 changes: 13 additions & 0 deletions metadata-ingestion/docs/sources/vertica/vertica_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
source:
type: vertica
config:
# Coordinates
host_port: localhost:5433
database: DATABASE_NAME

# Credentials
username: "${VERTICA_USER}"
password: "${VERTICA_PASSWORD}"

sink:
# sink configs
16 changes: 16 additions & 0 deletions metadata-ingestion/examples/recipes/vertica_to_datahub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# see https://datahubproject.io/docs/metadata-ingestion/source_docs/vertica for complete documentation
# TODO: create vertica doc page
source:
type: "vertica"
config:
host_port: localhost:5433
database: database_name
username: username
password: password
include_views: true

# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
4 changes: 4 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def get_long_description():
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging"},
"powerbi": {"orderedset"} | microsoft_common,
"vertica": sql_common | {"sqlalchemy-vertica[vertica-python]==0.0.5"},
}

all_exclude_plugins: Set[str] = {
Expand Down Expand Up @@ -337,6 +338,7 @@ def get_long_description():
"hive",
"starburst-trino-usage",
"powerbi",
"vertica",
# airflow is added below
]
for dependency in plugins[plugin]
Expand Down Expand Up @@ -401,6 +403,7 @@ def get_long_description():
"snowflake",
"redash",
"kafka-connect",
"vertica",
]
for dependency in plugins[plugin]
),
Expand Down Expand Up @@ -468,6 +471,7 @@ def get_long_description():
"starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource",
"nifi = datahub.ingestion.source.nifi:NifiSource",
"powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource",
"vertica = datahub.ingestion.source.sql.vertica:VerticaSource",
"presto-on-hive = datahub.ingestion.source.sql.presto_on_hive:PrestoOnHiveSource",
"pulsar = datahub.ingestion.source.pulsar:PulsarSource",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,15 @@ def _platform_alchemy_uri_tester_gen(
_platform_alchemy_uri_tester_gen("postgres", "postgresql"),
_platform_alchemy_uri_tester_gen("snowflake"),
_platform_alchemy_uri_tester_gen("trino"),
_platform_alchemy_uri_tester_gen("vertica"),
]
)


def get_platform_from_sqlalchemy_uri(sqlalchemy_uri: str) -> str:

for platform, tester in PLATFORM_TO_SQLALCHEMY_URI_TESTER_MAP.items():
if tester(sqlalchemy_uri):
return platform

return "external"


Expand Down
243 changes: 243 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
import re
from textwrap import dedent
from typing import Any, Dict

import pydantic
from pydantic.class_validators import validator
from sqlalchemy import sql, util
from sqlalchemy.sql import sqltypes
from sqlalchemy.sql.sqltypes import TIME, TIMESTAMP, String
from sqlalchemy_vertica.base import VerticaDialect

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
)
from datahub.utilities import config_clean


class UUID(String):
"""The SQL UUID type."""

__visit_name__ = "UUID"


def TIMESTAMP_WITH_TIMEZONE(*args, **kwargs):
kwargs["timezone"] = True
return TIMESTAMP(*args, **kwargs)


def TIME_WITH_TIMEZONE(*args, **kwargs):
kwargs["timezone"] = True
return TIME(*args, **kwargs)


def get_view_definition(self, connection, view_name, schema=None, **kw):
if schema is not None:
schema_condition = "lower(table_schema) = '%(schema)s'" % {
"schema": schema.lower()
}
else:
schema_condition = "1"

view_def = connection.scalar(
sql.text(
dedent(
"""
SELECT VIEW_DEFINITION
FROM V_CATALOG.VIEWS
WHERE table_name='%(view_name)s' AND %(schema_condition)s
"""
% {"view_name": view_name, "schema_condition": schema_condition}
)
)
)

return view_def


def get_columns(self, connection, table_name, schema=None, **kw):
if schema is not None:
schema_condition = "lower(table_schema) = '%(schema)s'" % {
"schema": schema.lower()
}
else:
schema_condition = "1"

sql_pk_column = sql.text(
dedent(
"""
SELECT column_name
FROM v_catalog.primary_keys
WHERE table_name = '%(table_name)s'
AND constraint_type = 'p'
AND %(schema_condition)s
"""
% {"table_name": table_name, "schema_condition": schema_condition}
)
)
primary_key_columns = tuple(row[0] for row in connection.execute(sql_pk_column))

sql_get_column = sql.text(
dedent(
"""
SELECT column_name, data_type, column_default, is_nullable
FROM v_catalog.columns
WHERE table_name = '%(table_name)s'
AND %(schema_condition)s
UNION ALL
SELECT column_name, data_type, '' as column_default, true as is_nullable
FROM v_catalog.view_columns
WHERE lower(table_name) = '%(table_name)s'
AND %(schema_condition)s
"""
% {"table_name": table_name, "schema_condition": schema_condition}
)
)

columns = []
for row in list(connection.execute(sql_get_column)):
name = row.column_name
primary_key = name in primary_key_columns
type = row.data_type.lower()
default = row.column_default
nullable = row.is_nullable

column_info = self._get_column_info(name, type, default, nullable, schema)
column_info.update({"primary_key": primary_key})
columns.append(column_info)

return columns


def _get_column_info( # noqa: C901
self, name, data_type, default, is_nullable, schema=None
):

attype: str = re.sub(r"\(.*\)", "", data_type)

charlen = re.search(r"\(([\d,]+)\)", data_type)
if charlen:
charlen = charlen.group(1) # type: ignore
args = re.search(r"\((.*)\)", data_type)
if args and args.group(1):
args = tuple(re.split(r"\s*,\s*", args.group(1))) # type: ignore
else:
args = () # type: ignore
kwargs: Dict[str, Any] = {}

if attype == "numeric":
if charlen:
prec, scale = charlen.split(",") # type: ignore
args = (int(prec), int(scale)) # type: ignore
else:
args = () # type: ignore
elif attype == "integer":
args = () # type: ignore
elif attype in ("timestamptz", "timetz"):
kwargs["timezone"] = True
if charlen:
kwargs["precision"] = int(charlen) # type: ignore
args = () # type: ignore
elif attype in ("timestamp", "time"):
kwargs["timezone"] = False
if charlen:
kwargs["precision"] = int(charlen) # type: ignore
args = () # type: ignore
elif attype.startswith("interval"):
field_match = re.match(r"interval (.+)", attype, re.I)
if charlen:
kwargs["precision"] = int(charlen) # type: ignore
if field_match:
kwargs["fields"] = field_match.group(1) # type: ignore
attype = "interval"
args = () # type: ignore
elif attype == "date":
args = () # type: ignore
elif charlen:
args = (int(charlen),) # type: ignore

while True:
if attype.upper() in self.ischema_names:
coltype = self.ischema_names[attype.upper()]
break
else:
coltype = None
break

self.ischema_names["UUID"] = UUID
self.ischema_names["TIMESTAMPTZ"] = TIMESTAMP_WITH_TIMEZONE
self.ischema_names["TIMETZ"] = TIME_WITH_TIMEZONE

if coltype:
coltype = coltype(*args, **kwargs)
else:
util.warn("Did not recognize type '%s' of column '%s'" % (attype, name))
coltype = sqltypes.NULLTYPE
# adjust the default value
autoincrement = False
if default is not None:
match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
if match is not None:
if issubclass(coltype._type_affinity, sqltypes.Integer):
autoincrement = True
# the default is related to a Sequence
sch = schema
if "." not in match.group(2) and sch is not None:
# unconditionally quote the schema name. this could
# later be enhanced to obey quoting rules /
# "quote schema"
default = (
match.group(1)
+ ('"%s"' % sch)
+ "."
+ match.group(2)
+ match.group(3)
)

column_info = dict(
name=name,
type=coltype,
nullable=is_nullable,
default=default,
autoincrement=autoincrement,
)
return column_info


VerticaDialect.get_view_definition = get_view_definition
# VerticaDialect.get_columns = get_columns
VerticaDialect._get_column_info = _get_column_info


class VerticaConfig(BasicSQLAlchemyConfig):
# defaults
scheme: str = pydantic.Field(default="vertica+vertica_python")

@validator("host_port")
def clean_host_port(cls, v):
return config_clean.remove_protocol(v)


@platform_name("Vertica")
@config_class(VerticaConfig)
@support_status(SupportStatus.TESTING)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
class VerticaSource(SQLAlchemySource):
def __init__(self, config: VerticaConfig, ctx: PipelineContext) -> None:
super().__init__(config, ctx, "vertica")

@classmethod
def create(cls, config_dict: Dict, ctx: PipelineContext) -> "VerticaSource":
config = VerticaConfig.parse_obj(config_dict)
return cls(config, ctx)
25 changes: 25 additions & 0 deletions metadata-ingestion/tests/integration/vertica/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: '3.9'
services:
vertica:
image: vertica/vertica-ce:10.1.1-0
container_name: vertica_ce
platform: linux/amd64
environment:
- APP_DB_USER=dbadmin
- APP_DB_PASSWORD=vertica
- VERTICA_MEMDEBUG=2
ports:
- 5433:5433
- 5444:5444
deploy:
mode: global
volumes:
- type: volume
source: vertica-data
target: /data
volumes:
vertica-data:
networks:
default:
external:
name: datahub_network
Loading

0 comments on commit 2911e1e

Please sign in to comment.