简单任务调度系统-成都快上网建站

简单任务调度系统

一 概述

1 运维管理的阶段

1 人工阶段

人工盯着服务器,出了问题,到机器前面,翻日志,查状态,手动操作

创新互联公司从2013年成立,先为牡丹等服务建站,牡丹等地企业,进行企业商务咨询服务。为牡丹企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

2 脚本阶段

开始写一些自动化脚本,启动计划任务,自动启动服务,监控服务等

3 工具阶段

脚本功能太弱,开发了大量工具,某种工具解决某个特定领域的问题,常用的有ansible,puppet等

4 平台阶段

将工具整合,自主研发,实现标准化,实现自动化流程控制,而今,平台已经开始迈向智能化的发展方向。

二 mschedule 设计

1 完整代码链接

https://gitee.com/ChangPaoZhe/mschedule

2要求

1 分发任务
分发脚本到目前节点上去执行


2 控制
控制并发,控制多少个节点同时执行
对错误做出响应,由用户设定,最多允许失败的比例或者数量,当超过范围时,需要终止任务执行


3 能跨机房部署


4 能对作业做版本控制,这是辅助功能,可过后实现

3 项目基本概述

1 基本概述

本项目的出发点,是只需要会使用shell脚本就可以了,可以通过使用shell脚本的方式来完成远程任务的下发和处理流程。

2 其他自动化工具二次开发缺点

ansible,salt等需要学习特定的内部语言,如果觉得ansible这样的工具不能满足需求,二次开发难度过高,代码量不小,本身它们开发接口不完善,而且熟悉它的叫也比较难,就算开发出来维护也难。

从这些项目上二次开发,等于拉一个分支,如果主分支有了新的特性,想要合并也是比较困难的。

自己开发,满足自己需求,完全适合自己需求,代码规模可控,便于他人接收维护。

3 项目初始版本目标

自己开发就是造轮子,造轮子不是不好,其起初要实现的功能应该是比较简单的。后面可以逐步进行完善操作。

4 项目基本架构图

简单任务调度系统

浏览器端和webSERVER端交互是通过HTTP实现的,而WEB server和master server 是通过TCP链接来实现的,master server 和agent之间也是通过TCP 链接来实现的

4 分发任务设计

1 分发任务分类

1 有agent 类

有agent类,被控节点需要安装或运行特殊的软件,用于和服务器端进行通信,服务器端把脚本,命令传递给agent端,由agent端控制来执行

2 无agent类

被控节点不需要安装或者运行特殊软件,如通过SSH来实现,这其实也是有agent的,不过不是自己写的程序


优缺点

1 通用,简单,易实现,但管理不善,容易出现安全问题

2 并行效率不高,有agent的并行执行可以不和管理服务器通信,可以并发很高,ssh执行要和master之间通信

3 ssh链接是有状态的,任务执行的时候,master不能挂了,否则任务将执行失败。

5 执行脚本(subprocess)

python 中有很多运行进程的方式,不过都过时了。
建议使用标准库subprocess模块,启动一个子进程。

1 初始化类源码

    def __init__(self, args, bufsize=-1, executable=None,
                 stdin=None, stdout=None, stderr=None,
                 preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
                 shell=False, cwd=None, env=None, universal_newlines=False,
                 startupinfo=None, creationflags=0,
                 restore_signals=True, start_new_session=False,
                 pass_fds=()):

第一个是参数,后面是可选,但shell默认为False,可将其置为True, stdout 后面跟文件或管道

        def wait(self, timeout=None, endtime=None):
            """Wait for child process to terminate.  Returns returncode
            attribute."""
            if endtime is not None:
                timeout = self._remaining_time(endtime)
            if timeout is None:
                timeout_millis = _winapi.INFINITE
            else:
                timeout_millis = int(timeout * 1000)
            if self.returncode is None:
                result = _winapi.WaitForSingleObject(self._handle,
                                                    timeout_millis)
                if result == _winapi.WAIT_TIMEOUT:
                    raise TimeoutExpired(self.args, timeout)
                self.returncode = _winapi.GetExitCodeProcess(self._handle)
            return self.returncode

此处返回是状态,0为成功,其他为失败


stdout 方法调用的是一个文件,因此可使用文件的形式进行处理

        if c2pread != -1:
            self.stdout = io.open(c2pread, 'rb', bufsize)
            if universal_newlines:
                self.stdout = io.TextIOWrapper(self.stdout)

2 基本代码如下

#!/usr/bin/poython3.6
#conding:utf-8
import  subprocess
from subprocess  import  Popen,PIPE

out=Popen("echo  'hello'",shell=True,stdout=PIPE)
code=out.wait(10)
txt=out.stdout.read()

print ("code={}  txt={}".format(code,txt.decode()))

结果如下

简单任务调度系统

6 项目基本构建

1 创建文件并添加虚拟环境

mkdir  mschedule  -p
cd mschedule/
pyenv virtualenv  3.5.3  msch
pyenv local msch 

简单任务调度系统

2 构建模块agent,并创建执行程序executor.py

#!/usr/bin/poython3.6
#conding:utf-8
from   subprocess   import  PIPE,Popen

class  Executor:
    def  run(self,script,timeout):
        p=Popen(script,shell=True,stdout=PIPE)
        code=p.wait(timeout=timeout)
        txt=p.stdout.read()
        return  (code,txt)

if __name__ == "__main__":
    exec=Executor()
    print (exec.run("echo  'hello'",3))

结果如下

简单任务调度系统

7 agent 和master设计

