Class Stomp::Connection
In: lib/stomp.rb
Parent: Object
Message Client Connection lib/stomp.rb Stomp dot/m_0_0.png

Low level connection which maps commands and supports synchronous receives

Methods

__old_receive   _receive   _transmit   abort   ack   begin   closed?   commit   disconnect   new   open   open?   poll   receive   send   socket   subscribe   transmit   unsubscribe  

Public Class methods

Create a connection, requires a login and passcode. Can accept a host (default is localhost), and port (default is 61613) to connect to

[Source]

    # File lib/stomp.rb, line 33
33:     def initialize(login, passcode, host='localhost', port=61613, reliable=false, reconnectDelay=5)
34:       @host = host
35:       @port = port
36:       @login = login
37:       @passcode = passcode
38:       @transmit_semaphore = Mutex.new
39:       @read_semaphore = Mutex.new
40:       @socket_semaphore = Mutex.new
41:       @reliable = reliable
42:       @reconnectDelay = reconnectDelay
43:       @closed = FALSE
44:       @subscriptions = {}
45:       @failure = NIL
46:       socket
47:     end

[Source]

    # File lib/stomp.rb, line 26
26:     def Connection.open(login = "", passcode = "", host='localhost', port=61613, reliable=FALSE, reconnectDelay=5)
27:       Connection.new login, passcode, host, port, reliable, reconnectDelay        
28:     end

Public Instance methods

Receive a frame, block until the frame is received

[Source]

     # File lib/stomp.rb, line 156
156:     def __old_receive
157:       # The recive my fail so we may need to retry.
158:       while TRUE
159:         begin
160:           s = socket
161:           return _receive(s)
162:         rescue 
163:           @failure = $!;
164:           raise unless @reliable
165:           $stderr.print "receive failed: " + $!;
166:         end
167:       end
168:     end

Abort a transaction by name

[Source]

     # File lib/stomp.rb, line 106
106:     def abort name, headers={}
107:       headers[:transaction] = name
108:       transmit "ABORT", headers
109:     end

Acknowledge a message, used then a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

    # File lib/stomp.rb, line 94
94:     def ack message_id, headers={}
95:       headers['message-id'] = message_id
96:       transmit "ACK", headers
97:     end

Begin a transaction, requires a name for the transaction

[Source]

    # File lib/stomp.rb, line 85
85:     def begin name, headers={}
86:       headers[:transaction] = name
87:       transmit "BEGIN", headers
88:     end

Is this connection closed?

[Source]

    # File lib/stomp.rb, line 80
80:     def closed?
81:       @closed
82:     end

Commit a transaction by name

[Source]

     # File lib/stomp.rb, line 100
100:     def commit name, headers={}
101:       headers[:transaction] = name
102:       transmit "COMMIT", headers
103:     end

Close this connection

[Source]

     # File lib/stomp.rb, line 142
142:     def disconnect(headers = {})
143:       transmit "DISCONNECT", headers
144:     end

Is this connection open?

[Source]

    # File lib/stomp.rb, line 75
75:     def open?
76:       !@closed
77:     end

Return a pending message if one is available, otherwise return nil

[Source]

     # File lib/stomp.rb, line 148
148:     def poll
149:       @read_semaphore.synchronize do
150:         return nil if @socket==NIL or !@socket.ready?
151:         return receive
152:       end
153:     end

[Source]

     # File lib/stomp.rb, line 170
170:     def receive
171:       super_result = __old_receive()
172:       if super_result.nil? && @reliable
173:         $stderr.print "connection.receive returning EOF as nil - resetting connection.\n"
174:         @socket = nil
175:         super_result = __old_receive()
176:       end         
177:       return super_result
178:     end

Send message to destination

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp.rb, line 136
136:     def send(destination, message, headers={})
137:       headers[:destination] = destination
138:       transmit "SEND", headers, message
139:     end

