The Mongrel2
connection class. Connection
objects serve as a front end for the ZMQ sockets which talk to the mongrel2 server/s for your handler. It receives TNetString requests and wraps Mongrel2::Request
objects around them, and then encodes and sends Mongrel2::Response
objects back to the server.
The application's identifier string that associates it with its route
True if the Connection
to the Mongrel2
server has been closed.
The ZMQ socket identity used by this connection
The connection's publication (response) socket address
The connection's subscription (request) socket address
Create a new Connection
identified by app_id
(a UUID or other unique string) that will connect to a Mongrel2
server on the sub_addr
and pub_addr
(e.g., 'tcp://127.0.0.1:9998').
# File lib/mongrel2/connection.rb, line 29
def initialize( app_id, sub_addr, pub_addr )
@app_id = app_id
@sub_addr = sub_addr
@pub_addr = pub_addr
@request_sock = @response_sock = nil
@identifier = make_identifier( app_id )
@closed = false
end
Send the given data
to one or more connected clients identified by client_ids
via the server specified by sender_id
. The client_ids
should be an Array of Integer IDs no longer than Mongrel2::MAX_IDENTS.
# File lib/mongrel2/connection.rb, line 156
def broadcast( sender_id, conn_ids, data )
idlist = conn_ids.flatten.map( &:to_s ).join( ' ' )
self.send( sender_id, idlist, data )
end
Tell the server associated with sender_id
to close the connections associated with conn_ids
.
# File lib/mongrel2/connection.rb, line 188
def broadcast_close( sender_id, *conn_ids )
self.broadcast( sender_id, conn_ids.flatten, '' )
end
Send the given data
to one or more connected clients identified by client_ids
via the server specified by sender_id
as an extended reply of type response_type
. The client_ids
should be an Array of Integer IDs no longer than Mongrel2::MAX_IDENTS.
# File lib/mongrel2/connection.rb, line 166
def broadcast_extended( sender_id, conn_ids, response_type, *data )
idlist = conn_ids.flatten.map( &:to_s ).join( ' ' )
self.send_extended( sender_id, idlist, response_type, *data )
end
Close both of the sockets and mark the Connection
as closed.
# File lib/mongrel2/connection.rb, line 194
def close
return if self.closed?
self.closed = true
if @request_sock
@request_sock.options.linger = 0
@request_sock.close
end
if @response_sock
@response_sock.options.linger = 0
@response_sock.close
end
end
Returns true
if the connection to the Mongrel2
server has been closed.
# File lib/mongrel2/connection.rb, line 209
def closed?
return @closed
end
Establish both connections to the Mongrel2
server.
# File lib/mongrel2/connection.rb, line 67
def connect
self.log.info "Connecting PULL request socket (%s)" % [ self.sub_addr ]
@request_sock = CZTop::Socket::PULL.new
@request_sock.connect( self.sub_addr )
self.log.info "Connecting PUB response socket (%s)" % [ self.pub_addr ]
@response_sock = CZTop::Socket::PUB.new
@response_sock.connect( self.pub_addr )
end
Copy constructor – don't keep the original
's sockets or closed state.
# File lib/mongrel2/connection.rb, line 42
def initialize_copy( original )
@request_sock = @response_sock = nil
@closed = false
end
Returns a string containing a human-readable representation of the Connection
, suitable for debugging.
# File lib/mongrel2/connection.rb, line 226
def inspect
state = if @request_sock
if self.closed?
"closed"
else
"connected"
end
else
"not connected"
end
return "#<%p:0x%016x %s (%s)>" % [
self.class,
self.object_id * 2,
self.to_s,
state,
]
end
Fetch the next request from the server as a Mongrel2::Request
object.
# File lib/mongrel2/connection.rb, line 109
def receive
raw_req = self.recv
self.log.debug "Receive: parsing raw request: %d bytes" % [ raw_req.bytesize ]
return Mongrel2::Request.parse( raw_req )
end
Fetch the next request from the server as raw TNetString data.
# File lib/mongrel2/connection.rb, line 97
def recv
self.check_closed
self.log.debug "Fetching next request (PULL)"
message = self.request_sock.receive
data = message.pop
self.log.debug " got %d bytes of %s request data" % [ data.bytesize, data.encoding.name ]
return data
end
Write the specified response
(Mongrel::Response object) to the requester.
# File lib/mongrel2/connection.rb, line 140
def reply( response )
response.each_chunk do |data|
self.send( response.sender_id, response.conn_id, data )
end
if response.extended_reply?
self.log.debug "Response also includes an extended reply."
data = response.extended_reply_data
filter = response.extended_reply_filter
self.send_extended( response.sender_id, response.conn_id, filter, *data )
end
end
Tell the server to close the connection associated with the given request_or_response
.
# File lib/mongrel2/connection.rb, line 181
def reply_close( request_or_response )
self.send_close( request_or_response.sender_id, request_or_response.conn_id )
end
Fetch the ZMQ::PULL socket for incoming requests, establishing the connection to Mongrel if it hasn't been already.
# File lib/mongrel2/connection.rb, line 80
def request_sock
self.check_closed
self.connect unless @request_sock
return @request_sock
end
Fetch the ZMQ::PUB socket for outgoing responses, establishing the connection to Mongrel if it hasn't been already.
# File lib/mongrel2/connection.rb, line 89
def response_sock
self.check_closed
self.connect unless @response_sock
return @response_sock
end
Write raw data
to the given connection ID (conn_id
) at the given sender_id
.
# File lib/mongrel2/connection.rb, line 117
def send( sender_id, conn_id, data )
self.check_closed
header = "%s %d:%s," % [ sender_id, conn_id.to_s.length, conn_id ]
buf = header + ' ' + data
self.log.debug "Sending response (PUB)"
self.response_sock << buf
self.log.debug " done with send (%d bytes)" % [ buf.bytesize ]
end
Tell the server to close the connection associated with the given sender_id
and conn_id
.
# File lib/mongrel2/connection.rb, line 174
def send_close( sender_id, conn_id )
self.log.info "Sending kill message to connection %d" % [ conn_id ]
self.send( sender_id, conn_id, '' )
end
Write raw data
to the given connection ID (conn_id
) at the specified sender_id
as an extended response of type response_type
.
# File lib/mongrel2/connection.rb, line 129
def send_extended( sender_id, conn_id, response_type, *data )
self.check_closed
self.log.debug "Sending response with %s extended reply (PUB)"
header = "%s %d:X %s," % [ sender_id, conn_id.to_s.length + 2, conn_id ]
buf = header + ' ' + TNetstring.dump( [response_type] + data )
self.response_sock << buf
self.log.debug " done with send (%d bytes)" % [ buf.bytesize ]
end
Return a string describing the connection.
# File lib/mongrel2/connection.rb, line 215
def to_s
return "{%s} %s <-> %s" % [
self.app_id,
self.sub_addr,
self.pub_addr,
]
end
Check to be sure the Connection
hasn't been closed, raising a Mongrel2::ConnectionError
if it has.
# File lib/mongrel2/connection.rb, line 257
def check_closed
raise Mongrel2::ConnectionError, "operation on closed Connection" if self.closed?
end