Skip to content

Commit

Permalink
feature: eslint问题修复
Browse files Browse the repository at this point in the history
  • Loading branch information
keelyzheng committed Dec 22, 2021
2 parents c76e45a + 2170f09 commit 94332ba
Show file tree
Hide file tree
Showing 33 changed files with 336 additions and 78 deletions.
2 changes: 1 addition & 1 deletion deploy/helm/chartty/c_bklogconfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
dataId: {{ .Values.global.bkLogConfig.dataId }}
logConfigType: std_log_config
namespace: {{ .Release.Namespace | quote }}
container_name_match:
containerNameMatch:
- {{ $namePrefix }}
labelSelector:
matchLabels: {{- include "chartty.labels" $global | nindent 6 }}
Expand Down
3 changes: 2 additions & 1 deletion src/api/bin/start.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
python manage.py compilemessages

gunicorn wsgi -w 8 --threads 2 --max-requests 1024 --max-requests-jitter 50 --worker-class gevent -b :8000 --access-logfile - --error-logfile - --access-logformat '[%(h)s] %({request_id}i)s %(u)s %(t)s "%(r)s" %(s)s %(D)s %(b)s "%(f)s" "%(a)s"'
LISTEN_PORT="${PORT:=8000}"
gunicorn wsgi -w 8 --threads 2 --max-requests 1024 --max-requests-jitter 50 --worker-class gevent -b :$LISTEN_PORT --access-logfile - --error-logfile - --access-logformat '[%(h)s] %({request_id}i)s %(u)s %(t)s "%(r)s" %(s)s %(D)s %(b)s "%(f)s" "%(a)s"'
3 changes: 2 additions & 1 deletion src/api/bkuser_core/categories/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ class SyncTaskStatus(AutoLowerEnum):
SUCCESSFUL = auto()
FAILED = auto()
RUNNING = auto()
RETRYING = auto()

_choices_labels = ((SUCCESSFUL, _("成功")), (FAILED, _("失败")), (RUNNING, _("同步中")))
_choices_labels = ((SUCCESSFUL, _("成功")), (FAILED, _("失败")), (RUNNING, _("同步中")), (RETRYING, _("失败重试中")))
4 changes: 2 additions & 2 deletions src/api/bkuser_core/categories/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def get_plugin_by_name(name: str) -> "DataSourcePlugin":

def register_plugin(plugin: "DataSourcePlugin"):
try:
get_plugin_by_name(plugin.name)
raise PluginAlreadyExisted(f"Plugin with name: {plugin.name} already existed")
if get_plugin_by_name(plugin.name):
logger.warning(f"Plugin with name: {plugin.name} already existed")
except PluginDoesNotExist:
_global_plugins[plugin.name] = plugin
logger.info("➕Plugin[%s] added.", plugin.name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 3.2.5 on 2021-12-09 08:04

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("categories", "0011_synctask"),
]

operations = [
migrations.AddField(
model_name="synctask",
name="retried_count",
field=models.IntegerField(default=0, verbose_name="重试次数"),
),
migrations.AlterField(
model_name="syncprogress",
name="status",
field=models.CharField(
choices=[("successful", "成功"), ("failed", "失败"), ("running", "同步中"), ("retrying", "失败重试中")],
default="running",
max_length=16,
verbose_name="状态",
),
),
migrations.AlterField(
model_name="synctask",
name="status",
field=models.CharField(
choices=[("successful", "成功"), ("failed", "失败"), ("running", "同步中"), ("retrying", "失败重试中")],
default="running",
max_length=16,
verbose_name="状态",
),
),
]
20 changes: 3 additions & 17 deletions src/api/bkuser_core/categories/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@
from uuid import UUID, uuid4

from bkuser_core.audit.models import AuditObjMetaInfo
from bkuser_core.categories.constants import (
TIMEOUT_THRESHOLD,
CategoryStatus,
CategoryType,
SyncStep,
SyncTaskStatus,
SyncTaskType,
)
from bkuser_core.categories.constants import TIMEOUT_THRESHOLD, CategoryStatus, SyncStep, SyncTaskStatus, SyncTaskType
from bkuser_core.categories.db_managers import ProfileCategoryManager
from bkuser_core.categories.exceptions import ExistsSyncingTaskError
from bkuser_core.common.models import TimestampedModel
Expand All @@ -38,7 +31,7 @@
class ProfileCategory(TimestampedModel):
"""用户目录"""

type = models.CharField(verbose_name="类型", max_length=32, choices=CategoryType.get_choices())
type = models.CharField(verbose_name="类型", max_length=32)
description = models.TextField("描述文字", null=True, blank=True)
display_name = models.CharField(verbose_name="展示名称", max_length=64)
domain = models.CharField(verbose_name="登陆域", max_length=64, db_index=True, unique=True)
Expand Down Expand Up @@ -173,21 +166,14 @@ class SyncTask(TimestampedModel):
verbose_name="触发类型", max_length=16, choices=SyncTaskType.get_choices(), default=SyncTaskType.MANUAL.value
)
operator = models.CharField(max_length=255, verbose_name="操作人", default="nobody")
retried_count = models.IntegerField(verbose_name="重试次数", default=0)

