Skip to content

Commit

Permalink
chore: add linting, pre-commit and github actions (dbt-labs#24)
Browse files Browse the repository at this point in the history
* chore: add linting, pre-commit and github actions
Closes dbt-labs#8

* chore: add autoflake and pyupgrade, improve makefile

* chore improve README.md on contributing
  • Loading branch information
Jrmyy authored Nov 16, 2022
1 parent b97fec1 commit e53b3b6
Show file tree
Hide file tree
Showing 20 changed files with 281 additions and 165 deletions.
8 changes: 0 additions & 8 deletions .flake8

This file was deleted.

16 changes: 16 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
name: ci

on:
pull_request:
push:

jobs:
pre-commit:
name: 'Pre-commit checks'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
with:
python-version: '3.x'
- uses: pre-commit/[email protected]
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,4 @@ cython_debug/
.idea/

# Project specific
test.py
test.py
61 changes: 61 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.3.0
hooks:
# Identify invalid files
- id: check-ast
- id: check-yaml
- id: check-json
- id: check-toml

# git checks
- id: check-merge-conflict
- id: check-added-large-files
- id: detect-private-key
- id: check-case-conflict

# Python checks
- id: check-docstring-first
- id: debug-statements
- id: requirements-txt-fixer
- id: fix-byte-order-marker

# General quality checks
- id: mixed-line-ending
- id: trailing-whitespace
args: [ --markdown-linebreak-ext=md ]
- id: end-of-file-fixer

- repo: https://github.com/PyCQA/autoflake
rev: v1.7.7
hooks:
- id: autoflake

- repo: https://github.com/asottile/pyupgrade
rev: v3.2.2
hooks:
- id: pyupgrade
args:
- '--py37-plus'

- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
name: isort
args:
- '.'

- repo: https://github.com/psf/black
rev: 22.10.0
hooks:
- id: black

- repo: https://github.com/pycqa/flake8
rev: 5.0.4
hooks:
- id: flake8
additional_dependencies:
- "Flake8-pyproject~=1.1"
args:
- '.'
29 changes: 25 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
include dev.env
export

install_deps:
pip install -r dev_requirements.txt
CHANGED_FILES := $(shell git ls-files --modified --other --exclude-standard)
CHANGED_FILES_IN_BRANCH := $(shell git diff --name-only $(shell git merge-base origin/main HEAD))

.PHONY : install_deps setup pre-commit pre-commit-in-branch pre-commit-all run_tests help

install_deps: ## Install all dependencies.
pip install -r dev-requirements.txt
pip install -r requirements.txt
pip install -e .

run_tests:
pytest test/integration/athena.dbtspec
setup: ## Install all dependencies and setup pre-commit
make install_deps
pre-commit install

pre-commit: ## check modified and added files (compared to last commit!) with pre-commit.
pre-commit run --files $(CHANGED_FILES)

pre-commit-in-branch: ## check changed since origin/main files with pre-commit.
pre-commit run --files $(CHANGED_FILES_IN_BRANCH)

pre-commit-all: ## Check all files in working directory with pre-commit.
pre-commit run --all-files

run_tests: ## Run tests.
pytest test/integration/athena.dbtspec

help: ## Show this help.
@egrep -h '\s##\s' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m %-30s\033[0m %s\n", $$1, $$2}'
25 changes: 19 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)

# dbt-athena

* Supports dbt version `1.3.*`
Expand Down Expand Up @@ -98,7 +101,7 @@ _Additional information_
* The compression type to use for any storage format that allows compression to be specified. To see which options are available, check out [CREATE TABLE AS][create-table-as]
* `field_delimiter` (`default=none`)
* Custom field delimiter, for when format is set to `TEXTFILE`

More information: [CREATE TABLE AS][create-table-as]

[run_started_at]: https://docs.getdbt.com/reference/dbt-jinja-functions/run_started_at
Expand All @@ -119,7 +122,7 @@ The following features of dbt are not implemented on Athena:

* Quoting is not currently supported
* If you need to quote your sources, escape the quote characters in your source definitions:

```yaml
version: 2
Expand All @@ -136,15 +139,25 @@ The following features of dbt are not implemented on Athena:
* **Only** supports Athena engine 2
* [Changing Athena Engine Versions][engine-change]

### Running tests
### Contributing

First, install the adapter and its dependencies using `make` (see [Makefile](Makefile)):
This connector works with Python from 3.7 to 3.10.

#### Getting started
In order to start developing on this adapter clone the repo and run this make command (see [Makefile](Makefile)) :

```bash
make install_deps
make setup
```

Next, configure the environment variables in [dev.env](dev.env) to match your Athena development environment. Finally, run the tests using `make`:
It will :
1. Install all dependencies.
2. Install pre-commit hooks.

Next, configure the environment variables in [dev.env](dev.env) to match your Athena development environment.

#### Running tests
You can run the tests using `make`:

```bash
make run_tests
Expand Down
13 changes: 3 additions & 10 deletions dbt/adapters/athena/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
from dbt.adapters.athena.connections import AthenaConnectionManager
from dbt.adapters.base import AdapterPlugin

from dbt.adapters.athena.connections import AthenaCredentials
from dbt.adapters.athena.impl import AthenaAdapter
import dbt.adapters.athena.query_headers

from dbt.adapters.base import AdapterPlugin
from dbt.include import athena


Plugin = AdapterPlugin(
adapter=AthenaAdapter,
credentials=AthenaCredentials,
include_path=athena.PACKAGE_PATH
)
Plugin = AdapterPlugin(adapter=AthenaAdapter, credentials=AthenaCredentials, include_path=athena.PACKAGE_PATH)
83 changes: 38 additions & 45 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
from typing import ContextManager, Tuple, Optional, List, Dict, Any
from dataclasses import dataclass
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import contextmanager
from copy import deepcopy
from dataclasses import dataclass
from decimal import Decimal
from concurrent.futures.thread import ThreadPoolExecutor

from pyathena.connection import Connection as AthenaConnection
from pyathena.result_set import AthenaResultSet
from pyathena.model import AthenaQueryExecution
from pyathena.cursor import Cursor
from pyathena.error import ProgrammingError, OperationalError
from pyathena.formatter import Formatter
from pyathena.util import RetryConfig

# noinspection PyProtectedMember
from pyathena.formatter import _DEFAULT_FORMATTERS, _escape_hive, _escape_presto
from typing import Any, ContextManager, Dict, List, Optional, Tuple

import tenacity
from dbt.adapters.base import Credentials
from dbt.contracts.connection import Connection, AdapterResponse, ConnectionState
from dbt.adapters.sql import SQLConnectionManager
from dbt.exceptions import RuntimeException, FailedToConnectException
from dbt.contracts.connection import AdapterResponse, Connection, ConnectionState
from dbt.events import AdapterLogger
from dbt.exceptions import FailedToConnectException, RuntimeException
from pyathena.connection import Connection as AthenaConnection
from pyathena.cursor import Cursor
from pyathena.error import OperationalError, ProgrammingError

import tenacity
# noinspection PyProtectedMember
from pyathena.formatter import (
_DEFAULT_FORMATTERS,
Formatter,
_escape_hive,
_escape_presto,
)
from pyathena.model import AthenaQueryExecution
from pyathena.result_set import AthenaResultSet
from pyathena.util import RetryConfig
from tenacity.retry import retry_if_exception
from tenacity.stop import stop_after_attempt
from tenacity.wait import wait_exponential
Expand Down Expand Up @@ -53,13 +55,21 @@ def unique_field(self):
return self.host

def _connection_keys(self) -> Tuple[str, ...]:
return "s3_staging_dir", "work_group", "region_name", "database", "schema", "poll_interval", \
"aws_profile_name", "endpoing_url"
return (
"s3_staging_dir",
"work_group",
"region_name",
"database",
"schema",
"poll_interval",
"aws_profile_name",
"endpoing_url",
)


class AthenaCursor(Cursor):
def __init__(self, **kwargs):
super(AthenaCursor, self).__init__(**kwargs)
super().__init__(**kwargs)
self._executor = ThreadPoolExecutor()

def _collect_result_set(self, query_id: str) -> AthenaResultSet:
Expand Down Expand Up @@ -91,9 +101,7 @@ def inner():
cache_size=cache_size,
cache_expiration_time=cache_expiration_time,
)
query_execution = self._executor.submit(
self._collect_result_set, query_id
).result()
query_execution = self._executor.submit(self._collect_result_set, query_id).result()
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
self.result_set = self._result_set_class(
self._connection,
Expand Down Expand Up @@ -128,7 +136,7 @@ def exception_handler(self, sql: str) -> ContextManager:
try:
yield
except Exception as e:
logger.debug("Error running SQL: {}", sql)
logger.debug(f"Error running SQL: {sql}")
raise RuntimeException(str(e)) from e

@classmethod
Expand Down Expand Up @@ -172,16 +180,8 @@ def open(cls, connection: Connection) -> Connection:

@classmethod
def get_response(cls, cursor) -> AdapterResponse:
if cursor.state == AthenaQueryExecution.STATE_SUCCEEDED:
code = "OK"
else:
code = "ERROR"

return AdapterResponse(
_message="{} {}".format(code, cursor.rowcount),
rows_affected=cursor.rowcount,
code=code
)
code = "OK" if cursor.state == AthenaQueryExecution.STATE_SUCCEEDED else "ERROR"
return AdapterResponse(_message=f"{code} {cursor.rowcount}", rows_affected=cursor.rowcount, code=code)

def cancel(self, connection: Connection):
connection.handle.cancel()
Expand All @@ -201,13 +201,9 @@ def commit(self):

class AthenaParameterFormatter(Formatter):
def __init__(self) -> None:
super(AthenaParameterFormatter, self).__init__(
mappings=deepcopy(_DEFAULT_FORMATTERS), default=None
)
super().__init__(mappings=deepcopy(_DEFAULT_FORMATTERS), default=None)

def format(
self, operation: str, parameters: Optional[List[str]] = None
) -> str:
def format(self, operation: str, parameters: Optional[List[str]] = None) -> str:
if not operation or not operation.strip():
raise ProgrammingError("Query is none or empty.")
operation = operation.strip()
Expand All @@ -232,11 +228,8 @@ def format(

func = self.get(v)
if not func:
raise TypeError("{0} is not defined formatter.".format(type(v)))
raise TypeError(f"{type(v)} is not defined formatter.")
kwargs.append(func(self, escaper, v))
else:
raise ProgrammingError(
"Unsupported parameter "
+ "(Support for list only): {0}".format(parameters)
)
raise ProgrammingError(f"Unsupported parameter (Support for list only): {parameters}")
return (operation % tuple(kwargs)).strip() if kwargs is not None else operation.strip()
Loading

0 comments on commit e53b3b6

Please sign in to comment.