Class: Ably::Realtime::Channel::ChannelManager Private

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/ably/realtime/channel/channel_manager.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

ChannelManager is responsible for all actions relating to channel state: attaching, detaching or failure Channel state changes are performed by this class and executed from ChannelStateMachine

This is a private class and should never be used directly by developers as the API is likely to change in future.

Instance Method Summary collapse

Constructor Details

#initialize(channel, connection) ⇒ ChannelManager

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ChannelManager.



12
13
14
15
# File 'lib/ably/realtime/channel/channel_manager.rb', line 12

def initialize(channel, connection)
  @channel    = channel
  @connection = connection
end

Instance Method Details

#attachObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Commence attachment



18
19
20
21
22
23
# File 'lib/ably/realtime/channel/channel_manager.rb', line 18

def attach
  if can_transition_to?(:attached)
    connect_if_connection_initialized
    send_attach_protocol_message if connection.state?(:connected) # RTL4i
  end
end

#attached(attached_protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Channel is attached, notify presence if sync is expected



35
36
37
38
39
40
41
42
43
44
# File 'lib/ably/realtime/channel/channel_manager.rb', line 35

def attached(attached_protocol_message)
  # If no attached ProtocolMessage then this attached request was triggered by the client
  # library, such as returning to attached when detach has failed
  if attached_protocol_message
    channel.presence.manager.on_attach attached_protocol_message.has_presence_flag?
    channel.properties.set_attach_serial(attached_protocol_message.channel_serial)
    channel.options.set_modes_from_flags(attached_protocol_message.flags)
    channel.options.set_params(attached_protocol_message.params)
  end
end

#detach(error, previous_state) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Commence attachment



26
27
28
29
30
31
32
# File 'lib/ably/realtime/channel/channel_manager.rb', line 26

def detach(error, previous_state)
  if connection.closed? || connection.connecting? || connection.suspended?
    channel.transition_state_machine :detached, reason: error
  elsif can_transition_to?(:detached)
    send_detach_protocol_message previous_state
  end
end

#detached_received(reason) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Handle DETACED messages, see #RTL13 for server-initated detaches



83
84
85
86
87
88
89
90
91
92
# File 'lib/ably/realtime/channel/channel_manager.rb', line 83

def detached_received(reason)
  case channel.state.to_sym
  when :detaching
    channel.transition_state_machine :detached, reason: reason
  when :attached, :suspended
    channel.transition_state_machine :attaching, reason: reason
  else
    logger.debug { "ChannelManager: DETACHED ProtocolMessage received, but no action to take as not DETACHING, ATTACHED OR SUSPENDED" }
  end
end

#drop_pending_queue_from_ack(ack_protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



146
147
148
149
150
151
152
153
154
# File 'lib/ably/realtime/channel/channel_manager.rb', line 146

def drop_pending_queue_from_ack(ack_protocol_message)
  message_serial_up_to = ack_protocol_message.message_serial + ack_protocol_message.count - 1
  connection.__pending_message_ack_queue__.drop_while do |protocol_message|
    if protocol_message.message_serial <= message_serial_up_to
      yield protocol_message
      true
    end
  end
end

#duplicate_attached_received(protocol_message) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/ably/realtime/channel/channel_manager.rb', line 60

def duplicate_attached_received(protocol_message)
  logger.debug { "Server initiated ATTACHED message received for channel '#{channel.name}' with state #{channel.state}" }
  if protocol_message.error
    channel.set_channel_error_reason protocol_message.error
    log_channel_error protocol_message.error
  end

  channel.properties.set_attach_serial(protocol_message.channel_serial)
  channel.options.set_modes_from_flags(protocol_message.flags)

  unless protocol_message.has_channel_resumed_flag?
    channel.emit :update, Ably::Models::ChannelStateChange.new(
      current: channel.state,
      previous: channel.state,
      event: Ably::Realtime::Channel::EVENT(:update),
      reason: protocol_message.error,
      resumed: false,
    )
    channel.presence.manager.on_attach protocol_message.has_presence_flag?
  end
end

