Class: Ably::Realtime::Channel

Inherits:
Object
  • Object
show all
Extended by:
Modules::Enum, Forwardable
Includes:
Modules::AsyncWrapper, Modules::Conversions, Modules::EventEmitter, Modules::EventMachineHelpers, Modules::MessageEmitter, Modules::StateEmitter, Modules::UsesStateMachine, Publisher
Defined in:
lib/ably/realtime/channel.rb,
lib/ably/realtime/channel/publisher.rb,
lib/ably/realtime/channel/push_channel.rb,
lib/ably/realtime/channel/channel_manager.rb,
lib/ably/realtime/channel/channel_properties.rb,
lib/ably/realtime/channel/channel_state_machine.rb

Overview

Enables messages to be published and subscribed to. Also enables historic messages to be retrieved and provides access to the Channel object of a channel.

Channels will always be in one of the following states:

initialized: 0
attaching:   1
attached:    2
detaching:   3
detached:    4
failed:      5

Note that the states are available as Enum-like constants:

Channel::STATE.Initialized  The channel has been initialized but no attach has yet been attempted.
Channel::STATE.Attaching    An attach has been initiated by sending a request to Ably.
                            This is a transient state, followed either by a transition to ATTACHED, SUSPENDED, or FAILED.
Channel::STATE.Attached     The attach has succeeded. In the ATTACHED state a client may publish and subscribe to messages, or be present on the channel.
Channel::STATE.Detaching    A detach has been initiated on an ATTACHED channel by sending a request to Ably.
                            This is a transient state, followed either by a transition to DETACHED or FAILED.
Channel::STATE.Detached     The channel, having previously been ATTACHED, has been detached by the user.
Channel::STATE.Suspended    The channel, having previously been ATTACHED, has lost continuity, usually due to
                            the client being disconnected from Ably for longer than two minutes. It will automatically attempt to reattach as soon as connectivity is restored.
Channel::STATE.Failed       An indefinite failure condition. This state is entered if a channel error
                            has been received from the Ably service, such as an attempt to attach without the necessary access rights.

Defined Under Namespace

Modules: Publisher Classes: ChannelManager, ChannelProperties, ChannelStateMachine, PushChannel

Constant Summary collapse

STATE =

The current Abbly::Realtime::Channel::STATE of the channel.

The permitted states for this channel

Specification:

  • RTL2b

ruby_enum('STATE',
  :initialized,
  :attaching,
  :attached,
  :detaching,
  :detached,
  :suspended,
  :failed
)
EVENT =

Describes the events emitted by a Ably::Rest::Channel or Ably::Realtime::Channel object. An event is either an UPDATE or a Ably::Rest::Channel::STATE.

The permitted channel events that are emitted for this channel

Specification:

  • RTL2g

ruby_enum('EVENT',
  STATE.to_sym_arr + [:update]
)
MAX_PROTOCOL_MESSAGE_BATCH_SIZE =

Max number of messages to bundle in a single ProtocolMessage

50

Instance Attribute Summary collapse

Attributes included from Modules::UsesStateMachine

#previous_state, #state_history

Instance Method Summary collapse

Methods included from Modules::UsesStateMachine

#synchronize_state_with_statemachine, #transition_state_machine, #transition_state_machine!

Methods included from Modules::StateEmitter

#once_or_if, #once_state_changed, #state, #state=, #state?, #unsafe_once_or_if, #unsafe_once_state_changed

Methods included from Modules::MessageEmitter

#emit_message

Methods included from Modules::EventEmitter

#emit, #off, #on, #once, #unsafe_off, #unsafe_on, #unsafe_once

Constructor Details

#initialize(client, name, channel_options = {}) ⇒ Channel

Initialize a new Channel object

Parameters:



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/ably/realtime/channel.rb', line 127

