最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
问答文章1 问答文章501 问答文章1001 问答文章1501 问答文章2001 问答文章2501 问答文章3001 问答文章3501 问答文章4001 问答文章4501 问答文章5001 问答文章5501 问答文章6001 问答文章6501 问答文章7001 问答文章7501 问答文章8001 问答文章8501 问答文章9001 问答文章9501
当前位置: 首页 - 科技 - 知识百科 - 正文

如何利用python微信公众号报警(代码)

来源:懂视网 责编:小采 时间:2020-11-27 14:25:41
文档

如何利用python微信公众号报警(代码)

如何利用python微信公众号报警(代码):微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回
推荐度:
导读如何利用python微信公众号报警(代码):微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回

微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。

其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。

而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。

为兼容这两种方式,因此按照订阅号的方式处理。

处理办法与接口文档中的要求相同:

为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。

而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数

据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

设计思路和使用方法:

自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

尽可能的保证Class中每一个可执行的函数单独调用都能成功。

Class中只将真正能被用到的方法和变量设置为public的。

使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使

用WeiXinTokenClass().get()方法即可得到access_token。

#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
import os
import sqlite3
import sys
import urllib
import urllib2
import json
import datetime
# import time
enable_debug = True
def debug(msg, code=None):
 if enable_debug:
 if code is None:
 print "message: %s" % msg
 else:
 print "message: %s, code: %s " % (msg, code)
AUTHOR_MAIL = "uberurey_ups@163.com"
weixin_qy_CorpID = "your_corpid"
weixin_qy_Secret = "your_secret"
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# Database
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases
DATABASES = {
 'default': {
 'ENGINE': 'db.backends.sqlite3',
 'NAME': os.path.join(BASE_DIR, '.odbp_db.sqlite3'),
 }
}
sqlite3_db_file = str(DATABASES['default']['NAME'])
def sqlite3_conn(database):
 try:
 conn = sqlite3.connect(database)
 except sqlite3.Error:
 print >> sys.stderr, """
 There was a problem connecting to Database:
 %s
 The error leading to this problem was:
 %s
 It's possible that this database is broken or permission denied.
 If you cannot solve this problem yourself, please mail to:
 %s
 """ % (database, sys.exc_value, AUTHOR_MAIL)
 sys.exit(1)
 else:
 return conn
def sqlite3_commit(conn):
 return conn.commit()
def sqlite3_close(conn):
 return conn.close()
def sqlite3_execute(database, sql):
 try:
 sql_conn = sqlite3_conn(database)
 sql_cursor = sql_conn.cursor()
 sql_cursor.execute(sql)
 sql_conn.commit()
 sql_conn.close()
 except sqlite3.Error as e:
 print e
 sys.exit(1)
def sqlite3_create_table_token():
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (
 "id" INTEGER ,
 "access_token" TEXT,
 "expires_in" TEXT,
 "expires_on" TEXT,
 "is_expired" INTEGER
 )
 ;
 ''')
 sqlite3_commit(sql_conn)
 sqlite3_close(sql_conn)
def sqlite3_create_table_account():
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (
 "id" INTEGER,
 "name" TEXT,
 "corpid" TEXT,
 "secret" TEXT,
 "current" INTEGER
 )
 ;
 ''')
 sqlite3_commit(sql_conn)
 sqlite3_close(sql_conn)
def sqlite3_create_tables():
 print "sqlite3_create_tables"
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (
 "id" INTEGER ,
 "access_token" TEXT,
 "expires_in" TEXT,
 "expires_on" TEXT,
 "is_expired" INTEGER
 )
 ;
 ''')
 sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (
 "id" INTEGER,
 "name" TEXT,
 "corpid" TEXT,
 "secret" TEXT,
 "current" INTEGER
 )
 ;
 ''')
 sqlite3_commit(sql_conn)
 sqlite3_close(sql_conn)
def sqlite3_set_credential(corpid, secret):
 try:
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 sql_cursor.execute('''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES
 (1,
 'odbp',
 ?,
 ?,
 1)
''', (corpid, secret))
 sqlite3_commit(sql_conn)
 sqlite3_close(sql_conn)
 except sqlite3.Error:
 sqlite3_create_table_account()
 sqlite3_set_credential(corpid, secret)
def sqlite3_set_token(access_token, expires_in, expires_on, is_expired):
 try:
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 sql_cursor.execute('''INSERT INTO "weixin_token"
 ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES
 (
 1,
 ?,
 ?,
 ?,
 ?
 )
''', (access_token, expires_in, expires_on, is_expired))
 sqlite3_commit(sql_conn)
 sqlite3_close(sql_conn)
 except sqlite3.Error:
 sqlite3_create_table_token()
 sqlite3_set_token(access_token, expires_in, expires_on, is_expired)
def sqlite3_get_credential():
 try:
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 credential = sql_cursor.execute('''SELECT "corpid", "secret" FROM weixin_account WHERE current == 1;''')
 result = credential.fetchall()
 sqlite3_close(sql_conn)
 except sqlite3.Error:
 sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)
 return sqlite3_get_credential()
 else:
 if result is not None and len(result) != 0:
 return result
 else:
 print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
 sys.exit(1)
