亟待动用supervisor的伊芙nts机制,也得以看作二个过程监控框架使用

也可以作为一个进程监控框架使用,也可以作为一个进程监控框架使用

supervisor可以当作3个简单的进度运维、重启、控制工具使用,也足以看成3个历程监控框架使用,作为继任者,须要使用supervisor的伊芙nts机制。

supervisor能够当作三个大概的经过运维、重启、控制工具使用,也能够看成一个历程监察和控制框架使用,作为继承者,须要选取supervisor的伊芙nts机制。

Event Listeners

supervisor对子程序的监督通过叫做event
listener的程序完结。supervisor控制的子程序状态发生变化时,就会发生局地风浪通报,event
listener能够对那个事件通报进行订阅。

event
listener自个儿也是用作supervisor的子程序运维的。事件通报业协会议的兑现基于event
listener子程序的stdin和stdout。supervisor发送特定格式的音信到event
listener的stdin,然后从event
listener的stdout获得一定格式的出口,从而形成2个请求/应答循环。

Event Listeners

supervisor对子程序的监督通过叫做event
listener的程序实现。supervisor控制的子程序状态产生变化时,就会时有爆发一些事件通报,event
listener能够对那个事件通报进行订阅。

event
listener本人也是用作supervisor的子程序运营的。事件通报业协会议的落到实处基于event
listener子程序的stdin和stdout。supervisor发送特定格式的音讯到event
listener的stdin,然后从event
listener的stdout得到一定格式的输出,从而形成1个请求/应答循环。

配置

event listener的配备放置于配置文件中的[eventlistener:x]块中。

[eventlistener:mylistener]
command=my_custom_listener.py
events=PROCESS_STATE,TICK_60

x是listener的名目,command是推行listener脚本的命令,events是要监督的事件类型。

event
listener本人是用作supervisor的子程序运维的,所以与配置子程序[program:x]块类似,官网例子:

[eventlistener:theeventlistenername]
command=/bin/eventlistener
process_name=%(program_name)s_%(process_num)02d
numprocs=5
events=PROCESS_STATE
buffer_size=10
directory=/tmp
umask=022
priority=-1
autostart=true
autorestart=unexpected
startsecs=1
startretries=3
exitcodes=0,2
stopsignal=QUIT
stopwaitsecs=10
stopasgroup=false
killasgroup=false
user=chrism
redirect_stderr=false
stdout_logfile=/a/path
stdout_logfile_maxbytes=1MB
stdout_logfile_backups=10
stdout_events_enabled=false
stderr_logfile=/a/path
stderr_logfile_maxbytes=1MB
stderr_logfile_backups=10
stderr_events_enabled=false
environment=A="1",B="2"
serverurl=AUTO

配置

event listener的配备放置于配置文件中的[eventlistener:x]块中。

[eventlistener:mylistener]
command=my_custom_listener.py
events=PROCESS_STATE,TICK_60

x是listener的名目,command是推行listener脚本的吩咐,events是要监督的轩然大波类型。

event
listener本人是用作supervisor的子程序运转的,所以与配置子程序[program:x]块类似,官网例子:

[eventlistener:theeventlistenername]
command=/bin/eventlistener
process_name=%(program_name)s_%(process_num)02d
numprocs=5
events=PROCESS_STATE
buffer_size=10
directory=/tmp
umask=022
priority=-1
autostart=true
autorestart=unexpected
startsecs=1
startretries=3
exitcodes=0,2
stopsignal=QUIT
stopwaitsecs=10
stopasgroup=false
killasgroup=false
user=chrism
redirect_stderr=false
stdout_logfile=/a/path
stdout_logfile_maxbytes=1MB
stdout_logfile_backups=10
stdout_events_enabled=false
stderr_logfile=/a/path
stderr_logfile_maxbytes=1MB
stderr_logfile_backups=10
stderr_events_enabled=false
environment=A="1",B="2"
serverurl=AUTO