def initialize(client, name, channel_options = {})
  name = ensure_utf_8(:name, name)

  @options       = Ably::Models::ChannelOptions(channel_options)
  @client        = client
  @name          = name
  @queue         = []

  @state_machine = ChannelStateMachine.new(self)
  @state         = STATE(state_machine.current_state)
  @manager       = ChannelManager.new(self, client.connection)
  @push          = PushChannel.new(self)
  @properties    = ChannelProperties.new(self)
  @attach_resume = false

  setup_event_handlers
  setup_presence
end

Instance Attribute Details

#attach_resumeBoolean (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Flag that specifies whether channel is resuming attachment(reattach) or is doing a 'clean attach' RTL4j1

Returns:

  • (Boolean)


114
115
116
# File 'lib/ably/realtime/channel.rb', line 114

def attach_resume
  @attach_resume
end

#clientAbly::Realtime::Client (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Ably::Realtime::Client associated with this channel



79
80
81
# File 'lib/ably/realtime/channel.rb', line 79

def client
  @client
end

#error_reasonAbly::Models::ErrorInfo, Ably::Exceptions::BaseAblyException (readonly)

An Models::ErrorInfo object describing the last error which occurred on the channel, if any.



104
105
106
# File 'lib/ably/realtime/channel.rb', line 104

def error_reason
  @error_reason
end

#managerAbly::Realtime::Channel::ChannelManager (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

The Channel manager responsible for attaching, detaching and handling failures for this channel



109
110
111
# File 'lib/ably/realtime/channel.rb', line 109

def manager
  @manager
end

#nameString (readonly)

The channel name.

Returns:

  • (String)


83
84
85
# File 'lib/ably/realtime/channel.rb', line 83

def name
  @name
end

#optionsHash (readonly)

Channel options configured for this channel, see #initialize for channel_options

Returns:

  • (Hash)


92
93
94
# File 'lib/ably/realtime/channel.rb', line 92

def options
  @options
end

#properties{Ably::Realtime::Channel::ChannelProperties} (readonly)

A ChannelProperties object.

Returns:

Specification:

  • CP1, RTL15



99
100
101
# File 'lib/ably/realtime/channel.rb', line 99

def properties
  @properties
end

#pushAbly::Realtime::Channel::PushChannel (readonly)

A PushChannel object.



88
89
90
# File 'lib/ably/realtime/channel.rb', line 88

def push
  @push
end

Instance Method Details

#__incoming_msgbus__Ably::Util::PubSub

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns Client library internal channel incoming message bus.

Returns:



352
353
354
355
356
# File 'lib/ably/realtime/channel.rb', line 352

def __incoming_msgbus__
  @__incoming_msgbus__ ||= Ably::Util::PubSub.new(
    coerce_into: lambda { |event| Ably::Models::ProtocolMessage::ACTION(event) }
  )
end

#attach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable

Attach to this channel ensuring the channel is created in the Ably system and all messages published on the channel are received by any channel listeners registered using #subscribe. Any resulting channel state change will be emitted to any listeners registered using the Modules::EventEmitter#on or Modules::EventEmitter#once methods. A callback may optionally be passed in to this call to be notified of success or failure of the operation. As a convenience, attach() is called implicitly if #subscribe for the channel is called, or Presence#enter or Presence#subscribe are called on the Presence object for this channel.

Yields:

Returns:

Specification:

  • RTL4d



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/ably/realtime/channel.rb', line 257

def attach(&success_block)
  if connection.closing? || connection.closed? || connection.suspended? || connection.failed?
    error = Ably::Exceptions::InvalidStateChange.new("Cannot ATTACH channel when the connection is in a closed, suspended or failed state. Connection state: #{connection.state}")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  if !attached?
    if detaching?
      # Let the pending operation complete (#RTL4h)
      once_state_changed { transition_state_machine :attaching if can_transition_to?(:attaching) }
    else
      transition_state_machine :attaching if can_transition_to?(:attaching)
    end
  end

  deferrable_for_state_change_to(STATE.Attached, &success_block)
end

#attach_resume!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



