Class: Ably::Realtime::Connection::ConnectionManager Private

Inherits:
Object
  • Object
show all
Defined in:
lib/ably/realtime/connection/connection_manager.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

ConnectionManager is responsible for all actions relating to underlying connection and transports, such as opening, closing, attempting reconnects etc. Connection state changes are performed by this class and executed from ConnectionStateMachine

This is a private class and should never be used directly by developers as the API is likely to change in future.

Constant Summary collapse

RESOLVABLE_ERROR_CODES =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Error codes from the server that can potentially be resolved

{
  token_expired: Ably::Exceptions::TOKEN_EXPIRED_CODE
}

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ ConnectionManager

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ConnectionManager.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/ably/realtime/connection/connection_manager.rb', line 19

def initialize(connection)
  @connection     = connection
  @timers         = Hash.new { |hash, key| hash[key] = [] }

  # RTN8c, RTN9c
  connection.unsafe_on(:closing, :closed, :suspended, :failed) do
    connection.reset_resume_info
  end

  connection.unsafe_once(:connecting) do
    close_connection_when_reactor_is_stopped
  end

  EventMachine.next_tick do
    # Connect once Connection object is initialised
    connection.connect if client.auto_connect && connection.can_transition_to?(:connecting)
  end
end

Instance Method Details

#close_connectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Send a Close Models::ProtocolMessage to the server and release the transport



192
193
194
195
196
197
198
# File 'lib/ably/realtime/connection/connection_manager.rb', line 192

def close_connection
  connection.send_protocol_message(action: Ably::Models::ProtocolMessage::ACTION.Close)

  create_timeout_timer_whilst_in_state(:closing, realtime_request_timeout) do
    force_close_connection if connection.closing?
  end
end

#connected(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called whenever a new connection is made



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/ably/realtime/connection/connection_manager.rb', line 109

def connected(protocol_message)
  # ClientID validity is already checked as part of the incoming message processing
  client.auth.configure_client_id protocol_message.connection_details.client_id

  # Update the connection details and any associated defaults
  connection.set_connection_details protocol_message.connection_details

  is_connection_resume_or_recover_attempt = !connection.key.nil_or_empty? || !client.recover.nil_or_empty?

  # RTN15c7, RTN16d
  failed_resume_or_recover = !protocol_message.connection_id == connection.id && !protocol_message.error.nil?
  if is_connection_resume_or_recover_attempt and failed_resume_or_recover # RTN15c7
    connection.reset_client_msg_serial
  end
  client.disable_automatic_connection_recovery # RTN16k, explicitly setting null, so it won't be used for subsequent connection requests

  if connection.key
    if protocol_message.connection_id == connection.id
      logger.debug { "ConnectionManager: Connection resumed successfully - ID #{connection.id} and key #{connection.key}" }
      resend_pending_message_ack_queue
    else
      nack_messages_on_all_channels protocol_message.error
    end
  else
    logger.debug { "ConnectionManager: New connection created with ID #{protocol_message.connection_id} and key #{protocol_message.connection_details.connection_key}" }
  end

  connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key
  force_reattach_on_channels protocol_message.error # irrespective of connection success/failure, reattach channels
end

#connected_update(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When connection is CONNECTED and receives an update Update the Connection details and emit an UPDATE event #RTN4h



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/ably/realtime/connection/connection_manager.rb', line 142

def connected_update(protocol_message)
  # ClientID validity is already checked as part of the incoming message processing
  client.auth.configure_client_id protocol_message.connection_details.client_id

  # Update the connection details and any associated defaults
  connection.set_connection_details protocol_message.connection_details

  connection.configure_new protocol_message.connection_id, protocol_message.connection_details.connection_key

  state_change = Ably::Models::ConnectionStateChange.new(
    current: connection.state,
    previous: connection.state,
    event: Ably::Realtime::Connection::EVENT(:update),
    reason: protocol_message.error,
    protocol_message: protocol_message
  )
  connection.emit :update, state_change
end

#connection_opening_failed(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Called by the transport when a connection attempt fails



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/ably/realtime/connection/connection_manager.rb', line 86

def connection_opening_failed(error)
  if error.kind_of?(Ably::Exceptions::BaseAblyException)
    # Authentication errors that indicate the authentication failure is terminal should move to the failed state
    if ([401, 403].include?(error.status) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)) ||
       (error.code == Ably::Exceptions::Codes::INVALID_CLIENT_ID)
      connection.transition_state_machine :failed, reason: error
      return
    end
  end

  logger.warn { "ConnectionManager: Connection to #{connection.current_host}:#{connection.port} failed; #{error.message}" }
  next_state = get_next_retry_state_info

  if connection.state == next_state.fetch(:state)
    logger.error { "ConnectionManager: Skipping next retry state after connection opening failed as already in state #{next_state}\n#{caller[0..20].join("\n")}" }
  else
    connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: Ably::Exceptions::ConnectionError.new("Connection failed: #{error.message}", nil, Ably::Exceptions::Codes::CONNECTION_FAILED, error)
  end
