Timescale

class
Superclass
Observability::Collector

Constants

JSON_CONFIG

The config to pass to JSON.parse

LOOP_TIMER

The number of seconds to wait between IO loops

MAX_EVENT_BYTES

The maximum size of event messages

Public Class Methods

anchor
new()

Create a new UDP collector

# File lib/observability/collector/timescale.rb, line 48
def initialize
        super

        @socket     = UDPSocket.new
        @db         = nil
        @cursor     = nil
        @processing = false
end

Public Instance Methods

anchor
read_next_event()

Read the next event from the socket

# File lib/observability/collector/timescale.rb, line 107
def read_next_event
        self.log.debug "Reading next event."
        data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false )

        if data == :wait_readable
                IO.select( [@socket], nil, nil, LOOP_TIMER )
                return nil
        elsif data.empty?
                return nil
        else
                self.log.info "Read %d bytes" % [ data.bytesize ]
                return JSON.parse( data )
        end
end
anchor
start()

Start receiving events.

# File lib/observability/collector/timescale.rb, line 63
def start
        self.log.info "Starting up."
        @db = Sequel.connect( self.class.db )
        @db.extension( :pg_json )
        # @cursor = @db[ :events ].prepare( :insert, :insert_new_event,
        #      time: :$time,
        #      type: :$type,
        #      version: :$version,
        #      data: :$data
        # )

        @socket.bind( self.class.host, self.class.port )

        self.start_processing
end
anchor
start_processing()

Start consuming incoming events and storing them.

# File lib/observability/collector/timescale.rb, line 90
def start_processing
        @processing = true
        while @processing
                event = self.read_next_event or next
                self.log.debug "Read event: %p" % [ event ]
                self.store_event( event )
        end
end
anchor
stop()

Stop receiving events.

# File lib/observability/collector/timescale.rb, line 81
def stop
        self.stop_processing

        @cursor = nil
        @db.disconnct
end
anchor
stop_processing()

Stop consuming events.

# File lib/observability/collector/timescale.rb, line 101
def stop_processing
        @processing = false
end
anchor
store_event( event )

Store the specified event.

# File lib/observability/collector/timescale.rb, line 124
def store_event( event )
        time    = event.delete('@timestamp')
        type    = event.delete('@type')
        version = event.delete('@version')

        # @cursor.call( time: time, type: type, version: version, data: event )
        @db[ :events ].insert(
                 time: time,
                 type: type,
                 version: version,
                 data: Sequel.pg_json( event )
        )
end