Client Notifications
In the previous tutorial we learned how to scale our workers. Now we will move to a different use case. Imagine that your service is publishing messages. Some of them are periodical (real-time status updates), some are sent as a reaction to particular events. Such case is very common for queue systems. You can read about it in the introduction to RabbitMQ Concepts.
Let's focus on the following scenario:
- Many applications are producing log files.
- The service is collecting information about logs and performs analyses.
- Important log entries (ex. fatals and errors) are published to the components that react to them (ex. Email or SMS notifications)
- The status information (number of analyzed entries) is published on the public channel periodically (ex. every 10s)
Log Analyzer
The log analyzer service waiting for log files to be processed is nothing new for us. We will start with the controller implementation that reacts to the single message with the path to the log file.
The message definition is trivial:
module LogAnalyzer
class NewLog < Tochtli::Message
route_to 'log.analyzer.new'
attribute :path, String
end
end
The initial controller code will look like this:
module LogAnalyzer
class LogController < Tochtli::BaseController
bind 'log.analyzer.*'
on NewLog, :create
def create
parser = LogParser.new(message.path)
parser.each do |event|
severity = event[:severity]
...
end
end
end
end
Think about LogParser
as a simple tool that enumerates each log entry as a hash with keys: :severity
, :timestamp
and :message
.
You can find its implementation in the file server.rb.
That was simple. The next requirement states that important entries are published to the other components.
We will achieve it with the reverse communication from server to client.
This time the server will act as a publisher and client used by the component listening to events as a subscriber.
The simplest way to publish a message in Tochtli is to implement Tochtli::BaseClient
child class.
Yes, it's true. The server code is going to have client class implementation.
The significant entry would be published with the message:
module LogAnalyzer
class EventOccurred < Tochtli::Message
route_to { "log.events.#{severity}" }
attribute :severity, String
attribute :timestamp, Time
attribute :message, String
end
end
The message attributes (:severity
, :timestamp
and :message
) are obvious but what about the routing key?
This time we will use different topics for different event severities to allow components to listen only to selected events.
Tochtli::Message
allows to define routing key by passing block to the route_to
directive.
OK, the message is ready. Now the client class:
module LogAnalyzer
class EventNotifier < Tochtli::BaseClient
SIGNIFICANT_SEVERITIES = [:fatal, :error, :warn]
def self.significant?(severity)
SIGNIFICANT_SEVERITIES.include?(severity)
end
def notify(event)
publish EventOccurred.new(event)
end
end
end
The core LogAnalyzer::EventNotifier
method is notify
that accepts event parameters and publishes EventOccurred
message.
Additionally, the client class determines if the event severity is significant?
.
Let's update the server code.
module LogAnalyzer
class LogController < Tochtli::BaseController
bind 'log.analyzer.*'
on NewLog, :create
def create
parser = LogParser.new(message.path)
notifier = EventNotifier.new(self.rabbit_connection)
parser.each do |event|
severity = event[:severity]
notifier.notify event if EventNotifier.significant?(severity)
end
end
end
end
# Well known code that starts the server
Tochtli::ControllerManager.setup
Tochtli::ControllerManager.start
trap('SIGINT') { exit }
at_exit { Tochtli::ControllerManager.stop }
puts 'Press Ctrl-C to stop worker...'
sleep
EventNotifier
is initialized on every processed log file (the client class is lightweight) with the Tochtli::RabbitConnection
instance.
The connection instance can be accessed with the Tochtli::BaseController#rabbit_connection
method. The client usage is known.
Client
Before the first live test we have to implement the client that would be used by log producers.
module LogAnalyzer
class Client < Tochtli::BaseClient
def send_new_log(path)
publish NewLog.new(path: path)
end
end
end
client = LogAnalyzer::Client.new
client.send_new_log ARGV[0]
It was simple, wasn't it?
The First Test
At first let's start the server with the command: bundle exec ruby server.rb
and client: bundle exec ruby client.rb sample.log
for
the log file sample.log
:
# Logfile created on 2015-08-10 10:06:45 +0200 by logger.rb/47272
D, [2015-08-10T10:07:02.366427 #4624] DEBUG -- : Sample debug
D, [2015-08-10T10:07:02.366514 #4624] DEBUG -- : Sample debug
I, [2015-08-10T10:07:02.366542 #4624] INFO -- : Sample info
D, [2015-08-10T10:07:02.366565 #4624] DEBUG -- : Sample debug
W, [2015-08-10T10:07:02.366615 #4624] WARN -- : Sample warn
I, [2015-08-10T10:07:02.366638 #4624] INFO -- : Sample info
D, [2015-08-10T10:07:02.366659 #4624] DEBUG -- : Sample debug
D, [2015-08-10T10:07:02.366679 #4624] DEBUG -- : Sample debug
I, [2015-08-10T10:07:02.366699 #4624] INFO -- : Sample info
I, [2015-08-10T10:07:02.366720 #4624] INFO -- : Sample info
E, [2015-08-10T10:07:02.366741 #4624] ERROR -- : Sample error
D, [2015-08-10T10:07:02.366761 #4624] DEBUG -- : Sample debug
D, [2015-08-10T10:07:02.366782 #4624] DEBUG -- : Sample debug
I assume that you set Tochtli.logger
to the tochtli.log
file for both client and server. That we can review the log output:
I, [2015-08-11T15:44:03.867756 #57253] INFO -- SERVER: Starting LogAnalyzer::LogController...
D, [2015-08-11T15:44:06.938811 #57255] DEBUG -- CLIENT: [2015-08-11 15:44:06 +0200 AMQP] Publishing message 2b3d5a86-c8b0-4051-8a41-076d9faf260a to log.analyzer.new
D, [2015-08-11T15:44:06.945843 #57253] DEBUG -- SERVER:
AMQP Message LogAnalyzer::NewLog at 2015-08-11 15:44:06 +0200
D, [2015-08-11T15:44:06.945919 #57253] DEBUG -- SERVER: Processing by LogAnalyzer::LogController#create [Thread: 70282728082940]
D, [2015-08-11T15:44:06.945993 #57253] DEBUG -- SERVER: Message: {:path=>"sample.log"}.
D, [2015-08-11T15:44:06.946041 #57253] DEBUG -- SERVER: Properties: {:content_type=>"application/json", :delivery_mode=>2, :priority=>0, :reply_to=>"amq.gen-InLd6PjsrZ_Du4cC0neKYQ", :message_id=>"2b3d5a86-c8b0-4051-8a41-076d9faf260a", :timestamp=>2015-08-11 15:44:06 +0200, :type=>"log_analyzer::new_log"}.
D, [2015-08-11T15:44:06.946065 #57253] DEBUG -- SERVER: Delivery info: exchange: puzzleflow.services, routing_key: log.analyzer.new.
D, [2015-08-11T15:44:06.947676 #57253] DEBUG -- SERVER: [2015-08-11 15:44:06 +0200 AMQP] Publishing message 93eb45e9-6c4d-4e50-8972-8714600e401e to log.events.warn
D, [2015-08-11T15:44:06.955029 #57253] DEBUG -- SERVER: [2015-08-11 15:44:06 +0200 AMQP] Publishing message 13309bc2-f2a3-4cc7-93f9-df5e0fc4d20b to log.events.error
E, [2015-08-11T15:44:06.955795 #57253] ERROR -- SERVER: [Tochtli::ReplyQueue] Unexpected message delivery '93eb45e9-6c4d-4e50-8972-8714600e401e':
#<Tochtli::MessageDropped: Message 93eb45e9-6c4d-4e50-8972-8714600e401e dropped: NO_ROUTE [312]>)
E, [2015-08-11T15:44:06.956176 #57253] ERROR -- SERVER: [Tochtli::ReplyQueue] Unexpected message delivery '13309bc2-f2a3-4cc7-93f9-df5e0fc4d20b':
#<Tochtli::MessageDropped: Message 13309bc2-f2a3-4cc7-93f9-df5e0fc4d20b dropped: NO_ROUTE [312]>)
D, [2015-08-11T15:44:06.956364 #57253] DEBUG -- SERVER: Message 2b3d5a86-c8b0-4051-8a41-076d9faf260a processed in 10.5ms.
The single message was published by the client on topic log.analyzer.new
with the path to sample.log
.
The service controller LogAnalyzer::LogController
processed it with create
action.
During log analyzes 2 messages were published on topics log.events.warn
and log.events.error
.
That's correct. Our sample.log
contained only 2 significant entries (warn and error).
Irritating in the tochtli.log
are entries about dropped messages.
Both event messages were dropped because there was no queue for them to route to.
We haven't implemented listeners yet, so nobody is waiting for notifications.
Tochtli publishes all messages with mandatory
flag enabled by default.
To get rid of the redundant log errors we have to change the flag in the LogAnalyzer::EventNotifier#notify
method:
module LogAnalyzer
def notify(event)
publish EventOccurred.new(event), mandatory: false
end
end
end
Events Listener
We are ready to introduce the events listener to the client. How can the client listen to the log events? It needs to subscribe to the queue where events are published. In Tochtli the only way to do so is to create a controller. More precisely, our client will have it's own controller (our service already has a client class for publishing messages).
module LogAnalyzer
class EventsController < Tochtli::BaseController
on EventOccurred, :handle, routing_key: 'log.events.*'
def handle
handler.call(message.severity, message.timestamp, message.message)
end
protected
def handler
raise "Internal Error: handler not set for EventsController" unless env.has_key?(:handler)
env[:handler]
end
end
end
The LogAnalyzer::EventsController
accepts only EventOccurred
message and processes it with method handle
.
The message has custom routing depending on severity, therefore we need to specify routing key in on
directive.
The routing key defined with on
is used by Tochtli dispatcher to find the proper message class and controller method.
As you can see the '*' and '#' characters are accepted in the routing key.
In the method handle
the handler
is used. This is the first time we directly use controller environment (env
).
Usually environment variables are referred indirectly. For ex. env[:message]
is referred by message
accessor.
This time the custom variable :handler
is used. Where does it come from? We will see in the moment.
First we need to see how the controller would be started in the client.
Tochtli layered structure does not allow the application to operate on controller layer. The client class should provide the required functionality. Let's add next client method.
module LogAnalyzer
class Client < Tochtli::BaseClient
def react_on_events(client_id, severities, handler=nil, &block)
handler = block unless handler
severities = Array(severities)
routing_keys = severities.map {|severity| "log.events.#{severity}" }
Tochtli::ControllerManager.start EventsController,
queue_name: "log_analyzer/events/#{client_id}", # custom queue name
routing_keys: routing_keys, # make custom binding (only selected severities)
env: { handler: handler }
end
end
end
The log analyzer service client exposes the new API method react_on_events
that allows to bind the handler (Proc or block)
with events with given severities. To achieve that the EventsController
is started and bound with the new RabbitMQ queue.
We cannot use single queue for all clients. Each client should have its own queue to allow for event messages broadcasting.
The client_id
argument is used to create a queue name. The auto generated name won't be a solution because we want to have
a persistent queue that will collect events even when application component is turned off.
The selection of events is done with routing keys calculated from event severities.
Normally, the controller queue binding is set for a controller with bind
directive.
Tochtli controller manager allows to setup custom binding during start with :routing_keys
option.
The last option passed to the controller is :env
. That's the answer to the previous question
about source of custom controller environment variables.
The last step for now is to rewrite the client runner code. We already have 2 API methods: send_new_log
and react_on_events
.
client = LogAnalyzer::Client.new
command = ARGV[0]
def hold
puts 'Press Ctrl-C to stop...'
sleep
end
case command
when 's'
client.send_new_log ARGV[1]
when 'c'
client.react_on_events ARGV[1], [:fatal, :error], lambda {|severity, timestamp, message|
puts "[#{timestamp}] Got #{severity}: #{message}"
}
puts 'Press Ctrl-C to stop...'
sleep
else
puts "Unknown command: #{command.inspect}"
puts
puts "Usage: bundle exec ruby client [command] [params]"
puts
puts "Commands:"
puts " s [path] - send log from file to server"
puts " c [client ID] - catch fatal and error events"
end
To start the event listener run the command bundle exec ruby client.rb c client-001
.
When server is started, publish new logs with command bundle exec ruby client.rb s sample.log
.
You should see the output like:
[2015-08-10 10:07:02 +0200] Got error: Sample error
[2015-08-10 10:07:02 +0200] Got fatal: Sample fatal
[2015-08-10 10:07:02 +0200] Got error: Sample error
[2015-08-10 10:07:02 +0200] Got error: Sample error
[2015-08-10 10:07:02 +0200] Got error: Sample error
[2015-08-10 10:07:02 +0200] Got fatal: Sample fatal
...
System Monitor
The last uncovered requirement is periodical (every 10s) publication of status information on public channel. This is very common case for monitoring systems. As always at first we will introduce the message definition.
module LogAnalyzer
class CurrentStatus < Tochtli::Message
route_to 'log.status'
attribute :fatal, Integer, default: 0
attribute :error, Integer, default: 0
attribute :warn, Integer, default: 0
attribute :info, Integer, default: 0
attribute :debug, Integer, default: 0
attribute :timestamp, Time
end
end
It consists of attributes containing the number of messages with a related severity (fatal, error, ...) and the timestamp. To be able to publish this message in server code we would extend the existing server's client class.
module LogAnalyzer
class EventNotifier < Tochtli::BaseClient
def update_status(status)
publish CurrentStatus.new(status), mandatory: false
end
end
end
The update_status
method published non mandatory message (as previously) because the situation when no listener is active is normal.
The next step is implementation of the core status monitor that gathers statistics and publish status information periodically.
module LogAnalyzer
class StatusMonitor
include MonitorMixin
def initialize(rabbit_connection)
super()
@notifier = EventNotifier.new(rabbit_connection)
@status = Hash.new(0)
end
def start
Thread.new(&method(:monitor))
end
def note(severity)
synchronize do
@status[severity] += 1
end
end
protected
def reset_status
synchronize do
status = @status
@status = Hash.new(0)
status
end
end
def monitor
loop do
current_status = reset_status
current_status[:timestamp] = Time.now
@notifier.update_status current_status
sleep 10
end
end
end
end
In the above code there are 2 interesting methods. The first start
run the new monitoring thread.
The second note
updates the current monitor statistics. Synchronization is added for thread safety.
Internally the EventNotifier
client class, initialized with the rabbit connection, is used to publish periodical messages.
Finally, we have to find the nice place to start the monitor and glue it with the server's log analyzer.
To do so we would use the Tochtli::BaseController
hooks. Each controller class has access to the following hooks:
before_setup
- run at very beginning before the controller connection is setupafter_setup
- run after initial setupbefore_start
- run right before the controller queue is created and boundafter_start
- run after queue preparationbefore_restart
- run before the restart actionafter_restart
- run after the queue binding and handlers restart
For our case the after_setup
hook is suitable.
We have an access to the rabbit connection then which is required to initialize the EventNotifier
client object.
module LogAnalyzer
class LogController < Tochtli::BaseController
bind 'log.analyzer.*'
on NewLog, :create
cattr_accessor :monitor
after_setup do |rabbit_connection|
self.monitor = StatusMonitor.new(rabbit_connection)
self.monitor.start
end
def create
parser = LogParser.new(message.path)
notifier = EventNotifier.new(self.rabbit_connection)
parser.each do |event|
severity = event[:severity]
notifier.notify event if EventNotifier.significant?(severity)
self.monitor.note severity
end
notifier.wait_for_confirms
end
end
end
Additionally, the create
action has been updated with the single line: self.monitor.note severity
that updates status monitor statistics for every log entry.
The last step required for the status monitor functionality is client API implementation that will allow to listen on status updates. Like with events listener we have to create a controller class that will be bound with the status queue collection status updates.
module LogAnalyzer
class MonitorController < Tochtli::BaseController
bind 'log.status'
self.queue_name = '' # auto generate
self.queue_durable = false
self.queue_exclusive = true
self.queue_auto_delete = true
on CurrentStatus do
monitor.call(message.to_hash)
end
protected
def monitor
raise "Internal Error: monitor not set for MonitorController" unless env.has_key?(:monitor)
env[:monitor]
end
end
end
Everything looks familiar, except queue parameters set in the class definition. We can read from the code that the controller class would have auto generated name (empty string), would not be durable but exclusive and auto deleted. The auto generated name allow us to create different queues for each client instances. The created queue would be temporary because the nature of the status information is ephemeral.
Let's update the client API to allow to attach handler on status update.
module LogAnalyzer
class Client < Tochtli::BaseClient
def monitor_status(monitor=nil, &block)
monitor = block unless monitor
Tochtli::ControllerManager.start MonitorController, env: { monitor: monitor }
end
end
end
This is a definition of the third and the last client method. The final client runner code would look like:
client = LogAnalyzer::Client.new
command = ARGV[0]
def hold
puts 'Press Ctrl-C to stop...'
sleep
end
case command
when 's'
client.send_new_log ARGV[1]
when 'm'
client.monitor_status {|status| p status }
hold
when 'c'
client.react_on_events ARGV[1], [:fatal, :error], lambda {|severity, timestamp, message|
puts "[#{timestamp}] Got #{severity}: #{message}"
}
hold
else
puts "Unknown command: #{command.inspect}"
puts
puts "Usage: bundle exec ruby client [command] [params]"
puts
puts "Commands:"
puts " s [path] - send log from file to server"
puts " m - start status monitor"
puts " c [client ID] - catch fatal and error events"
end
That's it. Start your server. Start the status monitor and events handler. Submit new log files to the server and observe the number. At the end you should see something like:
$ ruby client.rb m
Press Ctrl-C to stop...
{:fatal=>0, :error=>0, :warn=>0, :info=>0, :debug=>0, :timestamp=>2015-08-12 15:41:16 +0200}
{:fatal=>478, :error=>1500, :warn=>2364, :info=>5930, :debug=>9728, :timestamp=>2015-08-12 15:41:26 +0200}
{:fatal=>239, :error=>750, :warn=>1182, :info=>2965, :debug=>4864, :timestamp=>2015-08-12 15:41:36 +0200}
...
That's all folks for this tutorial. The client notifications are working exactly as expected. You can find the client and server code in the second tochtli example.