Example: Python-based notification subscriber
About the example
websocket-client-interfaces.py
, is a Python script that creates an interactive client that subscribes to notifications about interfaces:
Python 3.4 or later is required.
The script uses the Python 3 versions of the Tornado module to do create and handle the HTTP requests and WebSocket secure connection. For more information about Tornado, see:
The script uses the Python 3 versions of the requests module to send REST API requests to the switch. For more information about the requests module, see:
Access to the switch REST API must be enabled on the VRF through which this script will connect to the switch.
In the script, notice the following:
The value of user name and password used in the login routine is set in the USER and PASSWORD parameters. Hewlett Packard Enterprise recommends that you modify the script to collect the user name and password interactively, and pass them as data so they do not show in the system logs in cleartext.
The session cookie is created as part of the
login
routine and is stored incookie_header
.The WebSocket secure connection is made by the
connect
routine calling the Tornadowebsocket_connect
function. The session cookie,cookie_header
, is passed as part of thehttp_request
created by theHTTPRequest
function.The list of topics is collected from the input by the following routine:
def collect_topics(args): topics_list = [] if len(args) > 2: length = len(args) for i in range(2, length): topics_list.append(args[i]) return topics_list
The subscribe message is created in the following routine:
def create_json_dict(self, topics_list): json_dict = dict() json_dict["type"] = "subscribe" topic_list = [] for i in range(len(topics_list)): topic_dict = dict() topic_dict["name"] = topics_list[i] topic_list.append(topic_dict) json_dict["topics"] = topic_list return json_dict
Received messages are handled in the lines that begin with the following:
def run(self, topics_list):
Example Python script
# This script is executed as - python websocket-client.py {switchNotificatonURL} {topicURI(s)}
# Example of executing the script with multiple topics:
# python websocket-client.py
# "wss://{ipAddress}:{port}/rest/v1/notification"
# "/rest/v1/system/interfaces/1%2F1%2F1"
# "/rest/v1/system/interfaces/1%2F1%2F2"
#
#
from requests import post
from tornado import escape
import json
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado import gen
from tornado.websocket import websocket_connect
from tornado.httpclient import HTTPRequest
import sys
import traceback
from ssl import PROTOCOL_SSLv23
USER = 'admin3'
PASSWORD = 'admin3F@rM#'
PROXY_DICT = {'http': None, 'https': None}
REQUEST_TIMEOUT = 50
CONNECT_TIMEOUT = 50
class Client(object):
def __init__(self, url, timeout, topics_list):
self.url = url
self.timeout = timeout
self.ioloop = IOLoop.instance()
self.ws = None
self.cookie_header = self.login()
self.count = 0
self.connect(url, self.cookie_header, topics_list)
self.ioloop.start()
@gen.coroutine
def connect(self, ws_uri, cookie_header, topics_list):
print("trying to connect")
try:
http_request = HTTPRequest(url=ws_uri,
headers=cookie_header,
follow_redirects=True,
ssl_options={"ssl_version":
PROTOCOL_SSLv23},
validate_cert=False)
self.ws = yield websocket_connect(http_request)
except Exception as e:
print("connection error" + str(e))
else:
print("connected")
self.run(topics_list)
@gen.coroutine
def run(self, topics_list):
json_dict = self.create_json_dict(topics_list)
self.ws.write_message(escape.utf8(json.dumps(json_dict)))
while True:
msg = yield self.ws.read_message()
self.count = self.count + 1
print(msg)
if self.count == 1:
msg_in_json = self.check_if_JSON(msg)
if msg_in_json is not None:
success_test = self.check_if_success(msg_in_json)
if success_test:
print("PASS - Initial return JSON")
else:
print("FAIL - Initial return JSON")
if msg is None:
print("connection closed")
self.ws = None
break
def check_if_JSON(self, result):
try:
msg_json = json.loads(result)
except ValueError:
print("The message received is not a valid JSON")
return msg_json
def check_if_success(self, json_response):
pass_type = pass_resource = False
if "type" in json_response:
type_msg = json_response["type"]
if type_msg == "success":
pass_type = True
if "data" in json_response:
for each in json_response['data']:
if "resources" in each:
pass_resource = True
return pass_type and pass_resource
def login(self, username=None, password=None, proxies=PROXY_DICT):
if username is not None:
assert password is not None, "Must provide password for Login"
if not username:
username = USER
if not password:
password = PASSWORD
params = {'username': username,
'password': password}
login_url = NOTIFICATION_URL.replace("wss", "https")
login_url = login_url.replace("notification", "login")
login_header = {'Content-Type': 'application/x-ww-form-urlencoded'}
response = post(login_url, verify=False, headers=login_header,
params=params, proxies=proxies)
cookie_header = {'Cookie': response.headers['set-cookie']}
return cookie_header
def create_json_dict(self, topics_list):
json_dict = dict()
json_dict["type"] = "subscribe"
topic_list = []
for i in range(len(topics_list)):
topic_dict = dict()
topic_dict["name"] = topics_list[i]
topic_list.append(topic_dict)
json_dict["topics"] = topic_list
return json_dict
def collect_topics(args):
topics_list = []
if len(args) > 2:
length = len(args)
for i in range(2, length):
topics_list.append(args[i])
return topics_list
if __name__ == "__main__":
try:
NOTIFICATION_URL = sys.argv[1]
topics = collect_topics(sys.argv)
client = Client(NOTIFICATION_URL, 10, topics)
except KeyboardInterrupt:
print("Shutdown requested...exiting")
except Exception:
traceback.print_exc(file=sys.stdout)
sys.exit(0)