diff --git a/sentry_sdk/integrations/huey.py b/sentry_sdk/integrations/huey.py index a8d932200c..7a1ac19232 100644 --- a/sentry_sdk/integrations/huey.py +++ b/sentry_sdk/integrations/huey.py @@ -34,6 +34,13 @@ except ImportError: raise DidNotEnable("Huey is not installed") +try: + from huey.api import chord as HueyChord + from huey.api import group as HueyGroup +except ImportError: + HueyChord = None + HueyGroup = None + HUEY_CONTROL_FLOW_EXCEPTIONS = (CancelExecution, RetryTask, TaskLockedException) @@ -53,22 +60,37 @@ def patch_enqueue() -> None: @ensure_integration_enabled(HueyIntegration, old_enqueue) def _sentry_enqueue( - self: "Huey", task: "Task" + self: "Huey", item: "Union[Task, HueyGroup, HueyChord]" ) -> "Optional[Union[Result, ResultGroup]]": + if HueyChord is not None and isinstance(item, HueyChord): + span_name = "Huey Chord" + elif HueyGroup is not None and isinstance(item, HueyGroup): + span_name = "Huey Task Group" + else: + span_name = item.name + with sentry_sdk.start_span( op=OP.QUEUE_SUBMIT_HUEY, - name=task.name, + name=span_name, origin=HueyIntegration.origin, ): - if not isinstance(task, PeriodicTask): + if ( + not isinstance(item, PeriodicTask) + and not (HueyGroup is not None and isinstance(item, HueyGroup)) + and not (HueyChord is not None and isinstance(item, HueyChord)) + ): # Attach trace propagation data to task kwargs. We do # not do this for periodic tasks, as these don't # really have an originating transaction. - task.kwargs["sentry_headers"] = { + # Additionally, we do not do this for Huey groups or chords, as enqueue will + # recursively call this method for each task within the list, resulting + # in the trace propagation data being attached to each task individually + # (which we want) + item.kwargs["sentry_headers"] = { BAGGAGE_HEADER_NAME: get_baggage(), SENTRY_TRACE_HEADER_NAME: get_traceparent(), } - return old_enqueue(self, task) + return old_enqueue(self, item) Huey.enqueue = _sentry_enqueue diff --git a/tests/integrations/huey/test_huey.py b/tests/integrations/huey/test_huey.py index 7440280623..e2cc81e755 100644 --- a/tests/integrations/huey/test_huey.py +++ b/tests/integrations/huey/test_huey.py @@ -11,6 +11,12 @@ HUEY_VERSION = parse_version(HUEY_VERSION) +try: + from huey.api import chord, group +except ImportError: + chord = None + group = None + @pytest.fixture def init_huey(sentry_init): @@ -222,3 +228,114 @@ def propagated_trace_task(): (event,) = events assert event["contexts"]["trace"]["origin"] == "auto.queue.huey" + + +@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="group was added in 3.0") +def test_huey_enqueue_group(init_huey, capture_events): + huey = init_huey() + + events = capture_events() + + @huey.task() + def task1(): + pass + + @huey.task() + def task2(): + pass + + with start_transaction() as transaction: + huey.enqueue(group([task1.s(), task2.s()])) + + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + assert len(events) == 3 + + # Assert enqueue spans were successfully recorded + producer_event = events[0] + assert producer_event["type"] == "transaction" + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 3 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Task Group" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + assert spans[2]["op"] == "queue.submit.huey" + assert spans[2]["description"] == "task2" + + # Consumer transaction assertions (one per task) + consumer_events = events[1:] + for _, (consumer_event, expected_name) in enumerate( + zip(consumer_events, ["task1", "task2"]) + ): + assert consumer_event["type"] == "transaction" + assert consumer_event["transaction"] == expected_name + assert consumer_event["transaction_info"] == {"source": "task"} + assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert consumer_event["contexts"]["trace"]["status"] == "ok" + assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in consumer_event["tags"] + assert consumer_event["tags"]["huey_task_retry"] is False + + +@pytest.mark.skipif(HUEY_VERSION < (3, 0), reason="chord was added in 3.0") +def test_huey_enqueue_chord(init_huey, capture_events): + huey = init_huey() + + events = capture_events() + + @huey.task() + def task1(): + pass + + @huey.task() + def task2(results): + pass + + with start_transaction() as transaction: + huey.enqueue(chord([task1.s()], task2.s())) + + for _ in range(2): + task = huey.dequeue() + huey.execute(task) + + assert len(events) == 3 + + # Enqueue spans + producer_event = events[0] + assert producer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert producer_event["contexts"]["trace"]["origin"] == "manual" + + spans = producer_event["spans"] + assert len(spans) == 2 + assert spans[0]["op"] == "queue.submit.huey" + assert spans[0]["description"] == "Huey Chord" + assert spans[1]["op"] == "queue.submit.huey" + assert spans[1]["description"] == "task1" + + task1_event = events[1] + # Confirm the first task enqueued the chord callback + task1_spans = task1_event["spans"] + assert len(task1_spans) == 1 + assert task1_spans[0]["op"] == "queue.submit.huey" + assert task1_spans[0]["description"] == "task2" + + consumer_events = events[1:] + for _, (consumer_event, expected_name) in enumerate( + zip(consumer_events, ["task1", "task2"]) + ): + assert consumer_event["type"] == "transaction" + assert consumer_event["transaction"] == expected_name + assert consumer_event["transaction_info"] == {"source": "task"} + assert consumer_event["contexts"]["trace"]["op"] == "queue.task.huey" + assert consumer_event["contexts"]["trace"]["origin"] == "auto.queue.huey" + assert consumer_event["contexts"]["trace"]["status"] == "ok" + assert consumer_event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert "huey_task_id" in consumer_event["tags"] + assert consumer_event["tags"]["huey_task_retry"] is False