Skip to content

Commit

Permalink
Fix UseFunctionLevelImports rule (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
feluelle authored Jun 30, 2022
1 parent 99c3ed4 commit f441e96
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,6 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# Example dags for testing
example_dags/
77 changes: 46 additions & 31 deletions airflint/rules/use_function_level_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,57 @@ def match(self, node: ast.AST) -> Action:
# do the refactorings one by one and finally remove
# all unused imports at the end.
assert isinstance(node, ast.FunctionDef)
# Skip functions using dag decorator
# as they are being called by the DAG Parser as well.
assert not any(
isinstance(identifier, ast.Name)
and isinstance(identifier.ctx, ast.Load)
and identifier.id == "dag"
for expr in node.decorator_list
for identifier in ast.walk(expr)
)

inlined_imports = []
for identifier in ast.walk(node):
# Find all used identifiers inside this function.
if not (
isinstance(identifier, ast.Name)
and isinstance(identifier.ctx, ast.Load)
):
continue

# And try to resolve their scope. Each scope has its own parent
# (unless it is a module), so we are simply going back until we
# find the definition for the selected identifier.
scope = self.context["scope"].resolve(identifier)
while not (definitions := scope.definitions.get(identifier.id, [])):
scope = scope.parent
if scope is None:
# There might be some magic, so let's not
# forget the chance of running out of scopes.
break

# If any of the definitions (there might be multiple of them)
# we found matches an import, we'll filter them out.
imports = [
definition
for definition in definitions
if isinstance(definition, (ast.Import, ast.ImportFrom))
]
# And we'll ensure this import is originating from the global scope.
if imports and scope.scope_type is ScopeType.GLOBAL:
inlined_imports.append(imports[0])
# We only walk through function body - not decorators
# as they are not within the function scope.
for stmt in node.body:
for identifier in ast.walk(stmt):
# Find all used identifiers inside this function.
if not (
isinstance(identifier, ast.Name)
and isinstance(identifier.ctx, ast.Load)
):
continue

# And try to resolve their scope. Each scope has its own parent
# (unless it is a module), so we are simply going back until we
# find the definition for the selected identifier.
scope = self.context["scope"].resolve(identifier)
while not (definitions := scope.definitions.get(identifier.id, [])):
scope = scope.parent
if scope is None:
# There might be some magic, so let's not
# forget the chance of running out of scopes.
break

# If any of the definitions (there might be multiple of them)
# we found matches an import, we'll filter them out.
imports = [
definition
for definition in definitions
if isinstance(definition, (ast.Import, ast.ImportFrom))
]
# And we'll ensure this import is originating from the global scope.
if imports and scope.scope_type is ScopeType.GLOBAL:
inlined_imports.append(imports[0])

# We only want unique imports i.e. no duplicates
unique_inlined_imports = list(dict.fromkeys(inlined_imports))

# We want this rule to only run if there is at least 1 inlined import.
assert len(inlined_imports) >= 1
assert len(unique_inlined_imports) >= 1

# We'll select the first statement, which will act like an anchor to us
# when we are inserting.
first_stmt = node.body[0]
return NewStatementsAction(first_stmt, inlined_imports)
return NewStatementsAction(first_stmt, unique_inlined_imports)

0 comments on commit f441e96

Please sign in to comment.