经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Python3 » 查看文章
Flask后端开发(二) - 功能实现和项目总结
来源:cnblogs  作者:CrazyPixel  时间:2023/10/30 9:15:46  对本文有异议

Flask后端开发(二) - 功能实现和项目总结


前情回顾
Flask后端开发(一)-基础知识和前期准备

1. 功能1:修改文件参数值

针对文件参数值的修改,具体流程如下:

  • 前端接收用户修改的数据,传递给后端;
  • 后端接口接收数据之后,读取对应文件;
  • 定位修改位置,替换数据;
  • 返回修改结果给前端。

1.1. 获取网页端传参

前端传递参数的方式有两种,一种是GET,一种是POST,具体可参考Flask后端开发(一)-基础知识和前期准备

后端接收数据主要使用flask中的request模块,具体代码如下:

  1. #包导入
  2. from flask import request
  3. #前后端协商好传递数据的名称之后,后端根据参数名进行接收
  4. if request.method == "POST":
  5. userID= str(request.form.get("userID"))
  6. elif request.method == "GET":
  7. userID= str(request.args.get("userID"))
  8. #如果需要额外处理,例如字符串"a,b,c"需要转换为列表["a","b","c"],可以使用split函数
  9. BSD= (request.form.get("BSD")).split(",")
  10. #注:上述是代码片段,而非完整代码,一般后端接收数据写在接口函数中

1.2. 读取文件

1.2.1. 一般文件读取方式

一般文件包括.txt.c.log等文件,其内容读取主要使用python中file模块的open函数,具体代码如下:

  1. path= "文件路径"
  2. with open(path, "r",encoding='utf8') as file:
  3. file_content = file.read()
  4. #整个文件内容存储在file_content中

关于file模块的具体使用,可参考我的这篇博客:【python技巧】文本文件的读写操作

1.2.2. 特殊文件 —— mlx文件

本项目的一个特殊之处就是需要处理.mlx文件(实时脚本文件),这是matlab中的一种文件格式,其内容是二进制的,无法直接读取。因此,在本文当中,解决方案是将.mlx文件手动转换为.m文件,然后再读取.m文件的内容(真的很笨蛋,但是有效)。
对于.m文件,则可以按照一般文件的读取方式进行读取。

1.2.3. 特殊文件 —— .xlx文件

本项目中还会涉及到表格文件的数据读写,这里使用的是xlrd模块,具体代码如下:

  1. #包导入
  2. import xlrd
  3. #查找对应文件内容
  4. file_path="文件路径"
  5. #打开表格
  6. wb = xlrd.open_workbook(file_path)
  7. ws = wb.sheet_by_name('Sheet1')
  8. #按行读取,返回给前端一个行列表:
  9. n_rows = ws.nrows#获取行数
  10. for i in range(2,n_rows):#按行读取,进行筛选,第一行是表头,第二行开始是数据
  11. get_value=ws.cell(i,3).value#获取第i行第3列的数据

1.3. 查找数据修改位置,替换数据

本项目的需求是修改文件中的对应参数,涉及很多代码行的参数修改,因此,需要根据变量名查找相关位置。

  1. 定位
    根据pytho字符串中的find函数查找变量名所在的位置,参考博客如下Python find()方法,具体代码如下:
  1. # 其中file_content是文件内容,变量名是需要查找的变量名
  2. ## 1. 调度类型
  3. start_index_1 = file_content.find(
  4. "simParameters.SchedulingType ="
  5. )
  6. end_index_1 = file_content.find(
  7. "simParameters.NumUEs =",
  8. start_index_1,
  9. ) # 这之间修改schedulingtype的取值
  10. ## 2. UESpeed
  11. start_index_2 = file_content.find(
  12. "simParameters.UESpeed =", end_index_1
  13. )
  14. end_index_2 = file_content.find(
  15. "% Validate the UE positions",
  16. start_index_2,
  17. )
  18. ## 3. max_RB
  19. start_index_3 = file_content.find(
  20. "simParameters.NumRBs =", end_index_2
  21. )
  22. end_index_3 = file_content.find(
  23. "simParameters.SCS =",
  24. start_index_3,
  25. )
  26. ## 4. SCS
  27. start_index_4 = file_content.find(
  28. "simParameters.SCS =", end_index_3
  29. )
  30. end_index_4 = file_content.find(
  31. "simParameters.DLCarrierFreq =",
  32. start_index_4,
  33. )