def sqlite3_get_token():
 try:
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 credential = sql_cursor.execute(
 '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''')
 result = credential.fetchall()
 sqlite3_close(sql_conn)
 except sqlite3.Error:
 info = sys.exc_info()
 print info[0], ":", info[1]
 else:
 if result is not None and len(result) != 0:
 return result
 else:
 # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL
 # sys.exit(1)
 return None
def sqlite3_update_token(access_token, expires_on):
 sql_conn = sqlite3_conn(sqlite3_db_file)
 sql_cursor = sql_conn.cursor()
 sql_cursor.execute('''UPDATE "weixin_token" SET
 access_token=?,
 expires_on=?
 WHERE _ROWID_ = 1;''', (access_token, expires_on)
 )
 sqlite3_commit(sql_conn)
 sqlite3_close(sql_conn)
class WeiXinTokenClass(object):
 def __init__(self):
 self.__corpid = None
 self.__corpsecret = None
 self.__use_persistence = True
 self.__access_token = None
 self.__expires_in = None
 self.__expires_on = None
 self.__is_expired = None
 if self.__use_persistence:
 self.__corpid = sqlite3_get_credential()[0][0]
 self.__corpsecret = sqlite3_get_credential()[0][1]
 else:
 self.__corpid = weixin_qy_CorpID
 self.__corpsecret = weixin_qy_Secret
 def __get_token_from_weixin_qy_api(self):
 parameters = {
 "corpid": self.__corpid,
 "corpsecret": self.__corpsecret
 }
 url_parameters = urllib.urlencode(parameters)
 token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?"
 url = token_url + url_parameters
 response = urllib2.urlopen(url)
 result = response.read()
 token_json = json.loads(result)
 if token_json['access_token'] is not None:
 get_time_now = datetime.datetime.now()
 # TODO(Guodong Ding) token will expired ahead of time or not expired after the time
 expire_time = get_time_now + datetime.timedelta(seconds=token_json['expires_in'])
 token_json['expires_on'] = str(expire_time)
 self.__access_token = token_json['access_token']
 self.__expires_in = token_json['expires_in']
 self.__expires_on = token_json['expires_on']
 self.__is_expired = 1
 try:
 token_result_set = sqlite3_get_token()
 except sqlite3.Error:
 token_result_set = None
 if token_result_set is None and len(token_result_set) == 0:
 sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired)
 else:
 if self.__is_token_expired() is True:
 sqlite3_update_token(self.__access_token, self.__expires_on)
 else:
 debug("pass")
 return
 else:
 if token_json['errcode'] is not None:
 print "errcode is: %s" % token_json['errcode']
 print "errmsg is: %s" % token_json['errmsg']
 else:
 print result
 def __get_token_from_persistence_storage(self):
 try:
 token_result_set = sqlite3_get_token()
 except sqlite3.Error:
 self.__get_token_from_weixin_qy_api()
 finally:
 if token_result_set is None:
 self.__get_token_from_weixin_qy_api()
 token_result_set = sqlite3_get_token()
 access_token = token_result_set[0][0]
 expire_time = token_result_set[0][1]
 else:
 access_token = token_result_set[0][0]
 expire_time = token_result_set[0][1]
 expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')
 now_time = datetime.datetime.now()
 if now_time < expire_time:
 # print "The token is %s" % access_token
 # print "The token will expire on %s" % expire_time
 return access_token
 else:
 self.__get_token_from_weixin_qy_api()
 return self.__get_token_from_persistence_storage()
 @staticmethod
 def __is_token_expired():
 try:
 token_result_set = sqlite3_get_token()
 except sqlite3.Error as e:
 print e
 sys.exit(1)
 expire_time = token_result_set[0][1]
 expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')
 now_time = datetime.datetime.now()
 if now_time < expire_time:
 return False
 else:
 return True
 def get(self):
 return self.__get_token_from_persistence_storage() 
Python实现通过微信企业号发送文本消息的Class
编程要点和调用方法:
支持发送中文,核心语句“payload = json.dumps(self.data, encoding='utf-8', ensure_ascii=False)”,关键字“python json 中文”
这个Class只有一个公共方法send()。
使用方法:import这个class,然后调用send方法即可,方法参数只需要两个,给谁(多UserID用"|"隔开),内容是什么,例如:
import odbp_sendMessage
msg = odbp_sendMessage.WeiXinSendMsgClass()
msg.send("dingguodong", "Python 大魔王!")
#!/usr/bin/python
# encoding: utf-8
# -*- coding: utf8 -*-
"""
Created by PyCharm.
File: LinuxBashShellScriptForOps:odbp_sendMessage.py
User: Guodong
Create Date: 2016/8/12
Create Time: 14:49
 """
