Loggability::LogDevice::

Http class

This is the a generalized class that allows its subclasses to send log messages to HTTP endpoints asynchronously on a separate thread.

Constants

DEFAULT_BATCH_INTERVAL

The default number of seconds between batches

DEFAULT_ENDPOINT

The default HTTP endpoint URL to send logs to

DEFAULT_EXECUTOR_CLASS

The default Executor class to use for asynchronous tasks

DEFAULT_MAX_BATCH_SIZE

The default maximum number of messages that can be sent to the server in a single payload

DEFAULT_MAX_MESSAGE_BYTESIZE

The default max size in bytes for a single message.

DEFAULT_MAX_QUEUE_BYTESIZE

The default for the maximum bytesize of the queue (1 GB)

DEFAULT_OPTIONS

The default options for new instances

DEFAULT_WRITE_TIMEOUT

The default number of seconds to wait for data to be written before timing out

Attributes

batch_interval R

Number of seconds after the task completes before the task is performed again.

endpoint R

The URI of the endpoint to send messages to

executor R

The single thread pool executor

executor_class R

The Concurrent executor class to use for asynchronous tasks

last_send_time RW

The monotonic clock time when the last batch of logs were sent

logs_queue R

The Queue that contains any log messages which have not yet been sent to the logging service.

logs_queue_bytesize RW

The size of logs_queue in bytes

max_batch_bytesize R

The maximum number of bytes that will be included in a single POST

max_batch_size R

The maximum number of messages to post at one time

max_message_bytesize R

The maximum number of bytes of a single message to include in a batch

max_queue_bytesize R

The max bytesize of the queue. Will not queue more messages if this threshold is hit

timer_task R

The timer task thread

write_timeout R

How many seconds to wait for data to be written while sending a batch

Public Class Methods

new( endpoint=DEFAULT_ENDPOINT, opts={} )

Initialize the HTTP log device to send to the specified endpoint with the given options. Valid options are:

:batch_interval

Maximum number of seconds between batches

:write_timeout

How many seconds to wait for data to be written while sending a batch

:max_batch_size

The maximum number of messages that can be in a single batch

:max_batch_bytesize

The maximum number of bytes that can be in the payload of a single batch

:max_message_bytesize

The maximum number of bytes that can be in a single message

:executor_class

The Concurrent executor class to use for asynchronous tasks.

# File lib/loggability/log_device/http.rb, line 64
def initialize( endpoint=DEFAULT_ENDPOINT, opts={} )
        if endpoint.is_a?( Hash )
                opts = endpoint
                endpoint = DEFAULT_ENDPOINT
        end

        opts = DEFAULT_OPTIONS.merge( opts )

        @endpoint             = URI( endpoint ).freeze
        @logs_queue           = Queue.new

        @logs_queue_bytesize  = 0
        @max_queue_bytesize   = opts[:max_queue_bytesize] || DEFAULT_MAX_QUEUE_BYTESIZE
        @batch_interval       = opts[:batch_interval] || DEFAULT_BATCH_INTERVAL
        @write_timeout        = opts[:write_timeout] || DEFAULT_WRITE_TIMEOUT
        @max_batch_size       = opts[:max_batch_size] || DEFAULT_MAX_BATCH_SIZE
        @max_message_bytesize = opts[:max_message_bytesize] || DEFAULT_MAX_MESSAGE_BYTESIZE
        @executor_class       = opts[:executor_class] || DEFAULT_EXECUTOR_CLASS

        @max_batch_bytesize   = opts[:max_batch_bytesize] || @max_batch_size * @max_message_bytesize
        @last_send_time       = Concurrent.monotonic_time
end

Public Instance Methods

batch_ready?()

Returns true if a batch of logs is ready to be sent.

# File lib/loggability/log_device/http.rb, line 237
def batch_ready?
        seconds_since_last_send = Concurrent.monotonic_time - self.last_send_time

        return self.logs_queue.size >= self.max_batch_size ||
                seconds_since_last_send >= self.batch_interval
end
Also aliased as: has_batch_ready?
close()

LogDevice API – stop the batch thread and close the http connection

# File lib/loggability/log_device/http.rb, line 162
def close
        self.stop
        self.http_client.finish
rescue IOError
        # ignore it since http session has not yet started.
end
format_log_message( message )

