
Flask Todo代办事项项目
1.创建基本视图
2.注册功能
2.1 基本注册
2.2 邮箱注册(1)
2.3 邮箱注册(2)
2.4 手机注册
2.5 第三方注册
2.6 用户权限
3.修改个人页面
3.1 个人页面展示(1)
3.1个人页面展示(2)
示例代码
https://github.com/ningwenyan/Flask_Todo_Demo/releases/tag/v1.06
3.增加personal.html
中各种修改
1.重命名用户
这个功能选项,可以使用
dialog
,也可以使用alert
弹窗方法, 这里我使用sweatalert2
弹窗方法来处理.如果你想要理解,可以访问:
http://mishengqiang.com/sweetalert2
https://sweetalert2.github.io/
引入
sweatalert2
<!-- personal.html -->
<script src="https://cdn.bootcdn.net/ajax/libs/limonte-sweetalert2/10.9.0/sweetalert2.all.js"></script>
<link href="https://cdn.bootcdn.net/ajax/libs/limonte-sweetalert2/10.9.0/sweetalert2.css" rel="stylesheet">创建
API
# api/v1/user.py
class User_update_username(Resource):
def get(self, id):
user = User.query.get_or_404(id)
if user is not None:
return jsonify(user.to_dict())
def post(self, id):
user = User.query.get_or_404(id)
if user is not None:
if request.json.get('username'):
user.username = request.json.get('username')
user.save()
return jsonify(user.to_dict())注册路由
# api/v1/__init__.py
v1_api.add_resource(User_update_username, '/userUpdateUsername/<int:id>/')增加
jquery
$(function () {
$('#personalRename').click(function(e){
// 阻止默认新闻
e.preventDefault();
swal.fire({
icon: 'info',
title: '更改用户名',
confirmButtonText: '确定',
showCancelButton: true,
cancelButtonText:'取消',
input: 'text',
inputPlaceholder: '用户名',
allowOutsideClick: false,
}).then(function(result){
if (result.isConfirmed && result.value.length > 0){
const Uid = $('#showID').data('user-id');
const update_URL= '/api/v1/userUpdateUsername/' + Uid + '/' ;
$.getJSON(update_URL, function(rst){
rst.username = result.value;
csrfAjax.ajax({
url: update_URL,
type: 'POST',
contentType : 'application/json',
data : JSON.stringify(rst),
success: function (e) {
swal.fire({
icon: 'success',
title : '添加成功'
}
);
setTimeout(function(){
location.reload();
}, 3000)
}
})
})
}
});
})
});从新刷新页面后,即可进行修改重命名的操作.
2.权限设置
以上可以实现修改密码的操作,但是如果我想禁止某些类型的用户修改用户名(比如,未激活用户不能修改用户名),这就涉及到了权限的操作.基于最基本的
rbac
,设计以下数据表.10342
用户表 角色表 用户角色表 资源表 角色资源表 按照从属关系来定位资源.
# sqlModel.py
#!/usr/bin/env python
# coding=utf-8
from .exts import db, bcrypt, login_manager
from flask_login import UserMixin
from datetime import datetime
import enum
import re
from sqlalchemy.ext.declarative import synonym_for
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import synonym
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from flask import current_app
from app.utils.timeCoversion import utc2local
class BaseModel:
"""所有Model的父模板, 集成 db ORM 保存,删除属性"""
def __commit(self):
# __ 双下划线避免子类覆盖功能
from sqlalchemy.exc import IntegrityError
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
def delete(self):
# 删除数据
db.session.delete(self)
self.__commit()
def save(self):
# 添加数据
db.session.add(self)
self.__commit()
return self
class GenderEnum(enum.Enum):
MALE = 1
FEMAle = 2
SECRET = 3
def check_length(attribute, length):
"""检查属性值长度"""
try:
return bool(attribute) and len(attribute) <= length
except:
return False
EMAIL_REGEX = re.compile(r"^S+@S+.S+$")
USERNAME_REGEX = re.compile(r"^S+$")
PHONE_REGEX = re.compile(r"1[3|4|5|7|8][0-9]{9}")
# 用户角色关联表
UserToRole = db.Table('user_role', db.Model.metadata,
db.Column('user_id', db.Integer, db.ForeignKey('user.id'), primary_key=True),
db.Column('role__id', db.Integer, db.ForeignKey('sysrole.id'),primary_key=True)
)
# 角色资源表
RoleToResource = db.Table('role_resource', db.metadata,
db.Column('role_id', db.Integer, db.ForeignKey('sysrole.id'), primary_key=True),
db.Column('resource.id', db.Integer, db.ForeignKey('sysresource.id'), primary_key=True)
)
class User(db.Model, BaseModel, UserMixin):
__tablename__ = "user"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
_email = db.Column(db.String(64), unique=True, index=True) # 匹配正则
_username = db.Column(db.String(64), unique=True, index=True) # 匹配正则
_password_hash = db.Column(db.String(128)) # 密码加密,增强安全性, 隐藏属性值
confirmed = db.Column(db.Boolean, default=False) # 激活状态
location = db.Column(db.String(64)) # 位置:北京,河北....
about_me = db.Column(db.TEXT()) # 自我介绍
member_since = db.Column(db.DateTime, default=datetime.utcnow) # 创建时间,网站全部采用UTC时间
last_seen = db.Column(db.DateTime, default=datetime.utcnow) # 最后一次登录时间
gender = db.Column(db.Enum(GenderEnum), default=GenderEnum.SECRET)
avatar = db.Column(db.String(100)) # 头像保存地址
real_name = db.Column(db.String(64)) # 此字段用来验证 网络安全法案中的身份验证,因为是测试,而且要接入API,此处不做验证
_phone = db.Column(db.String(11))
birthday = db.Column(db.DateTime, default=None)
# relationship
roles = db.relationship('Role', secondary=UserToRole, backref=db.backref('users', lazy='dynamic'))
def __init__(self, email, username, password, *args, **kwargs):
self.email = email
self.username = username
self.password = password
# -- username 正则匹配,长度约束
@property
def username(self):
return self._username
@username.setter
def username(self, raw_username):
"""检查有效长度"""
is_valid_length = check_length(raw_username, 24)
if not is_valid_length or not bool(USERNAME_REGEX.match(raw_username)):
raise ValueError('{} 不是有效的用户名,不允许超过24位字符'.format(raw_username))
self._username = raw_username
username = synonym("_username", descriptor=username)
# -- email 正则匹配
@property
def email(self):
return self._email
@email.setter
def email(self, raw_email):
if not check_length(raw_email, 64) or not bool(EMAIL_REGEX.match(raw_email)):
raise ValueError("{} 不是有效的邮箱".format(raw_email))
self._email = raw_email
email = synonym("_email", descriptor=email)
# -- 密码加密/解密
@property
def password(self):
return self._password_hash
@password.setter
def password(self, raw_password):
"""加密密码"""
self._password_hash = bcrypt.generate_password_hash(raw_password)
@password.deleter
def password(self):
del self._password_hash
def check_password(self, password):
"""如果原始密码和加密后的密码相同,返回True"""
return bcrypt.check_password_hash(self._password_hash, password)
# -- phone
@property
def phone(self):
return self._phone
@phone.setter
def phone(self, raw_phone):
if not check_length(raw_phone, 11) or not bool(PHONE_REGEX.match(raw_phone)):
raise ValueError("{} 不匹配手机号".format(raw_phone))
self._phone = raw_phone
# -- 记录最后一次登录时间
def seen(self):
self.last_seen = datetime.utcnow()
return self.save()
def __repr__(self):
return "<User %r>" % self.username
# -- 创建邮箱token
# 生成token,使用itsdangerous序列化token,确定唯一用户,并设置超时时间
def generate_confirmation_token(self, expiration=3600):
s = Serializer(current_app._get_current_object().config['SECRET_KEY'], expires_in=expiration)
return s.dumps({'confirm':self.id}).decode('utf-8')
# 接受序列化信息并检测
def check_confirmation_token(self, token):
s = Serializer(current_app._get_current_object().config['SECRET_KEY'])
try:
data = s.loads(token.encode('utf-8'))
except:
return False
if data.get('confirm') != self.id:
# 检测唯一id
return False
# 所有检测通过,证明唯一用户,设置标志位
self.confirmed = True
# 没有执行db.session.commit(), 因为用户这里并不能确定用户点击了激活的超链接
# 这将会在 views.py 中定义
db.session.add(self)
return True
def to_dict(self):
return {
"id": self.id,
"username": self.username,
"email": self.email,
"confirmed": self.confirmed,
"location" : self.location,
"about_me" : self.about_me,
"member_since" : utc2local(self.member_since),
"last_seen" : utc2local(self.last_seen),
"gender" : self.gender.name,
"avatar" : self.avatar,
"birthday" : self.birthday,
"flag" : True # 标志位,在数据库中无设置,用于返回json数据
}
def have_permission(self, url):
permissions = []
for role in self.roles:
for resource in role.resources:
print(resource.name)
permissions.extend([ resource for resource in role.resources])
urls = list(filter(lambda x: x.url == url, permissions))
if urls:
return True
else:
return False
# rbac
# 角色表
class Role(db.Model, BaseModel):
__tablename__ = 'sysrole'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
create_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
update_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
name = db.Column(db.String(64)) # 超级管理员,管理员,普通会员,未激活会员
description = db.Column(db.String(64))
# relationship
resources = db.relationship('Resource', secondary=RoleToResource, backref=db.backref('roles', lazy='dynamic'))
def __init__(self, name, description, *args, **kwargs):
self.name = name
self.description = description
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'create_at': utc2local(self.create_at),
'update_at': utc2local(self.update_at)
}
def __repr__(self):
return "Role name:{}, description:{}".format(self.name, self.description)
# 资源表
class Resource(db.Model, BaseModel):
__tablename__ = 'sysresource'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
create_at = db.Column(db.DateTime, index=True, default=datetime.utcnow)
update_at = db.Column(db.DateTime, index=True, default=datetime.utcnow)
name = db.Column(db.String(64))
url = db.Column(db.String(200))
# relationship
def __init__(self, name, url, *args, **kwargs):
self.name = name
self.url = url
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
Model
对应如上,需要注意的是UserToRoleRoleToResource
创建的是Table
对象,并不是直接映射类.在添加删除时需要根据Relationship
来决定.这些表都包含了一些基本相关字段.
其中最重要的是
User.have_permission(self, url)
方法,通过该方法查看相关的url
是有没有权限操作的.因为现在还没有创建后台
Blueprint
,所以我们把创建的过程放在manager.py
中.# manager.py
from flask_script import Manager
from app.commons.exts import db
from flask_migrate import MigrateCommand, Migrate
from app import create_app
from app.commons.sqlModel import User, Role, UserToRole, Resource, RoleToResource
app = create_app()
# 导入Manager并绑定app
manager = Manager(app)
# 导入flaks_migrate
# Migrate 绑定app,db
Migrate(app, db)
# MigrateCommand 可以使用Alembic的命令
#
manager.add_command('db', MigrateCommand) # db是别名
# 添加开发测试账户
@manager.option('-e', '--email', dest='email')
@manager.option('-u', '--username', dest='username')
@manager.option('-p', '--password', dest='password')
def CMS_user(email, username, password):
user = User.query.filter_by(email=email).filter_by(password=password).first()
if user is not None:
print('已有账户')
else:
real_user = User(email, username, password)
real_user.save()
print('账户已创建')
# 更新密码
@manager.option('-i', '--id', dest='id')
@manager.option('-p', '--password', dest='password')
def CMS_user_update_password(id, password):
user = User.query.filter_by(id=id).first()
if user is not None:
user.password = password
user.save()
print("密码已经更新")
else:
print("账户不存在")
# 删除用户
@manager.option('-e', '--email', dest='email')
def CMS_user_delete(email):
user = User.query.filter_by(email=email).first()
if user is not None:
user.delete()
print("用户已删除")
else:
print('用户不存在')
# 添加角色
@manager.option('-n', '--name', dest='name')
@manager.option('-d', '--description', dest="description")
def cms_role_add(name, description):
role = Role(name, description)
role.save()
print('添加成功')
# 添加用户角色
@manager.option('-u', '--userId', dest='user_id')
@manager.option('-r', '--roleId', dest='role_id')
def cms_role_user_add(user_id, role_id):
user = User.query.get_or_404(user_id)
role = Role.query.get_or_404(role_id)
if user is not None and role is not None:
user.roles.append(role)
user.save()
print('添加角色用户映射')
# 删除用户角色
@manager.option('-u', '--userId', dest='user_id')
@manager.option('-r', '--roleId', dest='role_id')
def cms_role_user_del(user_id, role_id):
user = User.query.get_or_404(user_id)
role = Role.query.get_or_404(role_id)
if user is not None and role is not None:
user.roles.remove(role)
user.save()
print('添加角色用户映射')
# 添加资源
@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cms_resource_add(name, url):
resource = Resource(name, url)
resource.save()
print('资源添加成功')
# 添加资源角色表
@manager.option('-r', '--roleId', dest='role_id')
@manager.option('-s', '--resource', dest='resource_id')
def cms_resource_role_add(role_id,resource_id):
role = Role.query.get_or_404(role_id)
resource = Resource.query.get_or_404(resource_id)
if role is not None and resource is not None:
role.resources = [resource]
role.save()
print('角色资源添加成功')
if __name__ == '__main__':
manager.run()这样基本能满足我们创建角色和资源了.
为了记录创建的过程.不至于混淆.我在这里创建了
bash
脚本,一次性创建所有的权限.#!/bin/bash
echo "创建role"
python manager.py cms_role_add -n "InactiveUser" -d "未激活用户"
python manager.py cms_role_add -n "RegularUser" -d "普通会员"
python manager.py cms_role_add -n "UserLevel_1" -d "一级会员"
python manager.py cms_role_add -n "UserLever_2" -d "二级会员"
python manager.py cms_role_add -n "Administrator" -d "管理员"
python manager.py cms_role_add -n "SuperAdmin" -d "超级管理员"
echo "创建用户角色表"
python manager.py cms_role_user_add -u 1 -r 2
python manager.py cms_role_user_add -u 2 -r 1
echo "添加资源"
python manager.py cms_resource_add -n "修改用户名" -u "/api/v1/userUpdateUsername/"
echo "添加角色资源表"
python manager.py cms_resource_role_add -r 2 -s 1
echo "删除用户某个角色"
python manager.py cms_role_user_del -u 1 -r 2注意,我这里有2个用户
id=1,id=2
,其中id=1
是普通会员,id=2
是未激活会员.你需要根据自己的情况来创建用户角色的对应,也可以新增加多个角色.创建完毕后,需要新增加
js
把url
提供给后台,如果你使用的是flask_restful
,它有个Field.url()
能反馈URL
.这里为了简便,我手动推送了.// auth.js
.....
$.getJSON(update_URL, function(rst){
rst.username = result.value;
rst.url = '/api/v1/userUpdateUsername/';# api/user.py
class User_update_username(Resource):
def get(self, id):
user = User.query.get_or_404(id)
if user is not None:
return jsonify(user.to_dict())
def post(self, id):
user = User.query.get_or_404(id)
print(request.json.get('url'))
print(user.have_permission(request.json.get('url')))
if user is not None and user.have_permission(request.json.get('url')) and request.json.get('username'):
user.username = request.json.get('username')
user.save()
return jsonify(user.to_dict())
else:
to_dict = {
'flag': False
}
return jsonify(to_dict)注意这个
flag
,我在sqlModel.User
中设置了一个标志位,它永远指向了True
,也就是有权限.而在后台中检测到没有权限时,只会返回一个flag:False
,这样就可以在auth.js
中进行判断.// auth.js
$(function () {
$('#personalRename').click(function(e){
// 阻止默认新闻
e.preventDefault();
swal.fire({
icon: 'info',
title: '更改用户名',
confirmButtonText: '确定',
showCancelButton: true,
cancelButtonText:'取消',
input: 'text',
inputPlaceholder: '用户名',
allowOutsideClick: false,
}).then(function(result){
if (result.isConfirmed && result.value.length > 0){
const Uid = $('#showID').data('user-id');
const update_URL= '/api/v1/userUpdateUsername/' + Uid + '/' ;
$.getJSON(update_URL, function(rst){
rst.username = result.value;
rst.url = '/api/v1/userUpdateUsername/';
csrfAjax.ajax({
url: update_URL,
type: 'POST',
contentType : 'application/json',
data : JSON.stringify(rst),
success: function (e) {
console.log(e);
if (e.flag){
swal.fire({
icon: 'success',
title : '添加成功'
});
} else{
swal.fire({
'icon': 'info',
'title': '你没有权限'
})
}
setTimeout(function(){
location.reload();
}, 3000)
}
})
})
}
});
})
});注意
e.flag
的判断.10101
– END –
原文始发于微信公众号(Flask学习笔记):Flask Todo项目:个人事项(2)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之家整理,本文链接:https://www.bmabk.com/index.php/post/36351.html