end

#destroy_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Ensures the underlying transport has been disconnected and all event emitter callbacks removed



164
165
166
167
168
169
170
# File 'lib/ably/realtime/connection/connection_manager.rb', line 164

def destroy_transport
  if transport
    unsubscribe_from_transport_events transport
    transport.close_connection
    connection.release_websocket_transport
  end
end

#detach_active_channelsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



314
315
316
317
318
319
320
# File 'lib/ably/realtime/connection/connection_manager.rb', line 314

def detach_active_channels
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching?
  end.each do |channel|
    channel.transition_state_machine! :detaching # will always move to detached immediately if connection is closed
  end
end

#error_received_from_server(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

ProtocolMessage Error received from server. Some error states can be resolved by the client library.



268
269
270
271
272
273
274
275
276
277
# File 'lib/ably/realtime/connection/connection_manager.rb', line 268

def error_received_from_server(error)
  case error.code
  when RESOLVABLE_ERROR_CODES.fetch(:token_expired)
    next_state = get_next_retry_state_info(1)
    connection.transition_state_machine next_state.fetch(:state), retry_in: next_state.fetch(:pause), reason: error
  else
    logger.error { "ConnectionManager: Error #{error.class.name} code #{error.code} received from server '#{error.message}', transitioning to failed state" }
    connection.transition_state_machine :failed, reason: error
  end
end

#fail(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Connection has failed



211
212
213
214
215
216
217
218
# File 'lib/ably/realtime/connection/connection_manager.rb', line 211

def fail(error)
  connection.logger.fatal { "ConnectionManager: Connection failed - #{error}" }
  destroy_transport
  channels.each do |channel|
    next if channel.detached? || channel.initialized?
    channel.transition_state_machine :failed, reason: error if channel.can_transition_to?(:failed)
  end
end

#fail_active_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



323
324
325
326
327
328
329
# File 'lib/ably/realtime/connection/connection_manager.rb', line 323

def fail_active_channels(error)
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching? || channel.suspended?
  end.each do |channel|
    channel.transition_state_machine! :failed, reason: error
  end
end

#force_close_connectionObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Close the underlying transport immediately and set the connection state to closed



203
204
205
206
# File 'lib/ably/realtime/connection/connection_manager.rb', line 203

def force_close_connection
  destroy_transport
  connection.transition_state_machine :closed
end

#nack_messages_on_all_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When continuity on a connection is lost all messages whether queued or awaiting an ACK must be NACK'd as we now have a new connection



342
343
344
345
346
347
# File 'lib/ably/realtime/connection/connection_manager.rb', line 342

def nack_messages_on_all_channels(error)
  channels.each do |channel|
    channel.manager.fail_messages_awaiting_ack error, immediately: true
    channel.manager.fail_queued_messages error
  end
end

#reconnect_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Reconnect the WebsocketTransport if possible, otherwise set up a new transport



181
182
183
184
185
186
187
# File 'lib/ably/realtime/connection/connection_manager.rb', line 181

def reconnect_transport
  if !transport || transport.disconnected?
    setup_transport
  else
    transport.reconnect connection.current_host, connection.port
  end
end

#reintialize_failed_chanelsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



332
333
334
335
336
337
338
# File 'lib/ably/realtime/connection/connection_manager.rb', line 332

def reintialize_failed_chanels
  channels.select do |channel|
    channel.failed?
  end.each do |channel|
    channel.transition_state_machine :initialized
  end
end

#release_and_establish_new_transportObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



173
174
175
176
# File 'lib/ably/realtime/connection/connection_manager.rb', line 173

def release_and_establish_new_transport
  destroy_transport
  setup_transport
end

#resend_pending_message_ack_queueObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Any message sent before an ACK/NACK was received on the previous transport need to be resent to the Ably service so that a subsequent ACK/NACK is received. It is up to Ably to ensure that duplicate messages are not retransmitted on the channel based on the message serial numbers



294
295
296
297
298
299
300
301
302
# File 'lib/ably/realtime/connection/connection_manager.rb', line 294

def resend_pending_message_ack_queue
  connection.__pending_message_ack_queue__.delete_if do |protocol_message|
    if protocol_message.ack_required?
      connection.__outgoing_message_queue__ << protocol_message
      connection.__outgoing_protocol_msgbus__.publish :protocol_message
      true
    end
  end
end

#reset_liveness_timerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Liveness timer ensures a connection that has not heard from Ably in heartbeat_interval is moved to the disconnected state automatically



351
352
353
354
355
356
357
358
359
360
# File 'lib/ably/realtime/connection/connection_manager.rb', line 351

def reset_liveness_timer
  @liveness_timer.cancel if @liveness_timer
  @liveness_timer = EventMachine::Timer.new(connection.heartbeat_interval + 0.1) do
    if connection.connected? && (connection.time_since_connection_confirmed_alive? >= connection.heartbeat_interval)
      msg = "No activity seen from realtime in #{connection.heartbeat_interval}; assuming connection has dropped";
      error = Ably::Exceptions::ConnectionTimeout.new(msg, Ably::Exceptions::Codes::DISCONNECTED, 408)
      connection.transition_state_machine! :disconnected, reason: error
    end
  end