objects = SyncTaskManager()

@property
def required_time(self) -> datetime.timedelta:
return self.update_time - self.create_time

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
self.status = SyncTaskStatus.FAILED.value
self.save(update_fields=["status", "update_time"])

@property
def progresses(self):
# 由于建表顺序的原因, SyncProgress 的 task_id 未设置成外键....
Expand Down
8 changes: 8 additions & 0 deletions src/api/bkuser_core/categories/plugins/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from enum import auto

from bkuser_core.categories.constants import SyncStep
from bkuser_core.common.enum import AutoNameEnum
from django.utils.translation import gettext_lazy as _

PLUGIN_NAME_SETTING_KEY = "plugin_name"
Expand All @@ -22,3 +25,8 @@
(SyncStep.USERS_RELATIONSHIP, True): _("同步用户【{username}】上级成功"),
(SyncStep.USERS_RELATIONSHIP, False): _("同步用户【{username}】上级失败, 失败原因: {error}"),
}


class HookType(AutoNameEnum):
POST_SYNC = auto()
PRE_SYNC = auto()
4 changes: 3 additions & 1 deletion src/api/bkuser_core/categories/plugins/custom/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from bkuser_core.categories.plugins.plugin import DataSourcePlugin
from bkuser_core.categories.plugins.plugin import DataSourcePlugin, HookType

from .hooks import AlertIfFailedHook
from .login import LoginHandler
from .sycner import CustomSyncer

Expand All @@ -19,4 +20,5 @@
login_handler_cls=LoginHandler,
allow_client_write=True,
category_type="custom",
hooks={HookType.POST_SYNC: AlertIfFailedHook},
).register()
27 changes: 27 additions & 0 deletions src/api/bkuser_core/categories/plugins/custom/hooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-用户管理(Bk-User) available.
Copyright (C) 2017-2021 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging

from celery.states import FAILURE

logger = logging.getLogger(__name__)


class AlertIfFailedHook:
"""当所有重试都失败时将告警通知"""

def trigger(self, status: str, params: dict):
if status == FAILURE:
logger.error(
"failed to sync data for category<%s> after %s retries", params["category"], params["retries"]
)
# 目前该 hook 更多是一个示例,并未实际实现告警通知功能
# TODO: 使用 ESB 通知到平台管理员
17 changes: 16 additions & 1 deletion src/api/bkuser_core/categories/plugins/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
specific language governing permissions and limitations under the License.
"""
from dataclasses import dataclass, field
from typing import Optional, Type
from typing import Dict, Optional, Type
from uuid import UUID

from bkuser_core.categories.constants import SyncTaskStatus
from bkuser_core.categories.loader import register_plugin
from bkuser_core.categories.models import SyncProgress, SyncTask
from bkuser_core.categories.plugins.base import LoginHandler, Syncer
from bkuser_core.categories.plugins.constants import HookType
from rest_framework import serializers
from typing_extensions import Protocol


class SyncRecordSLZ(serializers.Serializer):
Expand All @@ -25,6 +27,13 @@ class SyncRecordSLZ(serializers.Serializer):
dt = serializers.DateTimeField()


class PluginHook(Protocol):
"""插件钩子,用于各种事件后的回调"""

def trigger(self, status: str, params: dict):
raise NotImplementedError


@dataclass
class DataSourcePlugin:
"""数据源插件,定义不同的数据源"""
Expand All @@ -44,10 +53,16 @@ class DataSourcePlugin:
# 其他额外配置
extra_config: dict = field(default_factory=dict)

hooks: Dict[HookType, Type[PluginHook]] = field(default_factory=dict)

def register(self):
"""注册插件"""
register_plugin(self)

def get_hook(self, type_: HookType) -> Optional[PluginHook]:
hook_cls = self.hooks.get(type_)
return hook_cls() if hook_cls else None

def sync(self, instance_id: int, task_id: UUID, *args, **kwargs):
"""同步数据"""
syncer = self.syncer_cls(category_id=instance_id)
Expand Down
1 change: 1 addition & 0 deletions src/api/bkuser_core/categories/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class SyncTaskSerializer(Serializer):
operator = CharField(help_text="操作人")
create_time = DateTimeField(help_text="开始时间")
required_time = DurationTotalSecondField(help_text="耗时")
retried_count = IntegerField(help_text="重试次数")


class SyncTaskProcessSerializer(Serializer):
Expand Down
93 changes: 68 additions & 25 deletions src/api/bkuser_core/categories/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,53 +10,96 @@
"""
import logging
import uuid
from typing import Optional
from contextlib import contextmanager
from typing import Any, Optional, Union

