Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions livekit-api/livekit/api/_failover.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,28 @@

FAILOVER_MAX_ATTEMPTS = 3
FAILOVER_BACKOFF_BASE = 0.2 # seconds


def failover_attempts(enabled: bool, host: Optional[str], force: bool = False) -> int:
# Below this per-request timeout (seconds) a retry is unlikely to help and many
# clients would retry in lockstep across regions, so a short request gets a
# single attempt (thundering-herd guard).
MIN_FAILOVER_TIMEOUT = 5.0


def failover_attempts(
enabled: bool,
host: Optional[str],
force: bool = False,
timeout: Optional[float] = None,
) -> int:
"""Total request attempts for a host; 1 means no failover. Failover only
engages when enabled and the host is a LiveKit Cloud domain. ``force``
bypasses the cloud-host check and is for internal testing only.
engages when enabled, the host is a LiveKit Cloud domain, and the request
timeout is long enough to retry. ``force`` bypasses the cloud-host check and
is for internal testing only.
"""
if enabled and (force or (host is not None and is_cloud(host))):
return FAILOVER_MAX_ATTEMPTS
return 1
if not (enabled and (force or (host is not None and is_cloud(host)))):
return 1
if timeout is not None and 0 < timeout < MIN_FAILOVER_TIMEOUT:
return 1
return FAILOVER_MAX_ATTEMPTS


def is_cloud(host: str) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions livekit-api/livekit/api/livekit_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(
url: LiveKit server URL (read from `LIVEKIT_URL` environment variable if not provided)
api_key: API key (read from `LIVEKIT_API_KEY` environment variable if not provided)
api_secret: API secret (read from `LIVEKIT_API_SECRET` environment variable if not provided)
timeout: Request timeout (default: 60 seconds)
timeout: Request timeout (default: 10 seconds)
session: aiohttp.ClientSession instance to use for requests, if not provided, a new one will be created
"""
url = url or os.getenv("LIVEKIT_URL")
Expand All @@ -57,7 +57,7 @@ def __init__(
if not self._session:
self._custom_session = False
if not timeout:
timeout = aiohttp.ClientTimeout(total=60)
timeout = aiohttp.ClientTimeout(total=10)
self._session = aiohttp.ClientSession(timeout=timeout)
Comment thread
davidzhao marked this conversation as resolved.

self._room = RoomService(self._session, url, api_key, api_secret, failover)
Expand Down
55 changes: 43 additions & 12 deletions livekit-api/livekit/api/sip_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import aiohttp
import warnings
from typing import Optional
from typing import Optional, Union

from livekit.protocol.models import ListUpdate
from livekit.protocol.sip import (
Expand Down Expand Up @@ -40,6 +40,30 @@
SVC = "SIP"
"""@private"""

# Calls that dial a phone (CreateSIPParticipant with wait_until_answered,
# TransferSIPParticipant) take longer than a normal request.
SIP_DIAL_TIMEOUT = 30.0
"""@private"""
Comment thread
davidzhao marked this conversation as resolved.

# A dialing request must outlast the ringing window, or it would abort before
# the call can be answered. Keep the request timeout at least this many seconds
# above the request's ringing_timeout.
RINGING_TIMEOUT_MARGIN = 2.0
"""@private"""


def _dial_timeout(
user_timeout: Optional[float],
request: Union[CreateSIPParticipantRequest, TransferSIPParticipantRequest],
) -> float:
"""Request timeout (seconds) for a phone-dialing call: the user-supplied
value (or the dial default) raised, when needed, to stay at least
RINGING_TIMEOUT_MARGIN above the request's ringing_timeout."""
effective = user_timeout if user_timeout else SIP_DIAL_TIMEOUT
if request.HasField("ringing_timeout"):
effective = max(effective, request.ringing_timeout.seconds + RINGING_TIMEOUT_MARGIN)
return effective


class SipService(Service):
"""Client for LiveKit SIP Service API
Expand Down Expand Up @@ -781,17 +805,13 @@ async def create_sip_participant(
SIPError: If the SIP operation fails
"""
client_timeout: Optional[aiohttp.ClientTimeout] = None
if timeout:
# obay user specified timeout
if create.wait_until_answered:
# Dialing a phone and waiting for an answer takes longer than a
# normal call, and the request must outlast ringing.
client_timeout = aiohttp.ClientTimeout(total=_dial_timeout(timeout, create))
elif timeout:
# obey user specified timeout
client_timeout = aiohttp.ClientTimeout(total=timeout)
elif create.wait_until_answered:
# ensure default timeout isn't too short when using sync mode
if (
self._client._session.timeout
and self._client._session.timeout.total
and self._client._session.timeout.total < 20
):
client_timeout = aiohttp.ClientTimeout(total=20)

if trunk_id:
create.sip_trunk_id = trunk_id
Expand All @@ -809,16 +829,26 @@ async def create_sip_participant(
)

async def transfer_sip_participant(
self, transfer: TransferSIPParticipantRequest
self,
transfer: TransferSIPParticipantRequest,
*,
timeout: Optional[float] = None,
) -> SIPParticipantInfo:
"""Transfer a SIP participant to a different room.

Args:
transfer: Request containing transfer details
timeout: Optional request timeout in seconds. Transferring dials a
phone, which takes longer than normal, so it defaults to a
longer timeout when unset.

Returns:
Updated SIP participant information
"""
# Transferring a call dials a phone, which takes longer than a normal
# call, so use a longer default unless the user specified a timeout, and
# keep the request alive past ringing so the destination can answer.
client_timeout = aiohttp.ClientTimeout(total=_dial_timeout(timeout, transfer))
return await self._client.request(
SVC,
"TransferSIPParticipant",
Expand All @@ -831,6 +861,7 @@ async def transfer_sip_participant(
sip=SIPGrants(call=True),
),
SIPParticipantInfo,
timeout=client_timeout,
)

def _admin_headers(self) -> dict[str, str]:
Expand Down
9 changes: 8 additions & 1 deletion livekit-api/livekit/api/twirp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,15 @@ async def request(
headers["Content-Type"] = "application/protobuf"
serialized_data = data.SerializeToString()

# The effective per-attempt timeout is the per-call override, or the
# session default; used to gate failover for short requests.
effective_timeout = timeout.total if timeout else None
if effective_timeout is None and self._session.timeout is not None:
effective_timeout = self._session.timeout.total
host = urlparse(self._origin).hostname
max_attempts = failover_attempts(self._failover, host, self._failover_force)
max_attempts = failover_attempts(
self._failover, host, self._failover_force, effective_timeout
)
attempted = {host_key(self._origin)}
region_origins: Optional[List[str]] = None
current_origin = self._origin
Expand Down
Loading