Skip to content

Commit 42cd6d0

Browse files
Add files via upload
1 parent c272a32 commit 42cd6d0

File tree

1 file changed

+171
-0
lines changed

1 file changed

+171
-0
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Output RDMBS Alchemy"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"This component pushes data to any RDMBS supported by SQLAlchemy as CSV on a given table. Parameters like host, database, user, password and table name need to be set.\n",
15+
"\n",
16+
"Currently only append mode is supported via the execute_batch helper. Optionally, you can flush (truncate) the table before.\n",
17+
"\n",
18+
"The current implementation loads all data into main memory (via pandas) first. Better implementations can be found below, PRs welcome!\n",
19+
"\n",
20+
"https://hakibenita.com/fast-load-data-python-postgresql"
21+
]
22+
},
23+
{
24+
"cell_type": "code",
25+
"execution_count": null,
26+
"metadata": {},
27+
"outputs": [],
28+
"source": [
29+
"!pip install sqlalchemy==1.4.23 pandas==1.3.1 psycopg2-binary==2.9.1"
30+
]
31+
},
32+
{
33+
"cell_type": "code",
34+
"execution_count": null,
35+
"metadata": {},
36+
"outputs": [],
37+
"source": [
38+
"import os\n",
39+
"import pandas as pd\n",
40+
"import psycopg2 \n",
41+
"import re\n",
42+
"from sqlalchemy import create_engine\n",
43+
"import sys"
44+
]
45+
},
46+
{
47+
"cell_type": "code",
48+
"execution_count": null,
49+
"metadata": {},
50+
"outputs": [],
51+
"source": [
52+
"# data to load (expects CSV file with header)\n",
53+
"data_csv = os.environ.get('data_csv', 'data.csv')\n",
54+
"\n",
55+
"# type of database server (sqlalchemy dialect), e.g. postgresql\n",
56+
"db_type = os.environ.get('db_type','postgresql')\n",
57+
"\n",
58+
"# hostname of database server\n",
59+
"host = os.environ.get('host')\n",
60+
"\n",
61+
"# database name\n",
62+
"database = os.environ.get('database')\n",
63+
"\n",
64+
"# db user\n",
65+
"user = os.environ.get('user')\n",
66+
"\n",
67+
"# db password\n",
68+
"password = os.environ.get('password')\n",
69+
"\n",
70+
"# db port\n",
71+
"port = int(os.environ.get('port', 5432))\n",
72+
"\n",
73+
"# schema name\n",
74+
"schema = os.environ.get('schema')\n",
75+
"\n",
76+
"# table name\n",
77+
"table = os.environ.get('table')\n",
78+
"\n",
79+
"# truncate table before insert\n",
80+
"truncate = bool(os.environ.get('truncate', False))\n",
81+
"\n",
82+
"# temporal data storage for local execution\n",
83+
"data_dir = os.environ.get('data_dir', '../../data/')"
84+
]
85+
},
86+
{
87+
"cell_type": "code",
88+
"execution_count": null,
89+
"metadata": {},
90+
"outputs": [],
91+
"source": [
92+
"parameters = list(\n",
93+
" map(\n",
94+
" lambda s: re.sub('$', '\"', s),\n",
95+
" map(\n",
96+
" lambda s: s.replace('=', '=\"'),\n",
97+
" filter(\n",
98+
" lambda s: s.find('=') > -1 and bool(re.match('[A-Za-z0-9_]*=[.\\/A-Za-z0-9]*', s)),\n",
99+
" sys.argv\n",
100+
" )\n",
101+
" )\n",
102+
" )\n",
103+
")\n",
104+
"\n",
105+
"for parameter in parameters:\n",
106+
" logging.warning('Parameter: '+parameter) \n",
107+
" exec(parameter)\n",
108+
" \n",
109+
"truncate = bool(truncate)\n",
110+
"port = int(port)"
111+
]
112+
},
113+
{
114+
"cell_type": "code",
115+
"execution_count": null,
116+
"metadata": {},
117+
"outputs": [],
118+
"source": [
119+
"from sqlalchemy import create_engine\n",
120+
"from sqlalchemy.orm import sessionmaker\n",
121+
"\n",
122+
"engine = create_engine(f'{db_type}://{user}:{password}@{host}:{port}/{database}')"
123+
]
124+
},
125+
{
126+
"cell_type": "code",
127+
"execution_count": null,
128+
"metadata": {},
129+
"outputs": [],
130+
"source": [
131+
"if truncate:\n",
132+
" with engine.connect() as con:\n",
133+
" con.execution_options(autocommit=True).execute(f'TRUNCATE TABLE {schema}.{table};')"
134+
]
135+
},
136+
{
137+
"cell_type": "code",
138+
"execution_count": null,
139+
"metadata": {},
140+
"outputs": [],
141+
"source": [
142+
"Session = sessionmaker(bind=engine) \n",
143+
"\n",
144+
"with Session() as session:\n",
145+
" df = pd.read_csv(data_dir + data_csv) \n",
146+
" df.to_sql(table, con=engine, if_exists='append',index=False)"
147+
]
148+
}
149+
],
150+
"metadata": {
151+
"kernelspec": {
152+
"display_name": "Python 3",
153+
"language": "python",
154+
"name": "python3"
155+
},
156+
"language_info": {
157+
"codemirror_mode": {
158+
"name": "ipython",
159+
"version": 3
160+
},
161+
"file_extension": ".py",
162+
"mimetype": "text/x-python",
163+
"name": "python",
164+
"nbconvert_exporter": "python",
165+
"pygments_lexer": "ipython3",
166+
"version": "3.8.6"
167+
}
168+
},
169+
"nbformat": 4,
170+
"nbformat_minor": 4
171+
}

0 commit comments

Comments
 (0)