Returns the given message in whatever format individual log messages are expected to be in by the service. The default just returns the stringified message. This executes inside the sending thread.

# File lib/loggability/log_device/http.rb, line 286
def format_log_message( message )
        return message.to_s[ 0 ... self.max_message_bytesize ].dump
end
get_next_log_payload()

Dequeue pending log messages to send to the service and return them as a suitably-encoded String. The default is a JSON Array. This executes inside the sending thread.

# File lib/loggability/log_device/http.rb, line 260
def get_next_log_payload
        buf = []
        count = 0
        bytes = 0

        # Be conservative so as not to overflow
        max_size = self.max_batch_bytesize - self.max_message_bytesize - 2 # for the outer Array

        while count < self.max_batch_size && bytes < max_size && !self.logs_queue.empty?
                message = self.logs_queue.deq
                formatted_message = self.format_log_message( message )
                self.logs_queue_bytesize -= message.bytesize

                count += 1
                bytes += formatted_message.bytesize + 3 # comma and delimiters

                buf << formatted_message
        end

        return '[' + buf.join(',') + ']'
end
has_batch_ready?()
Alias for: batch_ready?
http_client()

sets up a configured http object ready to instantiate connections

# File lib/loggability/log_device/http.rb, line 292
def http_client
        return @http_client ||= begin
                uri = URI( self.endpoint )

                http = Net::HTTP.new( uri.host, uri.port )
                http.write_timeout = self.write_timeout

                if uri.scheme == 'https'
                        http.use_ssl = true
                        http.verify_mode = OpenSSL::SSL::VERIFY_PEER
                end

                http
        end
end
make_batch_request()

Returns a new HTTP request (a subclass of Net::HTTPRequest) suitable for sending the next batch of logs to the service. Defaults to a POST of JSON data. This executes inside the sending thread.

# File lib/loggability/log_device/http.rb, line 249
def make_batch_request
        request = Net::HTTP::Post.new( self.endpoint.path )
        request[ 'Content-Type' ] = 'application/json'

        return request
end
running?()

Returns true if the device has started sending messages to the logging endpoint.

# File lib/loggability/log_device/http.rb, line 178
def running?
        return self.executor&.running?
end
send_logs()

Sends a batch of log messages to the logging service. This executes inside the sending thread.

# File lib/loggability/log_device/http.rb, line 215
def send_logs
        self.executor.post do
                if self.batch_ready?
                        # p "Batch ready; sending."
                        request = self.make_batch_request
                        request.body = self.get_next_log_payload

                        # p "Sending request", request

                        self.http_client.request( request ) do |res|
                                p( res ) if $DEBUG
                        end

                        self.last_send_time = Concurrent.monotonic_time
                else
                        # p "Batch not ready yet."
                end
        end
end
start()

Starts a thread pool with a single thread.

# File lib/loggability/log_device/http.rb, line 171
def start
        self.start_executor
        self.start_timer_task
end
start_executor()

Start the background thread that sends messages.

# File lib/loggability/log_device/http.rb, line 199
def start_executor
        @executor = self.executor_class.new
        @executor.auto_terminate = true unless @executor.serialized?
end
start_timer_task()

Create a timer task that calls that sends logs at regular interval

# File lib/loggability/log_device/http.rb, line 206
def start_timer_task
        @timer_task = Concurrent::TimerTask.execute( execution_interval: self.batch_interval ) do
                self.send_logs
        end
end
stop()

Shutdown the executor, which is a pool of single thread waits 3 seconds for shutdown to complete

# File lib/loggability/log_device/http.rb, line 185
def stop
        return unless self.running?

        self.timer_task.shutdown if self.timer_task&.running?
        self.executor.shutdown

        unless self.executor.wait_for_termination( 3 )
                self.executor.halt
                self.executor.wait_for_termination( 3 )
        end
end
write( message )

LogDevice API – write a message to the HTTP device.

# File lib/loggability/log_device/http.rb, line 147
def write( message )
        self.start unless self.running?
        if message.is_a?( Hash )
                message_size = message.to_json.bytesize
        else
                message_size = message.bytesize
        end
        return if ( self.logs_queue_bytesize + message_size ) >= self.max_queue_bytesize
        self.logs_queue_bytesize += message_size
        self.logs_queue.enq( message )
        self.send_logs
end