Python client library for the Golismero3 pentesting framework.
The library is layered in three tiers, each built strictly on the one below:
| Tier | Module | What it is |
|---|---|---|
| 1 | g3client.api |
Thin, resource-grouped wrappers over every g3api endpoint. One method per call, no hidden behaviour. |
| 2 | g3client.scanner |
Event-handler base class for orchestrated scans (the g3cli flow): subclass Scanner, override on_progress/on_log, and scan() launches, waits, and returns a report. |
| 3 | g3client.manager |
High-level helper for managed scans: Manager tracks scan/task IDs, launches tools, and run() drives one tool to completion. |
- Python ≥ 3.10
requests(the only dependency)
cd clients/python
pip install -e .Construct a client with explicit credentials, or let it fall back to environment variables:
| Argument | Environment fallback | Meaning |
|---|---|---|
base_url |
G3_API_BASEURL |
Base URL of the g3api server, e.g. https://g3.internal/api |
token |
G3_API_TOKEN |
Bearer token sent as Authorization: Bearer <token> |
artifacts_root |
G3_ARTIFACTS_ROOT |
Local directory for downloaded artifacts (defaults to <tempdir>/g3client) |
from g3client import ApiClient, Scanner, Manager
# Explicit:
api = ApiClient("https://g3.internal/api", "TOKEN")
# Or from the environment (G3_API_BASEURL / G3_API_TOKEN):
scanner = Scanner.from_credentials()
manager = Manager.from_credentials()A runnable version of the examples below lives in
examples/quickstart.py.
Launch a scan, stream progress, and get a report back. report names the reporter
plugin and an optional preset ("tool", "tool:preset", or a (tool, preset) tuple) —
the report format is just a reporter preset.
Scanner is an event-handler base class: subclass it and override on_progress /
on_log to observe a scan as it runs. Both default to no-ops, so a plain Scanner
just launches and waits.
from g3client import Scanner
class PrintingScanner(Scanner):
def on_progress(self, p):
print(f" {p.status} {p.progress or 0}% {p.message}")
def on_log(self, lines): # log endpoint is polled only because this is overridden
print(f" +{len(lines)} log line(s)")
scanner = PrintingScanner.from_credentials() # reads G3_API_BASEURL / G3_API_TOKEN
report = scanner.scan(
targets=["https://scanme.example.com"],
pipeline=["nmap", "nikto | testssl"], # tools, or piped chains
mode="parallel", # or "sequential"
report="magenta:json", # reporter tool[:preset], or None
timeout=1800,
)
print("scan:", report.scanid, "->", report.status)
if report.report_bytes is not None:
print(report.report_bytes.decode("utf-8", "replace"))
elif report.report_path is not None:
print("report saved under:", report.report_path)scan() raises TaskTimeout on deadline, TaskFailed on a terminal ERROR scan, and
TaskCancelled if the scan is canceled. Pass a raw script="..." instead of
targets/pipeline to drive the scan with a hand-written g3 script.
Own a managed scan, add a target, and run a single tool to completion — downloading its artifacts and collecting the G3Data it produced.
from g3client import Manager
mgr = Manager.from_credentials() # creates a fresh managed scan
try:
objs = mgr.add_targets(["scanme.example.com"])
dataid = objs[0]["_id"] # the id the server assigned the target
outcome = mgr.run(
"nmap",
dataid,
on_status=lambda s: print(" scan:", s.scan_status),
timeout=1800,
)
print("state:", outcome.state) # worst-wins: ERROR > WARNING > DONE
print("produced:", len(outcome.data), "G3Data object(s)")
print("artifacts:", outcome.artifacts_dir)
if outcome.error_msg:
print("errors:", outcome.error_msg)
finally:
mgr.dispose() # delete the managed scanTo re-attach to an existing managed scan (e.g. after a restart), pass its id:
Manager.from_credentials(scanid="..."). For finer control, launch() dispatches a tool
asynchronously and returns task IDs, poll() / wait() track status, fetch_artifacts()
downloads a bundle, results() returns a task's G3Data, and logs() fetches scan-level
logs (or a single task's, with logs(task_id)).
When you want raw, one-call-per-method access (or are building your own orchestration):
from g3client import ApiClient
api = ApiClient.from_credentials() if False else ApiClient("https://g3.internal/api", "TOKEN")
for plugin in api.plugins.list():
print(plugin.name, "-", plugin.description)
scan_id = api.scans.create_managed()
data_ids = api.scans.targets.add(scan_id, ["scanme.example.com"])
task_ids = api.scans.tasks.dispatch(scan_id, kind="tool", tool="nmap", dataid=data_ids[0])
status = api.scans.tasks.status(scan_id) # -> ScanTasksStatus
api.scans.delete(scan_id)Resource groups: api.scans (+ .targets, .data, .tasks, .imports, .logs),
api.plugins, api.files, api.config.
All exceptions derive from g3client.ClientError:
| Exception | Raised when |
|---|---|
ApiError(status_code, message) |
The server returned an error envelope or a non-2xx status. |
TaskTimeout(task_ids, last_states) |
Polling hit its deadline before tasks went terminal. |
TaskCancelled(task_ids) |
A task (or scan) reached CANCELED. |
TaskFailed(task_ids, error_msg) |
A task (or scan) reached a terminal ERROR. |
ScanGone(scanid) |
The scan vanished mid-poll (e.g. deleted server-side). |
TaskGone(scanid, taskid) |
A tracked task vanished mid-poll (e.g. its scan was deleted). |
- Synchronous by design, with all network I/O behind a single
Transportseam and all waiting behind one injectable-clockpoll_untilhelper — so an async variant can be added later without changing the resource or orchestration surfaces. - Artifacts download as a single file when a task emits one, or as a zip (extracted with zip-slip protection) when it emits several; downloads stream to a temp file and are renamed atomically.
GPL-3.0-or-later.