åå¹´ã»ã©åãããBlueskyã®AT Protocolã®Rustçã©ã¤ãã©ãªãä½ã£ã¦ããã memo.sugyan.com
ãã®ä¸ã§æè¿å®è£ ããæ©è½ã®è©±ã
API Agent
æ¬å®¶ã® atproto (TypeScriptå®è£
)ã« AtpAgent
ã¨ãããã®ãããã
ãã® AtpAgent
ã¯ã(Blueskyã ãã«éããªã) AT Protocol ã®ããã®æ±ç¨çãªã¨ã¼ã¸ã§ã³ãã¨ãã¦æä¾ããã¦ããããã®æ©è½ã®ä¸ã¤ã¨ãã¦tokenã®ç®¡çæ©è½ãããã
AT Protocolã®èªè¨¼
å°ãªãã¨ã 2023/11 æç¹ã§ã¯ HTTP Bearer auth ã§JWTãéä¿¡ãããã¨ã§èªè¨¼ãè¡ãæ¹å¼ã¨ãªã£ã¦ããã
com.atproto.server.createSession
ã§ãã°ã¤ã³æåãã㨠accessJwt
㨠refreshJwt
ãªã©ãå«ã¾ããèªè¨¼æ
å ±ãè¿ã£ã¦ããã®ã§ããã® accessJwt
ãBearer tokenã«ä½¿ã£ã¦åã¨ã³ãã¤ã³ãã«ã¢ã¯ã»ã¹ããã
ã¾ãã com.atproto.server.refreshSession
ã¨ããã¨ã³ããã¤ã³ãããããããã refreshJwt
ãtokenã«ä½¿ã£ã¦å©ããã¨ã§tokenãæ´æ°ãããã¨ãã§ããã
tokenã®ç®¡çã¨èªåæ´æ°æ©æ§
ã§ã @atproto/api
ã§æä¾ããã¦ãã AtpAgent
ã¯ãããã®tokenã®ç®¡çãèªåã§è¡ãæ©è½ãæã£ã¦ããã
export class AtpAgent { ... session?: AtpSessionData /** * Internal fetch handler which adds access-token management */ private async _fetch(...): Promise<AtpAgentFetchHandlerResponse> { ... // wait for any active session-refreshes to finish await this._refreshSessionPromise // send the request let res = await AtpAgent.fetch(...) // handle session-refreshes as needed if (isErrorResponse(res, ['ExpiredToken']) && this.session?.refreshJwt) { // attempt refresh await this._refreshSession() // resend the request with the new access token res = await AtpAgent.fetch(...) } return res } }
ã¾ãç¾æç¹ã§ãã£ã¦ããsessionæ
å ±ãå
ã«ãªã¯ã¨ã¹ãå¦çã試ã¿ããã®ã¬ã¹ãã³ã¹ã expired
ã®ã¨ã©ã¼ã ã£ãã¨ãã®ã¿ãããæ¤åºãã¦tokenãrefreshãã¦åãå
容ã®ãªã¯ã¨ã¹ããåéãããæåã®ãªã¯ã¨ã¹ããæåãã¦ããå ´åã¯ãã®ã¾ã¾ãã®ã¬ã¹ãã³ã¹ãè¿ãã
ã¨ããåãã
ATriumã§ã®å®è£
ã§ãããã¨åæ§ã®ä»çµã¿ãæ㤠AtpAgent
ãATriumã§ãå®è£
ãããã¨èããã
XrpcClient
trait
ATriumã§ã¯ãAT Protocolã®XRPCãªã¯ã¨ã¹ããéãããã® XrpcClient
ã¨ããtraitãå®ç¾©ãã¦ããã
#[async_trait] pub trait HttpClient { async fn send_http( &self, request: Request<Vec<u8>>, ) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Send + Sync + 'static>>; } pub type XrpcResult<O, E> = Result<OutputDataOrBytes<O>, self::Error<E>>; #[async_trait] pub trait XrpcClient: HttpClient { fn base_uri(&self) -> String; #[allow(unused_variables)] async fn auth(&self, is_refresh: bool) -> Option<String> { None } async fn send_xrpc<P, I, O, E>(&self, request: &XrpcRequest<P, I>) -> XrpcResult<O, E> where P: Serialize + Send + Sync, I: Serialize + Send + Sync, O: DeserializeOwned + Send + Sync, E: DeserializeOwned + Send + Sync, { ... } }
XRPC ã¯è¦ç´ã«æ²¿ã£ãHTTPãªã¯ã¨ã¹ã/ã¬ã¹ãã³ã¹ã§ãããªãã®ã§ãå
é¨ã¨ãã¦ã¯HTTPã®å¦çããããã¨ã«ãªãã
ããRustã§ã¯HTTPãªã¯ã¨ã¹ã/ã¬ã¹ãã³ã¹ãå¦çããããã®æ¨æºã©ã¤ãã©ãªã®ãããªãã®ã¯ãªããç¹ã«éåæã®å ´å㯠reqwest
ã Isahc
ãSurf
ã®ãããª3rd partyã®ã©ã¤ãã©ãªãå¤ã使ããã¦ãããã¨æãã
ATriumã§ã¯ãããã®ã©ã¤ãã©ãªãããã¯ã¨ã³ãã¨ãã¦éçºè
ãé¸æã§ãããããHTTPé¨åãæ½è±¡åãã HttpClient
ã¨ããtraitãå®ç¾©ãããããç¶æ¿ãã XrpcClient
ãå®è£
ããã¤ã³ã¹ã¿ã³ã¹ãå
é¨ã«æ㤠AtpServiceClient
ãåXRPCãå¦çããå½¢ã«ãã¦ããã
XrpcClient::send_xrpc()
ã¯ããã©ã«ãå®è£
ãæã£ã¦ãããHttpClient::send_http()
ããå®è£
ããã¦ããã°ããã¨ã¯ããã使ã£ã¦ãªã¯ã¨ã¹ãã«ä½¿ãå
¥åã®serializeãè¿ã£ã¦ããã¬ã¹ãã³ã¹JSONã®deserializeãªã©ã®å¦çãè¡ãããã«ãªã£ã¦ããã
session管çããwrapper
èªè¨¼ã«ã¤ãã¦ã¯ XrpcClient
ã®ã¡ã½ããã¨ã㦠async fn auth(&self, is_refresh: bool) -> Option<String>
ãå®ç¾©ãã¦ããã ããªã®ã§ãããã§ã©ã®ãããªtokenãè¿ããã¯traitå®è£
è
ã«ä»»ãããã
ã¤ã³ã¡ã¢ãªã§ç®¡çããå ´åãå
é¨ã§ Arc<RwLock<Option<Session>>>
ã®ããã«æã£ã¦ããã¦ç®¡çãããã¨ã§ããã«ãã¹ã¬ããã§å
±æããã¦ãã¦ãå®å
¨ã«æ±ããããã«ãªãã
(åè: https://github.com/usagi/rust-memory-container-cs)
ãªã®ã§ãå
é¨ã§ XrpcClient
ãå®è£
ãããã®ãæã¤Wrapperãä½ãã主ãªå¦çã¯ããã«ç§»è²ã㦠auth()
ã ããå®è£
ãããã¨ã§ãä¿æãã¦ããsessionæ
å ±ããtokenãè¿ã XrpcClient
å®è£
ãä½ããã¨ãã§ããã
use std::sync::Arc; use tokio::sync::RwLock; struct Wrapper<T> where T: XrpcClient + Send + Sync, { inner: T, session: Arc<RwLock<Option<Session>>>, } #[async_trait] impl<T> HttpClient for Wrapper<T> where T: XrpcClient + Send + Sync, { async fn send_http( &self, request: Request<Vec<u8>>, ) -> Result<Response<Vec<u8>>, Box<dyn std::error::Error + Send + Sync + 'static>> { self.inner.send_http(request).await } } #[async_trait] impl<T> XrpcClient for Wrapper<T> where T: XrpcClient + Send + Sync, { fn base_uri(&self) -> String { self.inner.base_uri() } async fn auth(&self, is_refresh: bool) -> Option<String> { self.session.read().unwrap().as_ref().map(|session| { if is_refresh { session.refresh_jwt.clone() } else { session.access_jwt.clone() } }) } }
ããã§ã¯éåæã® RwLock
ã¨ã㦠tokio::sync
ã使ã£ã¦ããã
ããã§ãAtpAgent
ãåãsessionãå
±æããä¾ãã°ãã°ã¤ã³æåæã«åå¾ããsessionæ
å ±ãæ¸ãè¾¼ããã¨ãã£ããã¨ãããã°ããã®å¾ã®ãªã¯ã¨ã¹ãã§ãã®å¤ã使ãããããã«ãªãã
struct AtpAgent<T> where T: XrpcClient + Send + Sync, { api: Service<Wrapper<T>>, session: Arc<RwLock<Option<Session>>>, } impl<T> AtpAgent<T> where T: XrpcClient + Send + Sync, { fn new(xrpc: T) -> Self { let session = Arc::new(RwLock::new(None)); let api = Service::new(Wrapper { inner: xrpc, session: Arc::clone(&session) }); Self { api, session } } async fn login(&self, ...) { // login process let session: Session = ...; self.session.write().await.replace(session); } }
tokenã®èªåæ´æ° (失æä¾)
ã§ãããã使ã£ã¦tokenã®èªåæ´æ°ãè¡ãããã«ã¯ãXrpcClient::send_xrpc()
ããªã¼ãã¼ã©ã¤ãããå½¢ã§å®è£
ããã°è¯ãããã
impl Wrapper<T> where T: XrpcClient + Send + Sync, { async fn refresh_session() { // refresh process let session: Session = ...; self.session.write().await.replace(session); } fn is_expired<O, E>(result: &XrpcResult<O, E>) -> bool where O: DeserializeOwned + Send + Sync, E: DeserializeOwned + Send + Sync, { if let Err(Error::XrpcResponse(response)) = &result { if let Some(XrpcErrorKind::Undefined(body)) = &response.error { if let Some("ExpiredToken") = &body.error.as_deref() { return true; } } } false } } impl<T> XrpcClient for Wrapper<T> where T: XrpcClient + Send + Sync, { fn base_uri(&self) -> String { ... } async fn auth(&self, is_refresh: bool) -> Option<String> { ... } async fn send_xrpc<P, I, O, E>(&self, request: &XrpcRequest<P, I>) -> XrpcResult<O, E> where P: Serialize + Send + Sync, I: Serialize + Send + Sync, O: DeserializeOwned + Send + Sync, E: DeserializeOwned + Send + Sync, { let result = self.inner.send_xrpc(request).await; // handle session-refreshes as needed if Self::is_expired(&result) { self.refresh_session().await; self.inner.send_xrpc(request).await } else { result } } }
self.inner
ã« send_xrpc()
ã移è²ãããã®çµæãå¤å®ã㦠Expired
ã®ã¨ã©ã¼ã§ãã£ãå ´åã®ã¿ self.refresh_session()
ãå¼ã³åºãã¦tokenãæ´æ°ããå度åã send_xrpc()
ãå¼ã³åºããã¨ããå½¢ã
self.refresh_session()
ãæåãã¦å
é¨ã®sessionãæ¸ãå¤ãã£ã¦ããã°ãå度åããªã¯ã¨ã¹ããéã£ãã¨ãã«ã¯tokenãæ´æ°ããã¦ããã®ã§æåããã
â¦ã¨æã£ãããå®éã«ã¯ããã¯æã£ãéãã«ã¯åããªãã
Rustã®traitå®è£
ã¯ã¤ã³ã¹ã¿ã³ã¹ã®ã¡ã½ããããªã¼ãã¼ã©ã¤ããã¦ããããã§ã¯ãªããããã¾ã§traitã®ã¡ã½ãããå®è£
ãã¦ããã ããªã®ã§ã self.inner
ã«ç§»è²ããã¡ã½ãã㯠self
ã¨ã¯å¥ã®ã¤ã³ã¹ã¿ã³ã¹ã¨ãã¦æ±ãããã
ã¤ã¾ã self.inner.send_xrpc()
ã®ããã©ã«ãå®è£
ä¸ã§å¼ã°ãã self.auth()
ã¯ããã¾ã§ self.inner
ã«å®è£
ããã¦ãã auth()
ã§ãããWrapper
ã«å®è£
ããã¦ãã auth()
ã¯å¼ã°ããªãããªã®ã§ãããã Wrapper
ã®å
é¨ã§sessionãæ´æ°ãã¦ã self.inner
ã«ã¯é¢ä¿ããªããã¨ãããã¨ã«ãªãã
2éã®wrapperã§è§£æ±º
ãããã©ããã£ã¦è§£æ±ºããããã°ããæ©ãã ããçµå±wrapperãããä¸ã¤ä½ã£ã¦2éã«sessionãå ±æãããã¨ã§æ³å®ããåããããããã«ãªã£ãã
主ãªå¦çã移è²ããã inner
ãåãsessionæ
å ±ãæã£ã¦ä½¿ã£ã¦ããã¦ããã°åé¡ãªãã®ã§ãå
ã®wrapperã RefreshWrapper
ã¨ãã¦ãåãããã« XrpcClient
ãå®è£
ãã SessionWrapper
ãä½ãã auth()
㧠self.session
ãåç
§ããæ©è½ã ãããã¡ãã«æãããããã«ããã
struct RefreshWrapper<T> where T: XrpcClient + Send + Sync, { inner: T, session: Arc<RwLock<Option<Session>>>, } #[async_trait] impl<T> HttpClient for RefreshWrapper<T> where T: XrpcClient + Send + Sync, { ... // (use inner) } #[async_trait] impl<T> XrpcClient for RefreshWrapper<T> where T: XrpcClient + Send + Sync, { async fn send_xrpc<P, I, O, E>(&self, request: &XrpcRequest<P, I>) -> XrpcResult<O, E> where P: Serialize + Send + Sync, I: Serialize + Send + Sync, O: DeserializeOwned + Send + Sync, E: DeserializeOwned + Send + Sync, { let result = self.inner.send_xrpc(request).await; // handle session-refreshes as needed if Self::is_expired(&result) { self.refresh_session().await; self.inner.send_xrpc(request).await } else { result } } } struct SessionWrapper<T> where T: XrpcClient + Send + Sync, { inner: T, session: Arc<RwLock<Option<Session>>>, } #[async_trait] impl<T> HttpClient for SessionWrapper<T> where T: XrpcClient + Send + Sync, { ... // (use inner) } #[async_trait] impl<T> XrpcClient for SessionWrapper<T> where T: XrpcClient + Send + Sync, { ... async fn auth(&self, is_refresh: bool) -> Option<String> { self.session.read().await.as_ref().map(|session| { if is_refresh { session.refresh_jwt.clone() } else { session.access_jwt.clone() } }) } }
ããã¦ã AtpAgent
ã§ã¯ XrpcClient
ãå®è£
ãããã®ã¨ã㦠RefreshWrapper<SessionWrapper<T>>
ã使ãããã«ããã両è
é㧠session
ãå
±æããããã« Arc<RwLock<Option<Session>>>
ã渡ãã¦ããã
struct AtpAgent<T> where T: XrpcClient + Send + Sync, { api: Service<RefreshWrapper<SessionWrapper<T>>>, session: Arc<RwLock<Option<Session>>>, } impl<T> AtpAgent<T> where T: XrpcClient + Send + Sync, { fn new(xrpc: T) -> Self { let session = Arc::new(RwLock::new(None)); let api = Service::new(Arc::new(RefreshWrapper { inner: SessionWrapper { inner: xrpc, session: Arc::clone(&session), }, session: Arc::clone(&session), })); Self { api, session } } }
send_xrpc()
å
ã§ã®expireæ¤åºã¨æ´æ°ã»åéã®å¦ç㯠RefreshWrapper
ã§è¡ãããããå®éã®éä¿¡ã¯å
é¨ã® SessionWrapper
ã«ç§»è²ããããã¨ã«ãªãã
SessionWrapper
ã§ã¯å
±æããã¦ãã self.session
ã auth()
å
ã§åç
§ããåä½ãããã®ã§ã RefreshWrapper
(ããããã使ã AtpAgent
æ¬ä½) ã§æ´æ°ãããsessionæ
å ±ããã®ã¾ã¾ä½¿ããããã¨ã«ãªãã
ããã§tokenã®èªåæ´æ°ãå®ç¾ãããã
並è¡å¦çã§ã®åææ´æ°ã®åé¡
ããä¸ã¤ãTypeScriptå®è£
ã® AtpAgent
ãæã£ã¦ããæ©è½ã¨ãã¦ãè¤æ°ã® refreshSession()
ãåæã«æ¥ã¦ã1åããå®è¡ãããªãããã«å¶å¾¡ãããã¨ãããã®ãããã
/** * Internal helper to refresh sessions * - Wraps the actual implementation in a promise-guard to ensure only * one refresh is attempted at a time. */ private async _refreshSession() { if (this._refreshSessionPromise) { return this._refreshSessionPromise } this._refreshSessionPromise = this._refreshSessionInner() try { await this._refreshSessionPromise } finally { this._refreshSessionPromise = undefined } }
å
é¨ç¶æ
ã¨ã㦠Promise<void> | undefined
ãæã£ã¦ããã _refreshSession()
ãå¼ã°ããæç¹ã§ããã undefined
ã§ããã° Promise<void>
ãã»ããããä¸ã§å®éã«ãã®å¦çã await
ããæ¢ã« Promise<void>
ãã»ããããã¦ããå ´åã¯ãããè¿ããã¨ããåãã
ããAgentã並è¡ãã¦è¤æ°ã®APIãã»ã¼åæã«å©ããã¨ãã«ãtokenãexpiredã ã£ãå ´åã¯ãã®çµæãã»ã¼åæã«è¿ã£ã¦ãããã¨ã«ãªãããã®æç¹ã§ã¯tokenãrefreshããã¦ããªãã®ã§ããããããã»ã¼åæã«èªårefreshå¦çãè¡ããã¨ã«ãªãããå®éã«ã¯1åã ãrefreshããã¦ããã°è¯ãã®ã§ããã®ããã«å¶å¾¡ããããã®ä»çµã¿ãã¨èããããã
ããããåãrefresh tokenã使ã£ã¦è¤æ°årefreshã®ãªã¯ã¨ã¹ãããã¨ãæ¢ã«ä½¿ç¨ãããrefresh tokenãç¡å¹ã«ãªã2åç®ä»¥éã®refreshã¯ã¨ã©ã¼ã«ãªãå¯è½æ§ããã(ããã¯ãµã¼ãå®è£ 次第ã¨æããããç¾æç¹ã§ã®blueskyã§ã¯ç¹ã«ä½ãèµ·ãããæ°ããtokenãçºè¡ãããã ããã®ããã )ãéåæã§ä¸¦è¡å¦çããå¾ãç°å¢ã§ã¯ãããã£ãå¶å¾¡ã¯å¿ è¦ã«ãªãã
â¦ãããRustã§ã¯åãããã«å®è£
ãããã¨ãã¦ãä¸æããããªãã£ãã async
ãªinnerãå¼ãã è¿ãå¤ãä¿æããã¨ãªã㨠Pin<Box<impl Future<Output = Result<...>> + 'static>>
ã®ãããªå½¢ã«ãªãããããããã Mutex
ã¨ãã§ä¿æãããã¨ãã¦ããã㯠Send
ã§ã¯ãªãã®ã§ Mutex
ã«å
¥ããããªãããªã©â¦ ä½åº¦ãã³ã³ãã¤ã«ã¨ã©ã¼ã«æ©ã¾ããã¦æ«æããã
Rustã®éåæã«è©³ãã人ãªãä¸æãå®è£
ã§ãããã ãããâ¦
Notify
ã«ããå¶å¾¡å®è£
ChatGPTã¨é·ã
ã¨è°è«ããçµå± tokio::sync
ã«ãã Mutex
㨠Notify
ã使ããã¨ã§åæ§ã®åä½ãå®ç¾ãããã
use tokio::sync::{Mutex, Notify}; struct RefreshWrapper<T> where T: XrpcClient + Send + Sync, { inner: T, session: Arc<RwLock<Option<Session>>>, is_refreshing: Mutex<bool>, notify: Notify, } impl<T> RefreshWrapper<T> where T: XrpcClient + Send + Sync, { ... async fn refresh_session(&self) { { let mut is_refreshing = self.is_refreshing.lock().await; if *is_refreshing { drop(is_refreshing); return self.notify.notified().await; } *is_refreshing = true; } self.refresh_session_inner().await; *self.is_refreshing.lock().await = false; self.notify.notify_waiters(); } }
ã¾ã self.is_refreshing
ã§ãrefreshå®è¡ä¸ã®ãã®ããããå¦ããä¿æããããã㯠Mutex
ã§ç®¡çãããã®ã§ã æåã«ããã¯ãåå¾ã㦠true
ã«å¤æ´ãããã®ãå®äºãã¦ã¾ã false
ã«æ»ãã¾ã§ã¯ä»ã®å¦çããã®èªã¿åãçµæã¯å¿
ã true
ã«ãªãã
ããã¦ããã® true
ã«å¤æ´ã§ãããã®ã ãããå¾ç¶ã®å®éã®æ´æ°å¦çã§ãã self.refresh_session_inner()
ãå®è¡ããã
self.is_refreshing
ã true
ã«ãªã£ã¦ããéã¯ã self.refresh_session_inner()
ãçµãã£ããã¨ã«å¼ã°ãã self.notify.notify_waiters()
ã«ãã£ã¦å®äºãç¥ããããã¾ã§å¾
æ©ããã ããã¨ããå½¢ã«ãªãã
ãããã£ãä»ã®ã¹ã¬ããããã®éç¥ãå¾
ã¤ããã®ä»çµã¿ã¨ã㦠tokio::sync::Notify
ãããããã ããã®å ´åã¯å®äºãããã¨ãç¥ãããã ããªã®ã§ Notify
ã ããå¦ççµæãç¥ãããå ´å㯠oneshot
ãªã©ã使ãã¨è¯ãã®ãããããªãã
ã©ã¤ãã©ãªã¨ãã¦ã¯ç¹å®ã®éåæã©ã³ã¿ã¤ã ã«ä¾åããããã«ã¯ããããªãã®ã§ tokio
ãªã©ã使ãã®ã¯é¿ãããã£ãããæ¨æºã©ã¤ãã©ãªã futures
ãªã©ã«ã¯åæ§ã®ä»çµã¿ããªãããã ã£ãã®ã§ä»æ¹ãªã tokio
ã使ããã¨ã«ãããå®éã®ã¨ãã sync
featureã§ä½¿ããããã®ãã®ã¯ç¹ã«ã©ã³ã¿ã¤ã ä¾åã¯ç¡ãããã§ã async-std
ãªã©å¥ã®éåæã©ã³ã¿ã¤ã ã§å®è¡ãã¦ãåé¡ãªãåä½ããããã§ã¯ãã£ãã
ã¾ã¨ã
ã©ã¤ãã©ãªã®ä¾åã¯å¢ãã¦ãã¾ã£ãããã©ãã«ã AtpAgent
ã¨ãã¦å®è£
ãããæ©è½ã¯å®ç¾ã§ãããRustãããããâ¦ã
ä»ã«ãã£ã¨è¯ãæ¹æ³ããåç¥ã®æ¹ãããã°pull-requestãªã©ããã ããã¨å¬ããã§ãã