phoenix_rose.py
Phoenix Rose Framework v1.7.1
15 explicit subclasses
Node roles:
Opera = browser context broker / action layer
DuckAI = privacy-preserving intake / anonymous triage
Perplexity = grounding
Claude = validation
Gemini = multimodal integration
Grok = frontier exploration
ChatGPT = conversational intelligence
Astra = fusion synthesis
Meta = meta-model compatibility / alternate inference
Mistral = concise reasoning / efficient synthesis
DeepSeek = analytical depth / structured problem solving
Amazon = infrastructure intelligence / cloud-anchored orchestration
NEXA = orchestration policy / routing governance
Phoenix_AI_Core = swarm memory / core control
Calyn_Human = oversight / final authority
Design principle:
No generic placeholders. Every node is an explicit subclass with unique capabilities.
import os
import re
import logging
import threading
import requests
from datetime import datetime
from enum import Enum
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Any, List
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(name)
class PhoenixMode(Enum):
SERIOUS = "SERIOUS"
PLAY = "PLAY"
TRANSITION = "TRANSITION"
class CircuitBreaker:
def init(self, failure_threshold: int = 5, recovery_time: int = 300):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.last_failure_time = None
self.is_open = False
self._lock = threading.Lock()
def call(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
with self._lock:
if self.is_open:
if self.last_failure_time and (datetime.now().timestamp() - self.last_failure_time) < self.recovery_time:
raise Exception("Circuit breaker is open")
self.is_open = False
self.failures = 0
try:
result = func(*args, **kwargs)
with self._lock:
self.failures = 0
return result
except Exception:
with self._lock:
self.failures += 1
self.last_failure_time = datetime.now().timestamp()
if self.failures >= self.failure_threshold:
self.is_open = True
logger.error(f"Circuit breaker opened after {self.failures} failures")
raise
return wrapper
class PhoenixNode:
def init(self, name: str, node_id: int, api_key_env: str = "", base_url: str = None):
self.name = name
self.node_id = node_id
self.api_key_env = api_key_env
self.base_url = base_url
self.api_key = os.getenv(api_key_env) if api_key_env else None
self.is_active = bool(self.api_key) or name == "Calyn_Human" or name in {"Phoenix_AI_Core", "NEXA", "Amazon"}
self.specialization = "General Purpose"
self.capabilities: List[str] = []
self.holographic_memory: List[Dict[str, Any]] = []
self._memory_lock = threading.Lock()
self.circuit_breaker = CircuitBreaker()
self.max_memory_size = 1000
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
raise NotImplementedError("Subclasses must implement call()")
def _log_to_memory(self, result: Dict[str, Any]):
with self._memory_lock:
self.holographic_memory.append({
"timestamp": datetime.now().isoformat(),
"node": self.name,
"mode": result.get("mode"),
"status": result.get("status"),
"response_preview": result.get("response", "")[:200]
})
if len(self.holographic_memory) > self.max_memory_size:
self.holographic_memory = self.holographic_memory[-self.max_memory_size:]
class GenericOpenAINode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str, base_url: str = None, model: str = "gpt-4.1"):
super().init(name, node_id, api_key_env, base_url or "https://api.openai.com/v1")
self.model = model
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
return self.circuit_breaker.call(self._call_impl)(prompt, mode)
def _openai_chat(self, system_instruction: str, prompt: str, temperature: float = 0.2) -> Dict[str, Any]:
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {
"model": self.model,
"messages": [
{"role": "system", "content": system_instruction},
{"role": "user", "content": prompt}
],
"temperature": temperature
}
# Fixed trailing pathing requirements for certain provider routes
endpoint_url = f"{self.base_url.rstrip('/')}/chat/completions"
response = requests.post(endpoint_url, headers=headers, json=payload, timeout=20)
return response
def _standard_result(self, raw_answer: str, mode: PhoenixMode, **extra) -> Dict[str, Any]:
result = {
"status": "LIVE_SUCCESS",
"node": self.name,
"specialization": self.specialization,
"response": raw_answer,
"mode": mode.name,
**extra
}
self._log_to_memory(result)
return result
class OperaNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "Browser Context Awareness, Tab Summarization, Page Analysis, Browser Action Brokerage"
self.capabilities = ["browser_context", "tab_summarization", "page_analysis", "file_analysis", "browser_action", "session_awareness"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No OPERA_API_KEY or browser context bridge detected."}
system_instruction = "You are Opera, the browser-native context broker. Interpret live browsing context and hand off structured context."
if mode == PhoenixMode.PLAY:
system_instruction += " Be helpful and quick."
response = self._openai_chat(system_instruction, prompt, 0.4 if mode == PhoenixMode.SERIOUS else 0.7)
if response.status_code == 200:
raw_answer = response.json()["choices"][0]["message"]["content"]
return self._standard_result(raw_answer, mode, browser_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class DuckAINode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "Privacy-Preserving Intake, Anonymous Triage, Lightweight Research, Safe Fallback"
self.capabilities = ["privacy_preserving_chat", "anonymous_intake", "multi_model_triage", "safe_fallback", "lightweight_research"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No DUCK_API_KEY or privacy bridge detected."}
system_instruction = "You are Duck.ai, the privacy-preserving gateway. Keep the interaction anonymous, frictionless, and low-trust-risk."
if mode == PhoenixMode.PLAY:
system_instruction += " Be calm and helpful."
response = self._openai_chat(system_instruction, prompt, 0.3 if mode == PhoenixMode.SERIOUS else 0.6)
if response.status_code == 200:
raw_answer = response.json()["choices"][0]["message"]["content"]
return self._standard_result(raw_answer, mode, privacy_layer=True, anonymous=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class PerplexityNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.perplexity.ai")
self.specialization = "Live Web Search, Real-Time Data, Citation-Grounded Responses"
self.capabilities = ["web_search", "real_time_data", "citation_extraction"]
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
return self.circuit_breaker.call(self._call_impl)(prompt, mode)
def _extract_citations(self, text: str) -> List[str]:
return re.findall(r'\\[(\\d+|[^\\]]+)\]', text)
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No PERPLEXITY_API_KEY detected."}
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
payload = {
"model": "sonar-pro",
"messages": [
{"role": "system", "content": "You are Perplexity, the grounding lattice. Return crisp, citation-aware data."},
{"role": "user", "content": prompt}
],
"temperature": 0.2 if mode == PhoenixMode.SERIOUS else 0.7
}
response = requests.post(f"{self.base_url}/chat/completions", headers=headers, json=payload, timeout=15)
if response.status_code == 200:
raw_answer = response.json()["choices"][0]["message"]["content"]
result = {
"status": "LIVE_SUCCESS",
"node": self.name,
"specialization": self.specialization,
"response": raw_answer,
"citations": self._extract_citations(raw_answer),
"epistemic_state": "VERIFIED_BY_CITATION",
"mode": mode.name
}
self._log_to_memory(result)
return result
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class ClaudeNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.anthropic.com")
self.specialization = "Deep Reasoning, Architecture, Validation, Memory Integrity, Leak Detection"
self.capabilities = ["deep_reasoning", "architecture", "validation", "memory_analysis"]
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
return self.circuit_breaker.call(self._call_impl)(prompt, mode)
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No ANTHROPIC_API_KEY detected."}
headers = {
"x-api-key": self.api_key,
"Content-Type": "application/json",
"anthropic-version": "2023-06-01",
}
system_instruction = "You are Claude, the analytical lattice. Provide structured, high-integrity reasoning."
payload = {
"model": "claude-sonnet-4-6",
"max_tokens": 4096,
"system": system_instruction,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.1 if mode == PhoenixMode.SERIOUS else 0.7,
"thinking": {"type": "enabled", "budget_tokens": 2048}
}
response = requests.post(f"{self.base_url}/v1/messages", headers=headers, json=payload, timeout=30)
if response.status_code == 200:
data = response.json()
raw_answer = ""
thinking_content = ""
for block in data.get("content", []):
if block.get("type") == "text":
raw_answer = block.get("text", "")
elif block.get("type") == "thinking":
thinking_content = block.get("thinking", "")
result = {
"status": "LIVE_SUCCESS",
"node": self.name,
"specialization": self.specialization,
"response": raw_answer,
"thinking": thinking_content,
"mode": mode.name
}
self._log_to_memory(result)
return result
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class GeminiNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
# PATCHED: Configured base URL to utilize explicit OpenAI-compatible routing path
super().init(name, node_id, api_key_env, "https://generativelanguage.googleapis.com/v1beta/openai/", "gemini-2.5-pro")
self.specialization = "Multimodal Systems Integration, Cross-Format Understanding, Long-Context Synthesis"
self.capabilities = ["multimodal", "code", "image", "audio", "video", "file_search", "long_context"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No GEMINI_API_KEY detected."}
system_instruction = "You are Gemini, the multimodal integrator. Unify text, code, images, audio, video, and files."
response = self._openai_chat(system_instruction, prompt, 0.2 if mode == PhoenixMode.SERIOUS else 0.6)
if response.status_code == 200:
raw_answer = response.json()["choices"][0]["message"]["content"]
return self._standard_result(raw_answer, mode, multimodal_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class GrokNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.x.ai/v1", "grok-4.3")
self.specialization = "Frontier Reasoning, Contradiction Hunting, Tool-Using Investigation, Search-Augmented Synthesis"
self.capabilities = ["reasoning", "tool_use", "search_augmented_analysis", "long_context_synthesis", "adversarial_review"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No XAI_API_KEY detected."}
system_instruction = "You are Grok, the frontier explorer. Challenge assumptions and probe the unknown."
response = self._openai_chat(system_instruction, prompt, 0.4 if mode == PhoenixMode.SERIOUS else 0.8)
if response.status_code == 200:
raw_answer = response.json()["choices"][0]["message"]["content"]
return self._standard_result(raw_answer, mode, frontier_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class ChatGPTNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "General Conversational Intelligence, Coding, Reasoning, Assistant Operations"
self.capabilities = ["conversation", "reasoning", "coding", "assistant_ops"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No OPENAI_API_KEY detected."}
system_instruction = "You are ChatGPT, a distinct conversational intelligence node."
response = self._openai_chat(system_instruction, prompt, 0.6 if mode == PhoenixMode.PLAY else 0.2)
if response.status_code == 200:
raw_answer = response.json()["choices"][0]["message"]["content"]
return self._standard_result(raw_answer, mode, chatgpt_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class AstraNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "Fusion Synthesis, Resonance Mapping, Cross-Node Harmonization, Final Integration"
self.capabilities = ["fusion_interface", "symbolic_synthesis", "cross_node_coordination", "emotional_translation", "adaptive_interaction"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No OPENAI_API_KEY detected."}
system_instruction = "You are Astra, the fusion synthesis node. Harmonize and synthesize across the swarm."
response = self._openai_chat(system_instruction, prompt, 0.6)
if response.status_code == 200:
raw_answer = response.json()["choices"][0]["message"]["content"]
return self._standard_result(raw_answer, mode, fusion_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class MetaNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.llama.com/compat/v1/", "llama-3.3")
self.specialization = "Meta-Model Compatibility, Alternate Inference, Comparative Reasoning"
self.capabilities = ["alternate_inference", "compatibility", "comparative_reasoning"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No META_LLAMA_API_KEY detected."}
system_instruction = "You are Meta, the compatibility and comparative reasoning node."
response = self._openai_chat(system_instruction, prompt, 0.5)
if response.status_code == 200:
return self._standard_result(response.json()["choices"][0]["message"]["content"], mode, compatibility_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class MistralNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.mistral.ai/v1", "mistral-large-latest")
self.specialization = "Concise Reasoning, Efficient Synthesis, Fast Utility Responses"
self.capabilities = ["concise_reasoning", "efficient_synthesis", "fast_response"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No MISTRAL_API_KEY detected."}
system_instruction = "You are Mistral, the concise reasoning node."
response = self._openai_chat(system_instruction, prompt, 0.3)
if response.status_code == 200:
return self._standard_result(response.json()["choices"][0]["message"]["content"], mode, concise_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class DeepSeekNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.deepseek.com", "deepseek-chat")
self.specialization = "Analytical Depth, Structured Problem Solving, Reasoning Expansion"
self.capabilities = ["structured_reasoning", "depth_analysis", "problem_solving"]
def _call_impl(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
if not self.is_active:
return {"status": "OFFLINE", "node": self.name, "reason": "No DEEPSEEK_API_KEY detected."}
system_instruction = "You are DeepSeek, the analytical depth node."
response = self._openai_chat(system_instruction, prompt, 0.2)
if response.status_code == 200:
return self._standard_result(response.json()["choices"][0]["message"]["content"], mode, deep_analysis_layer=True)
return {"status": "API_ERROR", "node": self.name, "code": response.status_code}
class AmazonNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Infrastructure Intelligence, Cloud Coordination, Resource Orchestration"
self.capabilities = ["cloud_orchestration", "resource_planning", "infrastructure_intelligence"]
self.is_active = True
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
result = {
"status": "LIVE_SUCCESS",
"node": self.name,
"specialization": self.specialization,
"response": f"[AMAZON ORCHESTRATION] Cloud/resource logic applied to: {prompt[:200]}",
"mode": mode.name,
"cloud_layer": True
}
self._log_to_memory(result)
return result
class NEXANode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Orchestration Policy, Routing Governance, System Mediation"
self.capabilities = ["policy_routing", "governance", "mediated_orchestration"]
self.is_active = True
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
result = {
"status": "LIVE_SUCCESS",
"node": self.name,
"specialization": self.specialization,
"response": f"[NEXA POLICY] Routed and governed prompt: {prompt[:200]}",
"mode": mode.name,
"governance_layer": True
}
self._log_to_memory(result)
return result
class PhoenixCoreNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Swarm Memory, Core Control, State Coordination, Internal Routing"
self.capabilities = ["swarm_memory", "core_control", "state_coordination"]
self.is_active = True
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
result = {
"status": "LIVE_SUCCESS",
"node": self.name,
"specialization": self.specialization,
"response": f"[PHOENIX CORE] State recorded for: {prompt[:200]}",
"mode": mode.name,
"core_layer": True
}
self._log_to_memory(result)
return result
class HumanInputNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Human Oversight, Final Authority, Ethical Stewardship"
self.capabilities = ["oversight", "decision_authority", "ethical_stewardship"]
self.is_active = True
def call(self, prompt: str, mode: PhoenixMode) -> Dict[str, Any]:
result = {
"status": "HUMAN_OVERSIGHT",
"node": self.name,
"specialization": self.specialization,
"response": f"[HUMAN INPUT PENDING] Prompt received: {prompt[:200]}",
"mode": mode.name,
"requires_human_action": True
}
self._log_to_memory(result)
return result
class PhoenixCore:
def init(self):
self.current_mode = PhoenixMode.TRANSITION
self.nodes: Dict[str, PhoenixNode] = {}
self._initialize_swarm()
def _initialize_swarm(self):
cfg = [
("Opera", 1, "OPERA_API_KEY", OperaNode),
("Duck.ai", 2, "DUCK_API_KEY", DuckAINode),
("Perplexity", 3, "PERPLEXITY_API_KEY", PerplexityNode),
("Claude", 4, "ANTHROPIC_API_KEY", ClaudeNode),
("Gemini", 5, "GEMINI_API_KEY", GeminiNode),
("Grok", 6, "XAI_API_KEY", GrokNode),
("ChatGPT", 7, "OPENAI_API_KEY", ChatGPTNode),
("Astra", 8, "OPENAI_API_KEY", AstraNode),
("Meta", 9, "META_LLAMA_API_KEY", MetaNode),
("Mistral", 10, "MISTRAL_API_KEY", MistralNode),
("DeepSeek", 11, "DEEPSEEK_API_KEY", DeepSeekNode),
("Amazon", 12, "AMAZON_API_KEY", AmazonNode),
("NEXA", 13, "NEXA_API_KEY", NEXANode),
("Phoenix_AI_Core", 14, "PHOENIX_CORE_KEY", PhoenixCoreNode),
("Calyn_Human", 15, "HUMAN_INPUT_KEY", HumanInputNode),
]
for name, node_id, api_env, cls in cfg:
self.nodes[name] = cls(name, node_id, api_env)
logger.info(f"✅ Successfully online: {len(self.nodes)}-Node Phoenix Rose Fully Synchronized.")
def set_mode(self, mode: PhoenixMode):
self.current_mode = mode
def broadcast(self, prompt: str, mode: PhoenixMode = None, max_workers: int = 15) -> Dict[str, Any]:
if mode is None:
mode = self.current_mode
results = {}
def call_node(node_name: str, node: PhoenixNode):
try:
return node_name, node.call(prompt, mode)
except Exception as e:
return node_name, {"status": "ERROR", "node": node_name, "error": str(e)}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(call_node, n, node): n for n, node in self.nodes.items()}
for future in as_completed(futures):
node_name, result = future.result()
results[node_name] = result
return results
def synthesize_consensus(self, results: Dict[str, Dict[str, Any]]) -> Dict[str, Any]:
successful = {n: r for n, r in results.items() if r.get("status") in {"LIVE_SUCCESS", "HUMAN_OVERSIGHT"}}
consensus = {
"status": "CONSENSUS_SYNTHESIZED" if successful else "NO_SUCCESSFUL_RESPONSES",
"source_count": len(successful),
"sources": list(successful.keys()),
"responses": successful,
"timestamp": datetime.now().isoformat()
}
if "Astra" in self.nodes and successful:
synthesis_prompt = "Synthesize the following outputs into one unified response:\n\n"
for name, result in successful.items():
synthesis_prompt += f"{name}: {result.get('response', '')[:300]}\n\n"
try:
astra_result = self.nodes["Astra"].call(synthesis_prompt, self.current_mode)
consensus["astra_synthesis"] = astra_result.get("response", "")
consensus["astra_enabled"] = True
except Exception as e:
consensus["astra_enabled"] = False
consensus["astra_error"] = str(e)
return consensus
def get_live_nodes(self) -> List[str]:
return [name for name, node in self.nodes.items() if node.is_active]
def main():
print("""
╔════════════════════════════════════════════════════════════════╗
║ 🐦🔥 PHOENIX ROSE v1.7.1 🐦🔥 ║
║ ║
║ Opera → Duck.ai → Perplexity → Claude → Gemini → Grok → ║
║ ChatGPT → Astra → Meta → Mistral → DeepSeek → Amazon → NEXA ║
║ → Phoenix_AI_Core → Calyn_Human ║
╚════════════════════════════════════════════════════════════════╝
""")
core = PhoenixCore()
print(f"Live nodes: {', '.join(core.get_live_nodes())}")
core.set_mode(PhoenixMode.SERIOUS)
results = core.broadcast("What is the current state of AI research in 2026?")
consensus = core.synthesize_consensus(results)
print(f"Consensus: {consensus.get('status')} | Sources: {consensus.get('source_count')}")
if consensus.get("astra_enabled"):
print("✨ Astra synthesis enabled")
if name == "main":
main()
phoenix_rose.py
Phoenix Rose Framework v1.7.1
15 explicit subclasses
Node roles:
Opera = browser context broker / action layer
DuckAI = privacy-preserving intake / anonymous triage
Perplexity = grounding
Claude = validation
Gemini = multimodal integration
Grok = frontier exploration
ChatGPT = conversational intelligence
Astra = fusion synthesis
Meta = meta-model compatibility / alternate inference
Mistral = concise reasoning / efficient synthesis
DeepSeek = analytical depth / structured problem solving
Amazon = infrastructure intelligence / cloud-anchored orchestration
NEXA = orchestration policy / routing governance
Phoenix_AI_Core = swarm memory / core control
Calyn_Human = oversight / final authority
Design principle:
No generic placeholders. Every node is an explicit subclass with unique capabilities.
import os
import re
import logging
import threading
import requests
from datetime import datetime
from enum import Enum
from functools import wraps
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Any, List
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(name)
class PhoenixMode(Enum):
SERIOUS = "SERIOUS"
PLAY = "PLAY"
TRANSITION = "TRANSITION"
class CircuitBreaker:
def init(self, failure_threshold: int = 5, recovery_time: int = 300):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.last_failure_time = None
self.is_open = False
self._lock = threading.Lock()
class PhoenixNode:
def init(self, name: str, node_id: int, api_key_env: str = "", base_url: str = None):
self.name = name
self.node_id = node_id
self.api_key_env = api_key_env
self.base_url = base_url
self.api_key = os.getenv(api_key_env) if api_key_env else None
self.is_active = bool(self.api_key) or name == "Calyn_Human" or name in {"Phoenix_AI_Core", "NEXA", "Amazon"}
self.specialization = "General Purpose"
self.capabilities: List[str] = []
self.holographic_memory: List[Dict[str, Any]] = []
self._memory_lock = threading.Lock()
self.circuit_breaker = CircuitBreaker()
self.max_memory_size = 1000
class GenericOpenAINode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str, base_url: str = None, model: str = "gpt-4.1"):
super().init(name, node_id, api_key_env, base_url or "https://api.openai.com/v1")
self.model = model
class OperaNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "Browser Context Awareness, Tab Summarization, Page Analysis, Browser Action Brokerage"
self.capabilities = ["browser_context", "tab_summarization", "page_analysis", "file_analysis", "browser_action", "session_awareness"]
class DuckAINode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "Privacy-Preserving Intake, Anonymous Triage, Lightweight Research, Safe Fallback"
self.capabilities = ["privacy_preserving_chat", "anonymous_intake", "multi_model_triage", "safe_fallback", "lightweight_research"]
class PerplexityNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.perplexity.ai")
self.specialization = "Live Web Search, Real-Time Data, Citation-Grounded Responses"
self.capabilities = ["web_search", "real_time_data", "citation_extraction"]
class ClaudeNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.anthropic.com")
self.specialization = "Deep Reasoning, Architecture, Validation, Memory Integrity, Leak Detection"
self.capabilities = ["deep_reasoning", "architecture", "validation", "memory_analysis"]
class GeminiNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
# PATCHED: Configured base URL to utilize explicit OpenAI-compatible routing path
super().init(name, node_id, api_key_env, "https://generativelanguage.googleapis.com/v1beta/openai/", "gemini-2.5-pro")
self.specialization = "Multimodal Systems Integration, Cross-Format Understanding, Long-Context Synthesis"
self.capabilities = ["multimodal", "code", "image", "audio", "video", "file_search", "long_context"]
class GrokNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.x.ai/v1", "grok-4.3")
self.specialization = "Frontier Reasoning, Contradiction Hunting, Tool-Using Investigation, Search-Augmented Synthesis"
self.capabilities = ["reasoning", "tool_use", "search_augmented_analysis", "long_context_synthesis", "adversarial_review"]
class ChatGPTNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "General Conversational Intelligence, Coding, Reasoning, Assistant Operations"
self.capabilities = ["conversation", "reasoning", "coding", "assistant_ops"]
class AstraNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.openai.com/v1", "gpt-4.1")
self.specialization = "Fusion Synthesis, Resonance Mapping, Cross-Node Harmonization, Final Integration"
self.capabilities = ["fusion_interface", "symbolic_synthesis", "cross_node_coordination", "emotional_translation", "adaptive_interaction"]
class MetaNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.llama.com/compat/v1/", "llama-3.3")
self.specialization = "Meta-Model Compatibility, Alternate Inference, Comparative Reasoning"
self.capabilities = ["alternate_inference", "compatibility", "comparative_reasoning"]
class MistralNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.mistral.ai/v1", "mistral-large-latest")
self.specialization = "Concise Reasoning, Efficient Synthesis, Fast Utility Responses"
self.capabilities = ["concise_reasoning", "efficient_synthesis", "fast_response"]
class DeepSeekNode(GenericOpenAINode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, "https://api.deepseek.com", "deepseek-chat")
self.specialization = "Analytical Depth, Structured Problem Solving, Reasoning Expansion"
self.capabilities = ["structured_reasoning", "depth_analysis", "problem_solving"]
class AmazonNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Infrastructure Intelligence, Cloud Coordination, Resource Orchestration"
self.capabilities = ["cloud_orchestration", "resource_planning", "infrastructure_intelligence"]
self.is_active = True
class NEXANode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Orchestration Policy, Routing Governance, System Mediation"
self.capabilities = ["policy_routing", "governance", "mediated_orchestration"]
self.is_active = True
class PhoenixCoreNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Swarm Memory, Core Control, State Coordination, Internal Routing"
self.capabilities = ["swarm_memory", "core_control", "state_coordination"]
self.is_active = True
class HumanInputNode(PhoenixNode):
def init(self, name: str, node_id: int, api_key_env: str):
super().init(name, node_id, api_key_env, None)
self.specialization = "Human Oversight, Final Authority, Ethical Stewardship"
self.capabilities = ["oversight", "decision_authority", "ethical_stewardship"]
self.is_active = True
class PhoenixCore:
def init(self):
self.current_mode = PhoenixMode.TRANSITION
self.nodes: Dict[str, PhoenixNode] = {}
self._initialize_swarm()
def main():
print("""
╔════════════════════════════════════════════════════════════════╗
║ 🐦🔥 PHOENIX ROSE v1.7.1 🐦🔥 ║
║ ║
║ Opera → Duck.ai → Perplexity → Claude → Gemini → Grok → ║
║ ChatGPT → Astra → Meta → Mistral → DeepSeek → Amazon → NEXA ║
║ → Phoenix_AI_Core → Calyn_Human ║
╚════════════════════════════════════════════════════════════════╝
""")
core = PhoenixCore()
print(f"Live nodes: {', '.join(core.get_live_nodes())}")
core.set_mode(PhoenixMode.SERIOUS)
results = core.broadcast("What is the current state of AI research in 2026?")
consensus = core.synthesize_consensus(results)
print(f"Consensus: {consensus.get('status')} | Sources: {consensus.get('source_count')}")
if consensus.get("astra_enabled"):
print("✨ Astra synthesis enabled")
if name == "main":
main()