#fail_messages_awaiting_ack(error, options = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When continuity on the connection is interrupted or channel becomes suspended (implying loss of continuity) then all messages published but awaiting an ACK from Ably should be failed with a NACK

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :immediately (Boolean)


98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/ably/realtime/channel/channel_manager.rb', line 98

def fail_messages_awaiting_ack(error, options = {})
  immediately = options[:immediately] || false

  fail_proc = lambda do
    error = Ably::Exceptions::MessageDeliveryFailed.new("Continuity of connection was lost so published messages awaiting ACK have failed") unless error
    fail_messages_in_queue connection.__pending_message_ack_queue__, error
  end

  # Allow a short time for other queued operations to complete before failing all messages
  if immediately
    fail_proc.call
  else
    EventMachine.add_timer(0.1) { fail_proc.call }
  end
end

#fail_messages_in_queue(queue, error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



122
123
124
125
126
127
128
129
130
131
# File 'lib/ably/realtime/channel/channel_manager.rb', line 122

def fail_messages_in_queue(queue, error)
  queue.delete_if do |protocol_message|
    if protocol_message.action.match_any?(:presence, :message)
      if protocol_message.channel == channel.name
        nack_messages protocol_message, error
        true
      end
    end
  end
end

#fail_queued_messages(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

When a channel becomes suspended or failed, all queued messages should be failed immediately as we don't queue in any of those states



117
118
119
120
# File 'lib/ably/realtime/channel/channel_manager.rb', line 117

def fail_queued_messages(error)
  error = Ably::Exceptions::MessageDeliveryFailed.new("Queued messages on channel '#{channel.name}' in state '#{channel.state}' will never be delivered") unless error
  fail_messages_in_queue connection.__outgoing_message_queue__, error
end

#log_channel_error(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

An error has occurred on the channel



47
48
49
# File 'lib/ably/realtime/channel/channel_manager.rb', line 47

def log_channel_error(error)
  logger.error { "ChannelManager: Channel '#{channel.name}' error: #{error}" }
end

#nack_message(message, error, protocol_message = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



141
142
143
144
# File 'lib/ably/realtime/channel/channel_manager.rb', line 141

def nack_message(message, error, protocol_message = nil)
  logger.debug { "Calling NACK failure callbacks for #{message.class.name} - #{message.to_json} #{"protocol message: #{protocol_message}" if protocol_message}" }
  message.fail error
end

#nack_messages(protocol_message, error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



133
134
135
136
137
138
139
# File 'lib/ably/realtime/channel/channel_manager.rb', line 133

def nack_messages(protocol_message, error)
  (protocol_message.messages + protocol_message.presence).each do |message|
    nack_message message, error, protocol_message
  end
  logger.debug { "Calling NACK failure callbacks for #{protocol_message.class.name} - #{protocol_message.to_json}" }
  protocol_message.fail error
end

#notify_state_changeObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

RTL13c



171
172
173
174
# File 'lib/ably/realtime/channel/channel_manager.rb', line 171

def notify_state_change
  @pending_state_change_timer.cancel if @pending_state_change_timer
  @pending_state_change_timer = nil
end

#request_reattach(reason = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Request channel to be reattached by sending an attach protocol message

Parameters:



53
54
55
56
57
58
# File 'lib/ably/realtime/channel/channel_manager.rb', line 53

def request_reattach(reason = nil)
  channel.set_channel_error_reason(reason) if reason
  channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
  send_attach_protocol_message
  logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
end

#start_attach_from_suspended_timerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

If the connection is still connected and the channel still suspended after channel_retry_timeout has passed, then attempt to reattach automatically, see #RTL13b



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/ably/realtime/channel/channel_manager.rb', line 158

def start_attach_from_suspended_timer
  cancel_attach_from_suspended_timer
  if connection.connected?
    channel.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update }
    connection.unsafe_once { |event| cancel_attach_from_suspended_timer unless event == :update }

    @attach_from_suspended_timer = EventMachine::Timer.new(channel_retry_timeout) do
      channel.transition_state_machine! :attaching
    end
  end
end