事件通报业协会议

1个event
listener能够处于二种情景,ACKNOWLEDGED、READY、BUSY,唯有在READY状态下才得以吸收事件通报。

event listener运营时处于ACKNOWLEDGED状态,直到event
listener向stdout中输出“READY\n”字符串截止。

event
listener向stdout中输出“READY\n”之后就处于READY状态,supervisor会向远在READY状态的listener发送listener订阅的风浪通报。

listener接收事件通报之后就处在BUSY状态,时期listener对接受到的轩然大波通报进行处理,处理终结后向stdout输出RESULT 2\nOK”或者RESULT
4
\nFAIL
”,前者代表拍卖成功,后者代表拍卖退步。

supervisor收到OK或许FAIL输出后,就将event
listener的气象置于ACKNOWLEDGED。FAIL的轩然大波通报会被缓存然后再次发送。

event
listener的景况处于ACKNOWLEDGED后得以脱离执行,也得以继续执行,继续执行就足以向stdout输出“READY\n”变异一个循环往复。

supervisor向listener发送的风云通报由两局地构成,header和body,由”\n”换行符分开。

一个header例子:

ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54

ver:共谋版本

server:supervisor的标识符,由[supervisord]块中的identifier选料设置。

serial:event的队列号

pool:listener的pool的名字。

poolserial:event在pool中的的行列号

eventname:event类型名称

len:header后面的body长度。

一个body例子:

processname:foo groupname:bar pid:123
This is the data that was sent between the tags

processname:事件所属的子进度名字

groupname:子进度所属组名

pid:子进程pid

二个粗略的listener脚本,listener.py:

import sys

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while True:
        # 进入READY状态
    ┆   write_stdout('READY\n')
    ┆   # 读取事件通知的header
    ┆   line = sys.stdin.readline()
    ┆   write_stderr(line)
        # 获取body长度,读取body
    ┆   headers=dict([x.split(':') for x in line.split() ])
    ┆   data = sys.stdin.read(int(headers['len']))
    ┆   write_stderr(data+'\n')
        # 发送OK进入ACKNOWLEDGED状态
    ┆   write_stdout('RESULT 2\nOK')
if __name__ == '__main__':
    main()

在conf.d目录中树立1个listener配置文件mylistener.conf:

[eventlistener:mylistener]
command=python listener.py
directory=/thedirectoroflistener.py
user=user
events=PROCESS_STATE,TICK_5
stdout_logfile=/path/to/mylistener_stdout.log
stderr_logfile=/path/to/mylistener_stderr.log

启动:

ubuntu:$ sudo supervisorctl start all
mylistener: started
celerybeat: started
ubuntu:$ sudo supervisorctl status
celerybeat                       RUNNING   pid 87729, uptime 0:00:20
mylistener                       RUNNING   pid 87728, uptime 0:00:20

监督检查就发轫了,能够到日志中查阅事件通报的始末:

ver:3.0 server:supervisor serial:15361 pool:mylistener poolserial:15361 eventname:PROCESS_STATE_RUNNING len:73
processname:mylistener groupname:mylistener from_state:STARTING pid:87728
ver:3.0 server:supervisor serial:15362 pool:mylistener poolserial:15362 eventname:TICK_5 len:15
when:1514313560
ver:3.0 server:supervisor serial:15364 pool:mylistener poolserial:15364 eventname:PROCESS_STATE_RUNNING len:73
processname:celerybeat groupname:celerybeat from_state:STARTING pid:87729

可以依据自身的内需设定监督的轩然大波类型,然后依照不一样的事件类型和内容做出不一样的应变,具体的轩然大波类型能够官网查阅。

python的supervisor.childutils模块对header和body的处理进展了打包:

def get_headers(line):
    return dict([ x.split(':') for x in line.split() ])

def eventdata(payload):
    headerinfo, data = payload.split('\n', 1)
    headers = get_headers(headerinfo)
    return headers, data