import odbp_getToken
class WeiXinSendMsgClass(object):
 def __init__(self):
 self.access_token = odbp_getToken.WeiXinTokenClass().get()
 self.to_user = ""
 self.to_party = ""
 self.to_tag = ""
 self.msg_type = "text"
 self.agent_id = 2
 self.content = ""
 self.safe = 0
 self.data = {
 "touser": self.to_user,
 "toparty": self.to_party,
 "totag": self.to_tag,
 "msgtype": self.msg_type,
 "agentid": self.agent_id,
 "text": {
 "content": self.content
 },
 "safe": self.safe
 }
 def send(self, to_user, content):
 if to_user is not None and content is not None:
 self.data['touser'] = to_user
 self.data['text']['content'] = content
 else:
 print
 raise RuntimeError
 import requests
 import json
 url = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
 querystring = {"access_token": self.access_token}
 payload = json.dumps(self.data, encoding='utf-8', ensure_ascii=False)
 headers = {
 'content-type': "application/json",
 'cache-control': "no-cache",
 }
 response = requests.request("POST", url, data=payload, headers=headers, params=querystring)
 return_content = json.loads(response.content)
 if return_content["errcode"] == 0 and return_content["errmsg"] == "ok":
 print "Send successfully! %s " % return_content
 else:
 print "Send failed! %s " % return_content
python调用mongodb发送微信企业号
python2.x
注意:data变量里, agent_id为刚刚创建的应用id(可在web页面看到)
toparty即为目标部门,或者可以用touser,totag指定目标账户
比较简单的调用,已实测,可以使用。
#coding:utf-8
import sys
import requests
import json
from pymongo import MongoClient
reload(sys)
sys.setdefaultencoding('utf-8')
class Weixin(object):
 def __init__(self, corp_id, corp_secret):
 self.token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' %(corp_id, corp_secret)
 self.send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token='
 def get_token(self):
 try:
 r = requests.get(self.token_url, timeout=10)
 except Exception as e:
 print e
 sys.exit(1)
 if r.status_code == requests.codes.ok:
 data = r.json()
 if data.get('errcode'):
 print data['errmsg']
 sys.exit(1)
 return data['access_token']
 else:
 print r.status_code
 sys.exit(1)
 def send(self,message):
 url = self.send_url + self.get_token()
 data = {
 "touser": "hequan2011",
 "msgtype": "text",
 "agentid": "0",
 "text": {
 "content": message
 },
 "safe":"0"
 }
 send_data = json.dumps(data,ensure_ascii=False)
 try:
 r = requests.post(url, send_data)
 except Exception, e:
 print e
 sys.exit(1)
 if r.status_code == requests.codes.ok:
 print r.json()
 else:
 print r.code
 sys.exit(1)
corpid = 'xxxxxxxxxxx'
corpsecret = 'xxxxxxxxxxxxxxxxx'
client = MongoClient('mongodb://user:password@127.0.0.1:27017/')
db = client.ku
collection = db.biao
a = []
for data in collection.find():
 a.append(data)
l = a[0]
g = l
z = str(g["name"])
z1 = int(g["jg"])
print z
msg = "1:{0}
 2:{1}
".format(z,z1)
w = Weixin(corpid,corpsecret)
w.send(msg)
ZABBIX 微信报警 插件
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# __author__ = '懒懒的天空'
import requests
import sys
import json
from conf.INIFILES import read_config, write_config
import os
import datetime
from conf.BLog import Log
reload(sys)
sys.setdefaultencoding('utf-8')
class WeiXin(object):
 def __init__(self, corpid, corpsecret): # 初始化的时候需要获取corpid和corpsecret,需要从管理后台获取
 self.__params = {
 'corpid': corpid,
 'corpsecret': corpsecret
 }
 self.url_get_token = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken'
 self.url_send = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?'
 self.__token = self.__get_token()
 self.__token_params = {
 'access_token': self.__token
 }
 def __raise_error(self, res):
 raise Exception('error code: %s,error message: %s' % (res.json()['errcode'], res.json()['errmsg']))
 global senderr
 sendstatus = False
 senderr = 'error code: %s,error message: %s' % (res.json()['errcode'], res.json()['errmsg'])
 def __get_token(self):
 # print self.url_get_token
 headers = {'content-type': 'application/json'}
 res = requests.get(self.url_get_token, headers=headers, params=self.__params)
 try:
 return res.json()['access_token']
 except:
 self.__raise_error(res.content)
 def send_message(self, agentid, messages, userid='', toparty=''):
 payload = {
 'touser': userid,
 'toparty': toparty,
 'agentid': agentid,
 "msgtype": "news",
 "news": messages
 }
 headers = {'content-type': 'application/json'}
 data = json.dumps(payload, ensure_ascii=False).encode('utf-8')
 params = self.__token_params
 res = requests.post(self.url_send, headers=headers, params=params, data=data)
 try:
 return res.json()
 except:
 self.__raise_error(res)
