nmap工具详解

1.1 nmap基础

在运维过程中有时需要主机存活性探测,一般是namp、tcpdump命令结合使用,相关工具包安装

1
[root@ ]# yum install  nmap tcpdump

列出几种nmap命令语法

1
2
3
-sS/sT/sA/sW/sM: TCP SYN/Connect()/ACK/Window/Maimon scans
-sU: UDP Scan
-sP: ping Scan

下面我们在2台主机间探测,一主机发nmap探测,另一主机tcpdump抓包分析

在A主机正常发一个ping包看看正常情况下的icmp包
ping -c 1 10.17.200.36

在B主机抓包发现icmp包有去有回

1
2
3
4
5
[root@ ]# tcpdump -np -i ens192 src host 10.17.200.14
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on ens192, link-type EN10MB (Ethernet), capture size 262144 bytes
16:08:38.392418 IP 10.17.200.14 > 10.17.200.36: ICMP echo request, id 5220, seq 1, length 64
16:08:43.400811 ARP, Reply 10.17.200.14 is-at 00:50:56:b9:b2:fb, length 46

可在一台主机临时禁用icmp协议,再用ping将探测不到这台主机

1
echo 1 > /proc/sys/net/ipv4/icmp_echo_ignore_all

1.2 nmap ping探测

我们开始nmap ping探测, -n表示不进行DNS解析

