Skip to content

Commit

Permalink
Support for SQL databases (MySQL + MS-SQL) (#2)
Browse files Browse the repository at this point in the history
* adding sql source + mysql

* adding sql support

* MSSQL support, basic integration test

* file sink and pipeline context
  • Loading branch information
shirshanka committed Feb 16, 2021
1 parent faf472a commit 1ddbdee
Show file tree
Hide file tree
Showing 25 changed files with 301 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker exec -it testsqlserver /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
source:
type: mssql
mssql:
username: sa
password: test!Password
database: DemoData

sink:
type: file
run_id: test
3 changes: 3 additions & 0 deletions metadata-ingestion/integration_test/sql_server/run_ingest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
rm -rf output
gometa-ingest -c mssql_to_console.yaml 2>&1

10 changes: 10 additions & 0 deletions metadata-ingestion/integration_test/sql_server/run_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
echo "Stopping container in case it is already running"
source stop_sql_server.sh
echo "Starting container"
source start_sql_server.sh
echo "Loading data / metadata"
source create_metadata.sh
echo "Running ingestion"
source run_ingest.sh
echo "Stopping container"
source stop_sql_server.sh
10 changes: 10 additions & 0 deletions metadata-ingestion/integration_test/sql_server/setup/setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE DATABASE DemoData;
GO
USE DemoData;
GO
CREATE TABLE Products (ID int, ProductName nvarchar(max));
GO
CREATE SCHEMA Foo;
GO
CREATE TABLE Foo.Items (ID int, ItemName nvarchar(max));
GO
12 changes: 12 additions & 0 deletions metadata-ingestion/integration_test/sql_server/start_sql_server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
dir=`pwd`
CONTAINER_NAME="testsqlserver"
docker run -d --rm --name=${CONTAINER_NAME} -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=test!Password' -p 1433:1433 -v $dir/setup:/setup mcr.microsoft.com/mssql/server:latest
ready="SQL Server is now ready for client connections."
until docker logs $CONTAINER_NAME | grep -q "$ready";
do
echo "sleeping for 5 seconds"
sleep 5
done
echo "SQL Server started"


Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
docker stop testsqlserver
10 changes: 10 additions & 0 deletions metadata-ingestion/integration_test/sql_server/wait_sql_server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CONTAINER_NAME="testsqlserver"
ready="SQL Server is now ready for client connections."
until docker logs $CONTAINER_NAME | grep "$ready";
do
echo "sleeping for 5 seconds"
sleep 5
done



10 changes: 10 additions & 0 deletions metadata-ingestion/recipes/mssql_to_datahub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
source:
type: mssql
mssql:
username: sa
password: test!Password
database: DemoData

sink:
type: console
9 changes: 9 additions & 0 deletions metadata-ingestion/recipes/mysql_to_datahub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
source:
type: "mysql"
mysql:
username: datahub
password: datahub

sink:
type: "file"
4 changes: 4 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,9 @@ def get_long_description():
"requests>=2.25.1",
"fastavro>=1.3.0", #TODO: Do we need both avro-s?
"avro-python3==1.8.2",
"sqlalchemy>=1.3.23", #Required for SQL sources
"pymysql>=1.0.2", # Driver for MySQL
"sqlalchemy-pytds>=0.3", # Driver for MS-SQL

],
)
17 changes: 15 additions & 2 deletions metadata-ingestion/src/gometa/ingestion/api/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
from dataclasses import dataclass
from typing import TypeVar, Generic, Optional
from abc import abstractmethod, ABCMeta

T = TypeVar('T')

@dataclass
class RecordEnvelope(Generic[T]):
record: T
metadata: Optional[dict]




@dataclass
class WorkUnit(metaclass=ABCMeta):
id: str

@abstractmethod
def get_metadata(self) -> dict:
pass

@dataclass
class PipelineContext:
run_id: str

4 changes: 2 additions & 2 deletions metadata-ingestion/src/gometa/ingestion/api/sink.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import abstractmethod, ABCMeta

from gometa.ingestion.api.common import RecordEnvelope
from gometa.ingestion.api.common import RecordEnvelope, WorkUnit, PipelineContext


class WriteCallback:
Expand All @@ -25,7 +25,7 @@ def on_failure(self, re, fe, fm):
class Sink(metaclass=ABCMeta):
"""All Sinks must inherit this base class"""
@abstractmethod
def configure(self, config_dict:dict):
def configure(self, config_dict:dict, ctx: PipelineContext, workunit: WorkUnit):
pass