def main(send_to, subject, content):
 try:
 global sendstatus
 global senderr
 data = ''
 messages = {}
 body = {}
 config_file_path = get_path()
 CorpID = read_config(config_file_path, 'wei', "CorpID")
 CorpSecret = read_config(config_file_path, 'wei', "CorpSecret")
 agentid = read_config(config_file_path, 'wei', "agentid")
 web = read_config(config_file_path, 'wei', "web")
 content = json.loads(content)
 messages["message_url"] = web
 body["url"] = web + "history.php?action=showgraph&itemids[]=" + content[u'监控ID']
 warn_message = ''
 if content[u'当前状态'] == 'PROBLEM':
 body["title"] = "服务器故障"
 warn_message += subject + '
'
 warn_message += '详情:
'
 warn_message += '告警等级:'+ content[u'告警等级'] + '
'
 warn_message += '告警时间:'+ content[u'告警时间'] + '
'
 warn_message += '告警地址:'+ content[u'告警地址'] + '
'
 warn_message += '持续时间:'+ content[u'持续时间'] + '
'
 warn_message += '监控项目:'+ content[u'监控项目'] + '
'
 warn_message += content[u'告警主机'] + '故障(' + content[u'事件ID']+ ')'
 else:
 body["title"] = "服务器恢复"
 warn_message += subject + '
'
 warn_message += '详情:
'
 warn_message += '告警等级:'+ content[u'告警等级'] + '
'
 warn_message += '恢复时间:'+ content[u'恢复时间'] + '
'
 warn_message += '告警地址:'+ content[u'告警地址'] + '
'
 warn_message += '持续时间:'+ content[u'持续时间'] + '
'
 warn_message += '监控项目:'+ content[u'监控项目'] + '
'
 warn_message += content[u'告警主机'] + '恢复(' + content[u'事件ID']+ ')'
 body['description'] = warn_message
 data = []
 data.append(body)
 messages['articles'] = data
 wx = WeiXin(CorpID, CorpSecret)
 data = wx.send_message(toparty=send_to, agentid=agentid, messages=messages)
 sendstatus = True
 except Exception, e:
 senderr = str(e)
 sendstatus = False
 logwrite(sendstatus, data)
def get_path():
 path = os.path.dirname(os.path.abspath(sys.argv[0]))
 config_path = path + '/config.ini'
 return config_path
def logwrite(sendstatus, content):
 logpath = '/var/log/zabbix/weixin'
 if not sendstatus:
 content = senderr
 t = datetime.datetime.now()
 daytime = t.strftime('%Y-%m-%d')
 daylogfile = logpath+'/'+str(daytime)+'.log'
 logger = Log(daylogfile, level="info", is_console=False, mbs=5, count=5)
 os.system('chown zabbix.zabbix {0}'.format(daylogfile))
 logger.info(content)
if __name__ == "__main__":
 if len(sys.argv) > 1:
 send_to = sys.argv[1]
 subject = sys.argv[2]
 content = sys.argv[3]
 logwrite(True, content)
 main(send_to, subject, content) 
python实现微信企业号的文本消息推送
#!/usr/bin/python
# _*_coding:utf-8 _*_
import urllib2
import json
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
def gettoken(corpid, corpsecret):
 gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret
 try:
 token_file = urllib2.urlopen(gettoken_url)
 except urllib2.HTTPError as e:
 print e.code
 print e.read().decode("utf8")
 sys.exit()
 token_data = token_file.read().decode('utf-8')
 token_json = json.loads(token_data)
 token_json.keys()
 token = token_json['access_token']
 return token
def senddata(access_token, user, party, agent, subject, content):
 send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
 send_values = "{"touser":"" + user + "","toparty":"" + party + "","totag":"","msgtype":"text","agentid":"" + agent + "","text":{"content":"" + subject + "
" + content + ""},"safe":"0"}"
 send_request = urllib2.Request(send_url, send_values)
 response = json.loads(urllib2.urlopen(send_request).read())
 print str(response)
if __name__ == '__main__':
 user = str(sys.argv[1]) # 参数1:发送给用户的账号,必须关注企业号,并对企业号有发消息权限
 party = str(sys.argv[2]) # 参数2:发送给组的id号,必须对企业号有权限
 agent = str(sys.argv[3]) # 参数3:企业号中的应用id
 subject = str(sys.argv[4]) # 参数4:标题【消息内容的一部分】
 content = str(sys.argv[5]) # 参数5:文本具体内容
 corpid = 'CorpID' # CorpID是企业号的标识
 corpsecret = 'corpsecretSecret' # corpsecretSecret是管理组凭证密钥
 try:
 accesstoken = gettoken(corpid, corpsecret)
 senddata(accesstoken, user, party, agent, subject, content)
 except Exception, e:
 print str(e) + "Error Please Check "corpid" or "corpsecret" Config" 
Nagios调用Python程序控制微信公众平台发布报警信息
vim Notify-host-by-weixin-party.py 
import urllib.request
import json
import sys
#以上是导入模块
#创建获取AccessToken的方法
def gettoken(corp_id,corp_secret):
 gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret
 try:
 token_file = urllib.request.urlopen(gettoken_url)
 except urllib.error.HTTPError as e:
 print(e.code)
 print(e.read().decode("utf8"))
 token_data = token_file.read().decode('utf-8')
 token_json = json.loads(token_data)
 token_json.keys()
 token = token_json['access_token']
 return token
