Class Stomp::Client
In: lib/stomp.rb
Parent: Object
Message Client Connection lib/stomp.rb Stomp dot/m_0_0.png

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Methods

abort   acknowledge   begin   close   commit   join   new   open   open?   register_receipt_listener   send   subscribe   unsubscribe  

Public Class methods

Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)

[Source]

     # File lib/stomp.rb, line 263
263:     def initialize user="", pass="", host="localhost", port=61613, reliable=false
264:       if user =~ /stomp:\/\/(\w+):(\d+)/
265:         user = ""
266:         pass = ""
267:         host = $1
268:         port = $2
269:         reliable = false
270:       elsif user =~ /stomp:\/\/(\w+):(\w+)@(\w+):(\d+)/
271:         user = $1
272:         pass = $2
273:         host = $3
274:         port = $4
275:         reliable = false
276:       end
277:       
278:       @id_mutex = Mutex.new
279:       @ids = 1
280:       @connection = Connection.open user, pass, host, port, reliable
281:       @listeners = {}
282:       @receipt_listeners = {}
283:       @running = true
284:       @replay_messages_by_txn = Hash.new
285:       @listener_thread = Thread.start do
286:         while @running
287:           message = @connection.receive
288:           case
289:           when message == NIL:
290:             break
291:           when message.command == 'MESSAGE': 
292:             if listener = @listeners[message.headers['destination']]
293:               listener.call(message)
294:             end
295:           when message.command == 'RECEIPT':
296:             if listener = @receipt_listeners[message.headers['receipt-id']]
297:               listener.call(message)
298:             end
299:           end
300:         end
301:       end
302:     end

Accepts a username (default ""), password (default ""), host (default localhost), and port (default 61613)

[Source]

     # File lib/stomp.rb, line 312
312:     def self.open user="", pass="", host="localhost", port=61613, reliable=false
313:       Client.new user, pass, host, port, reliable
314:     end

Public Instance methods

Abort a transaction by name

[Source]

     # File lib/stomp.rb, line 322
322:     def abort name, headers={}
323:       @connection.abort name, headers
324: 
325:       # lets replay any ack'd messages in this transaction      
326:       replay_list = @replay_messages_by_txn[name]
327:       if replay_list
328:         replay_list.each do |message| 
329:           if listener = @listeners[message.headers['destination']]
330:             listener.call(message)
331:           end
332:         end
333:       end
334:     end

Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp.rb, line 363
363:     def acknowledge message, headers={}
364:       txn_id = headers[:transaction]
365:       if txn_id
366:         # lets keep around messages ack'd in this transaction in case we rollback
367:         replay_list = @replay_messages_by_txn[txn_id]
368:         if replay_list == nil
369:           replay_list = []
370:           @replay_messages_by_txn[txn_id] = replay_list
371:         end
372:         replay_list << message
373:       end
374:       if block_given?
375:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
376:       end
377:       @connection.ack message.headers['message-id'], headers
378:     end

Begin a transaction by name

[Source]

     # File lib/stomp.rb, line 317
317:     def begin name, headers={}
318:       @connection.begin name, headers
319:     end

Close out resources in use by this client

[Source]

     # File lib/stomp.rb, line 399
399:     def close
400:       @connection.disconnect
401:       @running = false 
402:     end

Commit a transaction by name

[Source]

     # File lib/stomp.rb, line 337
337:     def commit name, headers={}
338:       txn_id = headers[:transaction]
339:       @replay_messages_by_txn.delete(txn_id)
340:       @connection.commit name, headers
341:     end

Join the listener thread for this client, generally used to wait for a quit signal

[Source]

     # File lib/stomp.rb, line 306
306:     def join
307:       @listener_thread.join
308:     end

Is this client open?

[Source]

     # File lib/stomp.rb, line 394
394:     def open?
395:       @connection.open?
396:     end

Send message to destination

If a block is given a receipt will be requested and passed to the block on receipt

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp.rb, line 386
386:     def send destination, message, headers = {}
387:       if block_given?
388:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
389:       end
390:       @connection.send destination, message, headers
391:     end

Subscribe to a destination, must be passed a block which will be used as a callback listener

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp.rb, line 347
347:     def subscribe destination, headers={}
348:       raise "No listener given" unless block_given?
349:       @listeners[destination] = lambda {|msg| yield msg}
350:       @connection.subscribe destination, headers
351:     end

Unsubecribe from a channel

[Source]

     # File lib/stomp.rb, line 354
354:     def unsubscribe name, headers={}
355:       @connection.unsubscribe name, headers
356:       @listeners[name] = nil
357:     end

Private Instance methods

[Source]

     # File lib/stomp.rb, line 405
405:     def register_receipt_listener listener
406:       id = -1
407:       @id_mutex.synchronize do
408:         id = @ids.to_s
409:         @ids = @ids.succ
410:       end
411:       @receipt_listeners[id] = listener
412:       id
413:     end

[Validate]