[Source]

    # File lib/stomp.rb, line 49
49:     def socket
50:       # Need to look into why the following synchronize does not work.
51:       #@read_semaphore.synchronize do
52:         s = @socket;
53:         while s == NIL or @failure != NIL
54:           @failure = NIL
55:           begin
56:             s = TCPSocket.open @host, @port
57:             _transmit(s, "CONNECT", {:login => @login, :passcode => @passcode})
58:             @connect = _receive(s)                        
59:             # replay any subscriptions.
60:             @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) }
61:           rescue 
62:             @failure = $!;
63:             s=NIL;
64:             raise unless @reliable
65:             $stderr.print "connect failed: " + $! +" will retry in #{@reconnectDelay}\n";                
66:             sleep(@reconnectDelay);
67:           end
68:         end
69:         @socket = s
70:         return s;
71:       #end
72:     end

Subscribe to a destination, must specify a name

[Source]

     # File lib/stomp.rb, line 112
112:     def subscribe(name, headers = {}, subId=NIL)
113:       headers[:destination] = name
114:       transmit "SUBSCRIBE", headers
115:       
116:       # Store the sub so that we can replay if we reconnect.
117:       if @reliable
118:         subId = name if subId==NIL
119:         @subscriptions[subId]=headers
120:       end
121:     end

Unsubscribe from a destination, must specify a name

[Source]

     # File lib/stomp.rb, line 124
124:     def unsubscribe(name, headers = {}, subId=NIL)
125:       headers[:destination] = name
126:       transmit "UNSUBSCRIBE", headers
127:       if @reliable
128:         subId = name if subId==NIL
129:         @subscriptions.delete(subId)
130:       end
131:     end

Private Instance methods

[Source]

     # File lib/stomp.rb, line 181
181:     def _receive( s )
182:       line = ' '
183:       @read_semaphore.synchronize do
184:         line = s.gets while line =~ /^\s*$/
185:         return NIL if line == NIL
186:         Message.new do |m|
187:           m.command = line.chomp
188:           m.headers = {}
189:           until (line = s.gets.chomp) == ''
190:             k = (line.strip[0, line.strip.index(':')]).strip
191:             v = (line.strip[line.strip.index(':') + 1, line.strip.length]).strip
192:             m.headers[k] = v
193:           end
194:           
195:           if (m.headers['content-length'])
196:             m.body = s.read m.headers['content-length'].to_i
197:             c = RUBY_VERSION > '1.9' ? s.getc.ord : s.getc
198:             raise "Invalid content length received" unless c == 0
199:           else
200:             m.body = ''
201:             if RUBY_VERSION > '1.9'
202:               until (c = s.getc.ord) == 0
203:                 m.body << c.chr
204:               end
205:             else
206:               until (c = s.getc) == 0
207:                 m.body << c.chr
208:               end
209:             end
210:           end
211:           #c = s.getc
212:           #raise "Invalid frame termination received" unless c == 10
213:         end
214:       end
215:     end

[Source]

     # File lib/stomp.rb, line 234
234:     def _transmit(s, command, headers={}, body='')
235:       @transmit_semaphore.synchronize do
236:         s.puts command
237:         headers.each {|k,v| s.puts "#{k}:#{v}" }
238:         s.puts "content-length: #{body.length}"
239:         s.puts "content-type: text/plain; charset=UTF-8"
240:         s.puts
241:         s.write body
242:         s.write "\0"
243:       end
244:     end

[Source]

     # File lib/stomp.rb, line 218
218:     def transmit(command, headers={}, body='')
219:       # The transmit my fail so we may need to retry.
220:       while TRUE
221:         begin
222:           s = socket
223:           _transmit(s, command, headers, body)
224:           return
225:         rescue
226:           @failure = $!;
227:           raise unless @reliable
228:           $stderr.print "transmit failed: " + $!+"\n";
229:         end
230:       end
231:     end

[Validate]