The config to pass to JSON.parse
The number of seconds to wait between IO loops
The maximum size of event messages
Create a new UDP collector
# File lib/observability/collector/timescale.rb, line 48
def initialize
super
@socket = UDPSocket.new
@db = nil
@cursor = nil
@processing = false
end
Read the next event from the socket
# File lib/observability/collector/timescale.rb, line 107
def read_next_event
self.log.debug "Reading next event."
data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false )
if data == :wait_readable
IO.select( [@socket], nil, nil, LOOP_TIMER )
return nil
elsif data.empty?
return nil
else
self.log.info "Read %d bytes" % [ data.bytesize ]
return JSON.parse( data )
end
end
Start receiving events.
# File lib/observability/collector/timescale.rb, line 63
def start
self.log.info "Starting up."
@db = Sequel.connect( self.class.db )
@db.extension( :pg_json )
# @cursor = @db[ :events ].prepare( :insert, :insert_new_event,
# time: :$time,
# type: :$type,
# version: :$version,
# data: :$data
# )
@socket.bind( self.class.host, self.class.port )
self.start_processing
end
Start consuming incoming events and storing them.
# File lib/observability/collector/timescale.rb, line 90
def start_processing
@processing = true
while @processing
event = self.read_next_event or next
self.log.debug "Read event: %p" % [ event ]
self.store_event( event )
end
end
Stop receiving events.
# File lib/observability/collector/timescale.rb, line 81
def stop
self.stop_processing
@cursor = nil
@db.disconnct
end
Stop consuming events.
# File lib/observability/collector/timescale.rb, line 101
def stop_processing
@processing = false
end
Store the specified event
.
# File lib/observability/collector/timescale.rb, line 124
def store_event( event )
time = event.delete('@timestamp')
type = event.delete('@type')
version = event.delete('@version')
# @cursor.call( time: time, type: type, version: version, data: event )
@db[ :events ].insert(
time: time,
type: type,
version: version,
data: Sequel.pg_json( event )
)
end