-
Notifications
You must be signed in to change notification settings - Fork 96
/
utils.py
164 lines (141 loc) · 4.35 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import argparse
import logging
from os import getenv
from typing import Any, Iterable, List
from rich.progress import track as rich_track
class Tracker:
__enabled = False
@classmethod
def enable(cls) -> None:
cls.__enabled = True
@classmethod
def disable(cls) -> None:
cls.__enabled = False
@classmethod
def track(cls, it: Iterable, description: str, **kwargs) -> Iterable: # type: ignore[no-untyped-def]
if not cls.__enabled:
return it
description = f"{description: <32}"
return rich_track(it, description, **kwargs)
track = Tracker.track
def default(arg: Any, default_value: Any) -> Any:
return arg if arg is not None else default_value
def set_slow_config(args: argparse.Namespace) -> None:
args.concurrent_requests = default(args.concurrent_requests, 1)
args.max_retries = default(args.max_retries, 50)
args.backoff = default(args.backoff, 2)
def parse_args(args: List[str]) -> argparse.Namespace:
default_values = {"document": "query { FUZZ }"}
parser = argparse.ArgumentParser()
parser.add_argument(
"-v",
"--verbose",
default=0,
action="count",
)
parser.add_argument(
"-i",
"--input-schema",
metavar="<file>",
help="Input file containing JSON schema which will be supplemented with obtained information",
)
parser.add_argument(
"-o",
"--output",
metavar="<file>",
help="Output file containing JSON schema (default to stdout)",
)
parser.add_argument(
"-d",
"--document",
metavar="<string>",
default=default_values["document"],
help=f'Start with this document (default {default_values["document"]})',
)
parser.add_argument(
"-H",
"--header",
metavar="<header>",
dest="headers",
action="append",
default=[],
)
parser.add_argument(
"-c",
"--concurrent-requests",
metavar="<int>",
type=int,
default=None,
help="Number of concurrent requests to send to the server",
)
parser.add_argument(
"-w",
"--wordlist",
metavar="<file>",
type=argparse.FileType("r"),
help="This wordlist will be used for all brute force effots (fields, arguments and so on)",
)
parser.add_argument(
"-wv",
"--validate",
action="store_true",
help="Validate the wordlist items match name Regex",
)
parser.add_argument(
"-x",
"--proxy",
metavar="<string>",
type=str,
help="Define a proxy to use for all requests. For more info, read https://docs.aiohttp.org/en/stable/client_advanced.html?highlight=proxy",
)
parser.add_argument(
"-k",
"--no-ssl",
action="store_true",
help="Disable SSL verification",
)
parser.add_argument(
"-m",
"--max-retries",
metavar="<int>",
type=int,
help="How many retries should be made when a request fails",
)
parser.add_argument(
"-b",
"--backoff",
metavar="<int>",
type=int,
help="Exponential backoff factor. Delay will be calculated as: `0.5 * backoff**retries` seconds.",
)
parser.add_argument(
"-p",
"--profile",
choices=["slow", "fast"],
default="fast",
help="Select a speed profile. fast mod will set lot of workers to provide you quick result"
+ " but if the server as some rate limit you may want to use slow mod.",
)
parser.add_argument(
"--progress",
action="store_true",
help="Enable progress bar",
)
parser.add_argument("url")
parsed_args = parser.parse_args(args)
if parsed_args.profile == "slow":
set_slow_config(parsed_args)
if parsed_args.progress:
Tracker.enable()
return parsed_args
def setup_logger(verbosity: int) -> None:
fmt = getenv("LOG_FMT") or "%(asctime)s \t%(levelname)s\t| %(message)s"
datefmt = getenv("LOG_DATEFMT") or "%Y-%m-%d %H:%M:%S"
default_level = getenv("LOG_LEVEL") or "INFO"
level = "DEBUG" if verbosity >= 1 else default_level.upper()
logging.basicConfig(
level=level,
format=fmt,
datefmt=datefmt,
)
logging.getLogger("asyncio").setLevel(logging.ERROR)