这部分的下标定位情况,可参考我的此篇博客:【python技巧】替换文件中的某几行

  1. 替换
    在本项目中使用的全文替换,具体代码结构如下:
  1. # 1. 读取文件
  2. path = "文件路径"
  3. with open(path, "r",encoding='utf-8') as file:
  4. file_content = file.read()
  5. # 2. 定位
  6. start_index_1 = file_content.find("simParameters.UEPosition =")
  7. end_index_1 = file_content.find("simParameters.UESpeed =",start_index_1) # 这之间修改ue_position的取值
  8. start_index_2 = file_content.find("simParameters.Position = ", end_index_1)
  9. end_index_2 = file_content.find("csirsConfig = nrCSIRSConfig", start_index_2)
  10. if (start_index_1 == -1 or end_index_1 == -1 or start_index_2 == -1 or end_index_1 == -1):
  11. return jsonify({"Error": "找不到对应的参数位置"})
  12. # 3.更新参数值
  13. updated_content = (
  14. file_content[:start_index_1]
  15. + "simParameters.UEPosition = "
  16. + str(UE_position)
  17. + ";\n")
  18. updated_content += file_content[end_index_1:start_index_2]
  19. updated_content += "simParameters.Position = "+str(gNB_position)+";% Position of gNB in (x,y,z) coordinates"
  20. updated_content += file_content[end_index_2:]
  21. # 4. 更新文件
  22. if updated_content != "":
  23. with open(path, "w",encoding="utf-8") as file:
  24. file.write(updated_content)
  25. msg = "成功改变相关文件参数\n"
  26. return jsonify({"Sueecess": msg})

2. 功能2:读取结果数据

2.1. 实时数据展示如何存储相关数据?

本项目中matlab会使用作图程序实时展示每个时隙的运行结果,但是这个作图程序无法显示在网页端,因此,考虑将数据存储在文件中,然后通过网页端读取文件中的数据,进行展示。
实时数据的存储代码需要手动添加,在matlab每次作图调用的函数中添加如下matlab代码:

  1. date_str=datestr(now,31);
  2. new_str = replace(date_str,' ','_');
  3. new_str=replace(new_str,':','');#添加时间戳
  4. filename=sprintf('./文件夹名/file_name_%s.mat',new_str);#定义文件存储相对位置
  5. save(filename,"需要存储的变量名");#存储变量

2.2. 读取相关数据,整理、打包、传递

2.2.1. 读取.mat文件

根据添加的代码,将会得到若干个.mat文件,需要读取.mat文件的内容并整理成前端需要的格式。
在本文中,是定义一个文件夹名为A,每个时隙的实时数据存储为一个.mat文件,文件名为file_name_时间戳.mat,因此需要读取文件夹A下的所有.mat文件。
在python中读取.mat文件的具体代码如下:

  1. #1.包导入
  2. import os
  3. import scipy.io as sio
  4. #2. 读取数据
  5. file_list=os.listdir("./文件夹名/")#读取文件夹下所有文件名称,形成列表
  6. list_1=[]
  7. list_2=[]
  8. for file in file_list:#遍历文件列表
  9. file_content= sio.loadmat(f"./文件夹/{file}")#读取文件内容
  10. #这里的写法是根据我的.mat文件结构来的,如果不一样,需要根据自己的文件结构进行修改
  11. list_1.append(file_content["之前存储的变量名"][0].tolist())
  12. list_2.append(file_content["之前存储的变量名"][1].tolist())
  13. #之后得到的list_1和list_2就是前端需要的数据
  14. #3. 传递给前端
  15. result={
  16. "list_1或者其他变量名":list_1,
  17. "list_2或者其他变量名":list_2,
  18. }
  19. return jsonify(result)#数据打包为json格式,传递给前端

2.2.2. 读取.xlsx文件