用户和master server 通信,提交任务,此处是通过HTTP的方式提交任务
master 按照用户要求将任务分发到指定的节点上,这些节点上需要有agent用于和master通信,接受master发布的任务,并执行这些任务


设计agent,越简单越好,越简单bug越少,越稳定。
从本质上来说,master,agent设计是典型的CS编程模式
master作为CS中的server,agent作为CS中的client

8 消息设计

1 注册信息

agent启动后,需要主动连接server,并注册自己
信息包括
hostname:报告自己的主机名称,此主机名称可能会重复

UUID,用于唯一标识这台主机

IP: 用于更加方便的管理主机

其它相关信息视情况而定

 {
  "type": "register",  # 此处用于定义消息类型
            "payload":{
                "id" :  uuid,  #用于唯一标识一台主机
                "hostname":  "xxxx",  # 对应agent名称
                "IP": [],  # agent IP地址,其可能包含多个IP地址,因此此处使用列表进行存储
            }
    }

2 心跳信息

agent定时向master发送心跳包,包含UUID这个唯一标识,附带hostname和ip地址,hostname和ip都可能变动,但agent不变,其UUID便不会发生变化,其他相关信息科一附加, 如更加flag,用于标识agent是否有正在执行的任务。

 {
   "type": "heartbeat",  # 此处用于定义消息类型
            "payload":{
                "id" :  uuid,  #用于唯一标识一台主机
                "hostname":  "xxxx",  # 对应agent名称
                "IP": [],  # agent IP地址,其可能包含多个IP地址,因此此处使用列表进行存储
            }
}

3 任务消息

master分派任务给agent,发送任务描述信息到agent。
注意脚本字符串使用base64编码

 {  
     "type"  :"task",
     "payload" :{
             "id"  :"task-uuid",  # 定义任务的唯一标识
             "script" : "base64code",  #定义执行任务的内容
             "timeout"  :0, # 定义超时时长
             "parallel"  :1,  # 定义并行执行数
             "fail_rate"  :0,  # 定义失败率,及百分比为多少代表失败
             "fail_count"  :-1 # 定义失败的次数为多少次表示失败,-1表示不关心
     }

 }

4 任务结果消息

当agent任务执行完成后,返回给master该任务执行的状态码和输出结果。

{
    "type"  :"result",
    "payload" :{
        "id": "task-uuid", # 定义任务唯一标识
        "agent_id":  "agent-uuid",  #定义任务执行者
        "code" : 0,  #定义任务执行结果返回值。0 表示成功,其他表示失败 
        "output" :"base64encode"  # 定义任务执行结果,及输出到控制台的结果

    }
}

以上的master,agent之间需要传递消息,消息采用json格式。

三 agent端代码实现

1 日志实现

简单任务调度系统

具体代码如下

#!/usr/bin/poython3.6
#conding:utf-8
import  logging
def  getlogger(mod_name:str,filepath:str='/var/log/mschedule'):
    logger=logging.getLogger(mod_name)  # 获取名字
    logger.setLevel(logging.INFO)  # 添加日志级别
    logger.propagate=False  # 配置不想上传递
    handler=logging.FileHandler("{}/{}.log".format(filepath,mod_name))
    fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s (%(filename)s:L%(lineno)d)",
                            datefmt='%Y-%m-%d %H:%M:%S')
    handler.setFormatter(fmt)
    logger.addHandler(handler)
    return  logger

if __name__ == "__main__":
    log = getlogger('test')
    log.info('13234545654')

结果如下

简单任务调度系统

2 通信模块实现(zerorpc )

1 介绍和安装

原生的socket编程过于底层,很少使用,任何一门语言都要避开直接使用socket库开发,太过底层,难写难维护。

zeroprc 是基于 ZeroMQ和MessagePack 来实现的通信工具。

官网地址

http://www.zerorpc.io

安装

pip  install  zerorpc  

2 基本代码实现

根目录创建app.py和appserver.py

简单任务调度系统

server 端配置

#!/usr/bin/poython3.6
#conding:utf-8

import zerorpc

class HelloRPC(object):  #定义方法
    def hello(self, name):
        return "Hello, %s" % name

s = zerorpc.Server(HelloRPC())  # 方法注入
s.bind("tcp://0.0.0.0:8080")  # 绑定方法 
s.run()  # 运行方法

client端配置

#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc

c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
print (c.hello("RPC"))
#!/usr/bin/poython3.6
#conding:utf-8
import zerorpc
import   threading
c = zerorpc.Client()
c.connect("tcp://127.0.0.1:8080")
e=threading.Event()
while not  e.wait(3):
    print(c.hello('test client'))
    print ('```````````````')

结果如下

简单任务调度系统

简单任务调度系统

3 注册消息实现

1 uuid唯一主机标识

使用uuid.uuid4().hex 获取一个uuid,一个节点起始运行的时候是没有uuid的,一旦运行会生成一个uuid,并持久化到一个文件中,下次运行先找这个文件,如果文件中有uuid,就直接读取,没有uuid就重新生成并写入到该文件中。

#!/usr/bin/poython3.6
#conding:utf-8
#!/usr/bin/poython3.6
#conding:utf-8
import uuid
print  (uuid.uuid4().hex)
print  (uuid.uuid4().hex)
print  (uuid.uuid4().hex)

结果如下

简单任务调度系统

2 hostname

windows 和Linux 获取主机名称的方式是不同的

可以在所有平台上是使用socket.gethostname()获取主机名。

