DRF三大认证

命运对每个人都是一样的,不一样的是各自的努力和付出不同,付出的越多,努力的越多,得到的回报也越多,在你累的时候请看一下身边比你成功却还比你更努力的人,这样,你就会更有动力。

导读:本篇文章讲解 DRF三大认证,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

认证权限频率

在APIView执行的过程中,在dispatch方法中走了三大认证self.initial(request, *args, **kwargs)

def initial(self, request, *args, **kwargs):
    ···
    self.perform_authentication(request)  # 认证
    self.check_permissions(request)    # 权限
    self.check_throttles(request)    # 频率
  • 认证

    • 需求

      前后端混合开发,可以通过HttpResponse对象来设置cookie进而校验登录,现在前后端分离开发,用不到cookie,我们通过登录接口,来模拟认证登录,登录成功返回json字符串,并且携带随机字符串(uuid模拟生成token),通过token随机字符串来判断用户是否登录,登录了就更新token,首次登录就存token;

    • 分析
      • 创建User表
      • 创建UserToken表,和User一对一关系
      • 前端传入用户名,密码
      • 数据库取校验用户信息
      • 校验成功,Token表内新增一条记录,返回给前端json格式字符串,字符串中带一个随机字符串
    • 登录认证

      • 模型层
        from django.db import models
        
        
        class User(models.Model):
            username = models.CharField(max_length=32)
            password = models.CharField(max_length=16)
            user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通管理员'), (3, '普通用户')))
        
            def get_code(self):
                self.get_user_type_display()
                print(self.get_user_type_display())
        
        class UserToken(models.Model):
            user = models.OneToOneField(to=User,on_delete=models.CASCADE)
            token = models.CharField(max_length=32)
      • 视图层
        from rest_framework.response import Response
        from rest_framework.viewsets import ViewSet
        from rest_framework.decorators import action
        from app01 import models
        
        
        class UserView(ViewSet):  # 等价于class UserView(ViewSetMixin, APIView):
            @action(methods=['POST'], detail=False)  # detail 不带pk
            def login(self, request, *args, **kwargs):
                # 获取数据
                username = request.data.get('username')
                password = request.data.get('password')
                user = models.User.objects.filter(username=username, password=password).first()
                if user:
                    # 如果user有值说明登录成功,生产随机字符串,存入数据库,如果重复登录那么就更新随机字符串
                    import uuid
                    uuid_str = uuid.uuid4()
                    # print(type(uuid_str)) # <class 'uuid.UUID'>
                    token = str(uuid_str)
                    # 如果存在就更新,如果不存在就新增,指定搜索对象,然后defaults指定更新内容
                    models.UserToken.objects.update_or_create(user=user,defaults={'token': token} )
                    # 返回随机字符串
                    return Response({'code': 100, 'msg': '登录成功', 'token': token})
                return Response({'code': 101, 'msg': '登录失败,用户名或密码错误'})
      • 路由
        from django.contrib import admin
        from django.urls import path,include
        from app01 import views
        from rest_framework.routers import SimpleRouter
        router = SimpleRouter()
        router.register('user',views.UserView,'user')
        urlpatterns = [
            path('admin/', admin.site.urls),
            path('',include(router.urls))
        ]

        imageimage

    • 认证

      在执行视图函数之前执行了认证方法:self.perform_authentication(request)

      • 这里写一个认证demo,只有登录过的才能查看Book表
        '''auth.py'''
        from rest_framework.authentication import BaseAuthentication
        from rest_framework.exceptions import AuthenticationFailed
        from app01 import models
        
        # 写一个类继承BaseAuthentication
        class LoginAuth(BaseAuthentication):
            # 重写authenticate方法
            def authenticate(self, request):
                # 获取前端携带的token,token放在哪是自己规定的,比如从查询参数中获取
                token = request.query_params.get('token')
                # 比对随机字符串
                user_token = models.UserToken.objects.filter(token=token).first()
                if user_token:
                    # 登录了,返回当前登录用户和token
                    return user_token.user,token
                    """
                    返回的第一个(user_token.user),给了request.user,就是当前登录用户对象
                    返回的第二个(token),给了request.auth,就是token串
                    """
                else:
                    # 没有登录,抛异常
                    raise AuthenticationFailed('您没有登录,请登录')
        
        
        
        '''serializer.py'''
        from rest_framework import serializers
        from app01 import models
        class BookSerializer(serializers.ModelSerializer):
            class Meta:
                model = models.Book
                fields = '__all__'
        
        
        
        '''models.py'''
        class Book(models.Model):
            name = models.CharField(max_length=32)
            price = models.DecimalField(decimal_places=2,max_digits=5)
            author = models.CharField(max_length=32)
        
        
        
        
        '''urls.py'''
        from django.contrib import admin
        from django.urls import path,include
        from app01 import views
        from rest_framework.routers import SimpleRouter
        router = SimpleRouter()
        router.register('user',views.UserView,'user')
        router.register('books',views.BookView,'books')
        urlpatterns = [
            path('admin/', admin.site.urls),
            path('',include(router.urls))
        ]
      • 局部使用:写一个认证类,通过authentication_classes参数指定认证类
        class BookView(ModelViewSet):
            # 局部使用
            authentication_classes = [LoginAuth,]
      • 全局使用:写一个认证类,settings.py配置,所有的视图类生效
        REST_FRAMEWORK={
            "DEFAULT_AUTHENTICATION_CLASSES":["app01.auth.LoginAuth",]
            }
      • 局部禁用:authentication_classes = []
      • 返回的user_token和token值可以通过在视图类里重写list方法拿到
        '''views.py'''
        from .auth import LoginAuth
        class BookView(ModelViewSet):
            # 局部使用
            authentication_classes = [LoginAuth,]
            queryset = models.Book.objects.all()
            serializer_class = serializer.BookSerializer
        
            def list(self, request, *args, **kwargs):
                print(request.user) # User object (1)
                print(request.user.username) # Alan
                print(request.auth) # de914129-2f08-41a4-a7a9-de289badb771
                return super().list(request, *args, **kwargs)
      • 总结
        • 局部禁用和全局配置使用的时候要注意,全局如果认证的时候是每个视图函数都认证,就比如登录视图认证登录,那么就死循环了,不认证不能登录···
  • 权限

    写权限继承BasePermission,重写has_permission方法,判断如果有权限,返回True,如果没有权限,返回False

    • 需求
      • 认证登录成功后,普通用户只能查询一条或所有
      • 管理员登录后才能通过权限认证进行修改,增加,删除操作
    • 作用

      • 权限控制可以限制用户对于视图的访问和对于具体数据对象的访问
      • 认证通过, 可以进行下一步验证 (频率认证)
      • 认证失败, 抛出权限异常结果
    • 使用

      • 局部使用:permission_classes = [UserPermission, ]
      • 全局使用:
        REST_FRAMEWORK={
                    "DEFAULT_PERMISSION_CLASSES":["app01.auth.UserPermission",]
               }
      • 局部禁用:permission_classes = []
    • 权限

      • 第一步:写一个类,继承BasePermission,重写has_permission,判断如果有权限,返回True,如果没有权限,返回False
      • 第二步:局部使用和全局使用
      • 权限类
        class UserPermission(BasePermission):
            def has_permission(self, request, view):
                # 没有权限的提示信息
                self.message = '您是:%s,没有权限' % request.user.get_user_type_display()
                # 如果有权限,返回True,没有权限返回False
                # 权限类,在认证类之后,request.user有了当前登录用户
                user_type = request.user.user_type
                print(user_type)
                if user_type < 3:  # 只要不是1,2,就没有权限
                    return True
                else:
                    return False
      • 视图
        from .auth import LoginAuth, UserPermission
        from rest_framework.mixins import RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin,ListModelMixin,CreateModelMixin
        from rest_framework.viewsets import GenericViewSet
        
        class BookView(RetrieveModelMixin,ListModelMixin,GenericViewSet):
            # 局部使用,普通用户登录后只能获取一条或所有
            authentication_classes = [LoginAuth, ]
            queryset = models.Book.objects.all()
            serializer_class = serializer.BookSerializer
        
        class BookDetailView(CreateModelMixin, DestroyModelMixin, UpdateModelMixin, GenericViewSet):
            # 局部使用,普通用户没有权限
            authentication_classes = [LoginAuth, ]
            permission_classes = [UserPermission, ]
            queryset = models.Book.objects.all()
            serializer_class = serializer.BookSerializer
      • 路由
        from rest_framework.routers import SimpleRouter
        router = SimpleRouter()
        router.register('user',views.UserView,'user')
        router.register('books',views.BookView,'books')
        router.register('bookdetail',views.BookDetailView,'bookdetail')
        urlpatterns = [
            path('admin/', admin.site.urls),
            path('',include(router.urls))
        ]

        image

      • 总结
        • 5个接口分成了俩视图类写
        • BookView:获取所有,获取单条API
        • BookDetailView:删除,修改,新增API
        • 这俩视图都需要登录:authentication_classes = [LoginAuth, ]
        • BookView只要登陆就可以操作
        • BookDetailView必须有权限才能,加了一个权限,permission_classes = [UserPermission, ]
      • 注意

        如果使用ModelViewSet快速写五个接口,那么在验证认证和权限的时候就会错乱,获取和修改等操作都在一个视图里了,分开写会好一点

  • 频率

    • 作用

      • 限制视图接口被访问的频率次数
      • 限制条件 : IP、ID、唯一键
      • 频率周期 : 时(h)、分(m)、秒(s)
      • 频率次数 : [num] / s
      • 没有达到限制频率可正常访问接口
      • 达到了频率限制次数, 在限制时间内不能进行访问, 超过时间后可以正常访问
    • 使用

      • 频率类
        # 频率类
        class IPThrottle(SimpleRateThrottle):
            scope = 'ip'
        
            # get_cache_key返回什么就以什么方法做限制,限制条件必须唯一,比如用户id
            def get_cache_key(self, request, view):
                # 限制ip地址,从request.META字典中获取ip
                '''
                request.META:请求头中的数据
                '''
                return request.META.get('REMOTE_ADDR')  # 客户端ip
      • 配置文件
        REST_FRAMEWORK={
            'DEFAULT_THROTTLE_RATES': {
                'ip': '3/m'  # ip是scope的字符串,一分钟访问3次
        }
      • 局部使用
        class BookView(RetrieveModelMixin, ListModelMixin, GenericViewSet):
            authentication_classes = [LoginAuth, ] # 登录认证
            permission_classes = [UserPermission, ] # 权限限制
            throttle_classes = [IPThrottle, ]  # 频率限制
            
            queryset = models.Book.objects.all()
            serializer_class = serializer.BookSerializer
      • 全局使用
        REST_FRAMEWORK={
            'DEFAULT_THROTTLE_CLASSES': (  # 全局配置频率类
                    'app01.auth.IPThrottle'
                ),
            }

        image

      • 总结
        • 写一个类,继承SimpleRateThrottle,重写类属性scope,scope值自定义,配置文件中一致就行,重写get_cache_key方法,返回什么限制什么
        • 在配置文件中配置,限制频率
        • 局部/全局使用
    • 认证权限频率+五个接口

      • 模型
        from django.db import modelsclass User(models.Model):    username = models.CharField(max_length=32)    password = models.CharField(max_length=16)    user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通管理员'), (3, '普通用户')))class UserToken(models.Model):    user = models.OneToOneField(to=User,on_delete=models.CASCADE)    token = models.CharField(max_length=32)class Book(models.Model):    name = models.CharField(max_length=32)    price = models.DecimalField(decimal_places=2,max_digits=5)    author = models.CharField(max_length=32)
      • 视图
        from rest_framework.decorators import actionfrom rest_framework.response import Responsefrom rest_framework.viewsets import ViewSetfrom app01 import modelsfrom app01 import serializerclass UserView(ViewSet):    @action(methods=['POST'], detail=False)    def login(self, request, *args, **kwargs):        # 获取数据        username = request.data.get('username')        password = request.data.get('password')        user = models.User.objects.filter(username=username, password=password).first()        if user:            # 如果user有值说明登录成功,生产随机字符串,存入数据库,如果重复登录那么就更新随机字符串            import uuid            uuid_str = uuid.uuid4()            # print(type(uuid_str)) # <class 'uuid.UUID'>            token = str(uuid_str)            # 如果存在就更新,如果不存在就新增,指定搜索对象,然后defaults指定更新内容            models.UserToken.objects.update_or_create(user=user, defaults={'token': token})            # 返回随机字符串            return Response({'code': 100, 'msg': '登录成功', 'token': token})        return Response({'code': 101, 'msg': '登录失败,用户名或密码错误'})from .auth import LoginAuth, UserPermission, IPThrottlefrom rest_framework.mixins import RetrieveModelMixin, DestroyModelMixin, UpdateModelMixin, ListModelMixin, \    CreateModelMixinfrom rest_framework.viewsets import GenericViewSetclass BookView(RetrieveModelMixin, ListModelMixin, GenericViewSet):    # 局部使用,普通用户登录后只能获取一条或所有    authentication_classes = [LoginAuth, ]    throttle_classes = [IPThrottle, ]    queryset = models.Book.objects.all()    serializer_class = serializer.BookSerializerclass BookDetailView(CreateModelMixin, DestroyModelMixin, UpdateModelMixin, GenericViewSet):    # 局部使用,普通用户没有权限    authentication_classes = [LoginAuth, ]    permission_classes = [UserPermission, ]    queryset = models.Book.objects.all()    serializer_class = serializer.BookSerializer
      • 序列化器
        from rest_framework import serializersfrom app01 import modelsclass BookSerializer(serializers.ModelSerializer):    class Meta:        model = models.Book        fields = '__all__'
      • 认证权限频率类
        from rest_framework.authentication import BaseAuthenticationfrom rest_framework.exceptions import AuthenticationFailedfrom rest_framework.permissions import BasePermissionfrom rest_framework.throttling import SimpleRateThrottlefrom app01 import models# 认证类class LoginAuth(BaseAuthentication):    # 重写authenticate方法    def authenticate(self, request):        # 获取前端携带的token,token放在哪是自己规定的,比如从查询参数中获取        token = request.query_params.get('token')        # 比对随机字符串        user_token = models.UserToken.objects.filter(token=token).first()        if user_token:            # 登录了,返回当前登录用户和token            return user_token.user, token        else:            # 没有登录,抛异常            raise AuthenticationFailed('您没有登录,请登录')# 权限类class UserPermission(BasePermission):    def has_permission(self, request, view):        # 没有权限的提示信息        self.message = '您是:%s,没有权限' % request.user.get_user_type_display()        # 如果有权限,返回True,没有权限返回False        # 权限类,在认证类之后,request.user有了当前登录用户        user_type = request.user.user_type        print(user_type)        if user_type < 3:  # 只要不是1,2,就没有权限            return True        else:            return False# 频率类class IPThrottle(SimpleRateThrottle):    scope = 'ip'    # get_cache_key返回什么就以什么方法做限制,限制条件必须唯一,比如用户id    def get_cache_key(self, request, view):        # 限制ip地址,从request.META字典中获取ip        '''        request.META:请求头中的数据        '''        return request.META.get('REMOTE_ADDR')  # 客户端ip
      • 配置文件
        REST_FRAMEWORK={    'DEFAULT_THROTTLE_RATES': {        'ip': '3/m'  # minute_3是scope的字符串,一分钟访问3次},
      • 路由
        from django.contrib import adminfrom django.urls import path,includefrom app01 import viewsfrom rest_framework.routers import SimpleRouterrouter = SimpleRouter()router.register('user',views.UserView,'user')router.register('books',views.BookView,'books')router.register('bookdetail',views.BookDetailView,'bookdetail')urlpatterns = [    path('admin/', admin.site.urls),    path('',include(router.urls))]
    • 进阶实战

      • 需求

        • 注册接口,包含字段用户名,密码,确认密码,用户类型
        • 登陆接口,校验用户名,密码,生成随机字符串
        • 认证功能,除了注册登陆接口外,所有接口都要登陆后访问
        • 频率限制功能,每分钟访问5次,book的所有接口,使用这个频率类
        • 权限限制功能,publish的所有操作需要超级用户能访问,其他的普通登陆用户就可以操作
      • 模型

        from django.db import modelsclass User(models.Model):    username = models.CharField(max_length=32)    password = models.CharField(max_length=255)    user_type = models.IntegerField(choices=((1, '超级管理员'), (2, '普通管理员'), (3, '普通用户')))class UserToken(models.Model):    user = models.OneToOneField(to=User,on_delete=models.CASCADE)    token = models.CharField(max_length=32)class Book(models.Model):    title = models.CharField(max_length=11)    price = models.DecimalField(max_digits=5, decimal_places=2)    authors = models.ManyToManyField(to='Author')    publish = models.ForeignKey(to='Publish', on_delete=models.CASCADE)    @property    def publish_detail(self):        return {'name': self.publish.name,'Email':self.publish.email}    @property    def author_list(self):        l = []        print(self.authors.all())  # <QuerySet [<Author: Author object (1)>, <Author: Author object (2)>]>        for author in self.authors.all():            print(author.authordetail)  # AuthorDetail object (1)            l.append({'name': author.username, 'gender': author.gender,                      'address': author.authordetail.address,'telephone':author.authordetail.telephone})        return lclass Author(models.Model):    username = models.CharField(max_length=11)    gender = models.IntegerField(choices=((1, '男'), (2, '女'), (3, '未知')))    authordetail = models.OneToOneField(to='AuthorDetail', on_delete=models.CASCADE)    @property    def authordetail_info(self):        return {'telephone': self.authordetail.telephone, 'address': self.authordetail.address}class AuthorDetail(models.Model):    telephone = models.BigIntegerField()    address = models.CharField(max_length=32)class Publish(models.Model):    name = models.CharField(max_length=11)    email = models.EmailField()
      • 序列化器

        from rest_framework import serializersfrom .models import *# 用户序列化器class UserSerializer(serializers.ModelSerializer):    class Meta:        model = User        fields = '__all__'# 书序列化器class BookSerializer(serializers.ModelSerializer):    class Meta:        model = Book        # fields = '__all__'        fields = ['id', 'title', 'price', 'publish', 'authors', 'publish_detail', 'author_list']        extra_kwargs = {            'publish': {'write_only': True},            'authors': {'write_only': True},        }# 作者序列化器class AuthorSerializer(serializers.ModelSerializer):    class Meta:        # 指定和哪个表有关系        model = Author        # fields = '__all__'        fields = ['id', 'username', 'gender', 'telephone', 'address', 'authordetail_info']    # 重写字段telephone和addr    telephone = serializers.CharField(write_only=True)    address = serializers.CharField(write_only=True, max_length=8, required=False)    # 重写create,操作两个表    def create(self, validated_data):        # 先存作者详情        authordetail = AuthorDetail.objects.create(telephone=validated_data.get('telephone'),                                                   address=validated_data.get('address'))        # 存作者表        author = Author.objects.create(author_detail=authordetail, gender=validated_data.get('gender'),                                       username=validated_data.get('username'))        # 这样只返回author对象就行,直接存了两个表,返回反序列化的对象        return author# 出版社序列化器class PublishSerializer(serializers.ModelSerializer):    class Meta():        model = Publish        fields = '__all__'
      • 视图

        from django.contrib.auth.hashers import make_password, check_passwordfrom rest_framework.decorators import actionfrom rest_framework.response import Responsefrom rest_framework.viewsets import ViewSetfrom rest_framework.viewsets import ModelViewSetfrom .auth import *from .serializer import *from .models import *# 注册视图class UserRegisterView(ViewSet):    @action(methods=["POST"], detail=False)    def register(self, request):        usernmae = request.data.get('username')        password = request.data.get('password')        re_password = request.data.get('re_password')        user_type = request.data.get('user_type')        if User.objects.filter(username=usernmae):            return Response({'msg': f'用户{usernmae}已注册!', 'code': 4000})        else:            if password == re_password:                # make_password加密:make_password(password, salt=None, hasher='default')                user_date = {'username': usernmae, 'password': make_password(password), 'user_type': user_type}                user_serializer = UserSerializer(data=user_date)                if user_serializer.is_valid():                    user_serializer.save()                    return Response({'code': 2001, 'msg': f'用户{usernmae}注册成功'})                else:                    return Response({'code': 4001, 'msg': '注册失败', 'errors': user_serializer.errors})            else:                return Response({'msg': '两次密码不一致', 'code': 4002})# 登录视图class UserLoginView(ViewSet):    @action(methods=["POST"], detail=False)    def login(self, request):        username = request.data.get('username')        password = request.data.get('password')        user = User.objects.filter(username=username).first()        # check_password(password, encoded, setter=None, preferred='default')        if user and check_password(password, user.password):            import uuid            token = str(uuid.uuid4())            UserToken.objects.update_or_create(user=user, defaults={'token': token})            return Response({'code': 2000, 'msg': f'用户{user.username}登录成功', 'token': token})        return Response({'code': 4004, 'msg': '校验失败,用户名或密码错误'})# 书接视图class BookView(ModelViewSet):    authentication_classes = [LoginAuth,]    throttle_classes = [IPThrottle,]    queryset = Book.objects.all()    serializer_class = BookSerializer# 作者视图class AuthorView(ModelViewSet):    authentication_classes = [LoginAuth,]    queryset = Author.objects.all()    serializer_class = AuthorSerializer# 出版社视图class PublishView(ModelViewSet):    authentication_classes = [LoginAuth, ]    permission_classes = [UserPermission,]    queryset = Publish.objects.all()    serializer_class = PublishSerializer
      • 认证权限频率

        from rest_framework.authentication import BaseAuthenticationfrom rest_framework.exceptions import AuthenticationFailedfrom rest_framework.permissions import BasePermissionfrom rest_framework.throttling import SimpleRateThrottlefrom app01 import models# 认证类class LoginAuth(BaseAuthentication):    # 重写authenticate方法    def authenticate(self, request):        # 获取前端携带的token,token放在哪是自己规定的,比如从查询参数中获取        token = request.query_params.get('token')        # 比对随机字符串        user_token = models.UserToken.objects.filter(token=token).first()        if user_token:            # 登录了,返回当前登录用户和token            return user_token.user, token        else:            # 没有登录,抛异常            raise AuthenticationFailed('您没有登录,请登录')# 权限类class UserPermission(BasePermission):    def has_permission(self, request, view):        # 没有权限的提示信息        self.message = '您是:%s,没有权限' % request.user.get_user_type_display()        # 如果有权限,返回True,没有权限返回False        # 权限类,在认证类之后,request.user有了当前登录用户        user_type = request.user.user_type        print(user_type)        if user_type < 3:  # 只要不是1,2,就没有权限            return True        else:            return False# 频率类class IPThrottle(SimpleRateThrottle):    scope = 'ip'    # get_cache_key返回什么就以什么方法做限制,限制条件必须唯一,比如用户id    def get_cache_key(self, request, view):        # 限制ip地址,从request.META字典中获取ip        '''        request.META:请求头中的数据        '''        return request.META.get('REMOTE_ADDR')  # 客户端ip
      • 配置文件

        REST_FRAMEWORK = {    'DEFAULT_THROTTLE_RATES': {        'ip': '5/m'  # minute_3是scope的字符串,一分钟访问5次    }, }
      • 路由

        from django.contrib import adminfrom django.urls import path,includefrom app01 import viewsfrom rest_framework.routers import SimpleRouterrouter = SimpleRouter()router.register('user',views.UserLoginView,'user')router.register('user',views.UserRegisterView,'user')router.register('books',views.BookView,'books')router.register('author',views.AuthorView,'author')router.register('publish',views.PublishView,'publish')urlpatterns = [    path('admin/', admin.site.urls),    path('', include(router.urls)),]
      • 测试

        image

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/144005.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!