diff --git a/lib/splitclient-rb/engine/sync_manager.rb b/lib/splitclient-rb/engine/sync_manager.rb index 8bb78b4b..167981cc 100644 --- a/lib/splitclient-rb/engine/sync_manager.rb +++ b/lib/splitclient-rb/engine/sync_manager.rb @@ -6,14 +6,8 @@ class SyncManager SYNC_MODE_STREAMING = 0 SYNC_MODE_POLLING = 1 - def initialize(config, - synchronizer, - telemetry_runtime_producer, - telemetry_synchronizer, - status_manager, - sse_handler, - push_manager, - status_queue) + def initialize(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer, + status_manager, sse_handler, push_manager, status_queue) @config = config @synchronizer = synchronizer @telemetry_runtime_producer = telemetry_runtime_producer @@ -23,6 +17,7 @@ def initialize(config, @push_manager = push_manager @status_queue = status_queue @sse_connected = Concurrent::AtomicBoolean.new(false) + @back_off = Engine::BackOff.new(5, 3) end def start @@ -119,7 +114,7 @@ def process_forced_stop end def process_disconnect(reconnect) - unless @sse_connected.value + unless @sse_connected.value || reconnect @config.logger.debug('Streaming already disconnected.') if @config.debug_enabled return end @@ -130,6 +125,9 @@ def process_disconnect(reconnect) record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING) if reconnect + wait_interval = @back_off.interval + @config.logger.debug("Retrying streaming connection in: #{wait_interval} seconds") + sleep(wait_interval) @push_manager.stop_sse @synchronizer.sync_all @push_manager.start_sse @@ -155,6 +153,7 @@ def incoming_push_status_handler case status when Constants::PUSH_CONNECTED + @back_off.reset process_connected when Constants::PUSH_RETRYABLE_ERROR process_disconnect(true) diff --git a/lib/splitclient-rb/sse/event_source/client.rb b/lib/splitclient-rb/sse/event_source/client.rb index 5190ad6e..92dc7255 100644 --- a/lib/splitclient-rb/sse/event_source/client.rb +++ b/lib/splitclient-rb/sse/event_source/client.rb @@ -86,7 +86,7 @@ def connect_thread(latch) end def connect_stream(latch) - return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch) + return Constants::PUSH_RETRYABLE_ERROR unless socket_write(latch) while connected? || @first_event.value begin if IO.select([@socket], nil, nil, @read_timeout) diff --git a/spec/integrations/push_client_spec.rb b/spec/integrations/push_client_spec.rb index 9420861f..ca3a317f 100644 --- a/spec/integrations/push_client_spec.rb +++ b/spec/integrations/push_client_spec.rb @@ -84,6 +84,38 @@ end end + it 'Retry streaming connection' do + mock_splits_request(splits, -1) + mock_splits_request(splits2, 1_585_948_850_109) + mock_splits_request(splits3, 1_585_948_850_110) + mock_segment_changes('segment3', segment3, '-1') + + mock_server do |server| + server.setup_response('/') do |_, res| + send_content(res, event_split_kill_must_not_fetch, 500) + end + + stub_request(:get, auth_service_url + "?s=1.3").to_return(status: 200, body: auth_body_response) + + streaming_service_url = server.base_uri + factory = SplitIoClient::SplitFactory.new( + 'test_api_key', + streaming_enabled: true, + streaming_service_url: streaming_service_url, + auth_service_url: auth_service_url + ) + + client = factory.client + client.block_until_ready(1) + server.setup_response('/') do |_, res| + send_content(res, event_split_update_must_fetch) + end + sleep(2) + expect(client.get_treatment('admin', 'push_test')).to eq('after_fetch') + client.destroy + end + end + it 'processing split update event without fetch' do mock_splits_request(splits, -1) mock_splits_request(splits2, 1_585_948_850_109) @@ -587,14 +619,14 @@ expect(client.get_treatment('admin', 'push_test')).to eq('after_fetch') client.destroy end - end + end end private - def send_content(res, content) + def send_content(res, content, status=200) res.content_type = 'text/event-stream' - res.status = 200 + res.status = status res.chunked = true rd, wr = IO.pipe wr.write(content) diff --git a/spec/sse/event_source/client_spec.rb b/spec/sse/event_source/client_spec.rb index ce4cccdd..e2665fa4 100644 --- a/spec/sse/event_source/client_spec.rb +++ b/spec/sse/event_source/client_spec.rb @@ -357,6 +357,30 @@ stop_workers end end + + it 'test retry with first event error' do + log2 = StringIO.new + config2 = SplitIoClient::SplitConfig.new(logger: Logger.new(log2), debug_enabled: true) + + mock_server do |server| + server.setup_response('/') do |_, res| + send_stream_content(res, event_occupancy, 500) + end + start_workers + + sse_client2 = subject.new(config2, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue) + + sse_client2.instance_variable_set(:@uri, URI(server.base_uri)) + latch = Concurrent::CountDownLatch.new(1) + + thr2 = Thread.new do + res = sse_client2.send(:connect_stream, latch) + expect(res).to eq(SplitIoClient::Constants::PUSH_RETRYABLE_ERROR) + end + + stop_workers + end + end end private