@abstractmethod
Expand Down
9 changes: 2 additions & 7 deletions metadata-ingestion/src/gometa/ingestion/api/source.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
from abc import abstractmethod, ABCMeta
from .closeable import Closeable
from .common import RecordEnvelope

class WorkUnit(metaclass=ABCMeta):
@abstractmethod
def get_metadata(self) -> dict:
pass
from .common import *


class Extractor(Closeable, metaclass=ABCMeta):
Expand All @@ -20,7 +15,7 @@ def get_records(self) -> RecordEnvelope:
class Source(Closeable, metaclass = ABCMeta):

@abstractmethod
def configure(self, config_dict: dict):
def configure(self, config_dict: dict, ctx: PipelineContext):
pass

@abstractmethod
Expand Down
15 changes: 15 additions & 0 deletions metadata-ingestion/src/gometa/ingestion/extractor/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from gometa.ingestion.api.source import Extractor, WorkUnit
from gometa.ingestion.api import RecordEnvelope

class WorkUnitMCEExtractor(Extractor):
"""An extractor that simply returns MCE-s inside workunits back as records"""

def configure(self, workunit: WorkUnit):
self.workunit = workunit

def get_records(self) -> RecordEnvelope:
yield RecordEnvelope(self.workunit.mce, {})

def close(self):
pass

46 changes: 34 additions & 12 deletions metadata-ingestion/src/gometa/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
from dataclasses import dataclass, field
from gometa.configuration.common import DynamicTypedConfig, DynamicFactory
from gometa.ingestion.api.source import Source, Extractor
from gometa.ingestion.api.common import PipelineContext
from gometa.ingestion.api.sink import Sink, NoopWriteCallback, WriteCallback
from typing import Optional
import importlib
import time
import logging
logging.basicConfig(format='%(name)s [%(levelname)s] %(message)s', datefmt='%d-%b-%y %H:%M:%S')
logger = logging.getLogger(__name__)


class SourceConfig(DynamicTypedConfig):
extractor: str
extractor: Optional[str] = "gometa.ingestion.extractor.generic.WorkUnitMCEExtractor"


class PipelineConfig(BaseModel):
source: SourceConfig
sink: DynamicTypedConfig
run_id: str = str(int(time.time()) * 1000)


class LoggingCallback(WriteCallback):
Expand All @@ -25,46 +28,65 @@ def on_success(self, record_envelope, success_meta):

def on_failure(self, record_envelope, exception, failure_meta):
logger.exception(f'failed to write {record_envelope.record} with {failure_meta}')



@dataclass
class Pipeline:
source: Optional[Source] = None
extractor: Optional[Extractor] = None
sink: Optional[Sink] = None
source_class_mapping: Optional[dict] = field(default_factory = lambda: {
"mssql": "gometa.ingestion.source.mssql.SQLServerSource",
"mysql": "gometa.ingestion.source.mysql.MySQLSource",
"kafka": "gometa.ingestion.source.kafka.KafkaSource",
"ldap" : "gometa.ingestion.source.ldap.LdapSource",
})
sink_class_mapping: Optional[dict] = field(default_factory = lambda: {
"kafka": "gometa.ingestion.sink.kafka.KafkaSink",
"datahub": "gometa.ingestion.sink.datahub.DataHubSink",
"console": "gometa.ingestion.sink.console.ConsoleSink",
"file": "gometa.ingestion.sink.file.FileSink",
})

def get_class_from_name(self, class_string):
module_name, class_name = class_string.rsplit(".",1)
MyClass = getattr(importlib.import_module(module_name), class_name)
return MyClass()



def configure(self, config_dict):
self.source_factory = DynamicFactory()
self.config = PipelineConfig.parse_obj(config_dict)
source_type = self.config.source.type
source_class = self.source_class_mapping[source_type]
try:
source_class = self.source_class_mapping[source_type]
except KeyError:
logger.exception(f'Did not find a registered source class for {source_type}')
raise ValueError("Failed to configure source")
self.source = self.get_class_from_name(source_class)
self.source.configure(self.config.dict().get("source", {}).get(source_type, {}))
sink_type = self.config.sink.type
sink_class = self.sink_class_mapping[sink_type]
self.sink = self.get_class_from_name(sink_class)
self.sink.configure(self.config.dict().get("sink", {"type": "datahub"}).get(sink_type, {}))
self.sink_class = self.sink_class_mapping[sink_type]
self.sink_config = self.config.dict().get("sink", {"type": "datahub"}).get(sink_type, {})

