Skip to content

Commit 54fa051

Browse files
authored
Merge pull request #671 from DataDog/s.obregoso/fix_typosquatting
bugfix: typosquatting fix top packages format
2 parents ec3f52c + d57e667 commit 54fa051

File tree

9 files changed

+27195
-72144
lines changed

9 files changed

+27195
-72144
lines changed

guarddog/analyzer/metadata/go/typosquatting.py

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
1-
import json
2-
import logging
3-
import os
41
from typing import Optional
52

63
from guarddog.analyzer.metadata.typosquatting import TyposquatDetector
7-
from guarddog.utils.config import TOP_PACKAGES_CACHE_LOCATION
8-
9-
log = logging.getLogger("guarddog")
104

115

126
class GoTyposquatDetector(TyposquatDetector):
@@ -19,32 +13,21 @@ class GoTyposquatDetector(TyposquatDetector):
1913
"""
2014

2115
def _get_top_packages(self) -> set:
22-
top_packages_filename = "top_go_packages.json"
23-
24-
resources_dir = TOP_PACKAGES_CACHE_LOCATION
25-
if resources_dir is None:
26-
resources_dir = os.path.abspath(
27-
os.path.join(os.path.dirname(__file__), "..", "resources")
28-
)
29-
30-
top_packages_path = os.path.join(resources_dir, top_packages_filename)
31-
top_packages_information = self._get_top_packages_local(top_packages_path)
16+
"""
17+
Gets the top Go packages from local cache.
18+
Uses the base class implementation without network refresh.
19+
"""
20+
packages = self._get_top_packages_with_refresh(
21+
packages_filename="top_go_packages.json",
22+
popular_packages_url=None, # No URL = no auto-refresh
23+
)
3224

33-
if top_packages_information is None:
25+
if not packages:
3426
raise Exception(
35-
f"Could not retrieve top Go packages from {top_packages_path}"
27+
"Could not retrieve top Go packages from top_go_packages.json"
3628
)
3729

38-
return set(top_packages_information)
39-
40-
def _get_top_packages_local(self, path: str) -> list[dict] | None:
41-
try:
42-
with open(path, "r") as f:
43-
result = json.load(f)
44-
return result
45-
except FileNotFoundError:
46-
log.error(f"File not found: {path}")
47-
return None
30+
return packages
4831

4932
def detect(
5033
self,

guarddog/analyzer/metadata/npm/typosquatting.py

Lines changed: 24 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,6 @@
1-
import json
2-
import logging
3-
import os
4-
from datetime import datetime, timedelta
51
from typing import Optional
62

73
from guarddog.analyzer.metadata.typosquatting import TyposquatDetector
8-
from guarddog.utils.config import TOP_PACKAGES_CACHE_LOCATION
9-
import requests
10-
11-
log = logging.getLogger("guarddog")
124

135

146
class NPMTyposquatDetector(TyposquatDetector):
@@ -21,65 +13,38 @@ class NPMTyposquatDetector(TyposquatDetector):
2113
"""
2214