#!/usr/bin/poython3.6
#conding:utf-8
import  socket
print (socket.gethostname())

简单任务调度系统

3 ip 列表

pip  install   netifaces 

netifaces.interfaces() 返回接口列表

netifaces.ifaddresss(interface) 获取指定接口的IP地址,返回相关信息

ip地址判断

#!/usr/bin/poython3.6
#conding:utf-8
import  ipaddress
ips=['127.0.0.1','192.168.0.1','169.254.123.1','0.0.0.0','239.168.0.255','224.0.0.1','8.8.8.8']

for  ip  in  ips:
    print (ip)
    ip=ipaddress.ip_address(ip)
    print ('Linklocal  {}'.format(ip.is_link_local))  # 169.254地址
    print ('回环 {}'.format(ip.is_loopback))  # 回环
    print ('多播 {}'.format(ip.is_multicast))   # 多播
    print ('公网 {}'.format(ip.is_global))  # 公网,全球范围地址
    print ('私有 {}'.format(ip.is_private))  # 私有地址
    print ('保留 {}'.format(ip.is_reserved))  # 保留地址
    print ('版本 {}'.format(ip.version))  #ipv4地址
    print ('----------------------------')

结果如下

简单任务调度系统
简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8
import  netifaces
print (netifaces.interfaces())  # 获取所有的网卡接口
for i in  netifaces.interfaces():
    print ('i....',netifaces.ifaddresses(i))  # 使用ifaddress获取端口对应的IP地址
    print ()
    print ('------------------------------')
    print ()
    print ('[2]',netifaces.ifaddresses(i)[2])  # 获取字典key为2的对应的值

结果如下

简单任务调度系统

其是一个字典,key为2就是ipv4地址
每一个接口返回的ipv4地址是一个列表,也就是说可以有多个,ipv4地址描述是在addr上

#!/usr/bin/poython3.6
#conding:utf-8
import  netifaces
print (netifaces.interfaces())  # 获取所有的网卡接口
for i in  netifaces.interfaces():
        for   p  in  netifaces.ifaddresses(i)[2]:
            if  p['addr']:
                print ('ip',p['addr'])   # 获取ip地址 

结果如下

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8
import  netifaces
import  ipaddress
print (netifaces.interfaces())  # 获取所有的网卡接口
for i in  netifaces.interfaces():
        for   p  in  netifaces.ifaddresses(i)[2]:
            if  p['addr']:
                ip=ipaddress.ip_address(p['addr'])   #获取ip地址
                if  ip.is_loopback  or ip.is_multicast  or ip.is_link_local  or ip.is_reserved:  # 判断IP地址
                    continue
                print (ip)  

结果如下

简单任务调度系统

4 注册信息和相关信息处理

在agent文件包中创建msg.py文件,用于存储相关主从信息和配置信息

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8
import  socket
import  uuid
import   netifaces
import  ipaddress
import  os
class Messgae:
    def  __init__(self,myidpath):
        if os.path.exists(myidpath):  # 如果存在
            with  open(myidpath)  as  f:
                self.id=f.readline().strip()
        else:
            self.id=uuid.uuid4().hex
            with open(myidpath,'w')  as f:
                f.write(self.id)

    def get_ipaddress(self):
        address=[]
        for p in  netifaces.interfaces():  # 获取网口列表
            n=netifaces.ifaddresses(p)  # 获取字典
            if  n.get(2):  # 查看是否存在ipv4地址
                for  ip  in  n[2]:  # 此处获取对应列表的值
                    if  ip['addr']: # 查看ip地址是否存在
                        ip=ipaddress.ip_address(ip['addr'])
                        if    ip.is_reserved  or ip.is_multicast  or ip.is_link_local or ip.is_loopback:
                            continue
                        address.append(str(ip))
        return  address

    def  hearbeat(self):
        return   {
            "type" :"hearbeat",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }
    def  reg(self):
        return   {
            "type" :"register",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }

if __name__ == "__main__":
    msg=Messgae('/var/log/mschedule/uuid')
    print (msg.reg())

测试结果如下

简单任务调度系统

5 处理链接相关配置

agent中创建config模块用于添加相关链接服务端IP地址
agent中创建cm 模块用于处理链接相关配置

简单任务调度系统
简单任务调度系统

config.py 配置如下

#!/usr/bin/poython3.6
#conding:utf-8

CONN_URL="tcp://127.0.0.1:9000"

cm.py 模块配置如下

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模块
import  threading  # 用于处理中断相关
from  .msg import  Messgae  # 获取消息

from  .config import  CONN_URL

from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此处用于初始化消息
        self.log=getlogger('agent')  # 此处填写相关的log日志名称
    def start(self):
        self.client.connect(CONN_URL)  # 链接处理
        self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg())))  # 发送心跳信息
        self.client.send(self.message.reg())  #处理注册消息
        while  not self.event.wait(self.timeout):  # 等待的时间
            self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat())))  # 发送心跳信息
    def shutdown(self):
        self.log.info("关闭操作")
        self.client.close()
        self.event.set()

agent 中 _init_.py 端配置

#!/usr/bin/poython3.6
#conding:utf-8
from  .cm  import Conn_Manager
class  app:
    def __init__(self,timeout):
        self.conn=Conn_Manager(timeout)
    def start(self):
        self.conn.start()
    def shutdown(self):
        self.conn.shutdown()

全局根目录下 app.py 端配置如下

#!/usr/bin/poython3.6
#conding:utf-8
from  agent  import app

