Skip to content

Commit

Permalink
Refactor: SessionContext doesn't require Arc
Browse files Browse the repository at this point in the history
Embrace idiomatic Rust practices by reducing unnecessary sharing. In
this case, since `SessionContext` is not shared among threads, there is
no need to wrap it in an `Arc`. This change streamlines the code and
enhances performance by eliminating unneeded atomic reference counting.
  • Loading branch information
drmingdrmer committed May 12, 2024
1 parent 2b7d881 commit 2ef3f06
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 47 deletions.
50 changes: 25 additions & 25 deletions src/query/service/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ use crate::sessions::SessionType;
pub struct Session {
pub(in crate::sessions) id: String,
pub(in crate::sessions) typ: RwLock<SessionType>,
pub(in crate::sessions) session_ctx: Arc<SessionContext>,
pub(in crate::sessions) privilege_mgr: SessionPrivilegeManagerImpl,
pub(in crate::sessions) session_ctx: Box<SessionContext>,
status: Arc<RwLock<SessionStatus>>,
pub(in crate::sessions) mysql_connection_id: Option<u32>,
format_settings: FormatSettings,
Expand All @@ -57,17 +56,15 @@ impl Session {
pub fn try_create(
id: String,
typ: SessionType,
session_ctx: Arc<SessionContext>,
session_ctx: Box<SessionContext>,
mysql_connection_id: Option<u32>,
) -> Result<Arc<Session>> {
let status = Arc::new(Default::default());
let privilege_mgr = SessionPrivilegeManagerImpl::new(session_ctx.clone());
Ok(Arc::new(Session {
id,
typ: RwLock::new(typ),
status,
session_ctx,
privilege_mgr,
mysql_connection_id,
format_settings: FormatSettings::default(),
}))
Expand Down Expand Up @@ -114,7 +111,8 @@ impl Session {
}

pub fn quit(self: &Arc<Self>) {
let session_ctx = self.session_ctx.clone();
let session_ctx = self.session_ctx.as_ref();

if session_ctx.get_current_query_id().is_some() {
if let Some(shutdown_fun) = session_ctx.take_io_shutdown_tx() {
shutdown_fun();
Expand All @@ -138,9 +136,7 @@ impl Session {
}

pub fn force_kill_query(self: &Arc<Self>, cause: ErrorCode) {
let session_ctx = self.session_ctx.clone();

if let Some(context_shared) = session_ctx.get_query_context_shared() {
if let Some(context_shared) = self.session_ctx.get_query_context_shared() {
context_shared.kill(cause);
}
}
Expand Down Expand Up @@ -205,7 +201,11 @@ impl Session {
}

pub fn get_current_user(self: &Arc<Self>) -> Result<UserInfo> {
self.privilege_mgr.get_current_user()
self.privilege_mgr().get_current_user()
}

pub fn privilege_mgr<'a>(&'a self) -> SessionPrivilegeManagerImpl<'a> {
SessionPrivilegeManagerImpl::new(self.session_ctx.as_ref())
}

// set_authed_user() is called after authentication is passed in various protocol handlers, like
Expand All @@ -218,21 +218,23 @@ impl Session {
user: UserInfo,
restricted_role: Option<String>,
) -> Result<()> {
self.privilege_mgr
self.privilege_mgr()
.set_authed_user(user, restricted_role)
.await
}

#[async_backtrace::framed]
pub async fn validate_available_role(self: &Arc<Self>, role_name: &str) -> Result<RoleInfo> {
self.privilege_mgr.validate_available_role(role_name).await
self.privilege_mgr()
.validate_available_role(role_name)
.await
}

// Only the available role can be set as current role. The current role can be set by the SET
// ROLE statement, or by the `session.role` field in the HTTP query request body.
#[async_backtrace::framed]
pub async fn set_current_role_checked(self: &Arc<Self>, role_name: &str) -> Result<()> {
self.privilege_mgr
self.privilege_mgr()
.set_current_role(Some(role_name.to_string()))
.await
}
Expand All @@ -242,33 +244,33 @@ impl Session {
self: &Arc<Self>,
role_names: Option<Vec<String>>,
) -> Result<()> {
self.privilege_mgr.set_secondary_roles(role_names).await
self.privilege_mgr().set_secondary_roles(role_names).await
}

pub fn get_current_role(self: &Arc<Self>) -> Option<RoleInfo> {
self.privilege_mgr.get_current_role()
self.privilege_mgr().get_current_role()
}

pub fn get_secondary_roles(self: &Arc<Self>) -> Option<Vec<String>> {
self.privilege_mgr.get_secondary_roles()
self.privilege_mgr().get_secondary_roles()
}

#[async_backtrace::framed]
pub async fn unset_current_role(self: &Arc<Self>) -> Result<()> {
self.privilege_mgr.set_current_role(None).await
self.privilege_mgr().set_current_role(None).await
}

// Returns all the roles the current session has. If the user have been granted restricted_role,
// the other roles will be ignored.
// On executing SET ROLE, the role have to be one of the available roles.
#[async_backtrace::framed]
pub async fn get_all_available_roles(self: &Arc<Self>) -> Result<Vec<RoleInfo>> {
self.privilege_mgr.get_all_available_roles().await
self.privilege_mgr().get_all_available_roles().await
}

#[async_backtrace::framed]
pub async fn get_all_effective_roles(self: &Arc<Self>) -> Result<Vec<RoleInfo>> {
self.privilege_mgr.get_all_effective_roles().await
self.privilege_mgr().get_all_effective_roles().await
}

#[async_backtrace::framed]
Expand All @@ -280,7 +282,7 @@ impl Session {
if matches!(self.get_type(), SessionType::Local) {
return Ok(());
}
self.privilege_mgr
self.privilege_mgr()
.validate_privilege(object, privilege)
.await
}
Expand All @@ -290,12 +292,12 @@ impl Session {
if matches!(self.get_type(), SessionType::Local) {
return Ok(true);
}
self.privilege_mgr.has_ownership(object).await
self.privilege_mgr().has_ownership(object).await
}

#[async_backtrace::framed]
pub async fn get_visibility_checker(&self) -> Result<GrantObjectVisibilityChecker> {
self.privilege_mgr.get_visibility_checker().await
self.privilege_mgr().get_visibility_checker().await
}

pub fn get_settings(self: &Arc<Self>) -> Arc<Settings> {
Expand Down Expand Up @@ -328,9 +330,7 @@ impl Session {
}

pub fn set_query_priority(&self, priority: u8) {
let session_ctx = self.session_ctx.clone();

if let Some(context_shared) = session_ctx.get_query_context_shared() {
if let Some(context_shared) = self.session_ctx.get_query_context_shared() {
context_shared.set_priority(priority);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/sessions/session_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub struct SessionContext {
}

impl SessionContext {
pub fn try_create(settings: Arc<Settings>, typ: SessionType) -> Result<Arc<Self>> {
Ok(Arc::new(SessionContext {
pub fn try_create(settings: Arc<Settings>, typ: SessionType) -> Result<Self> {
Ok(SessionContext {
settings,
abort: Default::default(),
current_user: Default::default(),
Expand All @@ -87,7 +87,7 @@ impl SessionContext {
query_ids_results: Default::default(),
typ,
txn_mgr: Mutex::new(TxnManager::init()),
}))
})
}

// Get abort status.
Expand Down
7 changes: 4 additions & 3 deletions src/query/service/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use crate::sessions::SessionType;

impl Session {
pub fn process_info(self: &Arc<Self>) -> ProcessInfo {
let session_ctx = self.session_ctx.clone();
self.to_process_info(&session_ctx)
self.to_process_info()
}

fn to_process_info(self: &Arc<Self>, session_ctx: &SessionContext) -> ProcessInfo {
fn to_process_info(self: &Arc<Self>) -> ProcessInfo {
let session_ctx = self.session_ctx.as_ref();

let mut memory_usage = 0;

let shared_query_context = &session_ctx.get_query_context_shared();
Expand Down
27 changes: 18 additions & 9 deletions src/query/service/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,12 @@ impl SessionManager {
};

let session_ctx = SessionContext::try_create(settings, typ.clone())?;
let session = Session::try_create(id.clone(), typ.clone(), session_ctx, mysql_conn_id)?;
let session = Session::try_create(
id.clone(),
typ.clone(),
Box::new(session_ctx),
mysql_conn_id,
)?;

self.try_add_session(session.clone(), typ.clone())?;

Expand Down Expand Up @@ -357,14 +362,18 @@ impl SessionManager {

let mut queries_profiles = HashMap::new();
for weak_ptr in active_sessions {
if let Some(session_ctx) = weak_ptr.upgrade().map(|x| x.session_ctx.clone()) {
if let Some(context_shared) = session_ctx.get_query_context_shared() {
if let Some(executor) = context_shared.executor.read().upgrade() {
queries_profiles.insert(
context_shared.init_query_id.as_ref().read().clone(),
executor.get_profiles(),
);
}
let Some(arc_sesssion) = weak_ptr.upgrade() else {
continue;
};

let session_ctx = arc_sesssion.session_ctx.as_ref();

if let Some(context_shared) = session_ctx.get_query_context_shared() {
if let Some(executor) = context_shared.executor.read().upgrade() {
queries_profiles.insert(
context_shared.init_query_id.as_ref().read().clone(),
executor.get_profiles(),
);
}
}
}
Expand Down
12 changes: 5 additions & 7 deletions src/query/service/src/sessions/session_privilege_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::principal::GrantObject;
Expand Down Expand Up @@ -73,12 +71,12 @@ pub trait SessionPrivilegeManager {
// fn show_grants(&self);
}

pub struct SessionPrivilegeManagerImpl {
session_ctx: Arc<SessionContext>,
pub struct SessionPrivilegeManagerImpl<'a> {
session_ctx: &'a SessionContext,
}

impl SessionPrivilegeManagerImpl {
pub fn new(session_ctx: Arc<SessionContext>) -> Self {
impl<'a> SessionPrivilegeManagerImpl<'a> {
pub fn new(session_ctx: &'a SessionContext) -> Self {
Self { session_ctx }
}

Expand Down Expand Up @@ -126,7 +124,7 @@ impl SessionPrivilegeManagerImpl {
}

#[async_trait::async_trait]
impl SessionPrivilegeManager for SessionPrivilegeManagerImpl {
impl<'a> SessionPrivilegeManager for SessionPrivilegeManagerImpl<'a> {
// set_authed_user() is called after authentication is passed in various protocol handlers, like
// HTTP handler, clickhouse query handler, mysql query handler. auth_role represents the role
// granted by external authenticator, it will over write the current user's granted roles, and
Expand Down

0 comments on commit 2ef3f06

Please sign in to comment.