2315
def _get_top_packages(self) -> set:
24-
25-
popular_packages_url = (
26-
"https://github.com/LeoDog896/npm-rank/releases/download/latest/raw.json"
16+
"""
17+
Gets the top 8000 most popular NPM packages.
18+
Uses the base class implementation with NPM-specific parameters.
19+
"""
20+
return self._get_top_packages_with_refresh(
21+
packages_filename="top_npm_packages.json",
22+
popular_packages_url="https://github.com/LeoDog896/npm-rank/releases/download/latest/raw.json",
23+
refresh_days=30,
2724
)
2825

29-
top_packages_filename = "top_npm_packages.json"
26+
def _extract_package_names(self, data: dict | list | None) -> list | None:
27+
"""
28+
Extract package names from NPM data structure.
3029
31-
resources_dir = TOP_PACKAGES_CACHE_LOCATION
32-
if resources_dir is None:
33-
resources_dir = os.path.abspath(
34-
os.path.join(os.path.dirname(__file__), "..", "resources")
35-
)
30+
Network response format: [{"name": "package-name", ...}, ...]
31+
Local file format: ["package-name", "package-name", ...]
3632
37-
top_packages_path = os.path.join(resources_dir, top_packages_filename)
38-
top_packages_information = self._get_top_packages_local(top_packages_path)
39-
40-
if self._file_is_expired(top_packages_path, days=30):
41-
new_information = self._get_top_packages_network(popular_packages_url)
42-
if new_information is not None:
43-
top_packages_information = new_information
44-
45-
with open(top_packages_path, "w+") as f:
46-
json.dump(new_information, f, ensure_ascii=False, indent=4)
47-
48-
if top_packages_information is None:
49-
return set()
50-
return set(top_packages_information)
51-
52-
def _file_is_expired(self, path: str, days: int) -> bool:
53-
try:
54-
update_time = datetime.fromtimestamp(os.path.getmtime(path))
55-
return datetime.now() - update_time > timedelta(days=days)
56-
except FileNotFoundError:
57-
return True
58-
59-
def _get_top_packages_local(self, path: str) -> list[dict] | None:
60-
try:
61-
with open(path, "r") as f:
62-
result = json.load(f)
63-
return result
64-
except FileNotFoundError:
65-
log.error(f"File not found: {path}")
33+
This method handles both formats and limits to top 8000 packages.
34+
"""
35+
if data is None:
6636
return None
6737

68-
def _get_top_packages_network(self, url: str) -> list[dict] | None:
69-
try:
70-
response = requests.get(url)
71-
response.raise_for_status()
38+
# If data is already a list of strings (local file format)
39+
if isinstance(data, list) and len(data) > 0:
40+
if isinstance(data[0], str):
41+
return data
7242

73-
response_data = response.json()
74-
result = list([i["name"] for i in response_data[0:8000]])
43+
# If data is list of dicts (network response format)
44+
if isinstance(data[0], dict) and "name" in data[0]:
45+
return [item["name"] for item in data[0:8000]]
7546

76-
return result
77-
except json.JSONDecodeError:
78-
log.error(f'Couldn`t convert to json: "{response.text}"')
79-
return None
80-
except requests.exceptions.RequestException as e:
81-
log.error(f"Network error: {e}")
82-
return None
47+
return None
8348

8449
def detect(
8550
self,

guarddog/analyzer/metadata/pypi/typosquatting.py

Lines changed: 20 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
1-
import json
21
import logging
3-
import os
4-
from datetime import datetime, timedelta
52
from typing import Optional
63

7-
import requests
84
import packaging.utils
95

106
from guarddog.analyzer.metadata.typosquatting import TyposquatDetector
11-
from guarddog.utils.config import TOP_PACKAGES_CACHE_LOCATION
127

138
log = logging.getLogger("guarddog")
149

@@ -25,87 +20,35 @@ class PypiTyposquatDetector(TyposquatDetector):
2520

2621
def _get_top_packages(self) -> set:
2722
"""
28-
Gets the package information of the top 5000 most downloaded PyPI packages
29-
30-
Returns:
31-
set: set of package data in the format:
32-
{
33-
...
34-
{
35-
download_count: ...
36-
project: <package-name>
37-
}
38-
...
39-
}
23+
Gets the package information of the top 5000 most downloaded PyPI packages.
24+
Uses the base class implementation with PyPI-specific parameters.
4025
"""
41-
42-
popular_packages_url = (
43-
"https://hugovk.github.io/top-pypi-packages/top-pypi-packages.min.json"
26+
packages = self._get_top_packages_with_refresh(
27+
packages_filename="top_pypi_packages.json",
28+
popular_packages_url="https://hugovk.github.io/top-pypi-packages/top-pypi-packages.min.json",
29+
refresh_days=30,
4430
)
4531

46-
top_packages_filename = "top_pypi_packages.json"
47-
resources_dir = TOP_PACKAGES_CACHE_LOCATION
48-
if resources_dir is None:
49-
resources_dir = os.path.abspath(
50-
os.path.join(os.path.dirname(__file__), "..", "resources")
51-
)
52-
53-
top_packages_path = os.path.join(resources_dir, top_packages_filename)
54-
top_packages_information = self._get_top_packages_local(top_packages_path)
55-
56-
if self._file_is_expired(top_packages_path, days=30):
57-
new_information = self._get_top_packages_network(popular_packages_url)
58-
if new_information is not None:
59-
top_packages_information = new_information
60-
61-
with open(top_packages_path, "w+") as f:
62-
json.dump(new_information, f, ensure_ascii=False, indent=4)
63-
64-
if top_packages_information is None:
65-
return set()
66-
return set(map(self.get_safe_name, top_packages_information))
32+
# Apply canonicalization to PyPI package names
33+
return set(map(self._canonicalize_name, packages))
6734

68-
@staticmethod
69-
def get_safe_name(package):
70-
return packaging.utils.canonicalize_name(package["project"])
71-
72-
def _file_is_expired(self, path: str, days: int) -> bool:
73-
try:
74-
update_time = datetime.fromtimestamp(os.path.getmtime(path))
75-
return datetime.now() - update_time > timedelta(days=days)
76-
except FileNotFoundError:
77-
return True
78-
79-
def _get_top_packages_local(self, path: str) -> list[dict] | None:
80-
try:
81-
with open(path, "r") as f:
82-
result = json.load(f)
83-
return self.extract_information(result)
84-
except FileNotFoundError:
85-
log.error(f"File not found: {path}")
35+
def _extract_package_names(self, data: dict | list | None) -> list | None:
36+
"""
37+
Extract package names from PyPI data structure.
38+
PyPI data has format: {"rows": [{"project": "name", "download_count": ...}, ...]}
39+
"""
40+
if data is None:
8641
return None
8742

88-
def _get_top_packages_network(self, url: str) -> list[dict] | None:
89-
try:
90-
response = requests.get(url)
91-
response.raise_for_status()
92-
93-
response_data = response.json()
94-
result = response_data
43+
if isinstance(data, dict) and "rows" in data:
44+
return [row["project"] for row in data["rows"]]
9545

96-
return self.extract_information(result)
97-
except json.JSONDecodeError:
98-
log.error(f'Couldn`t convert to json: "{response.text}"')
99-
return None
100-
except requests.exceptions.RequestException as e:
101-
log.error(f"Network error: {e}")
102-
return None
46+
return None
10347

10448
@staticmethod
105-
def extract_information(data: dict | None) -> list[dict] | None:
106-
if data is not None:
107-
return data.get("rows")
108-
return None
49+
def _canonicalize_name(package_name: str) -> str:
50+
"""Canonicalize PyPI package names according to PEP 503."""
51+
return packaging.utils.canonicalize_name(package_name)
10952

11053
def detect(
11154
self,

0 commit comments

Comments
 (0)