Files of check-in [775400ee76] in directory assets/tclrmq1.4.5 [history]
tclrmq
Pure TCL RabbitMQ Library implementing AMQP 0.9.1
This library is completely asynchronous and makes no blocking calls. It relies on TclOO and requires Tcl 8.6, but has no other dependencies (other than a RabbitMQ server).
About
Developed for use within FlightAware (https://flightaware.com).
The package directory contains a Makefile for installing globally. By
default the Makefile installs to /usr/local/lib
, so this will need editing
if an alternative directory is required.
Basic Usage
There are two primary classes required for using the library.
1) Connection
The Connection class is used for initiating initial communication with the RabbitMQ server. It also relies on a subsidiary Login class, which is used for specifying username, password, vhost and authentication mechanism. Out of the box, this library only supports the PLAIN SASL mechanism. It can be easily extended to support an additional mechanism if required.
```tcl package require rmq
Arguments: -user -pass -vhost
All optional and shown with their defaults
set login [Login new -user "guest" -pass "guest" -vhost "/"]
Pass the login object created above to the Connection
constructor
-host and -port are shown with their default values
set conn [Connection new -host localhost -port 5672 -login $login]
Set a callback for when the connection is ready to use
which will be passed the connection object
$conn onConnected rmqconnready proc rmqconnready {conn} { puts "Connection ready!" $conn connectionClose }
Set a callback for when the connection is closed
$conn onClosed rmqconnclosed proc rmqconnclosed {conn} { puts "Connection closed!" }
Initiate the connection handshake and enter the event loop
$conn connect vwait die ```
2) Channel
The Channel class is where most of the action happens. The vast majority of AMQP methods refer to a specific channel. After the Connection object has gone through the opening handshake and calls its onOpen callback, a Channel object can be created by passing the Connection object to the Channel class' constructor.
```tcl
Assume the following proc has been set as the Connection object's
onOpen callback
proc rmqconnready {conn} { # Create a channel object # If no channel number is specified, the # next available will be chosen set chan [Channel new $conn]
# Do something with the channel, like
# declare an exchange
set flags [list $::rmq::EXCHANGE_DURABLE]
$chan exchangeDeclare "test" "direct" $flags
} ```
Callbacks
Using this library for anything useful requires setting callbacks for the AMQP methods needed in the client application. Most callbacks will be set on Channel objects, but the Connection object supports a few as well.
All callbacks are passed the object they were set on as the first parameter. Depending on the AMQP method or object event, additional parameters are provided as appropriate.
Connection Callbacks
Connection objects allow for the setting of the following callbacks:
1) onConnected: called when the AMQP connection handshake finishes and is passed the Connection object
2) onBlocked: called when the RabbitMQ server has blocked connections due to resource limitations. Callback is passed the Connection object, a boolean for whether the connection is blocked or not and a textual reason
3) onClosed: called when the connection is closed and is passed the Connection object
and a dict containing the reply code, any reply text, the class ID and the method ID.
The corresponding dictionary keys are replyCode
, replyText
, classID
and
methodID
respectively. An alias method onClose is also provided.
3) onError: called when an error code has been sent to the Connection and is passed the error code and any accompanying data in the frame
4) onFailedReconnect: called when all reconnection attempts have been exhausted
```tcl package require rmq
Arguments: username password vhost
set login [Login new -user "guest" -pass "guest" -vhost "/"]
Pass the login object created above to the Connection
constructor
set conn [Connection new -host localhost -port 5672 -login $login]
$conn onConnected rmqconnected $conn onClosed rmqclosed $conn onError rmqconnectionerror
proc rmq_connected {rmqConn} { # do useful things }
proc rmq_closed {rmqConn closeD} { # do other useful things }
proc rmq_error {rmqConn frameType frameData} { # do even more useful things } ```
Channel Callbacks
Channel objects have a few specific callbacks that can be set along with a more general callback mechanism for the majority of AMQP method calls.
Specific Callbacks
The specific callbacks provided for Channel objects mirror those available for Connection objects. They are:
1) onOpen: called when the channel is open and ready to use, i.e., when the Channel.Open-Ok method is received from the RabbitMQ server and is passed the same arguments as the onConnected callback for Connection objects
2) onClose: called when the channel has been fully closed, i.e., when the Channel.Close-Ok method is received from the RabbitMQ server and is passed the Channel object and the same dictionary passed to the onClosed callback for Connection objects
3) onError: called when the channel receives an error, i.e., a frame is received for the given channel but contains an AMQP error code and is passed the same arguments as the onError callback for Connection objects
General Callback Mechanism
Other than the above callbacks, a Channel object can be supplied a callback for every method that can be sent in response to an AMQP method by using the on method of Channel objects.
These callbacks are passed the Channel object they were set on unless otherwise specified in the full method documentation found below.
When specifying the name of the AMQP method the callback will be invoked on, start with a lowercase letter and use camel case. All AMQP methods documented in the RabbitMQ 0-9-1 extended specification are available.
```tcl
Asumming a channel object by name rmqChan exists
$rmqChan on exchangeDeclareOk exchangedeclared $rmqChan on queueDeclareOk queuedeclared $rmqChan on queueBindOk queue_bound
$rmqChan exchangeDeclare "thebestexchange" "fanout" vwait exchangeDelcared
$rmqChan queueDeclare "thebestqueue" vwait queueDeclared
$rmqChan queueBind "thebestqueue" "thebestexchange" "thebestrouting_key"
proc exchange_delcared {rmqChan} { set ::exchangeDeclared 1 }
proc queue_declared {rmqChan} { set ::queueDeclared 1 }
proc queue_bound {rmqChan} { set ::queueBound 1 } ```
The Exception of Consuming
When consuming messages from a queue using either Basic.Consume or Basic.Get, the process of setting a callback and the data passed into the callback differs from every other case.
For consuming, the Channel object methods basicConsume and basicGet take the name of the callback invoked for each message delivered and then their arguments. The callbacks get passed in the Channel object, a dictionary of method data, a dictionary of frame data, and the data from the queue.
```tcl
Assuming a channel object by name rmqChan exists
$rmqChan basicConsume consumecallback "thebest_queue"
proc consume_callback {rmqChan methodD frameD data} { # Can inspect the consumer tag and dispatch on it switch [dict get $methodD consumerTag] { # useful things }
# Can get the delivery tag to ack the message
$rmqChan basicAck [dict get $methodD deliveryTag]
# Frame data includes things like the data body size
# and is likely less immediately useful but it is
# passed in because it might be necessary for a given
# application
} ```
Consuming From Multiple Queues
For a given channel, multiple queues can be consumed from and each queue can be given its own callback proc by passing in (or allowing the server to generate) a distinct consumerTag for each invocation of basicConsume. Otherwise, dispatching based on the method or frame metadata allows a single callback proc to customize the handling of messages from different queues. When the client application is not constrained in its use of channels, instantiating multiple Channel objects is a straight-forward way for one consumer to concurrently pull data from more than one queue.
Method Data
The dictionary of method data passed as the second argument to consumer callbacks contains the following items:
consumerTag
The string consumer tag, either specified at the time basicConsume is called, or auto-generated by the server.
deliveryTag
Integer numbering for the message being consumed. This is used for the basicAck or basicNack methods.
redelivered
Boolean integer.
exchange
Name of the exchange the message came from.
routingKey
Routing key used for delivery of the message.
Frame Data
The dictionary of frame data passed as the third argument to consumer callbacks contains the following items:
classID
AMQP defined integer for the class used for delivering the message.
bodySize
Size in bytes for the data consumed from the queue.
properties
Dictionary of AMQP Basic method properties, e.g., correlation-id, timestamp or content-type.
Special Arguments
Flags
For AMQP methods like queueDeclare or exchangeDeclare which take flags, these are passed in as a list of
constants. All supported flags are mentioned in the documentation below detailing each Channel method.
Within the source, supported flag constants are found in constants.tcl.
Properties / Headers
For AMQP class methods which take properties and/or headers, e.g., basicConsume, basicPublish, or exchangeDeclare, the properties and headers are passed in as a Tcl dict. The library takes care of encoding them properly.
Library Documentation
All methods defined for Connection, Login, and Channel classes are detailed below. Only includes methods that are part of the public interface for each object. Any additional methods found in the source are meant to be called internally.
Connection Class
Class for connecting to a RabbitMQ server.
constructor
The constructor takes the following arguments (all optional):
-host
Defaults to localhost
-port
Defaults to 5672
-tls
Either 0 or 1, but defaults to 0. Controls whether to connect to the RabbitMQ server using TLS. To set TLS options, e.g., if using a client cert, call the tlsOptions method before invoking connect.
-login
Login object. Defaults to calling the Login constructor with no arguments.
-frameMax
Maximum frame size in bytes. Defaults to the value offered by the RabbitMQ server in Connection.Tune.
-maxChannels
Maximum number of channels available for this connection. Defaults to no imposed limit, which is essentially 65,535.
-locale
Defaults to en_US.
-heartbeatSecs
Interval in seconds for sending out heartbeat frames. A value of 0 means no heartbeats will be sent.
-blockedConnections
Either 0 or 1, but defaults to 1. Controls whether to use this RabbitMQ extension.
-cancelNotifications
Either 0 or 1, but deafults to 1. Controls whether to use this RabbitMQ extension.
-maxTimeout
Integer seconds to wait before timing out the connection attempt to the server. Defaults to 3.
-autoReconnect
Either 0 or 1, but defaults to 1. Controls whether the library attempts to reconnect to the RabbitMQ server when the initial call to Connection.connect fails or an established socket connection is closed by the server or by network conditions.
-maxBackoff
Integer number of seconds past which exponential backoff, which is the reconnection strategy employed, will not go. Defaults to 64 seconds.
-maxReconnects
Integer number of reconnects to attempt before giving up. Defaults to 5. A value of 0 means infinite reconnects. To disable retries, pass -autoReconnect as 0.
-debug
Either 0 or 1, but defaults to 0. Controls whether or not debug statements are passed to
-logCommand
detailing the operations of the library.-logCommand
If the
-debug
option is true, the value of this argument will be passed debugging statements detailing the operations of the library. The specified-logCommand
must take a string argument containing a single debugging statement. Defaults toputs stderr
.
attemptReconnect
Takes no arguments. Using the -maxBackoff and -maxReconnects constructor arguments, attempts to reconnect to the server. If this cannot be done, and an onFailedReconnect callback has been set, it is invoked.
closeConnection
Takes an optional boolean argument controlling whether the onClose callback is invoked (defaults to true). Closes the connection and, if specified, calls any callback set with onClose. This is not meant to be called externally as it does not uses the AMQP protocol for closing the channel. Instead, connectionClose should be used in client applications.
connect
Takes no arguments. Actually initiates a socket connection with the RabbitMQ server. If the connection fails the onClose callback is invoked. Two timeouts can potentially occur in this method: one during the TCP handshake and one during the AMQP handshake. In both cases, the -maxTimeout variable is used. Returns 1 if a connection is fully established, or 0 otherwise.
connected?
Takes no arguments. Returns 0 or 1 depending on whether the socket connection to the server has been established and an AMQP handshake completed. It is only true once both those conditions have been satisfied. In the event that a connection fails, the getSocket
method can be used to obtain and query the socket channel and determine whether the problem is network or protocol based.
getSocket
Takes no arguments. Returns the socket object for communicating with the server. This allows for more fine-grained inspection and tuning if so desired.
onBlocked
Takes the name of a callback proc which will be used for blocked connection notifications. Blocked connection notifications are always requested by this library, but the setting of a callback is optional. The callback takes the Connection object, a boolean for whether the connection is blocked (this callback is also used when the connection is no longer blocked), and a textual reason why.
onClose
Takes the name of a callback proc which will be called when the connection is closed. This includes a failed connection to the RabbitMQ server when first calling connect and a disconnection after establishing communication with the RabbitMQ server. The callback takes the Connection object and a dictionary with the keys specified in the documentation to the onClosed callback.
onClosed
Alias for onClose method.
onConnected
Takes the name of a callback proc which will be used when the AMQP handshake is finished. When this callback is invoked, the Connection object is ready to create channels and perform useful work.
onError
Takes the name of a callback proc used when an error is reported by the RabbitMQ server on the connection level. The callback proc takes the Connection object, a frame type and any extra data included in the frame.
onFailedReconnect
Takes the name of a callback proc used when the maximum number of connection attempts have been made without sucess. The callback proc takes the Connection object.
removeCallbacks
Takes an optional boolean channelsToo, which defaults to 0. Unsets all callbacks for the Connection object. If channelsToo is 1, also unsets callbacks on all of its channels.
reconnecting?
Takes no argument. Returns 0 or 1 depending on whether the Connection is in the process of attempting a reconnect.
resetRetries
Takes no arguments. Sets the count of connection retries back to 0. Useful in cases where -autoReconnect is true and more fine-grained control of the retry loop is desired. Internally the retry count is reset to 0 when the AMQP handshake completes.
tlsOptions
Used to setup the parameters for an SSL / TLS connection to the RabbitMQ server.
Supports all arguments supported by the Tcl tls package's ::tls::import::
command
as specified in the Tcl TLS documentation.
If a TLS connection is desired, this method needs to be called before connect.
Login Class
constructor
The constructor takes the following arguments (all optional):
-user
Username to login with. Defaults to guest
-pass
Password to login with. Defaults to guest
-mechanism
Authentication mechanism to use. Defaults to PLAIN
-vhost
Virtual host to login to. Defaults to /
saslResponse
Takes no arguments. This method needs to overridden if an alternative mechanism is desired.
Channel Class
Most of the methods made available by this library come from the Channel class. It implements the majority of the AMQP methods.
constructor
Takes the following arguments:
connectionObj
The Connection object to open a channel for. This is the only required argument.
channelNum
The channel number to open. Optional. If not specified, the next available number starting from 1 will be used. Passing in an empty string or 0 is equivalent to not providing this argument, i.e., the class will pick the next available channel number for the Connection object provided.
shouldOpen
A boolean argument that defaults to 1. If set to 1 the channel will open after it is created. If not, the channelOpen method must be called manually before anything can be done with the Channel object.
active?
Takes no arguments and returns 1 if the channel is active, i.e., it has been opened successfully, and 0 otherwise.
closeChannel
Not meant to be called externally. Instead, this method is used internally by the library to properly set the Channel object's state before and after calling the onClose callback.
closeConnection
Takes an optional boolean argument, callCloseCB, which defaults to 1. Closes the associated Connection object and if callCloseCB is true, any callback set with the Connection object's onClose method is invoked, otherwise it is ignored.
closing?
Takes no arguments and returns 1 if the Channel is in the process of closing and 0 otherwise.
getChannelNum
Takes no arguments, and returns the channel number.
getConnection
Takes no arguments, and returns the Connection object passed into the constructor.
open?
Alias for active?.
on
Takes an AMQP method name in camel case, starting with a lower case letter and the name of a callback proc for the method. To unset a callback, set its callback proc to the empty string or use removeCallback.
onClose
Takes the name of a callback proc to be called when the channel is closed. The callback takes the Channel object and a dictionary of data, which is specified in the section about onClose callbacks.
onClosed
Alias for onClose.
onError
Takes the name of a callback proc invoked when an error occurs on this particular Channel object. The error callback is passed the Channel object, a numeric error code as returned from the server, and any additional data passed back. Errors occur on a channel when the server returns an unexpected response but not when a disconnection occurs or the channel is closed forcefully by the server.
onOpen
Takes the name of a callback proc to be called when the channel successfully opens. Once it is open, AMQP methods can be called. The callback takes the Channel object.
onOpened
Alias for onOpen.
reconnecting?
Takes no arguments. Returns 1 if Connection is in the process of attempting a reconnect and 0 otherwise.
removeCallback
Takes the name of an AMQP method as defined on a Channel object.
removeCallbacks
Takes no arguments. Sets all callbacks to the empty string, effectively removing them.
setCallback
Takes the name of an AMQP method as defined on a Channel object (or for the on Channel method). The preferred method to use is on, but this is alternative method for setting a callback. To unset a callback, set its callback proc to the empty string or use removeCallback.
Channel AMQP Methods
The following methods are defined on Channel objects and implement the methods and classes detailed in the AMQP specification.
Channel Methods
channelClose
Takes the following arguments:
replyCode
Numeric reply code for closing the channel as specified in the AMQP specification.
replyText
Textual description of the reply code.
classID
AMQP class ID number.
methodID
AMQP method ID number.
To place a callback for the closing of a channel, use the onClose or onClosed method. The callback takes the Channel object and a dictionary of data with key names matching the arguments listed above.
channelOpen
Takes no arguments.
To place a callback for the opening of a channel use the onOpen method. The callback takes only the Channel object.
Exchange Methods
exchangeBind
Takes the following arguments:
dst
Destination exchange name.
src
Source exchange name.
rKey
Routing key for the exchange binding.
noWait
Boolean integer, which defaults to 0.
eArgs
Exchange binding arguments (optional). Passed in as a dict. Defaults to an empty dict.
To set a callback for exchange to exchange bindings use the on method with exchangeBindOk as the first argument. Callback only takes the Channel object.
exchangeDeclare
Takes the following arguments:
eName
Exchange name.
eType
Exchange type: direct, fanout, header, topic
eFlags
Optional flags. Flags supported (all in the ::rmq namespace):
- EXCHANGE_PASSIVE
- EXCHANGE_DURABLE
- EXCHANGEAUTODELETE
- EXCHANGE_INTERNAL
- EXCHANGENOWAIT
eArgs
Optional dict of exchange declare arguments.
To set a callback on an exchange declaration, use the on method with exchangeDeclareOk as the first argument. Callback only takes the Channel object.
exchangeDelete
Takes the following arguments:
eName
Exchange name to delete.
inUse
Optional boolean argument defaults to 0. If set to 1, will not delete an exchange with bindings on it.
noWait
Optional boolean argument defaults to 0.
To set a callback on the exchange deletion, use the on method with exchangeDeleteOk as the first argument. Callback only takes the Channel object.
exchangeUnbind
Takes the same arguments as exchangeBind, with the same callback data.
Queue Methods
queueBind
Takes the following arguments:
qName
Queue name.
eName
Exchange name.
rKey
Routing key (optional). Defaults to the empty string.
noWait
Boolean integer (optional). Defaults to 0.
qArgs
Queue binding arguments (optional). Needs to be passed in as a dict. Defaults to an empty dict.
To set a callback on a queue binding, use the on method with queueBindOk as the first argument. Callback only takes the Channel object.
queueDeclare
Takes the following arguments:
qName
Queue name.
qFlags
Optional list of queue declare flags. Supports the following flag constants (in the ::rmq namespace):
- QUEUE_PASSIVE
- QUEUE_DURABLE
- QUEUE_EXCLUSIVE
- QUEUEAUTODELETE
- QUEUEDECLARENO_WAIT
qArgs
Optional dictionary of queue declare arguments. Allows for setting features like TTLs, max lengths or a single consumer policy.
To set a callback on a queue declare, use the on method with queueDeclareOk as the first argument. Callback takes the Channel object, the queue name (especially important for exclusive queues), message count, number of consumers on the queue.
queueDelete
Takes the following arguments:
qName
Queue name.
flags
Optional list of flags. Supported flags (in the ::rmq namespace):
- QUEUEIFUNUSED
- QUEUEIFEMPTY
- QUEUEDELETENO_WAIT
To set a callback on a queue delete, use the on method with queueDeleteOk as the first argument. Callback takes the Channel object and a message count from the delete queue.
queuePurge
Takes the following arguments:
qName
Queue name.
noWait
Optional boolean argument. Defaults to 0.
To set a callback on a queue purge, use the on method with queuePurgeOk as the first argument. Callback takes the Channel object and a message count from the purged queue.
queueUnbind
Takes the following arguments:
qName
Queue name.
eName
Exchange name.
rKey
Routing key.
qArgs
Optional queue arguments. Passed in as a dict.
To set a callback on a queue unbinding, use the on method with queueUnbindOk as the first argument. Callback takes only the Channel object.
Basic Methods
basicAck
Takes the following arguments:
deliveryTag
Delivery tag being acknowledged.
multiple
Optional boolean, defaults to 0. If set to 1, all messages up to and including the deliveryTag argument's value.
Setting a callback on this method using the on method is for publisher confirms. The callback takes the Channel object, a delivery tag and a multiple boolean.
basicCancel
Takes the following arguments:
cTag
Consumer tag.
noWait
Optional boolean argument. Defaults to 0.
To set a callback on a basic cancel, use the on method with basicCancelOk as the first argument. Callback takes the Channel object and the consumer tag that was canceled.
basicConsume
Takes the following arguments:
callback
Name of a callback to use for consuming messages. The callback takes the Channel object, a dict of method data, a dict of frame data and the data from the queue.
qName
Queue name to consume from.
cTag
Optional consumer tag.
cFlags
Optional list of flags. Supported flags (all in the ::rmq namespace):
- CONSUMENOLOCAL
- CONSUMENOACK
- CONSUME_EXCLUSIVE
- CONSUMENOWAIT
cArgs
Optional arguments to control consuming. Passed in as a dict. Supports all arguments specified for the basic class.
Callback is set directly from this method.
basicGet
Takes the following arguments:
callback
Name of a callback proc using the same arguments as that for basicConsume.
qName
Queue name to get a message from
noWait
Optional boolean. Defaults to 0.
Like with basicConsume the callback for this method is set directly from the method call.
basicNack
Takes the following arguments:
deliveryTag
Delivery tag for message being nack'ed.
nackFlags
Optional list of flags. Supports the following (in the ::rmq namespace):
- NACK_MULTIPLE
- NACK_REQUEUE
Setting a callback on this method using the on method is for publisher confirms. The callback takes the Channel object, a delivery tag and a multiple boolean.
basicQos
Takes the following arguments:
prefetchCount
Integer prefetch count, i.e., the number of unacknowledged messages that can be delivered to a consumer at one time.
globalQos
Optional boolean which defaults to 0. If set to 1, the prefecth count is set globally for all consumers on the channel.
To set a callback on a basic QOS call, use the on method with basicQosOk as the first argument. Callback takes only the Channel object.
basicPublish
Takes the following arguments:
data
The data to publish to the queue.
eName
Exchange name.
rKey
Routing key.
pFlags
Optional list of flags. Supports the following flags (in the ::rmq namespace):
- PUBLISH_MANDATORY
- PUBLISH_IMMEDIATE
No callback can be set on this directly. For publisher confirms use the on method with basicAck as the first argument. That callback takes the Channel object, the delivery tag and a boolean for whether the ack is for multiple messages.
basicRecover
Same as basicRecoverAsync.
Confirm Methods
confirmSelect
Takes the following arguments:
noWait
Optional boolean argument, defaults to 0.
To set a callback on a confirm select call, use the on method with confirmSelectOk as the first argument. Callback takes the Channel object.
basicRecoverAsync
Takes the following arguments:
reQueue
Boolean argument. If 0, the message will be redelivered to the original recipient. If 1, an alternate recipient can get the redelivery.
To set a callback on a basic recover, use the on method with basicRecoverOk as the first argument. Callback takes the Channel object.
basicReject
Takes the following arguments:
deliveryTag
Delivery tag of message being rejected by the client.
reQueue
Optional boolean argument, defaults to 0. If set to 1, the rejected message will be requeued.
basicReturn
This method is not to be called directly, but to use a callback to handle returned messages, use the on method with basicReturn as the first argument. The callback takes the same arguments as the basicConsume callback.
TX Methods
txSelect
Takes no arguments.
To set a callback on a transaction select call, use the on method with txSelectOk as the first argument. Callback takes the Channel object.
txCommit
Takes no arguments.
To set a callback on a transaction commit call, use the on method with txCommitOk as the first argument. Callback takes the Channel object.
txRollback
Takes no arguments.
To set a callback on a transaction commit call, use the on method with txRollbackOk as the first argument. Callback takes the Channel object.