#这里是发送消息的方法
def senddata(access_token,notify_str):
 send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
#我传入的参数是一段字符串每个信息用separator连起来,只要再用字符串的split("separator")方法分开信息就可以了。
 notifydata = notify_str.split("separator")
 party = notifydata[0]
 cationtype = notifydata[1]
 name = notifydata[2]
 state = notifydata[3]
 address = notifydata[4]
 output = notifydata[5]
 datatime = notifydata[6]
# content = '[擦汗]Host Notification[擦汗]

类型: ' + cationtype + '
主机名: ' + name + '
状态: ' + state + '
IP地址: ' + address + '
摘要: ' + output + '
时间: ' + datatime + '
'
 if cationtype == "RECOVERY":
 content = '[嘘]' + address + ' is ' + state + '[嘘]

IP地址: ' + address + '
主要用途: ' + name + '
当前状态: ' + state + '

日志摘要: ' + output + '
检测时间: ' + datatime + '
'
 else:
 content = '[擦汗]' + address + ' is ' + state + '[擦汗]

IP地址: ' + address + '
主要用途: ' + name + '
当前状态: ' + state + '

日志摘要: ' + output + '
检测时间: ' + datatime + '
'
 send_values = {
 "toparty":party,
 "totag":"2",
 "msgtype":"text",
 "agentid":"15",
 "text":{
 "content":content
 },
 "safe":"0"
 }
 send_data = json.dumps(send_values, ensure_ascii=False).encode(encoding='UTF8')
#设置为非ascii解析,使其支持中文
 send_request = urllib.request.Request(send_url, send_data)
 response = urllib.request.urlopen(send_request)
#这个是返回微信公共平台的信息,调试时比较有用
 msg = response.read()
 return msg
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
 reload(sys)
 sys.setdefaultencoding(default_encoding)
#我编辑的脚本是要获取nagios传入的一段参数的(字符串),下面这条代码是获取执行脚本后获取的第一个参数(经测试nagios只能传入一个参进python,所以把所有包括用户名跟报警主机报警信息放进一个字符串里)
notifystr = str(sys.argv[1])
corpid = 'wxb6162862801114c9da602' 
corpsecret = '2nCsNcHxepBCV4U9Lcf-23By1RGzU1Zs422tdJpKTQzqjQ1b26IFxP76ydG2rKkchGN6E'
accesstoken = gettoken(corpid,corpsecret)
msg = senddata(accesstoken,notifystr)
print(msg)
[root@localhost python]# vim Notify-service-by-weixin-party.py 
import urllib.request
import json
import sys
def gettoken(corp_id,corp_secret):
 gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret
 try:
 token_file = urllib.request.urlopen(gettoken_url)
 except urllib.error.HTTPError as e:
 print(e.code)
 print(e.read().decode("utf8"))
 token_data = token_file.read().decode('utf-8')
 token_json = json.loads(token_data)
 token_json.keys()
 token = token_json['access_token']
 return token
def senddata(access_token,notify_str):
 send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
 notifydata = notify_str.split("separator")
 party = notifydata[0]
 cationtype = notifydata[1]
 desc = notifydata[2]
 alias = notifydata[3]
 address = notifydata[4]
 state = notifydata[5]
 datatime = notifydata[6]
 output = notifydata[7]
# content ='[擦汗]Service Notification [擦汗]

类型: ' + cationtype + '

服务名: ' + desc + '
主机名: ' + alias + '
IP址: ' + address + '
状态: ' + state + '
时间: ' + datatime + '
摘要:
' + output + '
'
 if cationtype == "RECOVERY":
 content ='[鼓掌]' + desc + ' is ' + state + '[鼓掌]

IP地址: ' + address + '
主要用途: ' + alias + '
服务状态: ' + desc + ' is ' + state + '
检测时间: ' + datatime + '
日志摘要: 
' + output + '
'
 else:
 content ='[擦汗]' + desc + ' is ' + state + '[擦汗]

IP地址: ' + address + '
主要用途: ' + alias + '
服务状态: ' + desc + ' is ' + state + '
检测时间: ' + datatime + '
日志摘要: 
' + output + '
'
 send_values = {
 "toparty":party,
 "totag":"2",
 "msgtype":"text",
 "agentid":"15",
 "text":{
 "content":content
 },
 "safe":"0"
 }
 send_data = json.dumps(send_values, ensure_ascii=False).encode(encoding='UTF8')
 send_request = urllib.request.Request(send_url, send_data)
 response = urllib.request.urlopen(send_request)
 msg = response.read()
 return msg
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
 reload(sys)
 sys.setdefaultencoding(default_encoding)