if __name__ == "__main__":
    agent=app(3)
    try:
        agent.start()
    except  KeyboardInterrupt:
        agent.shutdown()

服务端测试文件appserver 配置如下

#!/usr/bin/poython3.6
#conding:utf-8

import zerorpc

class HelloRPC(object):  #定义方法
    def send(self, name):
        return "Hello, %s" % name

s = zerorpc.Server(HelloRPC())  # 方法注入
s.bind("tcp://0.0.0.0:9000")  # 绑定方法
s.run()  # 运行方法

启动结果如下

简单任务调度系统

日志结果如下

简单任务调度系统

简单任务调度系统

处理客户端重连机制

默认的,服务端关闭后,客户端结果如下

简单任务调度系统

处理结果如下

cm.py如下

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模块
import  threading  # 用于处理中断相关
from  .msg import  Messgae  # 获取消息

from  .config import  CONN_URL

from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此处用于初始化消息
        self.log=getlogger('agent')  # 此处填写相关的log日志名称
    def start(self):
        try:
            self.client.connect(CONN_URL)  # 链接处理
            self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg())))  # 发送心跳信息
            self.client.send(self.message.reg())  #处理注册消息
            while  not self.event.wait(self.timeout):  # 等待的时间
                self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hreadbeat())))  # 发送心跳信息
        except  Exception as e:
            print ('--------------------')
            self.event.set()
            raise  e  # 此处是抛出异常到上一级
    def shutdown(self):
        self.log.info("关闭操作")
        self.client.close()
        self.event.set()

agent._init_.py 结果如下

#!/usr/bin/poython3.6
#conding:utf-8
from  .cm  import Conn_Manager
import  threading
class  app:
    def __init__(self,timeout):
        self.conn=Conn_Manager(timeout)
        self.event=threading.Event()
    def start(self):
        while not self.event.is_set():
            try:
                self.conn.start()
            except  Exception  as e:
                    print('重连')
                    self.conn.shutdown()
            self.event.wait(3)

    def shutdown(self):
        self.event.set()
        self.conn.shutdown()

app.py 如下

#!/usr/bin/poython3.6
#conding:utf-8
from  agent  import app

if __name__ == "__main__":
    agent=app(3)
    try:
        agent.start()
    except  KeyboardInterrupt:
        agent.shutdown()

结果如下

简单任务调度系统

四 master端实现

1 基本功能

1 TCP Server

绑定端口,启动监听,等待agent链接。

2 信息存储

存储agent列表
存储用户提交的Task列表,用户通过WEB提交的任务信息存储下来。

3 接受注册

将注册信息写入agent列表
接受心跳信息
接受agent端发送的心跳信息

4 派发任务

将用户提交的任务分配到agent端

2 基本代码实现

1 master.config 模块

用于指定服务端绑定IP地址和端口号

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8

MASTER_URL="tcp://0.0.0.0:9000"

if __name__ == "__main__":
    pass

2 master.handler 模块

主要负责客户端数据的调度

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger

log=getlogger('handler')

class Handler(object):
    def send(self,msg):  # 定义一个可调用的基础函数
        log.info(" ack  ok  {}".format(msg))
        return   " ack  ok  {}".format(msg)

3 cm.py 模块

用于tcp 链接建立和关闭

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8
from utils  import  getlogger
from  .config import   MASTER_URL
import  zerorpc
from  .handler import   Handler

log=getlogger('server')

class Master_Listen:
    def __init__(self):
        self.server=zerorpc.Server(Handler())
    def start(self):
        self.server.bind(MASTER_URL)
        log.info('Master 启动配置')
        self.server.run()
    def shutdown(self):
        self.server.close()

4 master._init_.py 模块

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8
from  .cm import   Master_Listen

class appserver:
    def __init__(self):
        self.appserver=Master_Listen()
    def start(self):
        self.appserver.start()
    def shutdown(self):
        self.appserver.shutdown()

5 appserver.py模块

#!/usr/bin/poython3.6
#conding:utf-8

from master  import  appserver

if __name__ == "__main__":
    appserver=appserver()
    try:
        appserver.start()
    except KeyboardInterrupt:
        appserver.shutdown()

启动服务测试如下

简单任务调度系统

结果如下

简单任务调度系统

上述代码实现了基本的注册,心跳部分的功能

经观察可知,目前注册和心跳除了类型不同外,其可以认为第一次心跳成功就是注册。

3 master的数据设计

master端核心需要存储2中数据:agent端数据,用户客户端浏览器提交的任务Task,构造出一个数据结构,存储相关信息.具体数据结构如下

1 agent客户端数据存储结构

{
    "agents" :{
        "agent_id"  :{
            "heartbeat" :"timestamp",
            "busy" :False,
            "info" :{
                "hostname" :"",
                "ip" :[]
            }
        }
    }
}

数据结构解释如下

1 agents里面记录了所有注册的agent
agent_id,字典的key,每一个agent 都有一个不同uuid,所以这个字典的键就是uuid,
heartbeat 由于设计中并没有让agent端发送心跳时间,所以就在master端记录了收到的时间
busy 如果agent 上有任务在执行。则此值表现为True
info 记录agent上发过来的hostname和ip列表

2 task数据存储结构

{
    "tasks" :{
        "task_id" :{
            "script" :"base64encode",
            "targets" :{
                "agent_id" :{
                    "state":"WAITING",
                    "output" :""
                }
            },
                        "state"  :"WAITING"
        }
    }
}

task 记录所有任务及target(agent)的状态