def get_asctime(now=None):
    if now is None: # for testing
        now = time.time() # pragma: no cover
    msecs = (now - long(now)) * 1000
    part1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now))
    asctime = '%s,%03d' % (part1, msecs)
    return asctime

class ProcessCommunicationsProtocol:
    def send(self, msg, fp=sys.stdout):
        fp.write(ProcessCommunicationEvent.BEGIN_TOKEN)
        fp.write(msg)
        fp.write(ProcessCommunicationEvent.END_TOKEN)

    def stdout(self, msg):
        return self.send(msg, sys.stdout)

    def stderr(self, msg):
        return self.send(msg, sys.stderr)

pcomm = ProcessCommunicationsProtocol()

class EventListenerProtocol:
    def wait(self, stdin=sys.stdin, stdout=sys.stdout):
        self.ready(stdout)
        line = stdin.readline()
        headers = get_headers(line)
        payload = stdin.read(int(headers['len']))
        return headers, payload

    def ready(self, stdout=sys.stdout):
        stdout.write(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN)
        stdout.flush()

    def ok(self, stdout=sys.stdout):
        self.send('OK', stdout)

    def fail(self, stdout=sys.stdout):
        self.send('FAIL', stdout)

    def send(self, data, stdout=sys.stdout):
        resultlen = len(data)
        result = '%s%s\n%s' % (PEventListenerDispatcher.RESULT_TOKEN_START,
                               str(resultlen),
                               data)
        stdout.write(result)
        stdout.flush()

listener = EventListenerProtocol()

listener脚本能够一本万利的写做:

import sys

from supervisor import childutils

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while True:
    ┆   headers, payload = childutils.listener.wait()
    ┆   write_stderr(payload+'\n')
    ┆   childutils.listener.ok(sys.stdout)

if __name__ == '__main__':
    main()

  

 

事件通报业协会议

1个event
listener能够处于三种情状,ACKNOWLEDGED、READY、BUSY,只有在READY状态下才得以接收事件通报。

event listener运营时处于ACKNOWLEDGED状态,直到event
listener向stdout中输出“READY\n”字符串停止。

event
listener向stdout中输出“READY\n”之后就处在READY状态,supervisor会向远在READY状态的listener发送listener订阅的轩然大波通报。

listener接收事件通报之后就高居BUSY状态,期间listener对收取到的事件通报进行处理,处理终结后向stdout输出RESULT 2\nOK”或者RESULT
4
\nFAIL
”,前者代表拍卖成功,后者代表拍卖退步。

supervisor收到OK大概FAIL输出后,就将event
listener的景况置于ACKNOWLEDGED。FAIL的轩然大波通报会被缓存然后再一次发送。

event
listener的状态处于ACKNOWLEDGED后得以退出执行,也得以继续执行,继续执行就能够向stdout输出“READY\n”变异2个循环往复。

supervisor向listener发送的事件通报由两局地组成,header和body,由”\n”换行符分开。

一个header例子:

ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54

ver:钻探版本

server:supervisor的标识符,由[supervisord]块中的identifier选用设置。

serial:event的行列号

pool:listener的pool的名字。

poolserial:event在pool中的的连串号

eventname:event类型名称

len:header后面的body长度。

一个body例子:

processname:foo groupname:bar pid:123
This is the data that was sent between the tags

processname:事件所属的子进度名字

groupname:子进度所属组名

pid:子进程pid

一个不难的listener脚本,listener.py:

import sys

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while True:
        # 进入READY状态
    ┆   write_stdout('READY\n')
    ┆   # 读取事件通知的header
    ┆   line = sys.stdin.readline()
    ┆   write_stderr(line)
        # 获取body长度,读取body
    ┆   headers=dict([x.split(':') for x in line.split() ])
    ┆   data = sys.stdin.read(int(headers['len']))
    ┆   write_stderr(data+'\n')
        # 发送OK进入ACKNOWLEDGED状态
    ┆   write_stdout('RESULT 2\nOK')
