-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support for SQL databases (MySQL + MS-SQL) (#2)
* adding sql source + mysql * adding sql support * MSSQL support, basic integration test * file sink and pipeline context
- Loading branch information
1 parent
faf472a
commit 1ddbdee
Showing
25 changed files
with
301 additions
and
75 deletions.
There are no files selected for viewing
1 change: 1 addition & 0 deletions
1
metadata-ingestion/integration_test/sql_server/create_metadata.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
docker exec -it testsqlserver /opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P 'test!Password' -d master -i /setup/setup.sql |
11 changes: 11 additions & 0 deletions
11
metadata-ingestion/integration_test/sql_server/mssql_to_console.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
--- | ||
source: | ||
type: mssql | ||
mssql: | ||
username: sa | ||
password: test!Password | ||
database: DemoData | ||
|
||
sink: | ||
type: file | ||
run_id: test |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
rm -rf output | ||
gometa-ingest -c mssql_to_console.yaml 2>&1 | ||
|
10 changes: 10 additions & 0 deletions
10
metadata-ingestion/integration_test/sql_server/run_test.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
echo "Stopping container in case it is already running" | ||
source stop_sql_server.sh | ||
echo "Starting container" | ||
source start_sql_server.sh | ||
echo "Loading data / metadata" | ||
source create_metadata.sh | ||
echo "Running ingestion" | ||
source run_ingest.sh | ||
echo "Stopping container" | ||
source stop_sql_server.sh |
10 changes: 10 additions & 0 deletions
10
metadata-ingestion/integration_test/sql_server/setup/setup.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
CREATE DATABASE DemoData; | ||
GO | ||
USE DemoData; | ||
GO | ||
CREATE TABLE Products (ID int, ProductName nvarchar(max)); | ||
GO | ||
CREATE SCHEMA Foo; | ||
GO | ||
CREATE TABLE Foo.Items (ID int, ItemName nvarchar(max)); | ||
GO |
12 changes: 12 additions & 0 deletions
12
metadata-ingestion/integration_test/sql_server/start_sql_server.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
dir=`pwd` | ||
CONTAINER_NAME="testsqlserver" | ||
docker run -d --rm --name=${CONTAINER_NAME} -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=test!Password' -p 1433:1433 -v $dir/setup:/setup mcr.microsoft.com/mssql/server:latest | ||
ready="SQL Server is now ready for client connections." | ||
until docker logs $CONTAINER_NAME | grep -q "$ready"; | ||
do | ||
echo "sleeping for 5 seconds" | ||
sleep 5 | ||
done | ||
echo "SQL Server started" | ||
|
||
|
1 change: 1 addition & 0 deletions
1
metadata-ingestion/integration_test/sql_server/stop_sql_server.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
docker stop testsqlserver |
10 changes: 10 additions & 0 deletions
10
metadata-ingestion/integration_test/sql_server/wait_sql_server.sh
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
CONTAINER_NAME="testsqlserver" | ||
ready="SQL Server is now ready for client connections." | ||
until docker logs $CONTAINER_NAME | grep "$ready"; | ||
do | ||
echo "sleeping for 5 seconds" | ||
sleep 5 | ||
done | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
--- | ||
source: | ||
type: mssql | ||
mssql: | ||
username: sa | ||
password: test!Password | ||
database: DemoData | ||
|
||
sink: | ||
type: console |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
--- | ||
source: | ||
type: "mysql" | ||
mysql: | ||
username: datahub | ||
password: datahub | ||
|
||
sink: | ||
type: "file" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,24 @@ | ||
from dataclasses import dataclass | ||
from typing import TypeVar, Generic, Optional | ||
from abc import abstractmethod, ABCMeta | ||
|
||
T = TypeVar('T') | ||
|
||
@dataclass | ||
class RecordEnvelope(Generic[T]): | ||
record: T | ||
metadata: Optional[dict] | ||
|
||
|
||
|
||
|
||
@dataclass | ||
class WorkUnit(metaclass=ABCMeta): | ||
id: str | ||
|
||
@abstractmethod | ||
def get_metadata(self) -> dict: | ||
pass | ||
|
||
@dataclass | ||
class PipelineContext: | ||
run_id: str | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15 changes: 15 additions & 0 deletions
15
metadata-ingestion/src/gometa/ingestion/extractor/generic.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from gometa.ingestion.api.source import Extractor, WorkUnit | ||
from gometa.ingestion.api import RecordEnvelope | ||
|
||
class WorkUnitMCEExtractor(Extractor): | ||
"""An extractor that simply returns MCE-s inside workunits back as records""" | ||
|
||
def configure(self, workunit: WorkUnit): | ||
self.workunit = workunit | ||
|
||
def get_records(self) -> RecordEnvelope: | ||
yield RecordEnvelope(self.workunit.mce, {}) | ||
|
||
def close(self): | ||
pass | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from gometa.ingestion.api.sink import Sink, WriteCallback | ||
from gometa.ingestion.api.common import RecordEnvelope, PipelineContext, WorkUnit | ||
from pydantic import BaseModel | ||
import os | ||
import pathlib | ||
|
||
class FileSinkConfig(BaseModel): | ||
output_dir:str = "output" | ||
file_name:str = "file.out" | ||
|
||
class FileSink(Sink): | ||
|
||
def __init__(self): | ||
self.config = None | ||
|
||
def configure(self, config_dict, ctx: PipelineContext, workunit: WorkUnit): | ||
self.config = FileSinkConfig.parse_obj(config_dict) | ||
self.id = workunit.id | ||
p = pathlib.Path(f'{self.config.output_dir}/{ctx.run_id}/{self.id}') | ||
p.mkdir(parents=True) | ||
fpath = p / self.config.file_name | ||
self.file = fpath.open('w') | ||
return self | ||
|
||
|
||
def write_record_async(self, record_envelope: RecordEnvelope, write_callback: WriteCallback): | ||
record_string = str(record_envelope.record) | ||
metadata = record_envelope.metadata | ||
out_line=f'{{"record": {record_string}, "metadata": {metadata}}}\n' | ||
self.file.write(out_line) | ||
if write_callback: | ||
write_callback.on_success(record_envelope, {}) | ||
|
||
def close(self): | ||
if self.file: | ||
self.file.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from pydantic import BaseModel | ||
from typing import Optional | ||
from .sql_common import SQLAlchemyConfig, get_sql_workunits | ||
from gometa.ingestion.api.source import Source | ||
|
||
class SQLServerConfig(SQLAlchemyConfig): | ||
#defaults | ||
host_port = "localhost:1433" | ||
scheme = "mssql+pytds" | ||
|
||
class SQLServerSource(Source): | ||
def configure(self, config_dict): | ||
self.config = SQLServerConfig.parse_obj(config_dict) | ||
|
||
def get_workunits(self): | ||
return get_sql_workunits(self.config, "mssql") | ||
|
||
def close(self): | ||
pass |
Oops, something went wrong.