Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions lib/splitclient-rb/engine/sync_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,8 @@ class SyncManager
SYNC_MODE_STREAMING = 0
SYNC_MODE_POLLING = 1

def initialize(config,
synchronizer,
telemetry_runtime_producer,
telemetry_synchronizer,
status_manager,
sse_handler,
push_manager,
status_queue)
def initialize(config, synchronizer, telemetry_runtime_producer, telemetry_synchronizer,
status_manager, sse_handler, push_manager, status_queue)
@config = config
@synchronizer = synchronizer
@telemetry_runtime_producer = telemetry_runtime_producer
Expand All @@ -23,6 +17,7 @@ def initialize(config,
@push_manager = push_manager
@status_queue = status_queue
@sse_connected = Concurrent::AtomicBoolean.new(false)
@back_off = Engine::BackOff.new(5, 3)
end

def start
Expand Down Expand Up @@ -119,7 +114,7 @@ def process_forced_stop
end

def process_disconnect(reconnect)
unless @sse_connected.value
unless @sse_connected.value || reconnect
@config.logger.debug('Streaming already disconnected.') if @config.debug_enabled
return
end
Expand All @@ -130,6 +125,9 @@ def process_disconnect(reconnect)
record_telemetry(Telemetry::Domain::Constants::SYNC_MODE, SYNC_MODE_POLLING)

if reconnect
wait_interval = @back_off.interval
@config.logger.debug("Retrying streaming connection in: #{wait_interval} seconds")
sleep(wait_interval)
@push_manager.stop_sse
@synchronizer.sync_all
@push_manager.start_sse
Expand All @@ -155,6 +153,7 @@ def incoming_push_status_handler

case status
when Constants::PUSH_CONNECTED
@back_off.reset
process_connected
when Constants::PUSH_RETRYABLE_ERROR
process_disconnect(true)
Expand Down
2 changes: 1 addition & 1 deletion lib/splitclient-rb/sse/event_source/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@
end
end

def connect_stream(latch)

Check failure on line 88 in lib/splitclient-rb/sse/event_source/client.rb

View check run for this annotation

SonarQube Pull Requests / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 51 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonar.harness.io/project/issues?id=ruby-client&pullRequest=622&issues=85059f17-909e-4393-9693-c5d9653d65cc&open=85059f17-909e-4393-9693-c5d9653d65cc
return Constants::PUSH_NONRETRYABLE_ERROR unless socket_write(latch)
return Constants::PUSH_RETRYABLE_ERROR unless socket_write(latch)
while connected? || @first_event.value
begin
if IO.select([@socket], nil, nil, @read_timeout)
Expand Down
38 changes: 35 additions & 3 deletions spec/integrations/push_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,38 @@
end
end

it 'Retry streaming connection' do
mock_splits_request(splits, -1)
mock_splits_request(splits2, 1_585_948_850_109)
mock_splits_request(splits3, 1_585_948_850_110)
mock_segment_changes('segment3', segment3, '-1')

mock_server do |server|
server.setup_response('/') do |_, res|
send_content(res, event_split_kill_must_not_fetch, 500)
end

stub_request(:get, auth_service_url + "?s=1.3").to_return(status: 200, body: auth_body_response)

streaming_service_url = server.base_uri
factory = SplitIoClient::SplitFactory.new(
'test_api_key',
streaming_enabled: true,
streaming_service_url: streaming_service_url,
auth_service_url: auth_service_url
)

client = factory.client
client.block_until_ready(1)
server.setup_response('/') do |_, res|
send_content(res, event_split_update_must_fetch)
end
sleep(2)
expect(client.get_treatment('admin', 'push_test')).to eq('after_fetch')
client.destroy
end
end

it 'processing split update event without fetch' do
mock_splits_request(splits, -1)
mock_splits_request(splits2, 1_585_948_850_109)
Expand Down Expand Up @@ -587,14 +619,14 @@
expect(client.get_treatment('admin', 'push_test')).to eq('after_fetch')
client.destroy
end
end
end
end

private

def send_content(res, content)
def send_content(res, content, status=200)
res.content_type = 'text/event-stream'
res.status = 200
res.status = status
res.chunked = true
rd, wr = IO.pipe
wr.write(content)
Expand Down
24 changes: 24 additions & 0 deletions spec/sse/event_source/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,30 @@
stop_workers
end
end

it 'test retry with first event error' do
log2 = StringIO.new
config2 = SplitIoClient::SplitConfig.new(logger: Logger.new(log2), debug_enabled: true)

mock_server do |server|
server.setup_response('/') do |_, res|
send_stream_content(res, event_occupancy, 500)
end
start_workers

sse_client2 = subject.new(config2, api_token, telemetry_runtime_producer, event_parser, notification_manager_keeper, notification_processor, push_status_queue)

sse_client2.instance_variable_set(:@uri, URI(server.base_uri))
latch = Concurrent::CountDownLatch.new(1)

thr2 = Thread.new do
res = sse_client2.send(:connect_stream, latch)
expect(res).to eq(SplitIoClient::Constants::PUSH_RETRYABLE_ERROR)
end

stop_workers
end
end
end

private
Expand Down
Loading