在项目中,还需要读取.xlsx文件,这里使用的是xlrd模块,具体代码如下:

  1. ##包导入
  2. import xlrd
  3. # 1. 读取文件
  4. file_path="文件路径"
  5. #打开表格
  6. wb = xlrd.open_workbook(file_path)
  7. ws = wb.sheet_by_name('Sheet1')
  8. #数据读取
  9. list_1=[]
  10. list_2=[]
  11. n_rows = ws.nrows#获取行数
  12. for i in range(2,n_rows):#按行读取,进行筛选
  13. list_1.append(float(ws.cell(i,7).value))
  14. list_2.append(float(ws.cell(i,7).value))
  15. #之后得到的list_1和list_2就是前端需要的数据
  16. #3. 传递给前端
  17. result={
  18. "list_1或者其他变量名":list_1,
  19. "list_2或者其他变量名":list_2,
  20. }
  21. return jsonify(result)#数据打包为json格式,传递给前端

2.2.3. 读取.txt/.log文件

在项目中,还需要读取某些文本文件,例如日志文件存储的相关数据,这里使用的是file模块,具体数据读取需要用到正则表达式相关知识,可参考博客【python技巧】文本处理-re库字符匹配,具体代码如下:

  1. ##方式1:要查找的内容为:“serveraddr = xxx",需要提取xxx
  2. path= "文件路径"
  3. with open(path, "r") as file:
  4. for line in file:
  5. if "serveraddr" in line:
  6. serveraddr_match = re.search(r'serveraddr\s*=\s*"([^"]+)"', line)
  7. serveraddr = serveraddr_match.group(1) if serveraddr_match else ""
  8. elif "serverport" in line:
  9. serverport_match = re.search(r'serverport\s*=\s*"(\d+)"', line)
  10. serverport = (
  11. int(serverport_match.group(1)) if serverport_match else 0
  12. )
  13. ##方式2:要查找的内容为:“itemxx: 数据1;数据2;数据3;”,需要提取xxx
  14. path= "文件路径"
  15. with open(path, "r") as file:
  16. lines = file.readlines() # 一次读取并存入lines中,行列表
  17. for line in lines: # 查找lines中包含item的行
  18. if item in line: # 待查找的条目,提取关键数据
  19. line = line.strip("\n") # 去掉换行符
  20. dataInOneLine = line.split(";") # 分割数据
  21. dataInOneLine[0] = dataInOneLine[0].split(":")[1] # 去掉item部分
  22. for i in range(0, len(dataInOneLine)): # 去掉空格
  23. dataInOneLine[i] = dataInOneLine[i].strip()
  24. break # 找到目标行,跳出循环
  25. # 行遍历完成,得到行遍历结果dataInOneLine列表,存储关键数据
  26. line_content_list = {
  27. "itemName": item,
  28. "数据项1": dataInOneLine[0],
  29. "数据项2": dataInOneLine[1],
  30. "数据项3": dataInOneLine[2],
  31. }
  32. # 将字典存入result_list,result_list中存储多条目的关键数据
  33. result_list.append(line_content_list)

3. 功能3:运行liunx命令行

这是本项目较难的一个功能点,需要使用python程序连接服务器,然后执行liunx命令,运行编译指令和matlab代码,并且需要将liunx命令的输出结果实时返回给前端。

3.1. 远程连接服务器

远程服务器连接使用的是paramiko模块,先使用pip install paramiko下载模块,具体代码如下:

  1. import paramiko
  2. #创建ssh连接,可以复用的公共函数
  3. def create_ssh_client(ip, port, username, password):
  4. ssh_client = paramiko.SSHClient()
  5. ssh_client.load_system_host_keys()
  6. ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  7. ssh_client.connect(ip, port, username, password)#连接服务器
  8. return ssh_client
  9. #调用方式:
  10. gnb_ssh_client = create_ssh_client(gnb_ip, gnb_port, gnb_username, gnb_password)
  11. ue_ssh_client = create_ssh_client(ue_ip, ue_port, ue_username, ue_password)

【注意事项】:为了避免之后命令行执行可能存在的权限问题和密码输入问题,推荐使用root身份进行ssh连接。