1
2
3
4
5
6
[root@ ]# nmap -n -sP 10.17.200.36
Starting Nmap 6.40 ( http://nmap.org ) at 2019-01-15 16:12 CST
Nmap scan report for 10.17.200.36
Host is up (0.00030s latency).
MAC Address: 00:50:56:B9:21:18 (VMware)
Nmap done: 1 IP address (1 host up) scanned in 0.05 seconds

在B主机探测发现只收到了对方发的请求包,并未回应,但是还是认为这台主机是存活的,这样提高了探测效率

1
2
3
4
[root@localhost roles]# tcpdump -np -i ens192 src host 10.17.200.14
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on ens192, link-type EN10MB (Ethernet), capture size 262144 bytes
16:12:28.972321 ARP, Request who-has 10.17.200.36 (Broadcast) tell 10.17.200.14, length 46

1.3 nmapSYN探测

我们开始nmap TCP的SYN探测, -n表示不进行DNS解析

1
2
3
4
5
6
7
8
9
10
11
[root@ ]# nmap -n -PE 10.17.200.36
Starting Nmap 6.40 ( http://nmap.org ) at 2019-01-15 16:20 CST
Nmap scan report for 10.17.200.36
Host is up (0.00014s latency).
Not shown: 998 closed ports
PORT STATE SERVICE
22/tcp open ssh
445/tcp filtered microsoft-ds
MAC Address: 00:50:56:B9:21:18 (VMware)

Nmap done: 1 IP address (1 host up) scanned in 1.33 seconds

在B主机探测发现,A主机对B主机的各服务都发送了TCP SYN包来进行探测

1
2
3
4
5
6
7
8
9
10
11
12
[root@ ]# tcpdump -np -i ens192 src host 10.17.200.14
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on ens192, link-type EN10MB (Ethernet), capture size 262144 bytes
16:20:07.124327 ARP, Request who-has 10.17.200.36 (Broadcast) tell 10.17.200.14, length 46
16:20:07.148867 IP 10.17.200.14.40911 > 10.17.200.36.rtsp: Flags [S], seq 3791226815, win 1024, options [mss 1460], length 0
16:20:07.148882 IP 10.17.200.14.40911 > 10.17.200.36.smtp: Flags [S], seq 3791226815, win 1024, options [mss 1460], length 0
16:20:07.148906 IP 10.17.200.14.40911 > 10.17.200.36.domain: Flags [S], seq 3791226815, win 1024, options [mss 1460], length 0
16:20:07.148943 IP 10.17.200.14.40911 > 10.17.200.36.https: Flags [S], seq 3791226815, win 1024, options [mss 1460], length 0
16:20:07.148950 IP 10.17.200.14.40911 > 10.17.200.36.mysql: Flags [S], seq 3791226815, win 1024, options [mss 1460], length 0
16:20:07.148950 IP 10.17.200.14.40911 > 10.17.200.36.ssh: Flags [S], seq 3791226815, win 1024, options [mss 1460], length 0
信息太多,略过...
16:20:12.152833 ARP, Reply 10.17.200.14 is-at 00:50:56:b9:b2:fb, length 46

1.4 arping

另外补充一下,arping -D可有效检测IP地址冲突问题,如果命令echo $?返回值为0则表示地址冲突,1则表示不冲突.

1
2
3
4
5
[root@ ]# arping  -D  -c 2   -I ens192   10.17.200.36
ARPING 10.17.200.36 from 0.0.0.0 ens192
Unicast reply from 10.17.200.36 [00:50:56:B9:21:18] 0.887ms
Sent 1 probes (1 broadcast(s))
Received 1 response(s)

1.5 总结

  • nmap -sP 可进行ping检测
  • nmap -PE 可进行tcp SYN检测
  • nmap -n -sP -PE 可进行ping与SYN结合检测,以免漏扫
  • arping -D 可进行地址冲突检测

<完结>

python-celery-Flask

一、基本使用

Celery是由Python开发的一个简单、灵活、可靠的处理大量任务的分发系统,可以实时处理任务,也可以定时异步处理任务。
每次分发任务后得到一个ID,然后根据这个ID查询任务执行情况。

并且celery需要rabbitMQ、Redis等充当broker来进行消息的接收。

安装

1
pip3 install celery eventlet   # windows系统需要eventlet模块

下面我们来快速上手celery
编辑s1.py

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python3

from celery import Celery

cel = Celery('xxx',
broker="redis://192.168.1.40",
backend='redis://192.168.1.40')

@cel.task
def f1(x,y):
return x+y

然后把s1这个work工作起来,进入命令终端,如果在linux系统,不用添加参数-P eventlet

1
E:\pro\xxx_dir> celery worker -A s1 -l info  -P eventlet

编辑s2.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python3

import datetime
from s1 import f1

# 立即执行
result = f1.delay(4,6)
print(result.id)

# 定时执行
ctime = datetime.datetime.now()
# ctime = datetime.datetime(year=2019,month=2,day=21,hour=14,minute=8)
utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp())
s10 = datetime.timedelta(seconds=10)
ctime_x = utc_time + s10
result = f1.apply_async(args=[1,3],eta=ctime_x)
print(result.id)

编辑s3.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python3

from celery.result import AsyncResult
from demo1.s1 import cel

async = AsyncResult(id="f43bce52-9503-475e-9d19-4a46ed910a8e",app=cel)

if async.successful():
ret = async.get() # 获取值
#async.forget() # 删除值
print(ret)
elif async.failed():
print('执行失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')
else:
print("任务执行失败")


async.revoke() # 取消一个任务,当一个任务正在执行,不能取消
async.revoke(terminate=True) # 终止一个任务,当一个任务正在执行,可以被终止

二、多目录结构

经过上面快速上手的学习,了解了celery的基本使用,那么重组一下代,形成项目中多目录结构看看相互之间如何调用?

创建一个celery_tasks的目录,里面一般保存2类文件,其中一个文件名称必须为celery,另一类就是定义task任务的文件,可以有多个。

定义celery_tasks/celery.py文件,如果有多个task任务文件,可以用includ列表包含进来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python3

from celery import Celery
# from celery.schedules import crontab

cel = Celery('xxxxxx',
broker='redis://192.168.1.40:6379',
backend='redis://192.168.1.40:6379',
include=['celery_tasks.task1',)
#include=['celery_tasks.task1','celery_tasks.task2'])

# 时区
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

在多目录结构中,跑celery work时不用指定到文件,指定目录即可

1
E:\pro\xxx_dir> celery worker -A celery_tasks -l info  -P eventlet

定义celery_tasks/task1.py

1
2
3
4
5
6
7
#!/usr/bin/env python3

from .celery import cel

@cel.task
def f1(x,y):
return x+y

有了celery.py文件和task任务文件,我们就可以在任意地方调用任务了。

比如定义test/exec1.py文件来执行任务

1
2
3
4
5
6
#!/usr/bin/env python3

from celery_tasks.task1 import f1

result = f1.delay(4,6)
print(result.id)

定义test/exec2.py文件来获取任务执行结果,需要提供任务ID

1
2
3
4
5
6
7
8
9
10
11
#!/usr/bin/env python3

from celery_tasks.celery import cel
from celery.result import AsyncResult

async = AsyncResult(id="be6bb021-da48-46a9-b1bc-94b987f7c8a7",app=cel)

if async.successful():
print(async.get())
else:
print("任务执行失败")

三、Flask中的例用

有了上面celery的认识,我们来简单写点代码,看一下在Flask框架中celery是如何使用的?

定义Flask项目启动文件app.py

写线上代码时是要把任务保存在数据库中的,这里仅作示例就保存在了HISTORY全局变量中了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/usr/bin/env python3

from flask import Flask,request,render_template,redirect

from celery_tasks.task2 import deploy

app = Flask(__name__)

HISTORY = []

@app.route('/index',methods=["GET","POST"])
def index():
if request.method == "GET":
return render_template('index.html',history=HISTORY)


@app.route('/publish',methods=["GET","POST"])
def publish():
if request.method == "GET":
return render_template('publish.html')
else:
version = request.form.get("version")
hosts = request.form.getlist("hosts")
print(version,hosts)

import datetime
ctime = datetime.datetime.now()
utc_time = datetime.datetime.utcfromtimestamp(ctime.timestamp())
ctime_10 = utc_time + datetime.timedelta(seconds=10)
result = deploy.apply_async(args=[version,hosts],eta=ctime_10)
HISTORY.append({"version":version,"hosts":hosts,"task_id":result.id})
print(HISTORY)
return redirect("/index")

from celery.result import AsyncResult
from celery_tasks.celery import cel

@app.route('/check_result',methods=["GET","POST"])
def check_result():
task_id = request.args.get("task_id")
async = AsyncResult(id=task_id,app=cel)
if async.successful():
result = async.get()
print(result)
# result.forget() # 将结果删除
return "执行成功"
elif async.failed():
return '执行失败'
elif async.status == 'PENDING':
return '任务等待中被执行'
elif async.status == 'RETRY':
return '任务异常后正在重试'
elif async.status == 'STARTED':
return '任务已经开始被执行'
else:
return "unkown status"

@app.route('/cancel', methods=["GET", "POST"])
def cancel():
task_id = request.args.get("task_id")
async =AsyncResult(id=task_id,app=cel)
async.revoke(terminate=True)
for i in HISTORY:
if task_id in i.values():
HISTORY.remove(i)
return redirect("/index")

if __name__ == '__main__':
app.run()

定义其中用到的templates/index.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>发布系统</h1>
<a href="/publish">添加发布任务</a>
<table>
{% for row in history %}
<tr>
<td>{{ row.task_id }}</td>
<td>{{ row.version }}</td>
{% for host in row.hosts %}
<td>
<span>{{ host }}</span>
</td>
{% endfor %}
<td><a href="/check_result?task_id={{ row.task_id }}">查看</a></td>
<td><a href="/cancel?task_id={{ row.task_id }}">取消</a></td>
</tr>
{% endfor %}
</table>

</body>
</html>

定义其中用到的templates/publish.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<form action="" method="post">
<p><input type="text" name="version" placeholder="请输入要发布的版本"></p>
<p>
<select name="hosts" multiple="multiple">
<option value="c1.com">c1.com</option>
<option value="c2.com">c2.com</option>
<option value="c3.com">c3.com</option>
</select>
</p>
<input type="submit" value="提交">
</form>
</body>
</html>

定义其中的celery_tasks.task2.py文件,这里的deploy是真正定义任务的地方.

1
2
3
4
5
6
7
8
#!/usr/bin/env python3

from .celery import cel

@cel.task
def deploy(version,hosts):
print(version, hosts) # 定义想要执行的任务代码
return 'deploy ok'

同样别望了先把work跑起来,再启动Flask

1
E:\pro\xxx_dir> celery worker -A celery_tasks -l info  -P eventlet

四、总结

还需要多写代码在项目中总结celery…

ansible callback 重写

1.1 adhoc callback重写

adhoc-callback.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os,sys,json
import ansible.constants as C
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.plugins.callback import CallbackBase
from ansible.inventory.host import Host,Group
from collections import namedtuple

BaseDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
source = os.path.join(BaseDir,'dir1/inventory/multinode')
loader = DataLoader() # 实例化loader对象
myinven = InventoryManager(loader=loader,sources=[source,]) # 实例化inventory对象
print(myinven.get_groups_dict())

varmanager = VariableManager(loader=loader,inventory=myinven) # 实例化VariableManager对象

#^#Options 选项
Options = namedtuple('Options',[
'connection','module_path', 'forks', 'timeout', 'remote_user',
'ask_pass', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args',
'scp_extra_args', 'become', 'become_method', 'become_user', 'ask_value_pass', 'verbosity',
'check', 'listhosts', 'listtasks', 'listtags', 'syntax','diff'
])

options = Options(connection='smart', module_path=None, forks=100, timeout=10,
remote_user='root', ask_pass=False, private_key_file=None, ssh_common_args=None, ssh_extra_args=None,
sftp_extra_args=None, scp_extra_args=None, become=None, become_method=None,
become_user='root', ask_value_pass=False, verbosity=None, check=False, listhosts=False,
listtasks=False, listtags=False, syntax=False, diff=True)

#^# 执行对象和模块
play_data = dict(
name="Ansible adhoc example",
hosts='192.168.1.6,',
gather_facts='no',
tasks=[
dict(action=dict(module='shell', args="touch /tmp/sss.txt")),
# dict(action=dict(module='debug', args=dict(msg="{{ shell_out.stdout }}"))),
],
)

play = Play().load(data=play_data,loader=loader,variable_manager=varmanager)

#^# 重写CallbackBase父类
class AdhocResultsCollector(CallbackBase):
def __init__(self, *args, **kwargs):
super(AdhocResultsCollector, self).__init__(*args, **kwargs)
self.host_ok = {}
self.host_unreachable = {}
self.host_failed = {}

def v2_runner_on_unreachable(self, result):
self.host_unreachable[result._host.get_name()] = result

def v2_runner_on_ok(self, result, *args, **kwargs):
self.host_ok[result._host.get_name()] = result

def v2_runner_on_failed(self, result, *args, **kwargs):
self.host_failed[result._host.get_name()] = result

callback = AdhocResultsCollector()
passwords = dict()
tqm = TaskQueueManager(inventory=myinven,
variable_manager=varmanager,
loader=loader,options=options,
passwords=passwords,
stdout_callback=callback
)

result_status_code = tqm.run(play)

print(callback.host_ok.items())

result_raw = dict(
success = {},
failed = {},
unreachable = {}
)

for host,result in callback.host_ok.items():
result_raw['success'][host] = result._result

for host,result in callback.host_failed.items():
result_raw['failed'][host] = result._result

for host,result in callback.host_unreachable.items():
result_raw['unreachable'][host] = result._result

print(json.dumps(result_raw,indent=4))

1.2 playbook callback重写

写入示例playbook文件site.yml

1
2
3
4
5
6
7
8
---
- hosts: 192.168.1.6
remote_user: root
vars:
touch_file: "site.txt"
tasks:
- name: touch file
shell: "touch /tmp/{{ touch_file }}"

编写play_book.py 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3

import os,sys,json
import ansible.constants as C
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.plugins.callback import CallbackBase
from ansible.inventory.host import Host,Group
from collections import namedtuple

BaseDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
source = os.path.join(BaseDir,'dir1/inventory/multinode')
loader = DataLoader() # 实例化loader对象
myinven = InventoryManager(loader=loader,sources=[source,]) # 实例化inventory对象
print(myinven.get_groups_dict())

varmanager = VariableManager(loader=loader,inventory=myinven) # 实例化VariableManager对象

# Options 选项
Options = namedtuple('Options',[
'connection',
'module_path',
'forks',
'timeout',
'remote_user',
'ask_pass',
'private_key_file',
'ssh_common_args',
'ssh_extra_args',
'sftp_extra_args',
'scp_extra_args',
'become',
'become_method',
'become_user',
'ask_value_pass',
'verbosity',
'check',
'listhosts',
'listtasks',
'listtags',
'syntax',
'diff'
])

options = Options(connection='smart',
module_path=None,
forks=100,
timeout=10,
remote_user='root',
ask_pass=False,
private_key_file=None,
ssh_common_args=None,
ssh_extra_args=None,
sftp_extra_args=None,
scp_extra_args=None,
become=None,
become_method=None,
become_user='root',
ask_value_pass=False,
verbosity=None,
check=False,
listhosts=False,
listtasks=False,
listtags=False,
syntax=False,
diff=True,
)

# 重写CallbackBase父类
class PlayBookResultsCollector(CallbackBase):
CALLBACK_VERSION = 2.0
def __init__(self, *args, **kwargs):
super(PlayBookResultsCollector, self).__init__(*args, **kwargs)
self.task_ok = {}
self.task_skipped = {}
self.task_failed = {}
self.task_status = {}
self.task_unreachable = {}

def v2_runner_on_ok(self, result, *args, **kwargs):
self.task_ok[result._host.get_name()] = result

def v2_runner_on_failed(self, result, *args, **kwargs):
self.task_failed[result._host.get_name()] = result

def v2_runner_on_unreachable(self, result):
self.task_unreachable[result._host.get_name()] = result

def v2_runner_on_skipped(self, result):
self.task_ok[result._host.get_name()] = result

def v2_playbook_on_stats(self, stats):
hosts = sorted(stats.processed.keys())
for h in hosts:
t = stats.summarize(h)
self.task_status[h] = {
"ok":t['ok'],
"changed" : t['changed'],
"unreachable":t['unreachable'],
"skipped":t['skipped'],
"failed":t['failures']
}
# 执行对象和模块
passwords = {}
#传入playbooks, inventory, variable_manager, loader, options, passwords
playbook = PlaybookExecutor(playbooks=['site.yml',],
inventory=myinven,
variable_manager=varmanager,
loader=loader,
options=options,
passwords=passwords
)

# 把重写的CallbackBase父类加载进playbook
callback = PlayBookResultsCollector()
playbook._tqm._stdout_callback = callback
playbook.run()

result_raw = dict(
success = {},
failed = {},
unreachable = {},
skipped = {},
status = {},
)

for host,result in callback.task_ok.items():
result_raw['success'][host] = result._result

for host,result in callback.task_failed.items():
result_raw['failed'][host] = result._result

for host,result in callback.task_unreachable.items():
result_raw['unreachable'][host] = result._result

for host,result in callback.task_skipped.items():
result_raw['skipped'][host] = result._result

for host, result in callback.task_status.items():
result_raw['status'][host] = result

print(json.dumps(result_raw,indent=4))

执行示例
python3 play_book.py 返回类似如下结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"success": {
"192.168.1.6": {
"changed": true,
"end": "2019-01-14 04:50:06.190607",
"stdout": "",
"cmd": "touch /tmp/site.txt",
"rc": 0,
"start": "2019-01-14 04:50:06.186466",
"stderr": "",
"delta": "0:00:00.004141",
"invocation": {
"module_args": {
"creates": null,
"executable": null,
"_uses_shell": true,
... 略
}
},
}
},
"failed": {},
"unreachable": {},
"skipped": {},
"status": {
"192.168.1.6": {
"ok": 2,
"changed": 1,
"unreachable": 0,
"skipped": 0,
"failed": 0
}
}
}

总结

  • adhoc重写方法如host_ok,host_failed,host_unreachable
  • playbook重写方法如task_ok,task_failed,task_unreachable,task_skipped,task_status,task_changed
  • 返回如callback.task_ok.items(),其中键为host,值为result对象,result._result得到一个字典类型的详细结果

<< 完结 >>

python3下ansible api学习

1.1 ansible api基础

环境说明:

  • python version: python3
  • ansible version: 2.7.5
  • inventory file: dir1/inventory/multinode

清单文件定义: dir1/inventory/multinode

1
2
3
4
5
6
7
8
9
10
[control]
192.168.1.6 var1="ssss" ansible_ssh_user=root ansible_ssh_pass='123'

[nova:children]
control

[cinder:children]
control

[glance:children]

一个单一文件进行简单的接口学习: dir1/f1.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python3
import os,sys,json
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook import play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
import ansible.constants as C

BaseDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
source = os.path.join(BaseDir,"dir1/inventory/multinode")
loader = DataLoader()
inven = InventoryManager(loader=loader,sources=[source,])
# print(inven.get_hosts())

inven.add_group('test_group2')
print(inven.get_groups_dict())
inven.add_host(host='192.168.1.7',port=22,group='test_group2')
print(inven.get_groups_dict())

host = inven.get_host(hostname='192.168.1.6')

variableman = VariableManager(loader=loader,inventory=inven)
vars = variableman.get_vars(host=host)

# print(json.dumps(vars,indent=4))
variableman.set_host_variable(host=host,varname='k1',value='v1') # 局部的

x = variableman.get_vars(host=host)
print(x['k1'])
print(variableman.__dict__)
variableman._extra_vars = {"k2": "v2"} # 添加全局变量

x = variableman.get_vars() # 不传host说明是全局的

执行测试

1
python3  dir1/f1.py   # 输出调用信息对照接口就知道只些方法是干什么的了

1.2 adhoc模式示例学习

编辑dir1/adhoc.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
import os,sys,json
import ansible.constants as C
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.plugins.callback import CallbackBase
from ansible.inventory.host import Host,Group
from collections import namedtuple

BaseDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
source = os.path.join(BaseDir,'dir1/inventory/multinode')
loader = DataLoader() # 实例化loader对象
myinven = InventoryManager(loader=loader,sources=[source,]) # 实例化inventory对象
print(myinven.get_groups_dict())

varmanager = VariableManager(loader=loader,inventory=myinven) # 实例化VariableManager对象

# Options 选项
Options = namedtuple('Options',[
'connection','module_path', 'forks', 'timeout', 'remote_user',
'ask_pass', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args',
'scp_extra_args', 'become', 'become_method', 'become_user', 'ask_value_pass', 'verbosity',
'check', 'listhosts', 'listtasks', 'listtags', 'syntax','diff'
])

options = Options(connection='smart', module_path=None, forks=100, timeout=10,
remote_user='root', ask_pass=False, private_key_file=None, ssh_common_args=None, ssh_extra_args=None,
sftp_extra_args=None, scp_extra_args=None, become=None, become_method=None,
become_user='root', ask_value_pass=False, verbosity=None, check=False, listhosts=False,
listtasks=False, listtags=False, syntax=False, diff=True)

# 执行对象和模块
play_data = dict(
name="Ansible adhoc example",
hosts='192.168.1.6,',
gather_facts='no',
tasks=[
dict(action=dict(module='shell', args="touch /tmp/sss.txt")),
# dict(action=dict(module='debug', args=dict(msg="{{ shell_out.stdout }}"))),
],
)

play = Play().load(data=play_data,loader=loader,variable_manager=varmanager)
passwords = {}
tqm = TaskQueueManager(inventory=myinven,variable_manager=varmanager, loader=loader,options=options,passwords=passwords)

result = tqm.run(play)

执行测试

1
python3  dir1/adhoc.py

输出信息和命令行ansible直接模块运行一样,任务正常执行

1.3 playbook 示例学习

编辑dir1/play_book.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/usr/bin/env python3

import os,sys,json
import ansible.constants as C
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.plugins.callback import CallbackBase
from ansible.inventory.host import Host,Group
from collections import namedtuple

BaseDir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
source = os.path.join(BaseDir,'dir1/inventory/multinode')
loader = DataLoader() # 实例化loader对象
myinven = InventoryManager(loader=loader,sources=[source,]) # 实例化inventory对象
print(myinven.get_groups_dict())

varmanager = VariableManager(loader=loader,inventory=myinven) # 实例化VariableManager对象

# Options 选项
Options = namedtuple('Options',[
'connection','module_path', 'forks', 'timeout', 'remote_user',
'ask_pass', 'private_key_file', 'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args',
'scp_extra_args', 'become', 'become_method', 'become_user', 'ask_value_pass', 'verbosity',
'check', 'listhosts', 'listtasks', 'listtags', 'syntax','diff'
])

options = Options(connection='smart', module_path=None, forks=100, timeout=10,
remote_user='root', ask_pass=False, private_key_file=None, ssh_common_args=None, ssh_extra_args=None,
sftp_extra_args=None, scp_extra_args=None, become=None, become_method=None,
become_user='root', ask_value_pass=False, verbosity=None, check=False, listhosts=False,
listtasks=False, listtags=False, syntax=False, diff=True)

# 执行对象和模块
passwords = {}
#传入playbooks, inventory, variable_manager, loader, options, passwords
playbook = PlaybookExecutor(playbooks=['site.yml',],
inventory=myinven,
variable_manager=varmanager,
loader=loader,
options=options,
passwords=passwords)
playbook.run()

用到的site.yml文件示例如下

1
2
3
4
5
6
7
8
---
- hosts: 192.168.1.6
remote_user: root
vars:
touch_file: "site.txt"
tasks:
- name: touch file
shell: "touch /tmp/{{ touch_file }}"

执行测试

1
python3 play_book.py

输出信息和ansible-playbook命令行输出一样,任务正常执行

OpenVSwitch

交换机端口查看

1
2
3
4
5
# ovs-vsctl show
# ovs-ofctl show
# ovs-ofctl show br-int
# ovs-ofctl show br-tun
# virsh domiflist instance-00000017

open flow流表查看

1
2
3
4
5
6
7
8
# ovs-ofctl dump-flows br-int
# ovs-ofctl dump-flows br-tun

# ip netns exec qrouter-c266eb04-0be8-448f-986f-6eef3a9bcdce ifconfig

# dpkg-query -S /sbin/brctl
# apt-get install bridge-utils
# brctl show

添加br-ex网桥:

1
2
3
4
5
6
# ovs-vsctl add-br br-ex
桥加载到物理网口:
# ovs-vsctl add-port br-ex eth0
添加到不同vlan与端口模式
# ovs-vsctl add-port br-ex eth1 tag=100 //设置为access端口
# ovs-vsctl add-port br-ex eth2 trunk=200 //设置为trunk端口,允许vlan200通过,默认允许所有vlan直接转发

列出所有桥:

1
2
3
ovs-vsctl list-br
ovs-vsctl list-ports br-int
ovs-vsctl port-to-br port_name

列出桥上所接端口

1
2
3
# ovs-vsctl  list-ports br-ex
# ovs-ofctl dump-ports br-ex
# ovs-vsctl list port

根据交换机某个接口名称查端口号

1
2
# ovs-vsctl list interface tap0_br | grep "ofport "
ofport : 1

流表操作
注意: 流量匹配是有顺序的,table0–>table1—>table3–>table4 –>table5…

1
2
3
4
5
6
7
8
9
10
11
12
# ovs-vsctl add-br vswitch0

#发现有一条actions为NORMAL的流表项,这是默认存在的,用以实现交换机的基本动作
# ovs-ofctl dump-flows vswitch0
cookie=0x0, duration=267197.837s, table=0, n_packets=459, n_bytes=42190, idle_age=387, hard_age=65534, priority=0 actions=NORMAL

# ovs-ofctl del-flows vswitch0 # 流表删除后所有流量将被丢弃
# ovs-ofctl dump-flows vswitch0

# 可以在table0添加类似规则使流量正常转发,没写表名默认table0
# ovs-ofctl add-flow br-int "priority=1,in_port=1,actions=output:4"
# ovs-ofctl add-flow br-int "priority=2,in_port=4,actions=output:1"

1
2
3
4
5
#flow优先级越高,会优先匹配,以下规则drop优先,流量被丢弃
# ovs-ofctl del-flows br-int
# ovs-ofctl add-flow br-int "priority=1,in_port=1,actions=output:4"
# ovs-ofctl add-flow br-int "priority=2,in_port=4,actions=output:1"
# ovs-ofctl add-flow vswitch0 "priority=3,in_port=1,actions=drop"

将table0的规则添加到table1上,发出流量也是不通的,因为流表是有顺序的,table0没有匹配到,流量被丢弃

1
2
3
4
5
6
7
# ovs-ofctl del-flows vswitch0
# ovs-ofctl add-flow vswitch0 "table=1,priority=1,in_port=1,actions=output:4"
# ovs-ofctl add-flow vswitch0 "table=1,priority=2,in_port=4,actions=output:1"
# ovs-ofctl dump-flows vswitch0
NXST_FLOW reply (xid=0x4):
cookie=0x0, duration=3.485s, table=1, n_packets=0, n_bytes=0, idle_age=3, priority=1,in_port=1 actions=output:4
cookie=0x0, duration=3.033s, table=1, n_packets=0, n_bytes=0, idle_age=3, priority=2,in_port=4 actions=output:1

现在给table0加上一条将数据包发送到table1处理的flow, 发现流量正常,这就明白了多个table之间是如何协调工作的。

1
# ovs-ofctl add-flow vswitch0 "table=0,actions=goto_table=1"

组表操作
添加一个组表

1
# ovs-ofctl -O OpenFlow13 add-group vswitch0 "group_id=1,type=select,bucket=resubmit(,1)"

查看组表

1
# ovs-ofctl -O OpenFlow13 dump-groups vswitch0

在table0中增加两条flow,目的是将数据包发送到group table1

1
2
# ovs-ofctl -O OpenFlow13 add-flow vswitch0 "table=0,in_port=1,actions=group:1"
# ovs-ofctl -O OpenFlow13 add-flow vswitch0 "table=0,in_port=4,actions=group:1"

向table1中增加两条flow,真正的数据转发在table1中进行,流量也正常通过

1
2
# ovs-ofctl add-flow vswitch0 "table=1,priority=1,in_port=1,actions=output:4"
# ovs-ofctl add-flow vswitch0 "table=1,priority=2,in_port=4,actions=output:1"

虚机挂在网桥上

xml格式定义openvswitch网桥,以便于virt-install –network参数指定网桥启动虚机

1
2
3
4
5
6
7
# ovsbr0.xml
<network>
<name>ovsbr0</name>
<forward mode='bridge'/>
<bridge name='ovsbr0'/>
<virtualport type='openvswitch'/>
</network>

1
2
3
virsh net-define ovsbr0.xml
virsh net-start ovsbr0
virsh net-autostart ovsbr0

在安装kvm虚拟机时使用ovsbr0

1
2
3
4
5
6
7
8
9
10
virt-install \
-n vm-name \
-r 4096 \
--disk path=/data/kvm/rhel75-vm1.qcow2,format=qcow2,size=60 \
--vcpus 4 \
--noautoconsole \
--cdrom=/data/kvm/iso/rhel75-x86_64.iso \
--os-type=linux \
--network network:ovsbr0 \
--vnc --vnclisten=0.0.0.0 --vncport=5901

也可以将正在运行的KVM虚拟机的vnet网络接口强制接到ovs网桥上

1
2
#virsh dumpxml $vmname|grep vnet 查看某虚拟机在宿主机上对应的网络接口
add-port ovsbr0 vnetxx

网桥接口划vlan并配IP命令

1
2
ovs-vsctl add-port ovsbr0 tag10 tag=10 -- set interface tag10 type=internal
ifconfig tag10 192.168.10.10/24 up

1
2
3
4
# ip link命令设置vlan接口命令
ip link add link eth0 name eth0.10 type vlan id 10
ifconfig eth0.10 192.168.10.33 netmask 255.255.255.0 broadcast 192.168.10.255 up
route add default gw 192.168.10.1 dev eth0.10

总结:这里只是命令总结,并没有什么实验逻辑

web-socket-高级篇

前面我们学习了WebSocket,我们知道客户端要与服务端进行WebSoccket通信,客户端要和服务端握手,握手成功后才能通信。

握手: 客户端发出握手请求,服务端在握手请求中取出“Sec-WebSocket-Key”,把“Sec-WebSocket-Key”加上一个特殊字符串
“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,然后计算SHA-1摘要,之后进行BASE-64编码,将结果做为“Sec-WebSocket-Accept”头的值,返回给客户端。如此操作,可以尽量避免普通HTTP请求被误认为Websocket协议。

如果给定了“Sec-WebSocket-Key”,那么摘要算法代码如下,服务器会把摘要后值返回给客户端完成握手操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/python3
import hashlib
import base64

SecKey = 'sN9cRrP/n9NdMgdcy2VJFQ==' # browser 自动携带的随机字符串
Magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'

def server_algorithm(SecKey):
str = SecKey + Magic_string
sec_str = base64.b64encode(hashlib.sha1(str.encode('utf-8')).digest())
return sec_str

print(server_algorithm(SecKey))

如果我们己经有一个socket server,真的能收到握手信息吗?
socker_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python3

import socket


conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
conn.bind(('127.0.0.1',8000))
conn.listen(5)


client,addr = conn.accept()
print(client.recv(8192))
print(addr)

如何发送握手请求?

方式一:
可直接在浏览器console终端下手动发送socket请求,请求中包含握手信息

1
2
>var sock = new WebSocket('ws://127.0.0.1:8000/xxoo')
undefined

此时浏览器会报VM44:1 WebSocket connection to 'ws://127.0.0.1:8000/xxoo' failed: Connection closed before receiving a handshake response的错,是因为服务器端没有返回摘要后的值,
表示没有握手成功。

方式二:
直接编写client.html用浏览器运行

1
2
3
4
5
6
7
8
9
10
11
12
13
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket test</title>
</head>
<body>
<h1>WebSocket study....</h1>
<script type="text/javascript">
var sock = new WebSocket('ws://127.0.0.1:8000/xxoo')
</script>
</body>
</html>

此时服务器会收到类似b'GET /xxoo HTTP/1.1\r\nHost: 127.0.0.1:8000\r\nConnection: Upgrade...client_max_window_bits\r\n\r\n'信息,里面包含“Sec-WebSocket-Key

如果我们用一个函数手动取出“Sec-WebSocket-Key”,然后手动摘要后把值再手动返回给客户端,这样就握手成功不会报错了。

处理握手信息的函数(get_headers)如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def get_headers(data):
"""
将请求头格式化成字典
:param data:
:return:
"""
header_dict = {}
data = str(data, encoding='utf-8')

for i in data.split('\r\n'):
print(i)
header, body = data.split('\r\n\r\n', 1)
header_list = header.split('\r\n')
for i in range(0, len(header_list)):
if i == 0:
if len(header_list[i].split(' ')) == 3:
header_dict['method'], header_dict['url'], header_dict['protocol'] = header_list[i].split(' ')
else:
k, v = header_list[i].split(':', 1)
header_dict[k] = v.strip()
return header_dict

把摘要后的值返回给客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
conn, address = sock.accept()
data = conn.recv(1024)
headers = get_headers(data) # 提取请求头信息
# 对请求头中的sec-websocket-key进行加密
response_tpl = "HTTP/1.1 101 Switching Protocols\r\n" \
"Upgrade:websocket\r\n" \
"Connection: Upgrade\r\n" \
"Sec-WebSocket-Accept: %s\r\n" \
"WebSocket-Location: ws://%s%s\r\n\r\n"
magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
value = headers['Sec-WebSocket-Key'] + magic_string
ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())
response_str = response_tpl % (ac.decode('utf-8'), headers['Host'], headers['url'])
conn.send(bytes(response_str, encoding='utf-8'))