# Ensure that sink and extractor can be constructed, even though we use them later
extractor = self.get_class_from_name(self.config.source.extractor)
sink = self.get_class_from_name(self.sink_class)

self.ctx = PipelineContext(run_id=self.config.run_id)
return self


def run(self):
callback = LoggingCallback()
for w in self.source.get_workunits():
extractor = self.get_class_from_name(self.config.source.extractor).configure(w)
for wu in self.source.get_workunits():
extractor = self.get_class_from_name(self.config.source.extractor)
extractor.configure(wu)
sink = self.get_class_from_name(self.sink_class)
logger.warn(f"Configuring sink with workunit {wu.id}")
sink.configure(self.sink_config, self.ctx, wu)
for record_envelope in extractor.get_records():
self.sink.write_record_async(record_envelope, callback)
sink.write_record_async(record_envelope, callback)
extractor.close()
self.sink.close()
sink.close()

10 changes: 8 additions & 2 deletions metadata-ingestion/src/gometa/ingestion/sink/console.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from gometa.ingestion.api.sink import Sink, WriteCallback
from gometa.ingestion.api.common import RecordEnvelope
import logging

logger = logging.getLogger(__name__)

class ConsoleSink(Sink):

def __init__(self):
self.config = None

def configure(self, config_dict={}):

def configure(self, config_dict, ctx, workunit):
self.config = config_dict
self.id = workunit.id
self.run_id = ctx.run_id
return self


def write_record_async(self, record_envelope: RecordEnvelope, write_callback: WriteCallback):
print(record_envelope)
logger.info(f'{self.run_id}:{self.id}:{record_envelope}')
if write_callback:
write_callback.on_success(record_envelope, {})

Expand Down
36 changes: 36 additions & 0 deletions metadata-ingestion/src/gometa/ingestion/sink/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from gometa.ingestion.api.sink import Sink, WriteCallback
from gometa.ingestion.api.common import RecordEnvelope, PipelineContext, WorkUnit
from pydantic import BaseModel
import os
import pathlib

class FileSinkConfig(BaseModel):
output_dir:str = "output"
file_name:str = "file.out"

class FileSink(Sink):

def __init__(self):
self.config = None

def configure(self, config_dict, ctx: PipelineContext, workunit: WorkUnit):
self.config = FileSinkConfig.parse_obj(config_dict)
self.id = workunit.id
p = pathlib.Path(f'{self.config.output_dir}/{ctx.run_id}/{self.id}')
p.mkdir(parents=True)
fpath = p / self.config.file_name
self.file = fpath.open('w')
return self


def write_record_async(self, record_envelope: RecordEnvelope, write_callback: WriteCallback):
record_string = str(record_envelope.record)
metadata = record_envelope.metadata
out_line=f'{{"record": {record_string}, "metadata": {metadata}}}\n'
self.file.write(out_line)
if write_callback:
write_callback.on_success(record_envelope, {})

def close(self):
if self.file:
self.file.close()
2 changes: 1 addition & 1 deletion metadata-ingestion/src/gometa/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_workunits(self):
if re.fullmatch(self.topic_pattern, t):
#TODO: topics config should support allow and deny patterns
if not t.startswith("_"):
yield KafkaWorkUnit(config=KafkaSourceConfig(connection=self.source_config.connection, topic=t))
yield KafkaWorkUnit(id=f'kafka-{t}', config=KafkaSourceConfig(connection=self.source_config.connection, topic=t))


def close(self):
Expand Down
19 changes: 19 additions & 0 deletions metadata-ingestion/src/gometa/ingestion/source/mssql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from pydantic import BaseModel
from typing import Optional
from .sql_common import SQLAlchemyConfig, get_sql_workunits
from gometa.ingestion.api.source import Source

class SQLServerConfig(SQLAlchemyConfig):
#defaults
host_port = "localhost:1433"
scheme = "mssql+pytds"

class SQLServerSource(Source):
def configure(self, config_dict):
self.config = SQLServerConfig.parse_obj(config_dict)

def get_workunits(self):
return get_sql_workunits(self.config, "mssql")

def close(self):
pass
Loading

0 comments on commit 1ddbdee

Please sign in to comment.