3.2. 执行liunx命令

一般来说,使用paramikoinvoke_shell函数,然后使用send函数发送命令,使用recv函数接收命令执行结果,具体代码如下:

  1. import paramiko
  2. def execute_command(ssh_client, command, output_lines):
  3. channel = ssh_client.invoke_shell()
  4. channel.send(command + '\n')
  5. while not channel.exit_status_ready():
  6. time.sleep(1)

而关于command的具体写法,这里我简单介绍一下:

  1. 单行命令:
    直接字符串赋值即可:
  1. cmd="cd /home'
  1. 多行命令:
  1. cmd1="cd /home"
  2. cmd2="ls"
  3. cmd3="其他指令"
  4. cmd=cmd1+";"+cmd2+";"+cmd3+"\n"#这样拼接之后,一次性发送给服务器就能按顺序执行多条命令
  1. 特殊命令——matlab脚本文件运行
    pyhton文件通过远程服务器连接执行matlab脚本文件,这里使用的是matlab -r命令,具体命令如下:
  1. cmd="cd 脚本文件对应文件夹"+";"+"matlab -nodesktop -nosplash -r 脚本文件名,不需要带.m后缀"
  1. 特殊命令——服务器文件复制
    在服务器A的命令端,需要复制得到服务器B的相关文件,这里使用的是scp命令,具体命令如下:
  1. cmd="sshpass -p 服务器B的密码 scp -P 服务器B的端口 root@服务器B的IP:服务器内的文件路径(例如 /home/user/copy_file.txt) 想要复制在服务器A中的文件位置,例如`/home/user/`"
  2. #sshpass是为了避免scp命令需要输入密码,这里直接将密码写在命令中
  1. 特殊命令——文件权限修改
    在服务器A的命令端修改相关文件权限,从而能够被读写,这里使用的是chmod命令,具体命令如下:
  1. cmd="chmod 777 文件路径"

3.3. 多线程执行

为了实现下文的实时读取输出随时终止命令,都需要使用多线程方式,从而让终端在执行命令行的时候,还能够接收python发送的新请求,返回终端信息或者执行终止操作。
实现多线程需要用到threading模块,具体代码如下:

  1. #包导入
  2. import threading
  3. #全局变量定义
  4. gnb_ssh_client = None
  5. ue_ssh_client = None
  6. gnb_output_lines = []
  7. ue_output_lines = []
  8. execution_in_progress = False # 用于标识执行是否正在进行中
  9. @model_name.route("/start_process", methods=["POST"])
  10. #需要异步多线程的处理方式
  11. def start_process():
  12. global gnb_ssh_client, ue_ssh_client, execution_in_progress,gnb_output_lines,ue_output_lines
  13. #前端传递参数
  14. #cmd定义
  15. #cmd拼接
  16. gnb_command="("+cmd1_gnb+";"+cmd2_gnb+";)"
  17. ue_command="(+"scp2ue_cmd_2+";"+cmd0_ue+";"+cmd1_ue+";"+cmd2_ue+";)"
  18. #连接ssh
  19. if gnb_ssh_client is None:
  20. gnb_ssh_client = create_ssh_client(gnb_ip, gnb_port, gnb_username, gnb_password)
  21. if ue_ssh_client is None:
  22. ue_ssh_client = create_ssh_client(ue_ip, ue_port, ue_username, ue_password)
  23. #执行命令
  24. gnb_output_lines=[]#先清空命令行输出
  25. ue_output_lines=[]
  26. gnb_thread = threading.Thread(target=execute_command, args=(gnb_ssh_client, gnb_command, gnb_output_lines, "gnb"))
  27. ue_thread = threading.Thread(target=execute_command, args=(ue_ssh_client, ue_command, ue_output_lines, "ue"))
  28. execution_in_progress = True
  29. gnb_thread.start()
  30. ue_thread.start()
  31. return jsonify({"success": "Execution started.","execution_in_progress":execution_in_progress})
  32. #ssh连接的公用函数
  33. def create_ssh_client(ip, port, username, password):
  34. ssh_client = paramiko.SSHClient()
  35. ssh_client.load_system_host_keys()
  36. ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  37. ssh_client.connect(ip, port, username, password)
  38. return ssh_client
  39. #命令执行的公用函数:注意和前文的代码区别,添加了多线程相关代码
  40. def execute_command(ssh_client, command, output_lines, identifier):
  41. channel = ssh_client.invoke_shell()
  42. output_thread = threading.Thread(target=get_output, args=(channel, output_lines, identifier))
  43. output_thread.start()
  44. channel.send(command + '\n')
  45. while not channel.exit_status_ready():
  46. time.sleep(1)
  47. # 等待输出线程完成
  48. output_thread.join()

