From 09109f19667e44391e76677772d661a05538b863 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Thu, 18 May 2023 14:55:07 +0200 Subject: [PATCH] api: add /get endpoints (#1577) this allows to get entities by ID or name after /list endpoints were changed in v0.23.0. --- apidocs/openapi.yaml | 236 ++++++++++++++++++++++- internal/core/api.go | 278 ++++++++++++++++++++------- internal/core/api_defs.go | 95 ++++++++++ internal/core/api_test.go | 320 +++++++++++++++++++++++++++++++- internal/core/core_test.go | 6 +- internal/core/hls_manager.go | 76 ++++---- internal/core/hls_muxer.go | 46 ++--- internal/core/metrics.go | 48 ++--- internal/core/path.go | 108 +++++------ internal/core/path_manager.go | 52 +++++- internal/core/rtmp_conn.go | 20 ++ internal/core/rtmp_server.go | 101 +++++----- internal/core/rtsp_conn.go | 10 + internal/core/rtsp_server.go | 144 +++++++------- internal/core/rtsp_session.go | 22 +++ internal/core/webrtc_manager.go | 107 +++++------ internal/core/webrtc_session.go | 34 ++++ 17 files changed, 1285 insertions(+), 418 deletions(-) create mode 100644 internal/core/api_defs.go diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index ab9f53e117a..cd2e34c2f22 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -471,7 +471,7 @@ components: items: $ref: '#/components/schemas/RTSPSession' - WebRTCConn: + WebRTCSession: type: object properties: id: @@ -496,7 +496,7 @@ components: type: integer format: int64 - WebRTCConnsList: + WebRTCSessionsList: type: object properties: pageCount: @@ -504,7 +504,7 @@ components: items: type: array items: - $ref: '#/components/schemas/WebRTCConn' + $ref: '#/components/schemas/WebRTCSession' paths: /v2/config/get: @@ -645,6 +645,30 @@ paths: '500': description: internal server error. + /v2/hlsmuxers/get/{name}: + post: + operationId: hlsMuxersGet + summary: returns a HLS muxer. + description: '' + parameters: + - name: name + in: path + required: true + description: name of the muxer. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/HLSMuxer' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/paths/list: get: operationId: pathsList @@ -675,6 +699,30 @@ paths: '500': description: internal server error. + /v2/paths/get/{name}: + post: + operationId: pathsGet + summary: returns a path. + description: '' + parameters: + - name: name + in: path + required: true + description: name of the path. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/Path' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/rtspconns/list: get: operationId: rtspConnsList @@ -705,6 +753,30 @@ paths: '500': description: internal server error. + /v2/rtspconns/get/{id}: + post: + operationId: rtspConnsGet + summary: returns a RTSP connection. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTSPConn' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/rtspsessions/list: get: operationId: rtspSessionsList @@ -735,6 +807,30 @@ paths: '500': description: internal server error. + /v2/rtspsessions/get/{id}: + post: + operationId: rtspSessionsGet + summary: returns a RTSP session. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTSPSession' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/rtspsessions/kick/{id}: post: operationId: rtspSessionsKick @@ -744,7 +840,7 @@ paths: - name: id in: path required: true - description: the ID of the session. + description: ID of the session. schema: type: string responses: @@ -785,6 +881,30 @@ paths: '500': description: internal server error. + /v2/rtspsconns/get/{id}: + post: + operationId: rtspsConnsGet + summary: returns a RTSPS connection. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTSPConn' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/rtspssessions/list: get: operationId: rtspsSessionsList @@ -815,6 +935,30 @@ paths: '500': description: internal server error. + /v2/rtspssessions/get/{id}: + post: + operationId: rtspsSessionsGet + summary: returns a RTSPS session. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTSPSession' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/rtspssessions/kick/{id}: post: operationId: rtspsSessionsKick @@ -824,7 +968,7 @@ paths: - name: id in: path required: true - description: the ID of the session. + description: ID of the session. schema: type: string responses: @@ -865,6 +1009,30 @@ paths: '500': description: internal server error. + /v2/rtmpconns/get/{id}: + post: + operationId: rtmpConnectionsGet + summary: returns a RTMP connection. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTMPConn' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/rtmpconns/kick/{id}: post: operationId: rtmpConnsKick @@ -874,7 +1042,7 @@ paths: - name: id in: path required: true - description: the ID of the connection. + description: ID of the connection. schema: type: string responses: @@ -915,6 +1083,30 @@ paths: '500': description: internal server error. + /v2/rtmpsconns/get/{id}: + post: + operationId: rtmpsConnectionsGet + summary: returns a RTMPS connection. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the connection. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/RTMPConn' + '400': + description: invalid request. + '500': + description: internal server error. + /v2/rtmpsconns/kick/{id}: post: operationId: rtmpsConnsKick @@ -924,7 +1116,7 @@ paths: - name: id in: path required: true - description: the ID of the connection. + description: ID of the connection. schema: type: string responses: @@ -959,7 +1151,31 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/WebRTCConnsList' + $ref: '#/components/schemas/WebRTCSessionsList' + '400': + description: invalid request. + '500': + description: internal server error. + + /v2/webrtcsessions/get/{id}: + post: + operationId: webrtcSessionsGet + summary: returns a WebRTC session. + description: '' + parameters: + - name: id + in: path + required: true + description: ID of the session. + schema: + type: string + responses: + '200': + description: the request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/WebRTCSession' '400': description: invalid request. '500': @@ -968,13 +1184,13 @@ paths: /v2/webrtcsessions/kick/{id}: post: operationId: webrtcSessionsKick - summary: kicks out a WebRTC connection from the server. + summary: kicks out a WebRTC session from the server. description: '' parameters: - name: id in: path required: true - description: the ID of the session. + description: ID of the session. schema: type: string responses: diff --git a/internal/core/api.go b/internal/core/api.go index 1876877234a..7cf7bec77ef 100644 --- a/internal/core/api.go +++ b/internal/core/api.go @@ -126,22 +126,33 @@ func paginate(itemsPtr interface{}, itemsPerPageStr string, pageStr string) (int } type apiPathManager interface { - apiPathsList() pathAPIPathsListRes + apiPathsList() (*apiPathsList, error) + apiPathsGet(string) (*apiPath, error) } type apiHLSManager interface { - apiMuxersList() hlsManagerAPIMuxersListRes + apiMuxersList() (*apiHLSMuxersList, error) + apiMuxersGet(string) (*apiHLSMuxer, error) } type apiRTSPServer interface { - apiConnsList() rtspServerAPIConnsListRes - apiSessionsList() rtspServerAPISessionsListRes - apiSessionsKick(uuid.UUID) rtspServerAPISessionsKickRes + apiConnsList() (*apiRTSPConnsList, error) + apiConnsGet(uuid.UUID) (*apiRTSPConn, error) + apiSessionsList() (*apiRTSPSessionsList, error) + apiSessionsGet(uuid.UUID) (*apiRTSPSession, error) + apiSessionsKick(uuid.UUID) error } type apiRTMPServer interface { - apiConnsList() rtmpServerAPIConnsListRes - apiConnsKick(uuid.UUID) rtmpServerAPIConnsKickRes + apiConnsList() (*apiRTMPConnsList, error) + apiConnsGet(uuid.UUID) (*apiRTMPConn, error) + apiConnsKick(uuid.UUID) error +} + +type apiWebRTCManager interface { + apiSessionsList() (*apiWebRTCSessionsList, error) + apiSessionsGet(uuid.UUID) (*apiWebRTCSession, error) + apiSessionsKick(uuid.UUID) error } type apiParent interface { @@ -149,11 +160,6 @@ type apiParent interface { apiConfigSet(conf *conf.Conf) } -type apiWebRTCManager interface { - apiSessionsList() webRTCManagerAPISessionsListRes - apiSessionsKick(uuid.UUID) webRTCManagerAPISessionsKickRes -} - type api struct { conf *conf.Conf pathManager apiPathManager @@ -209,34 +215,43 @@ func newAPI( if !interfaceIsEmpty(a.hlsManager) { group.GET("/v2/hlsmuxers/list", a.onHLSMuxersList) + group.GET("/v2/hlsmuxers/get/:name", a.onHLSMuxersGet) } group.GET("/v2/paths/list", a.onPathsList) + group.GET("/v2/paths/get/:name", a.onPathsGet) if !interfaceIsEmpty(a.rtspServer) { group.GET("/v2/rtspconns/list", a.onRTSPConnsList) + group.GET("/v2/rtspconns/get/:id", a.onRTSPConnsGet) group.GET("/v2/rtspsessions/list", a.onRTSPSessionsList) + group.GET("/v2/rtspsessions/get/:id", a.onRTSPSessionsGet) group.POST("/v2/rtspsessions/kick/:id", a.onRTSPSessionsKick) } if !interfaceIsEmpty(a.rtspsServer) { group.GET("/v2/rtspsconns/list", a.onRTSPSConnsList) + group.GET("/v2/rtspsconns/get/:id", a.onRTSPSConnsGet) group.GET("/v2/rtspssessions/list", a.onRTSPSSessionsList) + group.GET("/v2/rtspssessions/get/:id", a.onRTSPSSessionsGet) group.POST("/v2/rtspssessions/kick/:id", a.onRTSPSSessionsKick) } if !interfaceIsEmpty(a.rtmpServer) { group.GET("/v2/rtmpconns/list", a.onRTMPConnsList) + group.GET("/v2/rtmpconns/get/:id", a.onRTMPConnsGet) group.POST("/v2/rtmpconns/kick/:id", a.onRTMPConnsKick) } if !interfaceIsEmpty(a.rtmpsServer) { group.GET("/v2/rtmpsconns/list", a.onRTMPSConnsList) + group.GET("/v2/rtmpsconns/get/:id", a.onRTMPSConnsGet) group.POST("/v2/rtmpsconns/kick/:id", a.onRTMPSConnsKick) } if !interfaceIsEmpty(a.webRTCManager) { group.GET("/v2/webrtcsessions/list", a.onWebRTCSessionsList) + group.GET("/v2/webrtcsessions/get/:id", a.onWebRTCSessionsGet) group.POST("/v2/webrtcsessions/kick/:id", a.onWebRTCSessionsKick) } @@ -425,54 +440,98 @@ func (a *api) onConfigPathsDelete(ctx *gin.Context) { } func (a *api) onPathsList(ctx *gin.Context) { - res := a.pathManager.apiPathsList() - if res.err != nil { + data, err := a.pathManager.apiPathsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onPathsGet(ctx *gin.Context) { + data, err := a.pathManager.apiPathsGet(ctx.Param("name")) + if err != nil { + if err.Error() == "not found" { + ctx.AbortWithStatus(http.StatusNotFound) + return + } + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } - ctx.JSON(http.StatusOK, res.data) + ctx.JSON(http.StatusOK, data) } func (a *api) onRTSPConnsList(ctx *gin.Context) { - res := a.rtspServer.apiConnsList() - if res.err != nil { + data, err := a.rtspServer.apiConnsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount + data.PageCount = pageCount - ctx.JSON(http.StatusOK, res.data) + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTSPConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + data, err := a.rtspServer.apiConnsGet(uuid) + if err != nil { + return + } + + ctx.JSON(http.StatusOK, data) } func (a *api) onRTSPSessionsList(ctx *gin.Context) { - res := a.rtspServer.apiSessionsList() - if res.err != nil { + data, err := a.rtspServer.apiSessionsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTSPSessionsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount - ctx.JSON(http.StatusOK, res.data) + data, err := a.rtspServer.apiSessionsGet(uuid) + if err != nil { + return + } + + ctx.JSON(http.StatusOK, data) } func (a *api) onRTSPSessionsKick(ctx *gin.Context) { @@ -482,8 +541,8 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { return } - res := a.rtspServer.apiSessionsKick(uuid) - if res.err != nil { + err = a.rtspServer.apiSessionsKick(uuid) + if err != nil { return } @@ -491,37 +550,67 @@ func (a *api) onRTSPSessionsKick(ctx *gin.Context) { } func (a *api) onRTSPSConnsList(ctx *gin.Context) { - res := a.rtspsServer.apiConnsList() - if res.err != nil { + data, err := a.rtspsServer.apiConnsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTSPSConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount - ctx.JSON(http.StatusOK, res.data) + data, err := a.rtspsServer.apiConnsGet(uuid) + if err != nil { + return + } + + ctx.JSON(http.StatusOK, data) } func (a *api) onRTSPSSessionsList(ctx *gin.Context) { - res := a.rtspsServer.apiSessionsList() - if res.err != nil { + data, err := a.rtspsServer.apiSessionsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount + data.PageCount = pageCount - ctx.JSON(http.StatusOK, res.data) + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTSPSSessionsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + data, err := a.rtspsServer.apiSessionsGet(uuid) + if err != nil { + return + } + + ctx.JSON(http.StatusOK, data) } func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { @@ -531,8 +620,8 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { return } - res := a.rtspsServer.apiSessionsKick(uuid) - if res.err != nil { + err = a.rtspsServer.apiSessionsKick(uuid) + if err != nil { return } @@ -540,20 +629,35 @@ func (a *api) onRTSPSSessionsKick(ctx *gin.Context) { } func (a *api) onRTMPConnsList(ctx *gin.Context) { - res := a.rtmpServer.apiConnsList() - if res.err != nil { + data, err := a.rtmpServer.apiConnsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTMPConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount - ctx.JSON(http.StatusOK, res.data) + data, err := a.rtmpServer.apiConnsGet(uuid) + if err != nil { + return + } + + ctx.JSON(http.StatusOK, data) } func (a *api) onRTMPConnsKick(ctx *gin.Context) { @@ -563,8 +667,8 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { return } - res := a.rtmpServer.apiConnsKick(uuid) - if res.err != nil { + err = a.rtmpServer.apiConnsKick(uuid) + if err != nil { return } @@ -572,20 +676,35 @@ func (a *api) onRTMPConnsKick(ctx *gin.Context) { } func (a *api) onRTMPSConnsList(ctx *gin.Context) { - res := a.rtmpsServer.apiConnsList() - if res.err != nil { + data, err := a.rtmpsServer.apiConnsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount + data.PageCount = pageCount - ctx.JSON(http.StatusOK, res.data) + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onRTMPSConnsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + + data, err := a.rtmpsServer.apiConnsGet(uuid) + if err != nil { + return + } + + ctx.JSON(http.StatusOK, data) } func (a *api) onRTMPSConnsKick(ctx *gin.Context) { @@ -595,8 +714,8 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { return } - res := a.rtmpsServer.apiConnsKick(uuid) - if res.err != nil { + err = a.rtmpsServer.apiConnsKick(uuid) + if err != nil { return } @@ -604,37 +723,62 @@ func (a *api) onRTMPSConnsKick(ctx *gin.Context) { } func (a *api) onHLSMuxersList(ctx *gin.Context) { - res := a.hlsManager.apiMuxersList() - if res.err != nil { + data, err := a.hlsManager.apiMuxersList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onHLSMuxersGet(ctx *gin.Context) { + data, err := a.hlsManager.apiMuxersGet(ctx.Param("name")) + if err != nil { + ctx.AbortWithStatus(http.StatusInternalServerError) + return + } - ctx.JSON(http.StatusOK, res.data) + ctx.JSON(http.StatusOK, data) } func (a *api) onWebRTCSessionsList(ctx *gin.Context) { - res := a.webRTCManager.apiSessionsList() - if res.err != nil { + data, err := a.webRTCManager.apiSessionsList() + if err != nil { ctx.AbortWithStatus(http.StatusInternalServerError) return } - pageCount, err := paginate(&res.data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + pageCount, err := paginate(&data.Items, ctx.Query("itemsPerPage"), ctx.Query("page")) + if err != nil { + ctx.AbortWithStatus(http.StatusBadRequest) + return + } + data.PageCount = pageCount + + ctx.JSON(http.StatusOK, data) +} + +func (a *api) onWebRTCSessionsGet(ctx *gin.Context) { + uuid, err := uuid.Parse(ctx.Param("id")) if err != nil { ctx.AbortWithStatus(http.StatusBadRequest) return } - res.data.PageCount = pageCount - ctx.JSON(http.StatusOK, res.data) + data, err := a.webRTCManager.apiSessionsGet(uuid) + if err != nil { + return + } + + ctx.JSON(http.StatusOK, data) } func (a *api) onWebRTCSessionsKick(ctx *gin.Context) { @@ -644,8 +788,8 @@ func (a *api) onWebRTCSessionsKick(ctx *gin.Context) { return } - res := a.webRTCManager.apiSessionsKick(uuid) - if res.err != nil { + err = a.webRTCManager.apiSessionsKick(uuid) + if err != nil { return } diff --git a/internal/core/api_defs.go b/internal/core/api_defs.go new file mode 100644 index 00000000000..aa067b40865 --- /dev/null +++ b/internal/core/api_defs.go @@ -0,0 +1,95 @@ +package core + +import ( + "time" + + "github.com/google/uuid" + + "github.com/bluenviron/mediamtx/internal/conf" +) + +type apiPath struct { + Name string `json:"name"` + ConfName string `json:"confName"` + Conf *conf.PathConf `json:"conf"` + Source interface{} `json:"source"` + SourceReady bool `json:"sourceReady"` + Tracks []string `json:"tracks"` + BytesReceived uint64 `json:"bytesReceived"` + Readers []interface{} `json:"readers"` +} + +type apiPathsList struct { + PageCount int `json:"pageCount"` + Items []*apiPath `json:"items"` +} + +type apiHLSMuxer struct { + Path string `json:"path"` + Created time.Time `json:"created"` + LastRequest time.Time `json:"lastRequest"` + BytesSent uint64 `json:"bytesSent"` +} + +type apiHLSMuxersList struct { + PageCount int `json:"pageCount"` + Items []*apiHLSMuxer `json:"items"` +} + +type apiRTSPConn struct { + ID uuid.UUID `json:"id"` + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` +} + +type apiRTSPConnsList struct { + PageCount int `json:"pageCount"` + Items []*apiRTSPConn `json:"items"` +} + +type apiRTMPConn struct { + ID uuid.UUID `json:"id"` + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + State string `json:"state"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` +} + +type apiRTMPConnsList struct { + PageCount int `json:"pageCount"` + Items []*apiRTMPConn `json:"items"` +} + +type apiRTSPSession struct { + ID uuid.UUID `json:"id"` + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + State string `json:"state"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` +} + +type apiRTSPSessionsList struct { + PageCount int `json:"pageCount"` + Items []*apiRTSPSession `json:"items"` +} + +type apiWebRTCSession struct { + ID uuid.UUID `json:"id"` + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + PeerConnectionEstablished bool `json:"peerConnectionEstablished"` + LocalCandidate string `json:"localCandidate"` + RemoteCandidate string `json:"remoteCandidate"` + State string `json:"state"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` +} + +type apiWebRTCSessionsList struct { + PageCount int `json:"pageCount"` + Items []*apiWebRTCSession `json:"items"` +} diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 539f116c3b0..e1d024e16e6 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -394,7 +394,60 @@ func TestAPIPathsList(t *testing.T) { }) } -func TestAPIProtocolSpecificList(t *testing.T) { +func TestAPIPathsGet(t *testing.T) { + p, ok := newInstance("api: yes\n" + + "paths:\n" + + " all:\n") + require.Equal(t, true, ok) + defer p.Close() + + source := gortsplib.Client{} + err := source.StartRecording("rtsp://localhost:8554/mypath", media.Medias{testMediaH264}) + require.NoError(t, err) + defer source.Close() + + for _, ca := range []string{"ok", "not found"} { + t.Run(ca, func(t *testing.T) { + type pathSource struct { + Type string `json:"type"` + } + + type path struct { + Name string `json:"name"` + Source pathSource `json:"source"` + SourceReady bool `json:"sourceReady"` + Tracks []string `json:"tracks"` + BytesReceived uint64 `json:"bytesReceived"` + } + + var pathName string + if ca == "ok" { + pathName = "mypath" + } else { + pathName = "nonexisting" + } + + var out path + err := httpRequest(http.MethodGet, "http://localhost:9997/v2/paths/get/"+pathName, nil, &out) + + if ca == "ok" { + require.NoError(t, err) + require.Equal(t, path{ + Name: "mypath", + Source: pathSource{ + Type: "rtspSession", + }, + SourceReady: true, + Tracks: []string{"H264"}, + }, out) + } else { + require.EqualError(t, err, "bad status code: 404") + } + }) + } +} + +func TestAPIProtocolList(t *testing.T) { serverCertFpath, err := writeTempFile(serverCert) require.NoError(t, err) defer os.Remove(serverCertFpath) @@ -438,7 +491,7 @@ func TestAPIProtocolSpecificList(t *testing.T) { medi := testMediaH264 - switch ca { + switch ca { //nolint:dupl case "rtsp conns", "rtsp sessions": source := gortsplib.Client{} @@ -640,7 +693,268 @@ func TestAPIProtocolSpecificList(t *testing.T) { } } -func TestAPIKick(t *testing.T) { +func TestAPIProtocolGet(t *testing.T) { + serverCertFpath, err := writeTempFile(serverCert) + require.NoError(t, err) + defer os.Remove(serverCertFpath) + + serverKeyFpath, err := writeTempFile(serverKey) + require.NoError(t, err) + defer os.Remove(serverKeyFpath) + + for _, ca := range []string{ + "rtsp conns", + "rtsp sessions", + "rtsps conns", + "rtsps sessions", + "rtmp", + "rtmps", + "hls", + "webrtc", + } { + t.Run(ca, func(t *testing.T) { + conf := "api: yes\n" + + switch ca { + case "rtsps conns", "rtsps sessions": + conf += "protocols: [tcp]\n" + + "encryption: strict\n" + + "serverCert: " + serverCertFpath + "\n" + + "serverKey: " + serverKeyFpath + "\n" + + case "rtmps": + conf += "rtmpEncryption: strict\n" + + "rtmpServerCert: " + serverCertFpath + "\n" + + "rtmpServerKey: " + serverKeyFpath + "\n" + } + + conf += "paths:\n" + + " all:\n" + + p, ok := newInstance(conf) + require.Equal(t, true, ok) + defer p.Close() + + medi := testMediaH264 + + switch ca { //nolint:dupl + case "rtsp conns", "rtsp sessions": + source := gortsplib.Client{} + + err := source.StartRecording("rtsp://localhost:8554/mypath", media.Medias{medi}) + require.NoError(t, err) + defer source.Close() + + case "rtsps conns", "rtsps sessions": + source := gortsplib.Client{ + TLSConfig: &tls.Config{InsecureSkipVerify: true}, + } + + err := source.StartRecording("rtsps://localhost:8322/mypath", media.Medias{medi}) + require.NoError(t, err) + defer source.Close() + + case "rtmp", "rtmps": + var port string + if ca == "rtmp" { + port = "1935" + } else { + port = "1936" + } + + u, err := url.Parse("rtmp://127.0.0.1:" + port + "/mypath") + require.NoError(t, err) + + nconn, err := func() (net.Conn, error) { + if ca == "rtmp" { + return net.Dial("tcp", u.Host) + } + return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) + }() + require.NoError(t, err) + defer nconn.Close() + conn := rtmp.NewConn(nconn) + + err = conn.InitializeClient(u, true) + require.NoError(t, err) + + err = conn.WriteTracks(testFormatH264, nil) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) + + case "hls": + source := gortsplib.Client{} + err := source.StartRecording("rtsp://localhost:8554/mypath", + media.Medias{medi}) + require.NoError(t, err) + defer source.Close() + + go func() { + time.Sleep(500 * time.Millisecond) + + for i := 0; i < 3; i++ { + /*source.WritePacketRTP(medi, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123 + uint16(i), + Timestamp: 45343 + uint32(i)*90000, + SSRC: 563423, + }, + Payload: []byte{ + testSPS, + 0x05, + }, + }) + + []byte{ // 1920x1080 baseline + 0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02, + 0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04, + 0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9, 0x20, + },*/ + + source.WritePacketRTP(medi, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123 + uint16(i), + Timestamp: 45343 + uint32(i)*90000, + SSRC: 563423, + }, + Payload: []byte{ + // testSPS, + 0x05, + }, + }) + } + }() + + func() { + res, err := http.Get("http://localhost:8888/mypath/index.m3u8") + require.NoError(t, err) + defer res.Body.Close() + require.Equal(t, 200, res.StatusCode) + }() + + case "webrtc": + source := gortsplib.Client{} + err := source.StartRecording("rtsp://localhost:8554/mypath", + media.Medias{medi}) + require.NoError(t, err) + defer source.Close() + + c := newWebRTCTestClient(t, "http://localhost:8889/mypath/whep", false) + defer c.close() + + time.Sleep(500 * time.Millisecond) + + source.WritePacketRTP(medi, &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + Marker: true, + PayloadType: 96, + SequenceNumber: 123, + Timestamp: 45343, + SSRC: 563423, + }, + Payload: []byte{0x01, 0x02, 0x03, 0x04}, + }) + + <-c.incomingTrack + } + + switch ca { + case "rtsp conns", "rtsp sessions", "rtsps conns", "rtsps sessions", "rtmp", "rtmps": + var pa string + switch ca { + case "rtsp conns": + pa = "rtspconns" + + case "rtsp sessions": + pa = "rtspsessions" + + case "rtsps conns": + pa = "rtspsconns" + + case "rtsps sessions": + pa = "rtspssessions" + + case "rtmp": + pa = "rtmpconns" + + case "rtmps": + pa = "rtmpsconns" + } + + type item struct { + ID string `json:"id"` + State string `json:"state"` + } + + var out1 struct { + Items []item `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v2/"+pa+"/list", nil, &out1) + require.NoError(t, err) + + if ca != "rtsp conns" && ca != "rtsps conns" { + require.Equal(t, "publish", out1.Items[0].State) + } + + var out2 item + err = httpRequest(http.MethodGet, "http://localhost:9997/v2/"+pa+"/get/"+out1.Items[0].ID, nil, &out2) + require.NoError(t, err) + + if ca != "rtsp conns" && ca != "rtsps conns" { + require.Equal(t, "publish", out2.State) + } + + case "hls": + type item struct { + Created string `json:"created"` + LastRequest string `json:"lastRequest"` + } + + var out item + err = httpRequest(http.MethodGet, "http://localhost:9997/v2/hlsmuxers/get/mypath", nil, &out) + require.NoError(t, err) + + s := fmt.Sprintf("^%d-", time.Now().Year()) + require.Regexp(t, s, out.Created) + require.Regexp(t, s, out.LastRequest) + + case "webrtc": + type item struct { + ID string `json:"id"` + Created time.Time `json:"created"` + RemoteAddr string `json:"remoteAddr"` + PeerConnectionEstablished bool `json:"peerConnectionEstablished"` + LocalCandidate string `json:"localCandidate"` + RemoteCandidate string `json:"remoteCandidate"` + BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` + } + + var out1 struct { + Items []item `json:"items"` + } + err = httpRequest(http.MethodGet, "http://localhost:9997/v2/webrtcsessions/list", nil, &out1) + require.NoError(t, err) + + var out2 item + err = httpRequest(http.MethodGet, "http://localhost:9997/v2/webrtcsessions/get/"+out1.Items[0].ID, nil, &out2) + require.NoError(t, err) + + require.Equal(t, true, out2.PeerConnectionEstablished) + } + }) + } +} + +func TestAPIProtocolKick(t *testing.T) { serverCertFpath, err := writeTempFile(serverCert) require.NoError(t, err) defer os.Remove(serverCertFpath) diff --git a/internal/core/core_test.go b/internal/core/core_test.go index 48b1e3d8705..7ec6b8a23da 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -165,10 +165,10 @@ func TestCorePathAutoDeletion(t *testing.T) { } }() - res := p.pathManager.apiPathsList() - require.NoError(t, res.err) + data, err := p.pathManager.apiPathsList() + require.NoError(t, err) - require.Equal(t, 0, len(res.data.Items)) + require.Equal(t, 0, len(data.Items)) }) } } diff --git a/internal/core/hls_manager.go b/internal/core/hls_manager.go index fd13aa76e93..b9c742967f7 100644 --- a/internal/core/hls_manager.go +++ b/internal/core/hls_manager.go @@ -4,37 +4,28 @@ import ( "context" "fmt" "sync" - "time" "github.com/bluenviron/mediamtx/internal/conf" "github.com/bluenviron/mediamtx/internal/logger" ) -type hlsManagerAPIMuxersListItem struct { - Path string `json:"path"` - Created time.Time `json:"created"` - LastRequest time.Time `json:"lastRequest"` - BytesSent uint64 `json:"bytesSent"` -} - -type hlsManagerAPIMuxersListData struct { - PageCount int `json:"pageCount"` - Items []hlsManagerAPIMuxersListItem `json:"items"` -} - type hlsManagerAPIMuxersListRes struct { - data *hlsManagerAPIMuxersListData - muxers map[string]*hlsMuxer - err error + data *apiHLSMuxersList + err error } type hlsManagerAPIMuxersListReq struct { res chan hlsManagerAPIMuxersListRes } -type hlsManagerAPIMuxersListSubReq struct { - data *hlsManagerAPIMuxersListData - res chan struct{} +type hlsManagerAPIMuxersGetRes struct { + data *apiHLSMuxer + err error +} + +type hlsManagerAPIMuxersGetReq struct { + name string + res chan hlsManagerAPIMuxersGetRes } type hlsManagerParent interface { @@ -67,6 +58,7 @@ type hlsManager struct { chHandleRequest chan hlsMuxerHandleRequestReq chMuxerClose chan *hlsMuxer chAPIMuxerList chan hlsManagerAPIMuxersListReq + chAPIMuxerGet chan hlsManagerAPIMuxersGetReq } func newHLSManager( @@ -114,6 +106,7 @@ func newHLSManager( chHandleRequest: make(chan hlsMuxerHandleRequestReq), chMuxerClose: make(chan *hlsMuxer), chAPIMuxerList: make(chan hlsManagerAPIMuxersListReq), + chAPIMuxerGet: make(chan hlsManagerAPIMuxersGetReq), } var err error @@ -199,16 +192,27 @@ outer: delete(m.muxers, c.PathName()) case req := <-m.chAPIMuxerList: - muxers := make(map[string]*hlsMuxer) + data := &apiHLSMuxersList{ + Items: []*apiHLSMuxer{}, + } - for name, m := range m.muxers { - muxers[name] = m + for _, muxer := range m.muxers { + data.Items = append(data.Items, muxer.apiItem()) } req.res <- hlsManagerAPIMuxersListRes{ - muxers: muxers, + data: data, + } + + case req := <-m.chAPIMuxerGet: + muxer, ok := m.muxers[req.name] + if !ok { + req.res <- hlsManagerAPIMuxersGetRes{err: fmt.Errorf("not found")} + continue } + req.res <- hlsManagerAPIMuxersGetRes{data: muxer.apiItem()} + case <-m.ctx.Done(): break outer } @@ -271,7 +275,7 @@ func (m *hlsManager) pathSourceNotReady(pa *path) { } // apiMuxersList is called by api. -func (m *hlsManager) apiMuxersList() hlsManagerAPIMuxersListRes { +func (m *hlsManager) apiMuxersList() (*apiHLSMuxersList, error) { req := hlsManagerAPIMuxersListReq{ res: make(chan hlsManagerAPIMuxersListRes), } @@ -279,19 +283,27 @@ func (m *hlsManager) apiMuxersList() hlsManagerAPIMuxersListRes { select { case m.chAPIMuxerList <- req: res := <-req.res + return res.data, res.err - res.data = &hlsManagerAPIMuxersListData{ - Items: []hlsManagerAPIMuxersListItem{}, - } + case <-m.ctx.Done(): + return nil, fmt.Errorf("terminated") + } +} - for _, pa := range res.muxers { - pa.apiMuxersList(hlsManagerAPIMuxersListSubReq{data: res.data}) - } +// apiMuxersGet is called by api. +func (m *hlsManager) apiMuxersGet(name string) (*apiHLSMuxer, error) { + req := hlsManagerAPIMuxersGetReq{ + name: name, + res: make(chan hlsManagerAPIMuxersGetRes), + } - return res + select { + case m.chAPIMuxerGet <- req: + res := <-req.res + return res.data, res.err case <-m.ctx.Done(): - return hlsManagerAPIMuxersListRes{err: fmt.Errorf("terminated")} + return nil, fmt.Errorf("terminated") } } diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index b55d9344551..de423cf7c97 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -30,6 +30,10 @@ const ( hlsMuxerRecreatePause = 10 * time.Second ) +func int64Ptr(v int64) *int64 { + return &v +} + type responseWriterWithCounter struct { http.ResponseWriter bytesSent *uint64 @@ -80,8 +84,7 @@ type hlsMuxer struct { bytesSent *uint64 // in - chRequest chan *hlsMuxerHandleRequestReq - chAPIHLSMuxersList chan hlsManagerAPIMuxersListSubReq + chRequest chan *hlsMuxerHandleRequestReq } func newHLSMuxer( @@ -121,13 +124,9 @@ func newHLSMuxer( ctx: ctx, ctxCancel: ctxCancel, created: time.Now(), - lastRequestTime: func() *int64 { - v := time.Now().UnixNano() - return &v - }(), - bytesSent: new(uint64), - chRequest: make(chan *hlsMuxerHandleRequestReq), - chAPIHLSMuxersList: make(chan hlsManagerAPIMuxersListSubReq), + lastRequestTime: int64Ptr(time.Now().UnixNano()), + bytesSent: new(uint64), + chRequest: make(chan *hlsMuxerHandleRequestReq), } m.Log(logger.Info, "created %s", func() string { @@ -201,15 +200,6 @@ func (m *hlsMuxer) run() { m.requests = append(m.requests, req) } - case req := <-m.chAPIHLSMuxersList: - req.data.Items = append(req.data.Items, hlsManagerAPIMuxersListItem{ - Path: m.pathName, - Created: m.created, - LastRequest: time.Unix(0, atomic.LoadInt64(m.lastRequestTime)), - BytesSent: atomic.LoadUint64(m.bytesSent), - }) - close(req.res) - case <-innerReady: isReady = true for _, req := range m.requests { @@ -555,17 +545,6 @@ func (m *hlsMuxer) processRequest(req *hlsMuxerHandleRequestReq) { } } -// apiMuxersList is called by api. -func (m *hlsMuxer) apiMuxersList(req hlsManagerAPIMuxersListSubReq) { - req.res = make(chan struct{}) - select { - case m.chAPIHLSMuxersList <- req: - <-req.res - - case <-m.ctx.Done(): - } -} - // apiReaderDescribe implements reader. func (m *hlsMuxer) apiReaderDescribe() pathAPISourceOrReader { return pathAPISourceOrReader{ @@ -573,3 +552,12 @@ func (m *hlsMuxer) apiReaderDescribe() pathAPISourceOrReader { ID: "", } } + +func (m *hlsMuxer) apiItem() *apiHLSMuxer { + return &apiHLSMuxer{ + Path: m.pathName, + Created: m.created, + LastRequest: time.Unix(0, atomic.LoadInt64(m.lastRequestTime)), + BytesSent: atomic.LoadUint64(m.bytesSent), + } +} diff --git a/internal/core/metrics.go b/internal/core/metrics.go index be934a485d3..e310871e955 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -78,9 +78,9 @@ func (m *metrics) Log(level logger.Level, format string, args ...interface{}) { func (m *metrics) onMetrics(ctx *gin.Context) { out := "" - res := m.pathManager.apiPathsList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.pathManager.apiPathsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { var state string if i.SourceReady { state = "ready" @@ -97,9 +97,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.hlsManager) { - res := m.hlsManager.apiMuxersList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.hlsManager.apiMuxersList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { tags := "{name=\"" + i.Path + "\"}" out += metric("hls_muxers", tags, 1) out += metric("hls_muxers_bytes_sent", tags, int64(i.BytesSent)) @@ -112,9 +112,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { if !interfaceIsEmpty(m.rtspServer) { //nolint:dupl func() { - res := m.rtspServer.apiConnsList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.rtspServer.apiConnsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { tags := "{id=\"" + i.ID.String() + "\"}" out += metric("rtsp_conns", tags, 1) out += metric("rtsp_conns_bytes_received", tags, int64(i.BytesReceived)) @@ -128,9 +128,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { }() func() { - res := m.rtspServer.apiSessionsList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.rtspServer.apiSessionsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}" out += metric("rtsp_sessions", tags, 1) out += metric("rtsp_sessions_bytes_received", tags, int64(i.BytesReceived)) @@ -146,9 +146,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { if !interfaceIsEmpty(m.rtspsServer) { //nolint:dupl func() { - res := m.rtspsServer.apiConnsList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.rtspsServer.apiConnsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { tags := "{id=\"" + i.ID.String() + "\"}" out += metric("rtsps_conns", tags, 1) out += metric("rtsps_conns_bytes_received", tags, int64(i.BytesReceived)) @@ -162,9 +162,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { }() func() { - res := m.rtspsServer.apiSessionsList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.rtspsServer.apiSessionsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}" out += metric("rtsps_sessions", tags, 1) out += metric("rtsps_sessions_bytes_received", tags, int64(i.BytesReceived)) @@ -179,9 +179,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.rtmpServer) { - res := m.rtmpServer.apiConnsList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.rtmpServer.apiConnsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { tags := "{id=\"" + i.ID.String() + "\",state=\"" + i.State + "\"}" out += metric("rtmp_conns", tags, 1) out += metric("rtmp_conns_bytes_received", tags, int64(i.BytesReceived)) @@ -195,9 +195,9 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } if !interfaceIsEmpty(m.webRTCManager) { - res := m.webRTCManager.apiSessionsList() - if res.err == nil && len(res.data.Items) != 0 { - for _, i := range res.data.Items { + data, err := m.webRTCManager.apiSessionsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { tags := "{id=\"" + i.ID.String() + "\"}" out += metric("webrtc_sessions", tags, 1) out += metric("webrtc_sessions_bytes_received", tags, int64(i.BytesReceived)) diff --git a/internal/core/path.go b/internal/core/path.go index 184c793a7d1..2de00f0a770 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -158,35 +158,24 @@ type pathAPISourceOrReader struct { ID string `json:"id"` } -type pathAPIPathsListItem struct { - Name string `json:"name"` - ConfName string `json:"confName"` - Conf *conf.PathConf `json:"conf"` - Source interface{} `json:"source"` - SourceReady bool `json:"sourceReady"` - Tracks []string `json:"tracks"` - BytesReceived uint64 `json:"bytesReceived"` - Readers []interface{} `json:"readers"` -} - -type pathAPIPathsListData struct { - PageCount int `json:"pageCount"` - Items []pathAPIPathsListItem `json:"items"` -} - type pathAPIPathsListRes struct { - data *pathAPIPathsListData + data *apiPathsList paths map[string]*path - err error } type pathAPIPathsListReq struct { res chan pathAPIPathsListRes } -type pathAPIPathsListSubReq struct { - data *pathAPIPathsListData - res chan struct{} +type pathAPIPathsGetRes struct { + path *path + data *apiPath + err error +} + +type pathAPIPathsGetReq struct { + name string + res chan pathAPIPathsGetRes } type path struct { @@ -232,7 +221,7 @@ type path struct { chPublisherStop chan pathPublisherStopReq chReaderAdd chan pathReaderAddReq chReaderRemove chan pathReaderRemoveReq - chAPIPathsList chan pathAPIPathsListSubReq + chAPIPathsGet chan pathAPIPathsGetReq // out done chan struct{} @@ -286,7 +275,7 @@ func newPath( chPublisherStop: make(chan pathPublisherStopReq), chReaderAdd: make(chan pathReaderAddReq), chReaderRemove: make(chan pathReaderRemoveReq), - chAPIPathsList: make(chan pathAPIPathsListSubReq), + chAPIPathsGet: make(chan pathAPIPathsGetReq), done: make(chan struct{}), } @@ -489,8 +478,8 @@ func (pa *path) run() { case req := <-pa.chReaderRemove: pa.handleReaderRemove(req) - case req := <-pa.chAPIPathsList: - pa.handleAPIPathsList(req) + case req := <-pa.chAPIPathsGet: + pa.handleAPIPathsGet(req) case <-pa.ctx.Done(): return fmt.Errorf("terminated") @@ -898,34 +887,35 @@ func (pa *path) handleReaderAddPost(req pathReaderAddReq) { } } -func (pa *path) handleAPIPathsList(req pathAPIPathsListSubReq) { - req.data.Items = append(req.data.Items, pathAPIPathsListItem{ - Name: pa.name, - ConfName: pa.confName, - Conf: pa.conf, - Source: func() interface{} { - if pa.source == nil { - return nil - } - return pa.source.apiSourceDescribe() - }(), - SourceReady: pa.stream != nil, - Tracks: func() []string { - if pa.stream == nil { - return []string{} - } - return mediasDescription(pa.stream.medias()) - }(), - BytesReceived: atomic.LoadUint64(pa.bytesReceived), - Readers: func() []interface{} { - ret := []interface{}{} - for r := range pa.readers { - ret = append(ret, r.apiReaderDescribe()) - } - return ret - }(), - }) - close(req.res) +func (pa *path) handleAPIPathsGet(req pathAPIPathsGetReq) { + req.res <- pathAPIPathsGetRes{ + data: &apiPath{ + Name: pa.name, + ConfName: pa.confName, + Conf: pa.conf, + Source: func() interface{} { + if pa.source == nil { + return nil + } + return pa.source.apiSourceDescribe() + }(), + SourceReady: pa.stream != nil, + Tracks: func() []string { + if pa.stream == nil { + return []string{} + } + return mediasDescription(pa.stream.medias()) + }(), + BytesReceived: atomic.LoadUint64(pa.bytesReceived), + Readers: func() []interface{} { + ret := []interface{}{} + for r := range pa.readers { + ret = append(ret, r.apiReaderDescribe()) + } + return ret + }(), + }, + } } // reloadConf is called by pathManager. @@ -1039,13 +1029,15 @@ func (pa *path) readerRemove(req pathReaderRemoveReq) { } } -// apiPathsList is called by api. -func (pa *path) apiPathsList(req pathAPIPathsListSubReq) { - req.res = make(chan struct{}) +// apiPathsGet is called by api. +func (pa *path) apiPathsGet(req pathAPIPathsGetReq) (*apiPath, error) { + req.res = make(chan pathAPIPathsGetRes) select { - case pa.chAPIPathsList <- req: - <-req.res + case pa.chAPIPathsGet <- req: + res := <-req.res + return res.data, res.err case <-pa.ctx.Done(): + return nil, fmt.Errorf("terminated") } } diff --git a/internal/core/path_manager.go b/internal/core/path_manager.go index 4ebcbe21bf6..0cf016a4b48 100644 --- a/internal/core/path_manager.go +++ b/internal/core/path_manager.go @@ -69,6 +69,7 @@ type pathManager struct { chPublisherAdd chan pathPublisherAddReq chHLSManagerSet chan pathManagerHLSManager chAPIPathsList chan pathAPIPathsListReq + chAPIPathsGet chan pathAPIPathsGetReq } func newPathManager( @@ -113,6 +114,7 @@ func newPathManager( chPublisherAdd: make(chan pathPublisherAddReq), chHLSManagerSet: make(chan pathManagerHLSManager), chAPIPathsList: make(chan pathAPIPathsListReq), + chAPIPathsGet: make(chan pathAPIPathsGetReq), } for pathConfName, pathConf := range pm.pathConfs { @@ -292,10 +294,17 @@ outer: paths[name] = pa } - req.res <- pathAPIPathsListRes{ - paths: paths, + req.res <- pathAPIPathsListRes{paths: paths} + + case req := <-pm.chAPIPathsGet: + path, ok := pm.paths[req.name] + if !ok { + req.res <- pathAPIPathsGetRes{err: fmt.Errorf("not found")} + continue } + req.res <- pathAPIPathsGetRes{path: path} + case <-pm.ctx.Done(): break outer } @@ -482,7 +491,7 @@ func (pm *pathManager) hlsManagerSet(s pathManagerHLSManager) { } // apiPathsList is called by api. -func (pm *pathManager) apiPathsList() pathAPIPathsListRes { +func (pm *pathManager) apiPathsList() (*apiPathsList, error) { req := pathAPIPathsListReq{ res: make(chan pathAPIPathsListRes), } @@ -491,17 +500,44 @@ func (pm *pathManager) apiPathsList() pathAPIPathsListRes { case pm.chAPIPathsList <- req: res := <-req.res - res.data = &pathAPIPathsListData{ - Items: []pathAPIPathsListItem{}, + res.data = &apiPathsList{ + Items: []*apiPath{}, } for _, pa := range res.paths { - pa.apiPathsList(pathAPIPathsListSubReq{data: res.data}) + item, err := pa.apiPathsGet(pathAPIPathsGetReq{}) + if err != nil { + return nil, err + } + + res.data.Items = append(res.data.Items, item) + } + + return res.data, nil + + case <-pm.ctx.Done(): + return nil, fmt.Errorf("terminated") + } +} + +// apiPathsGet is called by api. +func (pm *pathManager) apiPathsGet(name string) (*apiPath, error) { + req := pathAPIPathsGetReq{ + name: name, + res: make(chan pathAPIPathsGetRes), + } + + select { + case pm.chAPIPathsGet <- req: + res := <-req.res + if res.err != nil { + return nil, res.err } - return res + data, err := res.path.apiPathsGet(req) + return data, err case <-pm.ctx.Done(): - return pathAPIPathsListRes{err: fmt.Errorf("terminated")} + return nil, fmt.Errorf("terminated") } } diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index a67d607b499..cc630c3fc8b 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -827,3 +827,23 @@ func (c *rtmpConn) apiReaderDescribe() pathAPISourceOrReader { func (c *rtmpConn) apiSourceDescribe() pathAPISourceOrReader { return c.apiReaderDescribe() } + +func (c *rtmpConn) apiItem() *apiRTMPConn { + return &apiRTMPConn{ + ID: c.uuid, + Created: c.created, + RemoteAddr: c.remoteAddr().String(), + State: func() string { + switch c.safeState() { + case rtmpConnStateRead: + return "read" + + case rtmpConnStatePublish: + return "publish" + } + return "idle" + }(), + BytesReceived: c.conn.BytesReceived(), + BytesSent: c.conn.BytesSent(), + } +} diff --git a/internal/core/rtmp_server.go b/internal/core/rtmp_server.go index 8c976ec327e..108f93ca722 100644 --- a/internal/core/rtmp_server.go +++ b/internal/core/rtmp_server.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "sync" - "time" "github.com/google/uuid" @@ -15,27 +14,23 @@ import ( "github.com/bluenviron/mediamtx/internal/logger" ) -type rtmpServerAPIConnsListItem struct { - ID uuid.UUID `json:"id"` - Created time.Time `json:"created"` - RemoteAddr string `json:"remoteAddr"` - State string `json:"state"` - BytesReceived uint64 `json:"bytesReceived"` - BytesSent uint64 `json:"bytesSent"` +type rtmpServerAPIConnsListRes struct { + data *apiRTMPConnsList + err error } -type rtmpServerAPIConnsListData struct { - PageCount int `json:"pageCount"` - Items []rtmpServerAPIConnsListItem `json:"items"` +type rtmpServerAPIConnsListReq struct { + res chan rtmpServerAPIConnsListRes } -type rtmpServerAPIConnsListRes struct { - data *rtmpServerAPIConnsListData +type rtmpServerAPIConnsGetRes struct { + data *apiRTMPConn err error } -type rtmpServerAPIConnsListReq struct { - res chan rtmpServerAPIConnsListRes +type rtmpServerAPIConnsGetReq struct { + uuid uuid.UUID + res chan rtmpServerAPIConnsGetRes } type rtmpServerAPIConnsKickRes struct { @@ -71,9 +66,10 @@ type rtmpServer struct { conns map[*rtmpConn]struct{} // in - chConnClose chan *rtmpConn - chAPISessionsList chan rtmpServerAPIConnsListReq - chAPIConnsKick chan rtmpServerAPIConnsKickReq + chConnClose chan *rtmpConn + chAPIConnsList chan rtmpServerAPIConnsListReq + chAPIConnsGet chan rtmpServerAPIConnsGetReq + chAPIConnsKick chan rtmpServerAPIConnsKickReq } func newRTMPServer( @@ -129,7 +125,8 @@ func newRTMPServer( ln: ln, conns: make(map[*rtmpConn]struct{}), chConnClose: make(chan *rtmpConn), - chAPISessionsList: make(chan rtmpServerAPIConnsListReq), + chAPIConnsList: make(chan rtmpServerAPIConnsListReq), + chAPIConnsGet: make(chan rtmpServerAPIConnsGetReq), chAPIConnsKick: make(chan rtmpServerAPIConnsKickReq), } @@ -217,33 +214,26 @@ outer: case c := <-s.chConnClose: delete(s.conns, c) - case req := <-s.chAPISessionsList: - data := &rtmpServerAPIConnsListData{ - Items: []rtmpServerAPIConnsListItem{}, + case req := <-s.chAPIConnsList: + data := &apiRTMPConnsList{ + Items: []*apiRTMPConn{}, } for c := range s.conns { - data.Items = append(data.Items, rtmpServerAPIConnsListItem{ - ID: c.uuid, - Created: c.created, - RemoteAddr: c.remoteAddr().String(), - State: func() string { - switch c.safeState() { - case rtmpConnStateRead: - return "read" - - case rtmpConnStatePublish: - return "publish" - } - return "idle" - }(), - BytesReceived: c.conn.BytesReceived(), - BytesSent: c.conn.BytesSent(), - }) + data.Items = append(data.Items, c.apiItem()) } req.res <- rtmpServerAPIConnsListRes{data: data} + case req := <-s.chAPIConnsGet: + c := s.findConnByUUID(req.uuid) + if c == nil { + req.res <- rtmpServerAPIConnsGetRes{err: fmt.Errorf("not found")} + continue + } + + req.res <- rtmpServerAPIConnsGetRes{data: c.apiItem()} + case req := <-s.chAPIConnsKick: c := s.findConnByUUID(req.uuid) if c == nil { @@ -287,22 +277,40 @@ func (s *rtmpServer) connClose(c *rtmpConn) { } // apiConnsList is called by api. -func (s *rtmpServer) apiConnsList() rtmpServerAPIConnsListRes { +func (s *rtmpServer) apiConnsList() (*apiRTMPConnsList, error) { req := rtmpServerAPIConnsListReq{ res: make(chan rtmpServerAPIConnsListRes), } select { - case s.chAPISessionsList <- req: - return <-req.res + case s.chAPIConnsList <- req: + res := <-req.res + return res.data, res.err + + case <-s.ctx.Done(): + return nil, fmt.Errorf("terminated") + } +} + +// apiConnsGet is called by api. +func (s *rtmpServer) apiConnsGet(uuid uuid.UUID) (*apiRTMPConn, error) { + req := rtmpServerAPIConnsGetReq{ + uuid: uuid, + res: make(chan rtmpServerAPIConnsGetRes), + } + + select { + case s.chAPIConnsGet <- req: + res := <-req.res + return res.data, res.err case <-s.ctx.Done(): - return rtmpServerAPIConnsListRes{err: fmt.Errorf("terminated")} + return nil, fmt.Errorf("terminated") } } // apiConnsKick is called by api. -func (s *rtmpServer) apiConnsKick(uuid uuid.UUID) rtmpServerAPIConnsKickRes { +func (s *rtmpServer) apiConnsKick(uuid uuid.UUID) error { req := rtmpServerAPIConnsKickReq{ uuid: uuid, res: make(chan rtmpServerAPIConnsKickRes), @@ -310,9 +318,10 @@ func (s *rtmpServer) apiConnsKick(uuid uuid.UUID) rtmpServerAPIConnsKickRes { select { case s.chAPIConnsKick <- req: - return <-req.res + res := <-req.res + return res.err case <-s.ctx.Done(): - return rtmpServerAPIConnsKickRes{err: fmt.Errorf("terminated")} + return fmt.Errorf("terminated") } } diff --git a/internal/core/rtsp_conn.go b/internal/core/rtsp_conn.go index ea4c3ec1872..f90519af2f6 100644 --- a/internal/core/rtsp_conn.go +++ b/internal/core/rtsp_conn.go @@ -209,3 +209,13 @@ func (c *rtspConn) handleAuthError(authErr error) (*base.Response, error) { StatusCode: base.StatusUnauthorized, }, authErr } + +func (c *rtspConn) apiItem() *apiRTSPConn { + return &apiRTSPConn{ + ID: c.uuid, + Created: c.created, + RemoteAddr: c.remoteAddr().String(), + BytesReceived: c.conn.BytesReceived(), + BytesSent: c.conn.BytesSent(), + } +} diff --git a/internal/core/rtsp_server.go b/internal/core/rtsp_server.go index fc72210c604..dbeb960969b 100644 --- a/internal/core/rtsp_server.go +++ b/internal/core/rtsp_server.go @@ -19,47 +19,6 @@ import ( "github.com/bluenviron/mediamtx/internal/logger" ) -type rtspServerAPIConnsListItem struct { - ID uuid.UUID `json:"id"` - Created time.Time `json:"created"` - RemoteAddr string `json:"remoteAddr"` - BytesReceived uint64 `json:"bytesReceived"` - BytesSent uint64 `json:"bytesSent"` -} - -type rtspServerAPIConnsListData struct { - PageCount int `json:"pageCount"` - Items []rtspServerAPIConnsListItem `json:"items"` -} - -type rtspServerAPIConnsListRes struct { - data *rtspServerAPIConnsListData - err error -} - -type rtspServerAPISessionsListItem struct { - ID uuid.UUID `json:"id"` - Created time.Time `json:"created"` - RemoteAddr string `json:"remoteAddr"` - State string `json:"state"` - BytesReceived uint64 `json:"bytesReceived"` - BytesSent uint64 `json:"bytesSent"` -} - -type rtspServerAPISessionsListData struct { - PageCount int `json:"pageCount"` - Items []rtspServerAPISessionsListItem `json:"items"` -} - -type rtspServerAPISessionsListRes struct { - data *rtspServerAPISessionsListData - err error -} - -type rtspServerAPISessionsKickRes struct { - err error -} - type rtspServerParent interface { logger.Writer } @@ -364,6 +323,15 @@ func (s *rtspServer) OnDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) se.onDecodeError(ctx) } +func (s *rtspServer) findConnByUUID(uuid uuid.UUID) *rtspConn { + for _, c := range s.conns { + if c.uuid == uuid { + return c + } + } + return nil +} + func (s *rtspServer) findSessionByUUID(uuid uuid.UUID) (*gortsplib.ServerSession, *rtspSession) { for key, sx := range s.sessions { if sx.uuid == uuid { @@ -374,78 +342,92 @@ func (s *rtspServer) findSessionByUUID(uuid uuid.UUID) (*gortsplib.ServerSession } // apiConnsList is called by api and metrics. -func (s *rtspServer) apiConnsList() rtspServerAPIConnsListRes { +func (s *rtspServer) apiConnsList() (*apiRTSPConnsList, error) { select { case <-s.ctx.Done(): - return rtspServerAPIConnsListRes{err: fmt.Errorf("terminated")} + return nil, fmt.Errorf("terminated") default: } s.mutex.RLock() defer s.mutex.RUnlock() - data := &rtspServerAPIConnsListData{ - Items: []rtspServerAPIConnsListItem{}, + data := &apiRTSPConnsList{ + Items: []*apiRTSPConn{}, } for _, c := range s.conns { - data.Items = append(data.Items, rtspServerAPIConnsListItem{ - ID: c.uuid, - Created: c.created, - RemoteAddr: c.remoteAddr().String(), - BytesReceived: c.conn.BytesReceived(), - BytesSent: c.conn.BytesSent(), - }) + data.Items = append(data.Items, c.apiItem()) + } + + return data, nil +} + +// apiConnsGet is called by api. +func (s *rtspServer) apiConnsGet(uuid uuid.UUID) (*apiRTSPConn, error) { + select { + case <-s.ctx.Done(): + return nil, fmt.Errorf("terminated") + default: } - return rtspServerAPIConnsListRes{data: data} + s.mutex.RLock() + defer s.mutex.RUnlock() + + conn := s.findConnByUUID(uuid) + if conn == nil { + return nil, fmt.Errorf("not found") + } + + return conn.apiItem(), nil } // apiSessionsList is called by api and metrics. -func (s *rtspServer) apiSessionsList() rtspServerAPISessionsListRes { +func (s *rtspServer) apiSessionsList() (*apiRTSPSessionsList, error) { select { case <-s.ctx.Done(): - return rtspServerAPISessionsListRes{err: fmt.Errorf("terminated")} + return nil, fmt.Errorf("terminated") default: } s.mutex.RLock() defer s.mutex.RUnlock() - data := &rtspServerAPISessionsListData{ - Items: []rtspServerAPISessionsListItem{}, + data := &apiRTSPSessionsList{ + Items: []*apiRTSPSession{}, } for _, s := range s.sessions { - data.Items = append(data.Items, rtspServerAPISessionsListItem{ - ID: s.uuid, - Created: s.created, - RemoteAddr: s.remoteAddr().String(), - State: func() string { - switch s.safeState() { - case gortsplib.ServerSessionStatePrePlay, - gortsplib.ServerSessionStatePlay: - return "read" - - case gortsplib.ServerSessionStatePreRecord, - gortsplib.ServerSessionStateRecord: - return "publish" - } - return "idle" - }(), - BytesReceived: s.session.BytesReceived(), - BytesSent: s.session.BytesSent(), - }) + data.Items = append(data.Items, s.apiItem()) + } + + return data, nil +} + +// apiSessionsGet is called by api. +func (s *rtspServer) apiSessionsGet(uuid uuid.UUID) (*apiRTSPSession, error) { + select { + case <-s.ctx.Done(): + return nil, fmt.Errorf("terminated") + default: + } + + s.mutex.RLock() + defer s.mutex.RUnlock() + + _, sx := s.findSessionByUUID(uuid) + if sx == nil { + return nil, fmt.Errorf("not found") } - return rtspServerAPISessionsListRes{data: data} + return sx.apiItem(), nil } // apiSessionsKick is called by api. -func (s *rtspServer) apiSessionsKick(uuid uuid.UUID) rtspServerAPISessionsKickRes { +func (s *rtspServer) apiSessionsKick(uuid uuid.UUID) error { select { case <-s.ctx.Done(): - return rtspServerAPISessionsKickRes{err: fmt.Errorf("terminated")} + return fmt.Errorf("terminated") default: } @@ -454,11 +436,11 @@ func (s *rtspServer) apiSessionsKick(uuid uuid.UUID) rtspServerAPISessionsKickRe key, sx := s.findSessionByUUID(uuid) if sx == nil { - return rtspServerAPISessionsKickRes{err: fmt.Errorf("not found")} + return fmt.Errorf("not found") } sx.close() delete(s.sessions, key) sx.onClose(liberrors.ErrServerTerminated{}) - return rtspServerAPISessionsKickRes{} + return nil } diff --git a/internal/core/rtsp_session.go b/internal/core/rtsp_session.go index 5471315907f..ad51901e855 100644 --- a/internal/core/rtsp_session.go +++ b/internal/core/rtsp_session.go @@ -385,3 +385,25 @@ func (s *rtspSession) onPacketLost(ctx *gortsplib.ServerHandlerOnPacketLostCtx) func (s *rtspSession) onDecodeError(ctx *gortsplib.ServerHandlerOnDecodeErrorCtx) { s.Log(logger.Warn, ctx.Error.Error()) } + +func (s *rtspSession) apiItem() *apiRTSPSession { + return &apiRTSPSession{ + ID: s.uuid, + Created: s.created, + RemoteAddr: s.remoteAddr().String(), + State: func() string { + switch s.safeState() { + case gortsplib.ServerSessionStatePrePlay, + gortsplib.ServerSessionStatePlay: + return "read" + + case gortsplib.ServerSessionStatePreRecord, + gortsplib.ServerSessionStateRecord: + return "publish" + } + return "idle" + }(), + BytesReceived: s.session.BytesReceived(), + BytesSent: s.session.BytesSent(), + } +} diff --git a/internal/core/webrtc_manager.go b/internal/core/webrtc_manager.go index 5e11a4c08ac..a54217976da 100644 --- a/internal/core/webrtc_manager.go +++ b/internal/core/webrtc_manager.go @@ -64,25 +64,8 @@ func linkHeaderToIceServers(link []string) []webrtc.ICEServer { return ret } -type webRTCManagerAPISessionsListItem struct { - ID uuid.UUID `json:"id"` - Created time.Time `json:"created"` - RemoteAddr string `json:"remoteAddr"` - PeerConnectionEstablished bool `json:"peerConnectionEstablished"` - LocalCandidate string `json:"localCandidate"` - RemoteCandidate string `json:"remoteCandidate"` - State string `json:"state"` - BytesReceived uint64 `json:"bytesReceived"` - BytesSent uint64 `json:"bytesSent"` -} - -type webRTCManagerAPISessionsListData struct { - PageCount int `json:"pageCount"` - Items []webRTCManagerAPISessionsListItem `json:"items"` -} - type webRTCManagerAPISessionsListRes struct { - data *webRTCManagerAPISessionsListData + data *apiWebRTCSessionsList err error } @@ -99,6 +82,16 @@ type webRTCManagerAPISessionsKickReq struct { res chan webRTCManagerAPISessionsKickRes } +type webRTCManagerAPISessionsGetRes struct { + data *apiWebRTCSession + err error +} + +type webRTCManagerAPISessionsGetReq struct { + uuid uuid.UUID + res chan webRTCManagerAPISessionsGetRes +} + type webRTCSessionNewRes struct { sx *webRTCSession answer []byte @@ -157,6 +150,7 @@ type webRTCManager struct { chSessionClose chan *webRTCSession chSessionAddCandidates chan webRTCSessionAddCandidatesReq chAPISessionsList chan webRTCManagerAPISessionsListReq + chAPIConnsGet chan webRTCManagerAPISessionsGetReq chAPIConnsKick chan webRTCManagerAPISessionsKickReq // out @@ -200,6 +194,7 @@ func newWebRTCManager( chSessionClose: make(chan *webRTCSession), chSessionAddCandidates: make(chan webRTCSessionAddCandidatesReq), chAPISessionsList: make(chan webRTCManagerAPISessionsListReq), + chAPIConnsGet: make(chan webRTCManagerAPISessionsGetReq), chAPIConnsKick: make(chan webRTCManagerAPISessionsKickReq), done: make(chan struct{}), } @@ -309,46 +304,25 @@ outer: req.res <- webRTCSessionAddCandidatesRes{sx: sx} case req := <-m.chAPISessionsList: - data := &webRTCManagerAPISessionsListData{ - Items: []webRTCManagerAPISessionsListItem{}, + data := &apiWebRTCSessionsList{ + Items: []*apiWebRTCSession{}, } for sx := range m.sessions { - peerConnectionEstablished := false - localCandidate := "" - remoteCandidate := "" - bytesReceived := uint64(0) - bytesSent := uint64(0) - - pc := sx.safePC() - if pc != nil { - peerConnectionEstablished = true - localCandidate = pc.localCandidate() - remoteCandidate = pc.remoteCandidate() - bytesReceived = pc.bytesReceived() - bytesSent = pc.bytesSent() - } - - data.Items = append(data.Items, webRTCManagerAPISessionsListItem{ - ID: sx.uuid, - Created: sx.created, - RemoteAddr: sx.req.remoteAddr, - PeerConnectionEstablished: peerConnectionEstablished, - LocalCandidate: localCandidate, - RemoteCandidate: remoteCandidate, - State: func() string { - if sx.req.publish { - return "publish" - } - return "read" - }(), - BytesReceived: bytesReceived, - BytesSent: bytesSent, - }) + data.Items = append(data.Items, sx.apiItem()) } req.res <- webRTCManagerAPISessionsListRes{data: data} + case req := <-m.chAPIConnsGet: + sx := m.findSessionByUUID(req.uuid) + if sx == nil { + req.res <- webRTCManagerAPISessionsGetRes{err: fmt.Errorf("not found")} + continue + } + + req.res <- webRTCManagerAPISessionsGetRes{data: sx.apiItem()} + case req := <-m.chAPIConnsKick: sx := m.findSessionByUUID(req.uuid) if sx == nil { @@ -482,22 +456,40 @@ func (m *webRTCManager) sessionAddCandidates( } // apiSessionsList is called by api. -func (m *webRTCManager) apiSessionsList() webRTCManagerAPISessionsListRes { +func (m *webRTCManager) apiSessionsList() (*apiWebRTCSessionsList, error) { req := webRTCManagerAPISessionsListReq{ res: make(chan webRTCManagerAPISessionsListRes), } select { case m.chAPISessionsList <- req: - return <-req.res + res := <-req.res + return res.data, res.err + + case <-m.ctx.Done(): + return nil, fmt.Errorf("terminated") + } +} + +// apiSessionsGet is called by api. +func (m *webRTCManager) apiSessionsGet(uuid uuid.UUID) (*apiWebRTCSession, error) { + req := webRTCManagerAPISessionsGetReq{ + uuid: uuid, + res: make(chan webRTCManagerAPISessionsGetRes), + } + + select { + case m.chAPIConnsGet <- req: + res := <-req.res + return res.data, res.err case <-m.ctx.Done(): - return webRTCManagerAPISessionsListRes{err: fmt.Errorf("terminated")} + return nil, fmt.Errorf("terminated") } } // apiSessionsKick is called by api. -func (m *webRTCManager) apiSessionsKick(uuid uuid.UUID) webRTCManagerAPISessionsKickRes { +func (m *webRTCManager) apiSessionsKick(uuid uuid.UUID) error { req := webRTCManagerAPISessionsKickReq{ uuid: uuid, res: make(chan webRTCManagerAPISessionsKickRes), @@ -505,9 +497,10 @@ func (m *webRTCManager) apiSessionsKick(uuid uuid.UUID) webRTCManagerAPISessions select { case m.chAPIConnsKick <- req: - return <-req.res + res := <-req.res + return res.err case <-m.ctx.Done(): - return webRTCManagerAPISessionsKickRes{err: fmt.Errorf("terminated")} + return fmt.Errorf("terminated") } } diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index c736ee7f09f..96bf851b7fa 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -596,3 +596,37 @@ func (s *webRTCSession) apiSourceDescribe() pathAPISourceOrReader { func (s *webRTCSession) apiReaderDescribe() pathAPISourceOrReader { return s.apiSourceDescribe() } + +func (s *webRTCSession) apiItem() *apiWebRTCSession { + peerConnectionEstablished := false + localCandidate := "" + remoteCandidate := "" + bytesReceived := uint64(0) + bytesSent := uint64(0) + + pc := s.safePC() + if pc != nil { + peerConnectionEstablished = true + localCandidate = pc.localCandidate() + remoteCandidate = pc.remoteCandidate() + bytesReceived = pc.bytesReceived() + bytesSent = pc.bytesSent() + } + + return &apiWebRTCSession{ + ID: s.uuid, + Created: s.created, + RemoteAddr: s.req.remoteAddr, + PeerConnectionEstablished: peerConnectionEstablished, + LocalCandidate: localCandidate, + RemoteCandidate: remoteCandidate, + State: func() string { + if s.req.publish { + return "publish" + } + return "read" + }(), + BytesReceived: bytesReceived, + BytesSent: bytesSent, + } +}