from bkuser_core.categories.constants import SyncTaskType
from bkuser_core.categories.constants import SyncTaskStatus, SyncTaskType
from bkuser_core.categories.exceptions import ExistsSyncingTaskError
from bkuser_core.categories.loader import get_plugin_by_category
from bkuser_core.categories.models import ProfileCategory, SyncTask
from bkuser_core.categories.plugins.constants import HookType
from bkuser_core.categories.utils import catch_time
from bkuser_core.celery import app
from bkuser_core.common.cache import clear_cache
from bkuser_core.common.error_codes import error_codes
from celery import Task
from django.conf import settings

logger = logging.getLogger(__name__)


@app.task
class RetryWithHookTask(Task):
"""A task will retry automatically, with plugin hook executing"""

autoretry_for = (Exception,)
retry_kwargs = {"max_retries": settings.TASK_MAX_RETRIES}
retry_backoff = settings.RETRY_BACKOFF
retry_jitter = True

def after_return(self, status, retval, task_id, args, kwargs, einfo):
category = ProfileCategory.objects.get(pk=kwargs["instance_id"])
logger.info("Sync data task<%s> of category<%s> got result: %s", task_id, category, status)

plugin = get_plugin_by_category(category)
post_sync_hook = plugin.get_hook(HookType.POST_SYNC)
if post_sync_hook:
kwargs.update({"retries": self.request.retries, "category": category})
post_sync_hook.trigger(status, kwargs)


@contextmanager
def sync_data_task(category: ProfileCategory, task_id: Union[uuid.UUID, Any], should_retry: bool):
"""同步数据任务,支持标记重试、失败、成功"""
sync_task = SyncTask.objects.get(id=task_id)
try:
yield
except Exception:
if should_retry:
status = SyncTaskStatus.RETRYING.value
sync_task.retried_count += 1
else:
status = SyncTaskStatus.FAILED.value

sync_task.status = status
sync_task.save(update_fields=["retried_count", "status", "update_time"])
raise
else:
# 标记同步
category.mark_synced()
sync_task.status = SyncTaskStatus.SUCCESSFUL.value
sync_task.save(update_fields=["status", "update_time"])

# 同步成功后,清理当前的缓存
clear_cache()


@app.task(base=RetryWithHookTask)
def adapter_sync(instance_id: int, operator: str, task_id: Optional[uuid.UUID] = None, *args, **kwargs):
logger.info("going to sync Category<%s>", instance_id)
instance = ProfileCategory.objects.get(pk=instance_id)
category = ProfileCategory.objects.get(pk=instance_id)

if task_id is None:
# 只有定时任务未传递 task_id
try:
task_id = SyncTask.objects.register_task(category=instance, operator=operator, type_=SyncTaskType.AUTO).id
task_id = SyncTask.objects.register_task(category=category, operator=operator, type_=SyncTaskType.AUTO).id
except ExistsSyncingTaskError as e:
raise error_codes.LOAD_DATA_FAILED.f(str(e))

with SyncTask.objects.get(id=task_id):
try:
plugin = get_plugin_by_category(instance)
except ValueError:
logger.exception("category type<%s> is not support", instance.type)
raise error_codes.CATEGORY_TYPE_NOT_SUPPORTED
except Exception:
logger.exception(
"load adapter<%s-%s-%s> failed",
instance.type,
instance.display_name,
instance.id,
)
raise error_codes.LOAD_DATA_ADAPTER_FAILED
try:
plugin = get_plugin_by_category(category)
except ValueError:
logger.exception("category type<%s> is not support", category.type)
raise error_codes.CATEGORY_TYPE_NOT_SUPPORTED
except Exception:
logger.exception(
"load adapter<%s-%s-%s> failed",
category.type,
category.display_name,
category.id,
)
raise error_codes.LOAD_DATA_ADAPTER_FAILED

with sync_data_task(category, task_id, adapter_sync.request.retries < adapter_sync.max_retries):
with catch_time() as context:
plugin.sync(instance_id=instance_id, task_id=task_id, *args, **kwargs)
logger.info(f"同步总耗时: {context.time_delta}s, 消耗总CPU时间: {context.clock_delta}s.")

# 标记同步
instance.mark_synced()

# 同步成功后,清理当前的缓存
clear_cache()
2 changes: 1 addition & 1 deletion src/api/bkuser_core/config/common/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# ==============================================================================
# 数据库
# ==============================================================================
DB_PREFIX = "DB"
DB_PREFIX = env("DB_PREFIX", default="DB")

DATABASES = get_db_config(env, DB_PREFIX)

Expand Down
6 changes: 6 additions & 0 deletions src/api/bkuser_core/config/common/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,9 @@

# 是否使用进度条(本地开发方便)
USE_PROGRESS_BAR = False

# ==============================================================================
# 数据同步
# ==============================================================================
TASK_MAX_RETRIES = env.int("TASK_MAX_RETRIES", default=3)
RETRY_BACKOFF = env.int("RETRY_BACKOFF", default=30)
Loading

0 comments on commit 94332ba

Please sign in to comment.