3.4. 实时读取liunx命令的输出数据

按照多线程的方式执行代码之后,如何才能实时获取命令行的输出数据呢?
首先我们介绍一下整个命令行执行过程中,前后端所进行的操作:

  1. step1:
    • 前端:发送命令行执行请求;
    • 后端:调用命令行执行的接口(例如/start_process)
    • 服务器:建立ssh连接,执行命令行;
  2. step2:
    • 前端:按照一定的时间间隔,发送数据请求;
    • 后端:调用数据请求的接口(例如/get_output)
    • 服务器:继续执行命令,命令行输出数据被存储在全局变量中;
  3. step3:
    1. 前端:发送终止命令行执行请求;
    2. 后端:调用终止命令行执行的接口(例如/stop_process)
    3. 服务器:终止命令行执行,关闭ssh连接。

因此,此处的实时读取命令行输出数据,就是在step2中,前端按照一定的时间间隔,发送数据请求,后端调用数据请求的接口,返回命令行输出数据。具体代码如下:

  1. @model_name.route('/get_output', methods=['POST', 'GET'])
  2. def get_output():#读取全局变量内容即可
  3. global gnb_output_lines, ue_output_lines
  4. gnb_output = "\n".join(gnb_output_lines)
  5. ue_output = "\n".join(ue_output_lines)
  6. return jsonify({
  7. "gnb_output": gnb_output,
  8. "ue_output": ue_output
  9. })
  10. def execute_command(ssh_client, command, output_lines, identifier):
  11. channel = ssh_client.invoke_shell()
  12. output_thread = threading.Thread(target=get_output, args=(channel, output_lines, identifier))#这一步是关键,将命令行输出数据存储在全局变量中
  13. output_thread.start()
  14. channel.send(command + '\n')
  15. while not channel.exit_status_ready():
  16. time.sleep(1)
  17. # 等待输出线程完成
  18. output_thread.join()
  19. def get_output(channel, output_lines, identifier):#核心代码,将命令行执行的输出数据存储在全局变量中,需要和上文的`execute_command`函数配合使用
  20. while not channel.exit_status_ready():
  21. if channel.recv_ready():
  22. output = channel.recv(1024).decode('utf-8')
  23. lines = output.split('\n')
  24. for line in lines:
  25. if line.strip():
  26. formatted_line = f"[{identifier}]:{line.strip()}"
  27. #print(formatted_line)
  28. output_lines.append(formatted_line)

3.5. 随时终止liunx命令的执行

这个功能是为了避免指令执行无法自行终止,需要用户手动选择结束仿真。
在多线程的命令行执行中,前端发送请求,而后端在原有ssh连接的基础上,发送终止命令,具体代码如下:

  1. @model_name.route('/stop_process', methods=['POST'])
  2. def stop_process():
  3. global gnb_ssh_client, ue_ssh_client, execution_in_progress
  4. if not execution_in_progress:#如果没有命令行执行,返回错误信息
  5. return jsonify({"error": "No execution in progress.","execution_in_progress":execution_in_progress})
  6. if ue_ssh_client is not None:#断开uedssh连接
  7. stop_execution(ue_ssh_client)
  8. ue_ssh_client.close()
  9. ue_ssh_client = None
  10. if gnb_ssh_client is not None:#断开gnbssh连接
  11. stop_execution(gnb_ssh_client)
  12. gnb_ssh_client.close()
  13. gnb_ssh_client = None
  14. execution_in_progress = False
  15. return jsonify({"success": "Execution stopped.","execution_in_progress":execution_in_progress})
  16. #终止指令执行的公用函数
  17. def stop_execution(ssh_client):
  18. ssh_client.invoke_shell().send('\x03') # 发送Ctrl+C来终止命令