摘要后的值返回给客户端后就完成了握手过程,客户端就不会再报连接错误了。

接收客户端发来的数据

完成握手操作后就客户端就可以向服务器发送数据了,只需console终端sock.send('dimyth')

服务端接收

1
2
info = conn.recv(1024)
print(info) # 这里是字节

服务器端收到客户端发来的数据,这个数据需要服务器解包,解包过程如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
payload_len = info[1] & 127
if payload_len == 126:
extend_payload_len = info[2:4]
mask = info[4:8]
decoded = info[8:]
elif payload_len == 127:
extend_payload_len = info[2:10]
mask = info[10:14]
decoded = info[14:]
else:
extend_payload_len = None
mask = info[2:6]
decoded = info[6:]

bytes_list = bytearray()
for i in range(len(decoded)):
chunk = decoded[i] ^ mask[i % 4]
bytes_list.append(chunk)
body = str(bytes_list, encoding='utf-8')
print(body) # 解出真正数据

那么服务器给客户端发送数据要就封包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def send_msg(conn, msg_bytes):
"""
WebSocket服务端向客户端发送消息
:param conn: 客户端连接到服务器端的socket对象,即: conn,address = socket.accept()
:param msg_bytes: 向客户端发送的字节
:return:
"""
import struct

token = b"\x81"
length = len(msg_bytes)
if length < 126:
token += struct.pack("B", length)
elif length <= 0xFFFF: # 65535
token += struct.pack("!BH", 126, length)
else:
token += struct.pack("!BQ", 127, length)