notifystr = str(sys.argv[1])
corpid = 'wxb616286d28ds01114c9da602'
corpsecret = '2nCsNcHxepBCdtgV4U9Lcf-23By1RGzUgh1Zs422tdJpKTQzqjQ1b26IFxP76ydG2rKkchGN6E'
accesstoken = gettoken(corpid,corpsecret)
msg = senddata(accesstoken,notifystr)
print(msg)
shell和Python调用企业微信服务号进行报警
#!/bin/bash
corpid="wxd6b3"
corpsecret="aJTaPaGjW"
access_token=`curl -s "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpid&corpsecret=$corpsecret" |jq '.access_token' | awk -F'"' '{print $2}'`
curl -l -H "Content-type: application/json" -X POST -d '{"touser":"@all","msgtype":"text","toparty":"14","agentid":"14","text":{"content":"测试"} , "safe":"0"}' "
Python脚本如下:
# coding:utf-8
import sys
import urllib2
import time
import json
import requests
reload(sys)
sys.setdefaultencoding('utf-8')
#title = sys.argv[2] # 位置参数获取title 适用于zabbix
#content = sys.argv[3] # 位置参数获取content 适用于zabbix
title = "title 测试" # 位置参数获取title 适用于zabbix
content = "content 测试" # 位置参数获取content 适用于zabbix
class Token(object):
 # 获取token
 def __init__(self, corpid, corpsecret):
 self.baseurl = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}'.format(
 corpid, corpsecret)
 self.expire_time = sys.maxint
 def get_token(self):
 if self.expire_time > time.time():
 request = urllib2.Request(self.baseurl)
 response = urllib2.urlopen(request)
 ret = response.read().strip()
 ret = json.loads(ret)
 if 'errcode' in ret.keys():
 print >> ret['errmsg'], sys.stderr
 sys.exit(1)
 self.expire_time = time.time() + ret['expires_in']
 self.access_token = ret['access_token']
 return self.access_token
def send_msg(title, content):
 # 发送消息
 corpid = "" # 填写自己应用的
 corpsecret = "" # 填写自己应用的
 qs_token = Token(corpid=corpid, corpsecret=corpsecret).get_token()
 url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={0}".format(
 qs_token)
 payload = {
 "touser": "@all",
 "msgtype": "text",
 "agentid": "14",
 "text": {
 "content": "标题:{0}
 内容:{1}".format(title, content)
 },
 "safe": "0"
 }
 ret = requests.post(url, data=json.dumps(payload, ensure_ascii=False))
 print ret.json()
if __name__ == '__main__':
 # print title, content
 send_msg(title, content)
python利用微信订阅号报警
# coding=utf-8
import urllib
import urllib2
import cookielib
import json
import sys
data={'username':'yaokuaile-99',
 'pwd':'f4bb2d8abe7a799ad62495a912ae3363',
 'imgcode':'',
 'f':'json'
 }
cj=cookielib.LWPCookieJar()
opener=urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
urllib2.install_opener(opener)
def getToken():
 headers = {'Accept': 'application/json, text/javascript, */*; q=0.01',
 'Accept-Encoding': 'gzip,deflate,sdch',
 'Accept-Language': 'zh-CN,zh;q=0.8',
 'Connection': 'keep-alive',
 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
 'Content-Length': '74',
 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
 'Host': 'mp.weixin.qq.com',
 'Origin': 'https://mp.weixin.qq.com',
 'Referer': 'https://mp.weixin.qq.com/cgi-bin/loginpage?t=wxm2-login&lang=zh_CN',
 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36',
 'X-Requested-With': 'XMLHttpRequest',
 }
 req = urllib2.Request('https://mp.weixin.qq.com/cgi-bin/login?lang=zh_CN',urllib.urlencode(data),headers)
 ret = urllib2.urlopen(req)
 retread= ret.read()
 token = json.loads(retread)
 token=token['redirect_url'][44:]
 return token
### send msg
def sendWeixin(msg,token,tofakeid):
 msg = msg
 token = token
 data1 = {'type':'1','content':msg,'imgcode':'','imgcode':'','tofakeid':tofakeid,'f':'json','token':token,'ajax':'1'}
 headers = {'Accept':'*/*',
 'Accept-Encoding': 'gzip,deflate,sdch',
 'Accept-Language': 'zh-CN,zh;q=0.8',
 'Connection': 'keep-alive',
 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
 'Host': 'mp.weixin.qq.com',
 'Origin': 'https://mp.weixin.qq.com',
 'Referer': 'https://mp.weixin.qq.com/cgi-bin/singlemsgpage?msgid=&source=&count=20&t=wxm-singlechat&fromfakeid=150890&token=%s&lang=zh_CN',
 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/33.0.1750.154 Safari/537.36',
 'X-Requested-With':'XMLHttpRequest',
 }
 req2 = urllib2.Request('https://mp.weixin.qq.com/cgi-bin/singlesend?t=ajax-response&f=json&lang=zh_CN',urllib.urlencode(data1),headers)
 ret2=urllib2.urlopen(req2)
if __name__=='__main__':
 '''
 useage: ./send_wx.py msg
 '''
 token = getToken()
 msg = sys.argv[1:]
 msg = '
