Class | Stomp::Client |
In: |
lib/stomp.rb
|
Parent: | Object |
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 269 269: def initialize user="", pass="", host="localhost", port=61613, reliable=false 270: if user =~ /stomp:\/\/(\w+):(\d+)/ 271: user = "" 272: pass = "" 273: host = $1 274: port = $2 275: reliable = false 276: elsif user =~ /stomp:\/\/(\w+):(\w+)@(\w+):(\d+)/ 277: user = $1 278: pass = $2 279: host = $3 280: port = $4 281: reliable = false 282: end 283: 284: @id_mutex = Mutex.new 285: @ids = 1 286: @connection = Connection.open user, pass, host, port, reliable 287: @listeners = {} 288: @receipt_listeners = {} 289: @running = true 290: @replay_messages_by_txn = Hash.new 291: @listener_thread = Thread.start do 292: while @running 293: message = @connection.receive 294: case 295: when message == NIL 296: break 297: when message.command == 'MESSAGE' 298: if listener = @listeners[message.headers['destination']] 299: listener.call(message) 300: end 301: when message.command == 'RECEIPT' 302: if listener = @receipt_listeners[message.headers['receipt-id']] 303: listener.call(message) 304: end 305: end 306: end 307: end 308: end
Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)
# File lib/stomp.rb, line 318 318: def self.open user="", pass="", host="localhost", port=61613, reliable=false 319: Client.new user, pass, host, port, reliable 320: end
Abort a transaction by name
# File lib/stomp.rb, line 328 328: def abort name, headers={} 329: @connection.abort name, headers 330: 331: # lets replay any ack'd messages in this transaction 332: replay_list = @replay_messages_by_txn[name] 333: if replay_list 334: replay_list.each do |message| 335: if listener = @listeners[message.headers['destination']] 336: listener.call(message) 337: end 338: end 339: end 340: end
Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 369 369: def acknowledge message, headers={} 370: txn_id = headers[:transaction] 371: if txn_id 372: # lets keep around messages ack'd in this transaction in case we rollback 373: replay_list = @replay_messages_by_txn[txn_id] 374: if replay_list == nil 375: replay_list = [] 376: @replay_messages_by_txn[txn_id] = replay_list 377: end 378: replay_list << message 379: end 380: if block_given? 381: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 382: end 383: @connection.ack message.headers['message-id'], headers 384: end
Begin a transaction by name
# File lib/stomp.rb, line 323 323: def begin name, headers={} 324: @connection.begin name, headers 325: end
Close out resources in use by this client
# File lib/stomp.rb, line 405 405: def close 406: @connection.disconnect 407: @running = false 408: end
Commit a transaction by name
# File lib/stomp.rb, line 343 343: def commit name, headers={} 344: txn_id = headers[:transaction] 345: @replay_messages_by_txn.delete(txn_id) 346: @connection.commit name, headers 347: end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp.rb, line 312 312: def join 313: @listener_thread.join 314: end
Send message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 392 392: def send destination, message, headers = {} 393: if block_given? 394: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 395: end 396: @connection.send destination, message, headers 397: end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp.rb, line 353 353: def subscribe destination, headers={} 354: raise "No listener given" unless block_given? 355: @listeners[destination] = lambda {|msg| yield msg} 356: @connection.subscribe destination, headers 357: end
Unsubecribe from a channel
# File lib/stomp.rb, line 360 360: def unsubscribe name, headers={} 361: @connection.unsubscribe name, headers 362: @listeners[name] = nil 363: end