4. 其他收获

4.1. 异常处理

在后端程序执行过程中,可能会出现各种checked exeption,这类异常需要程序员进行捕获,不然则会影响程序的运行,产生报错,而这里我们可以将这些异常捕获之后,将信息作为返回值传递给前端,从而让用户知道程序运行成功与否。
具体代码结构如下:

  1. @model_name.route('/function_name', methods=['POST'])
  2. def function_name():
  3. try:
  4. #程序运行代码
  5. except Exception as e:
  6. return jsonify({"error": str(e)})
  7. finally:
  8. #程序运行结束后的代码,例如return jsonify({"success": "Execution stopped."})

4.2 日志生成

上一篇博客中,我们提到了分模块的flask项目结构,而日志部分的处理,需要在主文件app.py中。添加如下代码:

  1. import logging
  2. log_filename = 'app.log' # 日志文件名
  3. log_level = logging.DEBUG # 日志级别
  4. log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' # 日志格式
  5. # 配置日志
  6. logging.basicConfig(filename=os.path.join(os.path.dirname(__file__), log_filename), level=log_level, format=log_format)

则可以在程序运行过程中,将日志信息存储在app.log文件中,相关报错信息也会存储在该文件中,方便程序员进行调试。

4.3 环境部署

当我们在本地进行程序开发之后,需要将程序部署到服务器上,而迁移到服务器上或者别人的电脑上,需要安装相关的环境。在这种情况下,我们可以将自己开发用到的包进行打包,然后在服务器上进行安装,具体步骤如下:

  1. 将当前环境中的包信息存储在requirements.txt文件中,存储在当前文件夹目录下

pip freeze > requirements.txt

  1. 将整个项目文件打包之后,其他人可使用如下命令进行安装:

pip install -r requirements.txt

4.4. vscode远程连接服务器

当我们在服务器上部署完项目代码之后,可能会经过多次测试和调试,因此代码会不断被修改,且会被无数次的运行,如果每次都是本地修改+服务器部署运行,实在有些不够优雅,因此,我们可以试着把自己的本地编辑器连接到服务器上,这样就可以在本地进行服务器端代码的修改,然后直接在服务器上运行,这样就可以避免每次修改都需要部署的麻烦。
我在项目中使用的是vscode,具体步骤如下:

  1. 插件安装:Remote-SSH
  2. 选择新建远程,输入ssh连接信息

  3. 连接之后选择想要编辑的项目文件夹,即可在本地编辑器中进行服务器端代码的修改;
  4. 调试的时候可使用vscode进行运行、部署;

具体的配置细节,可参考博客

5. 项目感受

  1. 项目的文件读写、服务器连接等都是非常有趣的功能,多线程解决实时输出算是一个较大的技术难点;
  2. 项目本身难度不大,主要难点在于理解业务本身,如果有相对应的业务文档,可能开发效率会更高一点;
  3. 后端开发部分也需要了解基本的前端知识,例如前后端数据传递、前端页面的渲染等,这样才能更好的理解前端的需求,从而更好的进行后端开发;
  4. 项目对接需要良好的文档编辑能力和沟通能力;
  5. 相关业务的具体实现,也是体现个人智慧的时刻,具体的开发技术和解决问题的智慧都是必不可少的,开发技术是工具,而我们需要用自己已有的工具去实现某些功能,这是一种思维方式,也是一种能力。,当然,技术会的越多,能用的工具越多,解决问题的思路就不会被限制;
  6. 总会有开发者不会的东西,这是一个学习的过程,重点在于不断地学习,虚心请教和快速掌握。

6. 后记

如果觉得我写得还算不错,不妨点赞关注一波走起~
想看更多博文,请访问我的各平台主页:博客园 / CSDN / 51CTO / 掘金论坛 / 知乎

原文链接:https://www.cnblogs.com/CrazyPixel/p/17796888.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号