task_id ,字典的key对应一个一个task,item 也是taskid:{} 结构
task 任务,task.json 的payload信息
targets目标,用于指定agent的节点,记录agent上的state和输出output
state状态,单个agent上的执行状态

state 这是一个task的状态,整个任务的状态,比如统计达到了agent失败上限了,这个task的state 就置为失败

状态常量
"WAITING" "RUNNING" "SUCCEED" "FAILED"

4 agent 端信息存储

创建 storage.py 模块
构建Storage 类,用于存储用户信息

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime

class Storage:
    def  __init__(self):
        self.agents={}  # 此处用于存储用户信息
        self.tasks={}  # 此处用于存储作业信息 
    def reg_hb(self,agent_id,info):  # id 及就是客户端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 读不到置False,读到了不变

handler.py端配置如下

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定义一个可调用的基础函数,此处的msg及就是对应的函数
        log.info('客户端agent发送消息为:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客户端数据列表为:{}".format(self.store.agents))  # 客户端的列表
                return   "agent信息为: {}".format(msg)
        except Exception  as e:
            log.error("注册客户端信息错误为:{}".format(e))
            return  "Bad  Request...."

运行结果如下

简单任务调度系统

5 task 任务基本注册和创建

1 概述

用户通过WEB(HTTP)提交新的任务,任务json信息有:
1 任务脚本script,base64编码
2 超时时间timeout
3 并行度 parallel
4 失败率 fail_rate
5 失败次数fail_count
6 targets 是跑任务的Agent的agent_id列表,这个目前也是在用户端选好的,如yoghurt需要在主机名为webserver-xxxx的几台设备上运行脚本,为了用户方便,可以使用类似ansible的分组。

在Master端受到信息后,需要添加2个信息

task_id 是Mater 端新建任务时生成的uuid
state 默认状态是WAITING

在WEB server 中最后将用户端发送来的数据组成下面的字典

task={
    "task_id" :t.id,
    "script" :t.script,
    "timeout":t.timeout,
    "parallel" :t.parallelm,
    "fail_rate":t.fail_rate,
    "fail_count":t.fail_count,
    "state":t.state,
    "targets":t.targets
}

2 构建state类

用于处理相关消息的类型

简单任务调度系统

#!/usr/bin/poython3.6
#conding:utf-8

WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'

3 构建task类

创建master/task.py 类处理webserver端数据

简单任务调度系统\

#!/usr/bin/poython3.6
#conding:utf-8
import  uuid   # 获取唯一的task_id

from  .state import *

class Task:
    def  __init__(self,task_id,script,targets,timeout=0,parallel=1,fail_rate=0,fail_count=-1):
        self.id=task_id  # task唯一标识,用于确定任务
        self.script=script  # 对应的脚本内容,客户端输入的脚本
        self.timeout=timeout # 超时时间
        self.parallel=parallel # 并行执行数量
        self.fail_rate=fail_rate  #失败率
        self.fail_count=fail_count #失败数
        self.state=WAITING  # 对应的消息的状态
        self.targets={agent_id:{'state' : WAITING,'output':''} for agent_id  in targets}  # 此处对应客户端列表
        self.target_count=len(self.targets)  # 此处对应客户端的数量

在master.storage.py模块中进行相关方法调用,并将其存储进入task中

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task

class Storage:
    def  __init__(self):
        self.agents={}  # 此处用于存储用户信息
        self.tasks={}  # 此处用于存储作业信息
    def reg_hb(self,agent_id,info):  # id 及就是客户端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 读不到置False,读到了不变

    def add_task(self,task:dict):  # 此处用于从客户端获取相关的数据
        t=Task(**task)  # 此处进行参数解构
        self.tasks[t.id]=t
        return  t.id  # 此处用于获取处理id

在master/handler.py 中处理用于webservr调用相关配置

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage
import  uuid

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定义一个可调用的基础函数,此处的msg及就是对应的函数
        log.info('客户端agent发送消息为:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客户端数据列表为:{}".format(self.store.agents))  # 客户端的列表
                return   "agent信息为: {}".format(msg)
        except Exception  as e:
            log.error("注册客户端信息错误为:{}".format(e))
            return  "Bad  Request...."

    def add_task(self,task):   # 此处用于在webserver 端创建的agent调用方法返回结果
        task['task_id']=uuid.uuid4().hex  # 用于生成相关的任务id
        return   self.store.add_task(task)  # 此处用于调用相关配置
    def get_agents(self):
        return  self.store.get_agents()

6 task 任务分派

1 任务分派方式

任务在Storage中存储,一旦有了任务,需要将任务分派到指定节点执行,交给这些节点上的agent
不过,目前使用zerorpc,master是被动的接受agent端的数据并进行相关的响应操作,所以可以考虑使用一种agent端主动拉取数据的机制,提供一个接口,让agent访问,如果agent处于空闲状态,则就主动拉取任务,有任务就领走。
当agent少的时候,master推送任务到agent端,或者agent端主动拉取任务都是可以的,但是如果考虑到agent多的时候,或许使用agent拉模式是一个更好的选择。

本次采用agent拉取模式实现,所以master就不需要设计调度器了

2 客户端配置状态参数

agent/state.py

#!/usr/bin/poython3.6
#conding:utf-8

WAITING='WAITING'
RUNNING='RUNNING'
SUCCEED='SUCCEED'
FAILED='FAILED'

3 客户端添加消息类型result

用于返回至server端,用于最后返回至web浏览器端

#!/usr/bin/poython3.6
#conding:utf-8
import  socket
import  uuid
import   netifaces
import  ipaddress
import  os
class Messgae:
    def  __init__(self,myidpath):
        if os.path.exists(myidpath):  # 如果存在
            with  open(myidpath)  as  f:
                self.id=f.readline().strip()
        else:
            self.id=uuid.uuid4().hex
            with open(myidpath,'w')  as f:
                f.write(self.id)

    def get_ipaddress(self):
        address=[]
        for p in  netifaces.interfaces():  # 获取网口列表
            n=netifaces.ifaddresses(p)  # 获取字典
            if  n.get(2):  # 查看是否存在ipv4地址
                for  ip  in  n[2]:  # 此处获取对应列表的值
                    if  ip['addr']: # 查看ip地址是否存在
                        ip=ipaddress.ip_address(ip['addr'])
                        if    ip.is_reserved  or ip.is_multicast  or ip.is_link_local or ip.is_loopback:
                            continue
                        address.append(str(ip))
        return  address

    def  hearbeat(self):
        return   {
            "type" :"hearbeat",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }
    def  reg(self):
        return   {
            "type" :"register",
            "payload" :{
                "ip"  : self.get_ipaddress(),
                "hostname" : socket.gethostname(),
                "id" : self.id
            }
        }
    def result(self,task_id,code,output):  # 返回数据至web端,处理相关数据执行结果的返回
        return  {
            "type" :"result",
            "payload" :{
                "id"  : task_id,  # 此处用于定义task_id 及任务id  
                "agent_id" :self.id,  # 此处用于获取客户端id  
                "code" : code,  # 此处用于对执行结果状态进行保存
                "output" : output  #此处用于对执行结果的输出信息进行保存,并进行相关配置
            }
        }

4 agent/cm.py模块

用于处理配置拉取相关事宜

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模块
import  threading  # 用于处理中断相关
from  .msg import  Messgae  # 获取消息
from  .state import  *
from  .config import  CONN_URL
from  .executor import   Executor
from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此处用于初始化消息
        self.log=getlogger('agent')  # 此处填写相关的log日志名称
        self.state=WAITING
        self.exec=Executor()
    def start(self):
        try:
            self.event.clear()
            self.client.connect(CONN_URL)  # 链接处理
            self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg())))  # 发送心跳信息
            self.client.send(self.message.reg())  #处理注册消息
            while  not self.event.wait(self.timeout):  # 等待的时间
                self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat())))  # 发送心跳信息
                task=self.client.get_task(self.message.id)  # 此处返回三个参数,1 为taskid,二是script ,三是timeout
                if task:
                    code,output=self.exec.run(task[1],task[2])
                    self.client.send(self.message.result(task[0],code,output))
                else:
                    return   "目前无消息"
        except  Exception as e:
            self.event.set()
            raise  e  # 此处是抛出异常到上一级
    def shutdown(self):
        self.log.info("关闭操作")
        self.client.close()
        self.event.set()