msg = token + msg_bytes
conn.send(msg)
return True

对上面BHQ说明一下,B代表1个字节,H代表2个字节,Q代表8个字节

客户端如何收消息呢?

1
2
3
4
5
6
7
8
9
10
11
<div id="content"></div>
<script type="text/javascript">
var sock = new WebSocket('ws://127.0.0.1:8000/xxoo');
sock.onmessage = function (event) {
/* 服务器端向客户端发送数据时,自动执行 */
var response = event.data;
var newTag = document.createElement('div');
newTag.innerHTML = response;
document.getElementById('content').appendChild(newTag);
};
</script>

web-socket-基础知识

WebSocket在什么场景下使用?

页面实时展示数据

  • 轮询:setInterval()前端轮询请求,性能低下
  • 长轮询:把请求pending住多少秒后再返回,量大时也损耗服务器性能
  • WebSocket: 建立socket双向传输数据,高效。

那么什么是WebSocket

参考链接

先来看下http协议

http协议:
1 格式:请求头、请求体之间\r\n\r\n分隔,请求头或请求体内部\r\n分隔。
2 连接:一次请求,一次响应,然后断开连接。

那么WebSocket协议是怎么样的呢?

WebSocket:
1 格式: 请求头、请求体之间\r\n\r\n分隔,请求头或请求体内部\r\n分隔。
2 连接: 创建socket通道后不断开,全双工(full-duplex)通信,可以相互发消息。