389
390
391
# File 'lib/ably/realtime/channel.rb', line 389

def attach_resume!
  @attach_resume = true
end

#clear_error_reasonObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



378
379
380
# File 'lib/ably/realtime/channel.rb', line 378

def clear_error_reason
  @error_reason = nil
end

#detach {|Ably::Realtime::Channel| ... } ⇒ Ably::Util::SafeDeferrable

Detach from this channel. Any resulting channel state change is emitted to any listeners registered using the Modules::EventEmitter#on or Modules::EventEmitter#once methods. A callback may optionally be passed in to this call to be notified of success or failure of the operation. Once all clients globally have detached from the channel, the channel will be released in the Ably service within two minutes.

Yields:

Returns:

Specification:

  • RTL5e



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/ably/realtime/channel.rb', line 285

def detach(&success_block)
  if initialized?
    success_block.call if block_given?
    return Ably::Util::SafeDeferrable.new_and_succeed_immediately(logger)
  end

  if failed? || connection.closing? || connection.failed?
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, exception_for_state_change_to(:detaching))
  end

  if !detached?
    if attaching?
      # Let the pending operation complete (#RTL5i)
      once_state_changed { transition_state_machine :detaching if can_transition_to?(:detaching) }
    elsif can_transition_to?(:detaching)
      transition_state_machine :detaching
    else
      transition_state_machine! :detached
    end
  end

  deferrable_for_state_change_to(STATE.Detached, &success_block)
end

#history(options = {}) {|Ably::Models::PaginatedResult<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable

Retrieves a Models::PaginatedResult object, containing an array of historical Models::Message objects for the channel. If the channel is configured to persist messages, then messages can be retrieved from history for up to 72 hours in the past. If not, messages can only be retrieved from history for up to two minutes in the past.

Parameters:

  • options (Hash) (defaults to: {})

    a customizable set of options

Options Hash (options):

  • :until_attach (Boolean)

    When true, the history request will be limited only to messages published before this channel was attached. Channel must be attached

Yields:

Returns:

Specification:

  • RSL2a



334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/ably/realtime/channel.rb', line 334

def history(options = {}, &callback)
  # RTL10b
  if options.delete(:until_attach)
    unless attached?
      error = Ably::Exceptions::InvalidRequest.new('option :until_attach is invalid as the channel is not attached' )
      return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
    end
    options[:from_serial] = properties.attach_serial
  end

  async_wrap(callback) do
    rest_channel.history(options.merge(async_blocking_operations: true))
  end
end

#loggerObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Used by Modules::StateEmitter to debug state changes



384
385
386
# File 'lib/ably/realtime/channel.rb', line 384

def logger
  client.logger
end

#need_reattach?Boolean

Returns:

  • (Boolean)


402
403
404
# File 'lib/ably/realtime/channel.rb', line 402

def need_reattach?
  !!(attaching? || attached?) && !!(options.modes || options.params)
end

#presenceAbly::Realtime::Presence

A Presence object.

Returns:

Specification:

  • RTL9



315
316
317
# File 'lib/ably/realtime/channel.rb', line 315

def presence
  @presence
end

#publish(name, data = nil, attributes = {}) {|Ably::Models::Message, Array<Ably::Models::Message>| ... } ⇒ Ably::Util::SafeDeferrable

Publish a message to the channel. A callback may optionally be passed in to this call to be notified of success or failure of the operation. When publish is called with this client library, it won't attempt to implicitly attach to the channel.

Examples:

# Publish a single message form
channel.publish 'click', { x: 1, y: 2 }

# Publish a single message with single Hash form
message = { name: 'click', data: { x: 1, y: 2 } }
channel.publish message

# Publish an array of message Hashes form
messages = [
  { name: 'click', data: { x: 1, y: 2 } },
  { name: 'click', data: { x: 2, y: 3 } }
]
channel.publish messages