'.join(msg)
 tofakeid = '2443746922'
 sendWeixin(msg, token, tofakeid)
使用python实现微信模板消息
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import urllib2,json
import datetime,time
from config import *
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
class WechatPush():
 def __init__(self,appid,secrect,file_name):
 # 传入appid
 self.appid = appid
 # 传入密码
 self.secrect = secrect
 # 传入记录token和过期时间的文件名
 self.file_name=file_name
 def build_timestamp(self,interval):
 # 传入时间间隔,得到指定interval后的时间 格式为"2015-07-01 14:41:40"
 now = datetime.datetime.now()
 delta = datetime.timedelta(seconds=interval)
 now_interval=now + delta
 return now_interval.strftime('%Y-%m-%d %H:%M:%S')
 def check_token_expires(self):
 # 判断token是否过期
 with open(self.file_name,'r') as f:
 line=f.read()
 if len(line)>0:
 expires_time=line.split(",")[1]
 token=line.split(",")[0]
 else:
 return "","true"
 curr_time=time.strftime('%Y-%m-%d %H:%M:%S')
 # 如果过期返回false
 if curr_time>expires_time:
 return token,"false"
 # 没过期返回true
 else:
 return token,"true"
 def getToken(self):
 # 获取accessToken
 url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid='+self.appid + "&secret="+self.secrect
 try:
 f = urllib2.urlopen(url)
 s = f.read()
 # 读取json数据
 j = json.loads(s)
 j.keys()
 # 从json中获取token
 token = j['access_token']
 # 从json中获取过期时长
 expires_in =j['expires_in']
 # 将得到的过期时长减去300秒然后与当前时间做相加计算然后写入到过期文件
 write_expires=self.build_timestamp(int(expires_in-300))
 content="%s,%s" % (token,write_expires)
 with open(self.file_name,'w') as f:
 f.write(content)
 except Exception,e:
 print e
 return token
 def post_data(self,url,para_dct):
 """触发post请求微信发送最终的模板消息"""
 para_data = para_dct
 f = urllib2.urlopen(url,para_data)
 content = f.read()
 return content
 def do_push(self,touser,template_id,url,topcolor,data):
 '''推送消息 '''
 #获取存入到过期文件中的token,同时判断是否过期
 token,if_token_expires=self.check_token_expires()
 #如果过期了就重新获取token
 if if_token_expires=="false":
 token=self.getToken()
 # 背景色设置,貌似不生效 
 if topcolor.strip()=='':
 topcolor = "#7B68EE"
 #最红post的求情数据
 dict_arr = {'touser': touser, 'template_id':template_id, 'url':url, 'topcolor':topcolor,'data':data}
 json_template = json.dumps(dict_arr)
 requst_url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token="+token
 content = self.post_data(requst_url,json_template)
 #读取json数据
 j = json.loads(content)
 j.keys()
 errcode = j['errcode']
 errmsg = j['errmsg']
 #print errmsg