end

#respond_to_transport_disconnected_when_connecting(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a connection is disconnected whilst connecting, attempt reconnect and/or set state to :suspended or :failed



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/ably/realtime/connection/connection_manager.rb', line 223

def respond_to_transport_disconnected_when_connecting(error)
  return unless connection.disconnected? || connection.suspended? # do nothing if state has changed through an explicit request
  return if currently_renewing_token? # do not always reattempt connection or change state as client may be re-authorising

  if error.kind_of?(Ably::Models::ErrorInfo)
    if RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)
      next_state = get_next_retry_state_info(1)
      logger.debug { "ConnectionManager: Transport disconnected because of token expiry, pausing #{next_state.fetch(:pause)}s before reattempting to connect" }
      EventMachine.add_timer(next_state.fetch(:pause)) { renew_token_and_reconnect error }
      return
    end
  end

  if connection.state == :suspended
    return if connection_retry_for(:suspended)
  elsif connection.state == :disconnected
    return if connection_retry_for(:disconnected)
  end

  # Fallback if no other criteria met
  connection.transition_state_machine :failed, reason: error
end

#respond_to_transport_disconnected_whilst_connected(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a connection is disconnected after connecting, attempt reconnect and/or set state to :suspended or :failed



249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/ably/realtime/connection/connection_manager.rb', line 249

def respond_to_transport_disconnected_whilst_connected(error)
  unless connection.disconnected? || connection.suspended?
    logger.warn { "ConnectionManager: Connection #{"to #{connection.transport.url}" if connection.transport} was disconnected unexpectedly" }
  else
    logger.debug { "ConnectionManager: Transport disconnected whilst connection in #{connection.state} state" }
  end

  if error.kind_of?(Ably::Models::ErrorInfo) && !RESOLVABLE_ERROR_CODES.fetch(:token_expired).include?(error.code)
    logger.error { "ConnectionManager: Error in Disconnected ProtocolMessage received from the server - #{error}" }
  end

  destroy_transport
  respond_to_transport_disconnected_when_connecting error
end

#retry_count_for_state(state) ⇒ Integer

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Number of consecutive attempts for provided state

Returns:

  • (Integer)


284
285
286
# File 'lib/ably/realtime/connection/connection_manager.rb', line 284

def retry_count_for_state(state)
  retries_for_state(state, ignore_states: [:connecting]).count
end

#setup_transport {|Ably::Realtime::Connection::WebsocketTransport| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Creates and sets up a new WebsocketTransport available on attribute #transport

Yields:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/ably/realtime/connection/connection_manager.rb', line 42

def setup_transport
  if transport && !transport.ready_for_release?
    raise RuntimeError, 'Existing WebsocketTransport is connected, and must be closed first'
  end

  unless client.auth.authentication_security_requirements_met?
    connection.transition_state_machine :failed, reason: Ably::Exceptions::InsecureRequest.new('Cannot use Basic Auth over non-TLS connections', 401, Ably::Exceptions::Codes::INVALID_USE_OF_BASIC_AUTH_OVER_NONTLS_TRANSPORT)
    return
  end

  logger.debug { 'ConnectionManager: Opening a websocket transport connection' }

  # The socket attempt can fail at the same time as a timer firing so ensure
  #   only one outcome is processed from this setup attempt
  setup_attempt_status = {}
  setup_failed = lambda do
    return true if setup_attempt_status[:failed]
    setup_attempt_status[:failed] = true
    false
  end

  connection.create_websocket_transport.tap do |socket_deferrable|
    socket_deferrable.callback do |websocket_transport|
      subscribe_to_transport_events websocket_transport
      yield websocket_transport if block_given?
    end
    socket_deferrable.errback do |error|
      next if setup_failed.call
      connection_opening_failed error
    end
  end

  # The connection request timeout must be marginally higher than the REST request timeout to ensure
  #   any HTTP auth request failure due to timeout triggers before the connection timer kicks in
  logger.debug { "ConnectionManager: Setting up automatic connection timeout timer for #{realtime_request_timeout}s" }
  create_timeout_timer_whilst_in_state(:connecting, realtime_request_timeout) do
    next if setup_failed.call
    connection_opening_failed Ably::Exceptions::ConnectionTimeout.new("Connection to Ably timed out after #{realtime_request_timeout}s", nil, Ably::Exceptions::Codes::CONNECTION_TIMED_OUT)
  end
end

#suspend_active_channels(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



305
306
307
308
309
310
311
# File 'lib/ably/realtime/connection/connection_manager.rb', line 305

def suspend_active_channels(error)
  channels.select do |channel|
    channel.attached? || channel.attaching? || channel.detaching?
  end.each do |channel|
    channel.transition_state_machine! :suspended, reason: error
  end
end