# Publish an array of Ably::Models::Message objects form
messages = [
  Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
  Ably::Models::Message(name: 'click', data: { x: 2, y: 3 })
]
channel.publish messages

# Publish an array of Ably::Models::Message objects form
message = Ably::Models::Message(name: 'click', data: { x: 1, y: 2 })
channel.publish message

channel.publish('click', 'body') do |message|
  puts "#{message.name} event received with #{message.data}"
end

channel.publish('click', 'body').errback do |error, message|
  puts "#{message.name} was not received, error #{error.message}"
end

Parameters:

  • name (String, Array<Ably::Models::Message|Hash>, nil)

    The event name of the message to publish, or an Array of [Ably::Model::Message] objects or [Hash] objects with :name and :data pairs

  • data (String, ByteArray, nil) (defaults to: nil)

    The message payload unless an Array of [Ably::Model::Message] objects passed in the first argument

  • attributes (Hash, nil) (defaults to: {})

    Optional additional message attributes such as :client_id or :connection_id, applied when name attribute is nil or a string

Yields:

Returns:

Specification:

  • RTL6i



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/ably/realtime/channel.rb', line 193

def publish(name, data = nil, attributes = {}, &success_block)
  if suspended? || failed?
    error = Ably::Exceptions::ChannelInactive.new("Cannot publish messages on a channel in state #{state}")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  if !connection.can_publish_messages?
    error = Ably::Exceptions::MessageQueueingDisabled.new("Message cannot be published. Client is not allowed to queue messages when connection is in state #{connection.state}")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  messages = build_messages(name, data, attributes) # (RSL1a, RSL1b)

  if messages.length > Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE
    error = Ably::Exceptions::InvalidRequest.new("It is not possible to publish more than #{Realtime::Connection::MAX_PROTOCOL_MESSAGE_BATCH_SIZE} messages with a single publish request.")
    return Ably::Util::SafeDeferrable.new_and_fail_immediately(logger, error)
  end

  enqueue_messages_on_connection(client, messages, channel_name, options).tap do |deferrable|
    deferrable.callback(&success_block) if block_given?
  end
end

#reset_attach_resume!Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



394
395
396
# File 'lib/ably/realtime/channel.rb', line 394

def reset_attach_resume!
  @attach_resume = false
end

#set_channel_error_reason(error) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



373
374
375
# File 'lib/ably/realtime/channel.rb', line 373

def set_channel_error_reason(error)
  @error_reason = error
end

#set_options(channel_options) ⇒ Ably::Models::ChannelOptions Also known as: options=

Sets the Models::ChannelOptions for the channel. An optional callback may be provided to notify of the success or failure of the operation.

Parameters:

Returns:

Specification:

  • RTL16



365
366
367
368
369
# File 'lib/ably/realtime/channel.rb', line 365

def set_options(channel_options)
  @options = Ably::Models::ChannelOptions(channel_options)
  # RTL4i
  manager.request_reattach if (need_reattach? and connection.state?(:connected))
end

#subscribe(*names) {|Ably::Models::Message| ... } ⇒ void

This method returns an undefined value.

Registers a listener for messages on this channel. The caller supplies a listener function, which is called each time one or more messages arrives on the channel. A callback may optionally be passed in to this call to be notified of success or failure of the channel #attach operation.

Parameters:

  • names (String)

    The event name of the message to subscribe to if provided. Defaults to all events.

Yields:

Specification:

  • RTL7a



227
228
229
230
# File 'lib/ably/realtime/channel.rb', line 227

def subscribe(*names, &callback)
  attach unless attached? || attaching?
  super
end

#unsubscribe(*names, &callback) ⇒ void

This method returns an undefined value.

Deregisters the given listener for the specified event name(s). This removes an earlier event-specific subscription.

Parameters:

  • names (String)

    The event name of the message to subscribe to if provided. Defaults to all events.

Specification:

  • RTL8a



240
241
242
# File 'lib/ably/realtime/channel.rb', line 240

def unsubscribe(*names, &callback)
  super
end