4 服务端相关task获取配置

master/storage.py 用于配置获取agent_id和task相关信息

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task
from  .state import   *
class Storage:
    def  __init__(self):
        self.agents={}  # 此处用于存储用户信息
        self.tasks={}  # 此处用于存储作业信息
    def reg_hb(self,agent_id,info):  # id 及就是客户端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 读不到置False,读到了不变
    def  get_agents(self):
        return   self.agents

    def add_task(self,task:dict):  # 此处用于从客户端获取相关的数据
        t=Task(**task)  # 此处进行参数解构
        self.tasks[t.id]=t
        return  t.id  # 此处用于获取处理id
    @property
    def itme_task(self):
        yield  from  (task  for  task  in  self.tasks.values())  # 此处返回task
    def get_task(self,agent_id):
            return   [task.id,task.script,task.timeout]

master/handler.py 配置如下

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage
import  uuid

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定义一个可调用的基础函数,此处的msg及就是对应的函数
        log.info('客户端agent发送消息为:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客户端数据列表为:{}".format(self.store.agents))  # 客户端的列表
                return   "agent信息为: {}".format(msg)
        except Exception  as e:
            log.error("注册客户端信息错误为:{}".format(e))
            return  "Bad  Request...."

    def add_task(self,task):   # 此处用于在webserver 端创建的agent调用方法返回结果
        task['task_id']=uuid.uuid4().hex  # 用于生成相关的任务id
        return   self.store.add_task(task)  # 此处用于调用相关配置
    def get_agents(self):
        return  self.store.get_agents()
    def get_task(self,agent_id):
        return  self.store.get_task(agent_id)

5 处理服务端接受result 消息处理机制

master/handler.py中配置

#!/usr/bin/poython3.6
#conding:utf-8
from   utils  import  getlogger
from  .storage import  Storage
import  uuid

log=getlogger('handler')

class Handler(object):
    def __init__(self):
        self.store=Storage()
    def send(self,msg):  # 定义一个可调用的基础函数,此处的msg及就是对应的函数
        log.info('客户端agent发送消息为:{}'.format(msg))
        try:
            if  msg['type']  in  {'hearbeat','register'}:
                payload=msg['payload']
                info={'hostname' :payload['hostname'],'ip' :payload['ip']}
                self.store.reg_hb(payload['id'],info)
                log.info("客户端数据列表为:{}".format(self.store.agents))  # 客户端的列表
                return   "agent信息为: {}".format(msg)
            elif  msg['type']=="result":  # 此处用于处理相关返回信息
                self.store.result(msg['payload'])  # 调用对应方法 
        except Exception  as e:
            log.error("注册客户端信息错误为:{}".format(e))
            return  "Bad  Request...."
    def add_task(self,task):   # 此处用于在webserver 端创建的agent调用方法返回结果
        task['task_id']=uuid.uuid4().hex  # 用于生成相关的任务id
        return   self.store.add_task(task)  # 此处用于调用相关配置
    def get_agents(self):
        return  self.store.get_agents()
    def get_task(self,agent_id):
        return  self.store.get_task(agent_id)
    def get_result(self,task_id):  # 此处返回对应的值
        return  self.store.get_result(task_id)

master/stroage.py端配置

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task
from  .state import   *
class Storage:
    def  __init__(self):
        self.agents={}  # 此处用于存储用户信息
        self.tasks={}  # 此处用于存储作业信息
        self.result={}  # 用于存储agent端返回的结果
    def reg_hb(self,agent_id,info):  # id 及就是客户端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now().timestamp(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 读不到置False,读到了不变
    def  get_agents(self):
        return   self.agents
    def add_task(self,task:dict):  # 此处用于从客户端获取相关的数据
        t=Task(**task)  # 此处进行参数解构
        self.tasks[t.id]=t
        return  t.id  # 此处用于获取处理id
    @property
    def itme_task(self):
        yield  from  (task  for  task  in  self.tasks.values())  # 此处返回task
    def get_task(self,agent_id):
        for  task  in  self.itme_task:
            if agent_id  in  task.targets:  # 此处用于判断当前节点接入任务情况
                return   [task.id,task.script,task.timeout]
    def add_result(self,payload:dict):
        self.result[payload['id']]=payload  # 此处以task_id 为键,以payload为值进行处理
    def get_result(self,task_id:dict):
        return self.result.get(task_id['task_id'])  # task_id,获取对应的payload值

五 web端配置和处理

1 概述

用户通过WEB(HTTP)提交新的任务,任务json信息有:
1 任务脚本script,base64编码
2 超时时间timeout
3 并行度 parallel
4 失败率 fail_rate
5 失败次数 fail_count
6 targets 是跑在agent上的agent_id 列表,可以让用户看到一个列表,通过列表的勾选来完成相关的操作

2 代码实现

根目录创建appwebserver.py配置

简单任务调度系统

1 获取agent相关列表

#!/usr/bin/poython3.6
#conding:utf-8
import  zerorpc
from  aiohttp  import  request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"

client=zerorpc.Client()
client.connect(CONN_URL)
async  def  targetshandler(request:web.Request):
    txt=client.get_agents()  #通过zerorpc调用master端接口
    return  web.json_response(txt)  # 返回json端数据

app=web.Application()

app.router.add_get('/task/targets',targetshandler)  # 使用get方法进行处理

2 提交任务端配置

1 客户端数据如下
{
    "script"  : "echo  hello",
    "timeout" :20,
    "targets"  :[]
}
2 添加提交数据接口
async def  taskhandler(request:web.Request):
    j = await  request.json()  # 获取post 提交的数据,用于task任务数据生成
    txt=client.add_task(j)
    return   web.Response(text=txt,status=201)

app.router.add_post('/task',taskhandler)

3 添加获取执行结果配置

async   def  taskresult(request:web.Request):
    j = await  request.json()
    txt =client.get_result(j)
    return   web.json_response(txt)

app.router.add_post('/result',taskresult)

4 整体代码如下

#!/usr/bin/poython3.6
#conding:utf-8
import  zerorpc
from  aiohttp  import  request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"

client=zerorpc.Client()
client.connect(CONN_URL)
async  def  targetshandler(request:web.Request):
    txt=client.get_agents()  #通过zerorpc调用master端接口
    return  web.json_response(txt)  # 返回json端数据

app=web.Application()

app.router.add_get('/task/targets',targetshandler)  # 使用get方法进行处理

async def  taskhandler(request:web.Request):
    j = await  request.json()
    txt=client.add_task(j)
    return   web.Response(text=txt,status=201)

app.router.add_post('/task',taskhandler)

async   def  taskresult(request:web.Request):
    j = await  request.json()
    txt =client.get_result(j)
    return   web.json_response(txt)

app.router.add_post('/result',taskresult)

if __name__ == "__main__":
    web.run_app(app,host='0.0.0.0',port=80)

3 测试结果如下

简单任务调度系统

六 处理数据和节点状态

1 状态管理类型

1 节点状态

当节点在进行相关事件调度处理时,其状态应该是RUNNING状态,当处理完成后,其状态应该恢复称为WAITING状态。

2 task 任务状态

当当前agent下的所有该任务都执行完成时的状态,此处设计较为简单,只是全部执行就将其状态置位成功,否则为RUNNING状态或者WAITING,当有一个agent领取任务时,其状态将被置为RUNNING。

3 task中对应的agent的状态

及就是当前节点执行当前任务的状态,此状态保存在task中的targets字典中,用于对其客户端执行结果进行判断而获取其对应状态。

2 客户端调整代码

主要是cm.py调整如下

#!/usr/bin/poython3.6
#conding:utf-8
import   zerorpc   #添加模块
import  threading  # 用于处理中断相关
from  .msg import  Messgae  # 获取消息
from  .state import  *
from  .config import  CONN_URL
from  .executor import   Executor
from  utils  import getlogger

class  Conn_Manager:
    def __init__(self,timeout=3):
        self.timeout=timeout
        self.client=zerorpc.Client()
        self.event=threading.Event()
        self.message=Messgae('/var/log/mschedule/uuid')  # 此处用于初始化消息
        self.log=getlogger('agent')  # 此处填写相关的log日志名称
        self.state=WAITING
        self.exec=Executor()
    def start(self):
        try:
            self.event.clear()
            self.client.connect(CONN_URL)  # 链接处理
            self.log.info('注册消息发送 {}'.format(self.client.send(self.message.reg())))  # 发送心跳信息
            self.client.send(self.message.reg())  #处理注册消息
            while  not self.event.wait(self.timeout):  # 等待的时间
                self.log.info('心跳消息发送 {}'.format(self.client.send(self.message.hearbeat())))  # 发送心跳信息
                if  self.state == WAITING:  # 如果此处是空闲状态,则进行领任务处理
                    print('获取任务task')
                    task = self.client.get_task(self.message.id)  # 此处返回三个参数,1 为taskid,二是script ,三是timeout
                    if task:  # 领取成功,则进行执行相关任务.并上传至服务器端其状态
                        self.state = RUNNING  # 此处任务成功的情况
                        code,output=self.exec.run(task[1],task[2])
                        self.client.send(self.message.result(task[0], code, output))
                        self.state=WAITING  #状态更新为当前正常状态
                    else:
                        return   "目前无消息"
        except  Exception as e:
            self.event.set()
            raise  e  # 此处是抛出异常到上一级
    def shutdown(self):
        self.log.info("关闭操作")
        self.client.close()
        self.event.set()

3 master端代码调整

master/storage.py

#!/usr/bin/poython3.6
#conding:utf-8

import  datetime
from  .task import  Task
from  .state import   *
from utils  import  getlogger
log=getlogger('storage')
class Storage:
    def  __init__(self):
        self.agents={}  # 此处用于存储用户信息
        self.tasks={}  # 此处用于存储作业信息
        self.result={}  # 用于存储agent端返回的结果
        self.task_state=0 # 用于处理当所有agent状态都修改为成功或失败时将task的状态也进行相关的修改
    def reg_hb(self,agent_id,info):  # id 及就是客户端的id ,info 及就是host和ip地址
        self.agents[agent_id] = {
            'heaerbeat' : datetime.datetime.now().timestamp(),
            'info' :info,
            'busy':self.agents.get(agent_id,{}).get('busy',False)
        }
        # busy 读不到置False,读到了不变
    def  get_agents(self):
        return   self.agents
    def add_task(self,task:dict):  # 此处用于从客户端获取相关的数据
        t=Task(**task)  # 此处进行参数解构
        self.tasks[t.id]=t
        return  t.id  # 此处用于获取处理id
    @property
    def itme_task(self):
        yield  from  (task  for  task  in  self.tasks.values()  if  task.state  in  {WAITING,RUNNING})  # 此处返回task,当其中有成功或者失败时,则不用进行相关的操作处理
        #当为WAITING或者RUNNING 时,则进行相关的操作,其他情况则不进行相关操作
    def get_task(self,agent_id):
        for  task  in  self.itme_task:
            if agent_id  in  task.targets:  # 此处用于判断当前节点接入任务情况
                if task.state==WAITING:
                    task.state=RUNNING  #当前消息的状态
                task.targets[agent_id]['state']=RUNNING  # 此处是指此消息中的agent是否执行的状态的处理,若获取了,则此处的状态为RUNNING
                return   [task.id,task.script,task.timeout]
    def add_result(self,payload:dict):
        for task in self.itme_task:
            if  payload['code']==0:
                task.targets[payload['agent_id']]['state']=SUCCEED  # 此处是指对此消息进行处理,若code=0,则表示客户端执行成功,若为1,则表示失败
                self.task_state+=1
            else:
                task.targets[payload['agent_id']]['state']= FAILED#
                self.task_state+=1
            if self.task_state==task.target_count:
                task.state=SUCCEED
                self.task_state=0
            payload['agent_state']=task.targets[payload['agent_id']]['state']
        log.info("当前消息内容为:{}".format(self.result))
        self.result[payload['id']]=payload  # 此处以task_id 为键,以payload为值进行处理
    def get_result(self,task_id:dict):
        task_id=task_id['task_id']
        return self.result.get(task_id)  # task_id,获取对应的payload值

4 webserver端代码调整如下

webappserver.py

#!/usr/bin/poython3.6
#conding:utf-8
import  zerorpc
from  aiohttp  import  request,web_response,web,log
CONN_URL="tcp://127.0.0.1:9000"

client=zerorpc.Client()
client.connect(CONN_URL)
async  def  targetshandler(request:web.Request):
    txt=client.get_agents()  #通过zerorpc调用master端接口
    return  web.json_response(txt)  # 返回json端数据

app=web.Application()

app.router.add_get('/task/targets',targetshandler)  # 使用get方法进行处理

async def  taskhandler(request:web.Request):
    j = await  request.json()
    txt=client.add_task(j)
    return   web.Response(text=txt,status=201)

app.router.add_post('/task',taskhandler)

async   def  taskresult(request:web.Request):
    j = await  request.json()
    txt =client.get_result(j)
    if txt['code']  !=0:
        txt['output']='参数不正确,请重新输入'
    return   web.json_response(txt)

app.router.add_post('/result',taskresult)

if __name__ == "__main__":
    web.run_app(app,host='0.0.0.0',port=80)

5 结果如下

简单任务调度系统


网站栏目:简单任务调度系统
分享地址:http://kswjz.com/article/pssgdp.html
扫二维码与项目经理沟通

我们在微信上24小时期待你的声音

解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流