WebSocket实现了浏览器与服务器全双工(full-duplex)通信,能主动向浏览器发送消息,但需要浏览器支持websocket封包解包或加密解密。其本质是保持TCP连接,在浏览器和服务端通过Socket进行通信。

WebSocket特性

  • WebSocket 是独立的、创建在 TCP 上的协议。
  • Websocket 通过 HTTP/1.1 协议的101状态码进行握手。
  • 为了创建Websocket连接,需要通过浏览器发出请求,之后服务器进行回应,这个过程通常称为“握手”(handshaking)

总结起来:WebSocket是一种在单个TCP连接上进行全双工通信的协议。使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

WebSokcet工作原理

首先客户端要验证服务器是否支持websocket协议,能不能一起玩耍?

客户端发送playload data之前会发送握手字符串,服务器把握手字符串加密后返回给客户端,此时客户端也把字符串按特定算法加密,把客户端加密后的字符串与服务器加密后的字符串进行比较,如果一致则客户端认为服务器支持WebSocket协议通信,可以相互一起玩耍。

握手时的特定算法是什么呢?代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/python3
import hashlib
import base64

SecKey = 'sN9cRrP/n9NdMgdcy2VJFQ==' # browser 自动携带的随机字符串
Magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'

def server_algorithm(SecKey):
str = SecKey + Magic_string
sec_str = base64.b64encode(hashlib.sha1(str.encode('utf-8')).digest())
return sec_str

