-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase_integration.py
More file actions
102 lines (84 loc) · 3.98 KB
/
Copy pathdatabase_integration.py
File metadata and controls
102 lines (84 loc) · 3.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
from pymongo import UpdateOne
from tqdm import tqdm
import cx_Oracle
# Author: Ruslana Kruk
# This script integrates MongoDB and Oracle databases.
# It updates MongoDB document status and inserts documents into an Oracle table.
# Environment variables and connection setup
oracle_host = "enter_oracle_host"
oracle_port = "enter_oracle_port"
oracle_service_name = "enter_oracle_service_name"
oracle_user = "enter_oracle_user"
oracle_password = "enter_oracle_password"
mongo_db_name = "enter_mongo_db_name"
mongo_collection_name = "enter_mongo_collection_name"
dsn = cx_Oracle.makedsn(host=oracle_host, port=oracle_port, service_name=oracle_service_name)
# Update MongoDB document status in bulk
def update_mongo_status_bulk(document_ids, collection):
bulk_operations = [UpdateOne({"_id": doc["_id"]}, {"$set": {"_scrape_status": 1}}) for doc in document_ids]
collection.bulk_write(bulk_operations)
# Insert batch of documents into Oracle
def insert_batch_to_oracle(table_name, field_mapping, batch, cursor):
try:
batch_data = []
for doc in batch:
data = {dest: process_field(doc, src, data_type) for src, (dest, data_type) in field_mapping.items()}
batch_data.append(data)
placeholders = ', '.join([f':{dest}' for dest, _ in field_mapping.values()])
field_list = ', '.join([f':{dest} AS {dest}' for dest, (dest, _) in field_mapping.items()])
merge_statement = f"""
MERGE INTO {table_name} dst
USING (
SELECT {field_list}
FROM dual
) src
ON (dst.ID = src.ID)
WHEN MATCHED THEN
UPDATE SET {', '.join([f'dst.{dest} = src.{dest}' for dest, _ in field_mapping.values() if dest != "ID"])}
WHEN NOT MATCHED THEN
INSERT ({', '.join([dest for dest, _ in field_mapping.values()])})
VALUES ({placeholders})
"""
cursor.executemany(merge_statement, batch_data)
cursor.connection.commit()
except Exception as e:
cursor.connection.rollback()
raise e
# Process field data according to the mapping
def process_field(document, field_path, data_type):
value = document
for key in field_path.split('.'):
if '[' in key:
key, index = key.split('[')
index = int(index.rstrip(']'))
value = value.get(key, [])[index] if isinstance(value.get(key, []), list) else None
else:
value = value.get(key)
if value is None:
return None
if data_type.startswith("VARCHAR2"):
return str(value)[:4000]
return value
# Main function to integrate MongoDB and Oracle
def insert_collections_to_oracle(collection_name, table_name, field_mapping, batch_size):
os.environ["NLS_LANG"] = "AMERICAN_AMERICA.AL32UTF8"
with cx_Oracle.connect(user=oracle_user, password=oracle_password, dsn=dsn) as oracle_connection, \
oracle_connection.cursor() as oracle_cursor:
mongo_client = MongoClient(mongo_db_name)
mongo_collection = mongo_client[mongo_collection_name]
query = {"_scrape_status": 0}
total_documents = mongo_collection.count_documents(query)
with tqdm(total=total_documents, desc=f"Inserting {collection_name} into {table_name}") as pbar:
batch = []
for doc in mongo_collection.find(query):
batch.append(doc)
if len(batch) >= batch_size:
insert_batch_to_oracle(table_name, field_mapping, batch, oracle_cursor)
update_mongo_status_bulk(batch, mongo_collection)
batch.clear()
pbar.update(batch_size)
if batch:
insert_batch_to_oracle(table_name, field_mapping, batch, oracle_cursor)
update_mongo_status_bulk(batch, mongo_collection)
pbar.update(len(batch))