forked from Azinuss/pgsync
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplugin.py
More file actions
94 lines (80 loc) · 2.95 KB
/
plugin.py
File metadata and controls
94 lines (80 loc) · 2.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""PGSync Plugin."""
import logging
import os
from abc import ABC, abstractmethod
from importlib import import_module
from inspect import getmembers, isclass
from pkgutil import iter_modules
from typing import Generator, Optional
logger = logging.getLogger(__name__)
class Plugin(ABC):
"""Plugin base class."""
@abstractmethod
def transform(self, doc: dict, **kwargs) -> dict:
"""This must be implemented by all derived classes."""
pass
class Plugins(object):
def __init__(self, package: str, names: Optional[list] = None):
self.package: str = package
self.names: list = names or []
self.reload()
def reload(self) -> None:
"""Reloads the plugins from the available list."""
self.plugins: list = []
self._paths: list = []
logger.debug(f"Reloading plugins from package: {self.package}")
self.walk(self.package)
def walk(self, package: str) -> None:
"""Recursively walk the supplied package and fetch all plugins."""
module = import_module(package)
for _, name, ispkg in iter_modules(
module.__path__,
prefix=f"{module.__name__}.",
):
if ispkg:
continue
for _, klass in getmembers(import_module(name), isclass):
if issubclass(klass, Plugin) & (klass is not Plugin):
if klass.name not in self.names:
continue
logger.debug(
f"Plugin class: {klass.__module__}.{klass.__name__}"
)
self.plugins.append(klass())
paths: list = []
if isinstance(module.__path__, str):
paths.append(module.__path__)
else:
paths.extend([path for path in module.__path__])
for pkg_path in paths:
if pkg_path in self._paths:
continue
self._paths.append(pkg_path)
for pkg in [
path
for path in os.listdir(pkg_path)
if os.path.isdir(os.path.join(pkg_path, path))
]:
self.walk(f"{package}.{pkg}")
def transform(self, docs: list) -> Generator:
"""Applies all plugins to each doc."""
for doc in docs:
for plugin in self.plugins:
doc["_source"] = plugin.transform(
doc["_source"],
_id=doc["_id"],
_index=doc["_index"],
)
if not doc["_source"]:
yield
yield doc
def auth(self, key: str) -> Optional[str]:
"""Get an auth value from a key."""
for plugin in self.plugins:
if hasattr(plugin, "auth"):
try:
return plugin.auth(key)
except Exception as e:
logger.exception(f"Error calling auth: {e}")
return None
return None