print(server_algorithm(SecKey))

能不能一起玩耍,官方术语就是创建Websocket连接,需要通过浏览器发出请求,之后服务器进行回应,这个过程通常称为“握手”(handshaking).

不管怎么说,WebSocket允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。

什么?看不懂,一言不合上代码,下面是一个典型的Websocket握手请求.

客户端请求

1
2
3
4
5
6
7
GET / HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com:8002
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13

服务器回应

1
2
3
4
5
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com:8002/

字段说明

  • Connection必须设置Upgrade,表示客户端希望连接升级。
  • Upgrade字段必须设置Websocket,表示希望升级到Websocket协议。
  • Sec-WebSocket-Key是随机的字符串,服务器端会用这些数据来构造出一个SHA-1的信息摘要。把“Sec-WebSocket-Key”加上一个特殊字符串“258EAFA5-E914-47DA-95CA-C5AB0DC85B11”,然后计算SHA-1摘要,之后进行BASE-64编码,将结果做为“Sec-WebSocket-Accept”头的值,返回给客户端。如此操作,可以尽量避免普通HTTP请求被误认为Websocket协议。
  • Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13.
  • Origin字段是可选的,通常用来表示在浏览器中发起此Websocket连接所在的页面,类似于Referer。但是,与Referer不同的是,Origin只包含了协议和主机名称。