if __name__ == '__main__':
    main()

在conf.d目录中创制一个listener配置文件mylistener.conf:

[eventlistener:mylistener]
command=python listener.py
directory=/thedirectoroflistener.py
user=user
events=PROCESS_STATE,TICK_5
stdout_logfile=/path/to/mylistener_stdout.log
stderr_logfile=/path/to/mylistener_stderr.log

启动:

ubuntu:$ sudo supervisorctl start all
mylistener: started
celerybeat: started
ubuntu:$ sudo supervisorctl status
celerybeat                       RUNNING   pid 87729, uptime 0:00:20
mylistener                       RUNNING   pid 87728, uptime 0:00:20

督察就伊始了,能够到日志中查阅事件通报的剧情:

ver:3.0 server:supervisor serial:15361 pool:mylistener poolserial:15361 eventname:PROCESS_STATE_RUNNING len:73
processname:mylistener groupname:mylistener from_state:STARTING pid:87728
ver:3.0 server:supervisor serial:15362 pool:mylistener poolserial:15362 eventname:TICK_5 len:15
when:1514313560
ver:3.0 server:supervisor serial:15364 pool:mylistener poolserial:15364 eventname:PROCESS_STATE_RUNNING len:73
processname:celerybeat groupname:celerybeat from_state:STARTING pid:87729

能够依照自个儿的须要设定监督的风云类型,然后依据分化的轩然大波类型和内容做出差别的应变,具体的事件类型能够官网查阅。

python的supervisor.childutils模块对header和body的处理进展了包装:

def get_headers(line):
    return dict([ x.split(':') for x in line.split() ])

def eventdata(payload):
    headerinfo, data = payload.split('\n', 1)
    headers = get_headers(headerinfo)
    return headers, data

def get_asctime(now=None):
    if now is None: # for testing
        now = time.time() # pragma: no cover
    msecs = (now - long(now)) * 1000
    part1 = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(now))
    asctime = '%s,%03d' % (part1, msecs)
    return asctime

class ProcessCommunicationsProtocol:
    def send(self, msg, fp=sys.stdout):
        fp.write(ProcessCommunicationEvent.BEGIN_TOKEN)
        fp.write(msg)
        fp.write(ProcessCommunicationEvent.END_TOKEN)

    def stdout(self, msg):
        return self.send(msg, sys.stdout)

    def stderr(self, msg):
        return self.send(msg, sys.stderr)

pcomm = ProcessCommunicationsProtocol()

class EventListenerProtocol:
    def wait(self, stdin=sys.stdin, stdout=sys.stdout):
        self.ready(stdout)
        line = stdin.readline()
        headers = get_headers(line)
        payload = stdin.read(int(headers['len']))
        return headers, payload

    def ready(self, stdout=sys.stdout):
        stdout.write(PEventListenerDispatcher.READY_FOR_EVENTS_TOKEN)
        stdout.flush()

    def ok(self, stdout=sys.stdout):
        self.send('OK', stdout)

    def fail(self, stdout=sys.stdout):
        self.send('FAIL', stdout)

    def send(self, data, stdout=sys.stdout):
        resultlen = len(data)
        result = '%s%s\n%s' % (PEventListenerDispatcher.RESULT_TOKEN_START,
                               str(resultlen),
                               data)
        stdout.write(result)
        stdout.flush()

listener = EventListenerProtocol()

listener脚本能够便宜的写做:

import sys

from supervisor import childutils

def write_stdout(s):
    # only eventlistener protocol messages may be sent to stdout
    sys.stdout.write(s)
    sys.stdout.flush()

def write_stderr(s):
    sys.stderr.write(s)
    sys.stderr.flush()

def main():
    while True:
    ┆   headers, payload = childutils.listener.wait()
    ┆   write_stderr(payload+'\n')
    ┆   childutils.listener.ok(sys.stdout)

if __name__ == '__main__':
    main()