if __name__ == "__main__":
 def alarm(title,hostname,timestap,level,message,state,tail):
 """报警函数"""
 color="#FF0000"
 data={"first":{"value":title},"keyword1":{"value":hostname,"color":color},"keyword2":{"value":timestap,"color":color},"keyword3":{"value":level,"color":color},"keyword4":{"value":message,"color":color},"keyword5":{"value":state,"color":color},"remark":{"value":tail}}
 return data
 def recover(title,message,alarm_time,recover_time,continue_time,tail):
 """恢复函数"""
 re_color="#228B22"
 data={"first":{"value":title},"content":{"value":message,"color":re_color},"occurtime":{"value":alarm_time,"color":re_color},"recovertime":{"value":recover_time,"color":re_color},"lasttime":{"value":continue_time,"color":re_color},"remark":{"value":tail}}
 return data
 # data=alarm("测试的报警消息","8.8.8.8",time.ctime(),"最高级别","然并卵","挂了","大傻路赶紧处理")
 # 实例化类
 webchart=WechatPush(appid,secrect,file_name)
 url="http://www.xiaoniu88.com"
 print len(sys.argv)
 # 发送报警消息
 if len(sys.argv) == 9:
 title=sys.argv[1]
 hostname=sys.argv[2]
 timestap=sys.argv[3]
 level=sys.argv[4]
 message=sys.argv[5]
 state=sys.argv[6]
 tail=sys.argv[7]
 print "sys.argv[1]"+sys.argv[1]
 print "sys.argv[2]"+sys.argv[2]
 print "sys.argv[3]"+sys.argv[3]
 print "sys.argv[4]"+sys.argv[4]
 print "sys.argv[5]"+sys.argv[5]
 print "sys.argv[6]"+sys.argv[6]
 print "sys.argv[7]"+sys.argv[7]
 print "sys.argv[8]"+sys.argv[8]
 with open("/etc/zabbix/moniter_scripts/test.log",'a+') as f:
 f.write(title+"
")
 f.write(hostname+"
")
 f.write(timestap+"
")
 f.write(level+"
")
 f.write(message+"
")
 f.write(state+"
")
 f.write(tail+"
")
 f.write("%s_%s" % ("group",sys.argv[8])+"
")
 data=alarm(title,hostname,timestap,level,message,state,tail)
 group_name="%s_%s" % ("group",sys.argv[8])
 for touser in eval("%s_%s" % ("group",sys.argv[8])):
 webchart.do_push(touser,alarm_id,url,"",data)
 for touser in group_super:
 webchart.do_push(touser,alarm_id,url,"",data)
 #发送恢复消息
 elif len(sys.argv) == 8:
 title=sys.argv[1]
 message=sys.argv[2]
 alarm_time=sys.argv[3]
 recover_time=sys.argv[4]
 continue_time=sys.argv[5]
 tail=sys.argv[6]
 print "sys.argv[1]"+sys.argv[1]
 print "sys.argv[2]"+sys.argv[2]
 print "sys.argv[3]"+sys.argv[3]
 print "sys.argv[4]"+sys.argv[4]
 print "sys.argv[5]"+sys.argv[5]
 print "sys.argv[6]"+sys.argv[6]
 print "sys.argv[7]"+sys.argv[7]
 data=recover(title,message,alarm_time,recover_time,continue_time,tail)
 for touser in eval("%s_%s" % ("group",sys.argv[7])):
 webchart.do_push(touser,recover_id,url,"",data)
 for touser in group_super:
 webchart.do_push(touser,recover_id,url,"",data)
zabbix使用微信报警python脚本
#!/usr/bin/python
# coding: utf-8
#python2将zabbix报警信息发送到微信。
#by linwangyi #2016-01-18
import urllib,urllib2
import json
import sys
def gettoken(corpid,corpsecret):
 gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret
 try:
token_file = urllib2.urlopen(gettoken_url)
 except urllib2.HTTPError as e:
 print e.code
 print e.read().decode("utf8")
sys.exit()
 token_data = token_file.read().decode('utf-8')
 token_json = json.loads(token_data)
 token_json.keys()
 token = token_json['access_token']
 return token
def senddata(access_token,user,content):
 send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
 send_values = {
 "touser":user, #企业号中的用户帐号,在zabbix用户Media中配置,如果配置不正常,将按部门发送。
 "toparty":"1", #企业号中的部门id
 "msgtype":"text", #企业号中的应用id,消息类型。
 "agentid":"1",
 "text":{
 "content":content
 },
 "safe":"0"
 }
 send_data = json.dumps(send_values, ensure_ascii=False)
 send_request = urllib2.Request(send_url, send_data)
 response = json.loads(urllib2.urlopen(send_request).read())
 print str(response)
if __name__ == '__main__':
 user = str(sys.argv[1]) #zabbix传过来的第一个参数
 content = str(sys.argv[3]) #zabbix传过来的第三个参数
 corpid = 'XXXX' #CorpID是企业号的标识
 corpsecret = 'XXXXX' #corpsecretSecret是管理组凭证密钥
 accesstoken = gettoken(corpid,corpsecret)
 senddata(accesstoken,user,content)
#!/usr/bin/python3
# coding: utf-8
#python3将zabbix报警信息发送到微信。
#by http://sunday208.blog.51cto.com/ #2016-01-18
import urllib.request
import json
import sys
def gettoken(corp_id,corp_secret):
 gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret
 try:
 token_file = urllib.request.urlopen(gettoken_url)
 except urllib.error.HTTPError as e:
 print(e.code)
 print(e.read().decode("utf8"))
 token_data = token_file.read().decode('utf-8')
 token_json = json.loads(token_data)
 token_json.keys()
 token = token_json['access_token']
 return token
def senddata(access_token,user,content):
 try:
 send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token
 send_values = {
 "touser":user, #企业号中的用户帐号,在zabbix用户Media中配置,如果配置不正常,将按部门发送。
 "toparty":"1", #企业号中的部门id
 "msgtype":"text",
 "agentid":"1", #企业号中的应用id,消息类型。
 "text":{
 "content":content
 },
 "safe":"0"
 }
 send_data = json.dumps(send_values, ensure_ascii=False).encode(encoding='UTF8')
 send_request = urllib.request.Request(send_url, send_data)
 response = urllib.request.urlopen(send_request)
 msg = response.read()
 print("returned value : " + str(msg))
 except:
 print("returned value : " + str(msg))
default_encoding = 'utf-8'
if sys.getdefaultencoding() != default_encoding:
 reload(sys)
 sys.setdefaultencoding(default_encoding)
user = str(sys.argv[1]) #zabbix传过来的第一个参数
content = str(sys.argv[3]) #zabbix传过来的第三个参数
corpid = 'XXXX' #CorpID是企业号的标识
corpsecret = 'XXXXX' #corpsecretSecret是管理组凭证密钥
accesstoken = gettoken(corpid,corpsecret)
senddata(accesstoken,user,content)

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

文档

如何利用python微信公众号报警(代码)

如何利用python微信公众号报警(代码):微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回
推荐度:
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top