服务器解包细节

官方WebSocket instructions

注意的是客户端和服务端传输数据时,需要对数据进行封包解包。客户端有 javascript类库实现封包解包,但服务器需要手动实现。

conn,addr = sk.accept()时,服务器端代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
info = conn.recv(8096)

payload_len = info[1] & 127
if payload_len == 126:
extend_payload_len = info[2:4]
mask = info[4:8]
decoded = info[8:]
elif payload_len == 127:
extend_payload_len = info[2:10]
mask = info[10:14]
decoded = info[14:]
else:
extend_payload_len = None
mask = info[2:6]
decoded = info[6:]

bytes_list = bytearray()
for i in range(len(decoded)):
chunk = decoded[i] ^ mask[i % 4]
bytes_list.append(chunk)
body = str(bytes_list, encoding='utf-8')
print(body)

要看懂这段代码必须了解websocket解包细节.

当客户端加密发送了socket data数据时,服务端收到数据是这样的.

b'\x81\x82\xac\xde\xdd\xf4\xdc\xae'

需要对这样的数据解密才能拿到真的数据,跟据第二个字节后7位的值取数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+

1
2
3
4
value  = socket_data[1] & 127
value <=125 b'\x81\x82 \xac\xde\xdd\xf4\xdc\xae' # 数据在第2个节字后
value =126 b'\x81\x82\xac\xde \xdd\xf4\xdc\xae' # next 16bit(2个字节),数据在第4个节字后
value =127 xxx... # next 64bit(8个字节), 数据在第10个节字后

其中头32bits为掩码,真正数据还要去掉这4字节,取真正数据真不容易。

好了,WebSocket所有知识都在这里了,慢慢品味知识的韵味。