rtsp0.1.0-SNAPSHOTAn asynchronous client and server for the Real Time Streaming Protocol (RTSP) as described by RFC 2326. dependencies
| (this space intentionally left almost blank) | ||||||||||||
The | (ns multimedia.streaming.rtsp.client (:require [clojure.string :as string] [multimedia.streaming.rtsp.protocol :as rtsp] [multimedia.streaming.rtsp.transport :refer [request! connect-to IRequest]])) | ||||||||||||
What is RTSP?The Real Time Streaming Protocol, or RTSP, is an application-level protocol for control over the delivery of data with real-time properties. RTSP provides an extensible framework to enable controlled, on-demand delivery of real-time data, such as audio and video. Sources of data can include both live data feeds and stored clips. This protocol is intended to control multiple data delivery sessions, provide a means for choosing delivery channels such as UDP, multicast UDP and TCP, and provide a means for choosing delivery mechanisms based upon RTP (RFC 1889). | |||||||||||||
The RTSP SessionA The lifecycle of a session is as follows.
| (defrecord Session [url version connection c-seq] IRequest (request! [this {:keys [path method headers body timeout] :or {path , headers {}, body }}] (let [rtsp-request {:url (rtsp/url-for-path url path) :method method :version version :headers (assoc headers :c-seq (swap! c-seq inc)) :body body :timeout timeout}] (request! connection rtsp-request))) java.io.Closeable (close [this] (.close connection) (reset! c-seq 0))) | ||||||||||||
Session Construction | |||||||||||||
| (def ^:private default-session {:c-seq (atom 1) :version "RTSP/1.0"}) | ||||||||||||
Note that
The transport and port are determined by the scheme and port
segements of the provided URL respectively. If no port is provided,
the default RTSP port of | (defn session ([url] (session url {})) ([url options] (let [url (rtsp/split-url url) options (assoc options :url url :connection (connect-to url)) options (if-let [c-seq (:c-seq options)] (assoc options :c-seq (atom c-seq)) options)] (map->Session (merge default-session options))))) | ||||||||||||
Making RequestsOnce a session has been obtained, RTSP requests can be made by
calling the
Alternatively, there are convenience functions defined for each of the RTSP methods. See [Convenience Functions][]. | |||||||||||||
The Request MapThe RTSP request map may contain the following fields.
Header names may be either strings or keywords, but should be lowercase. Case conversion as per the RTSP standard is performed automatically. Values are case sensitive strings.
| |||||||||||||
Response HandlingThe RTSP library implements all communication asynchronously.
Calls to
This also allows all of manifold's asynchronous programming constructs to be used for response handling. The Response MapLike requests, responses take the form of a simple map structure with the following keys.
For example:
| |||||||||||||
Convenience Functions | |||||||||||||
| (defmacro ^:private def-rtsp-method [method-name] (let [verb# (-> method-name (string/replace \- \_) string/upper-case) fn-name# (symbol (str method-name \!)) doc# (str "Sends an RTSP " verb# " request for `path` to the peer described\n" " by `session` and returns a promise that will yield the response.") [s p o] ['session 'path 'options]] `(defn ~fn-name# ~doc# ([~s ~p] (~fn-name# ~s ~p {})) ([~s ~p ~o] (request! ~s (assoc ~o :method ~verb# :path ~p)))))) | ||||||||||||
Convenience functions are provided for each of the methods defined
in the RTSP specification and may be used instead of the lower
level
Note that each function has an exlamation mark appeneded to its name. | (def-rtsp-method options) (def-rtsp-method describe) (def-rtsp-method announce) (def-rtsp-method get-parameter) (def-rtsp-method pause) (def-rtsp-method play) (def-rtsp-method record) (def-rtsp-method redirect) (def-rtsp-method setup) (def-rtsp-method set-parameter) (def-rtsp-method teardown) | ||||||||||||
| (defn close! [session] (.close session)) | ||||||||||||
| (defn- print-session [session writer] (.write writer (str "#Session" (select-keys session [:url :version])))) | ||||||||||||
Printing support for the | (defmethod print-method Session [session ^java.io.Writer writer] (print-session session writer)) | ||||||||||||
Serialisation support for the | (defmethod print-dup Session [session ^java.io.Writer writer] (print-session session writer)) | ||||||||||||
Pretty printing support for the | (.addMethod clojure.pprint/simple-dispatch Session #(print-session % *out*)) | ||||||||||||
The | (ns multimedia.streaming.rtsp.protocol (:require [clojure.string :as string] [gloss.core :as codec :refer [defcodec]] [gloss.io :as io]) (:import [java.net URI])) | ||||||||||||
(def default-port 554) | |||||||||||||
(def default-scheme "rtsp") | |||||||||||||
Newlines in an RTSP request are represented as carrage-return followed by line-feed. | (def crlf "\r\n") | ||||||||||||
(defn split-url [url] (let [uri (URI. url) port (.getPort uri)] {:protocol (or (.getScheme uri) default-scheme) :host (.getHost uri) :port (if (pos? port) port default-port)})) | |||||||||||||
(defn join-url [{:keys [protocol host port]}] (str protocol "://" host \: port)) | |||||||||||||
(defn ensure-absolute [path] (if (re-matches #"^/.+" path) (string/replace path #"^/+" "/") (str "/" path))) | |||||||||||||
(defn url-for-path [url path] (if (= "*" path) path (str (join-url url) (ensure-absolute path)))) | |||||||||||||
(defn lower-snake-case [string] (-> string (string/replace #"[^-][A-Z]" #(str (first %) \- (second %))) string/lower-case)) | |||||||||||||
(defn upper-snake-case [string] (if (re-matches #"(?i)c-seq" string) "CSeq" (->> (string/split string #"-") (map string/capitalize) (string/join \-)))) | |||||||||||||
| (defn to-header [[k v]] (let [field (-> k name upper-snake-case) value (if (vector? v) (string/join \, v) v)] (string/join ": " [field value]))) | ||||||||||||
(defn unfold-headers [header-string] (string/replace header-string #"\r?\n\s+" " ")) | |||||||||||||
(defn header-string [header-map] (str (->> header-map (map to-header) (string/join crlf)))) | |||||||||||||
(defn header-map [header-string] (->> (string/split (unfold-headers header-string) (re-pattern crlf)) (map #(string/split % #": ")) (map (fn [[k v]] [(keyword (lower-snake-case k)) v])) (into {}))) | |||||||||||||
(defn with-content-length [{headers :headers, body :body :as request}] (if (string/blank? body) request (assoc-in request [:headers :content-length] (count (.getBytes body))))) | |||||||||||||
For when Gloss is fixed to handle this case. (def headers (let [key (codec/string :ascii :delimiters [": "]) value (codec/string :ascii :delimiters [crlf])] (codec/repeated [key value] :delimiters [(str crlf crlf)]))) | |||||||||||||
(defn make-body-codec [headers] (codec/ordered-map :headers headers :body (codec/string :utf-8 :length (Integer. (:content-length headers 0))))) | |||||||||||||
(defn write-only [_] (UnsupportedOperationException. "Codec does not support reading")) | |||||||||||||
(defn merge-headers-and-body [result] (dissoc (merge result (:rest result)) :rest)) | |||||||||||||
(def headers (codec/compile-frame (codec/string :ascii :delimiters [(str crlf crlf)]) header-string header-map)) | |||||||||||||
(def request (let [method (codec/string :ascii :delimiters [\space]) url (codec/string :ascii :delimiters [\space]) version (codec/string :ascii :delimiters [crlf]) body (codec/string :utf-8)] {:encoder (codec/compile-frame (codec/ordered-map :method method :url url :version version :headers headers :body body) with-content-length identity) :decoder (codec/compile-frame (codec/ordered-map :method method :url url :version version :rest (codec/header headers make-body-codec write-only)) identity merge-headers-and-body)})) | |||||||||||||
(def response (let [version (codec/string :ascii :delimiters [\space]) status (codec/string-integer :ascii :delimiters [\space]) reason (codec/string :ascii :delimiters [crlf]) body (codec/string :utf-8)] {:encoder (codec/compile-frame (codec/ordered-map :version version :status status :reason reason :headers headers :body body) with-content-length identity) :decoder (codec/compile-frame (codec/ordered-map :version version :status status :reason reason :rest (codec/header headers make-body-codec write-only)) identity merge-headers-and-body)})) | |||||||||||||
(def encode-request (partial io/encode (request :encoder))) | |||||||||||||
(def decode-request #(io/decode-stream % (request :decoder))) | |||||||||||||
(def encode-response (partial io/encode (response :encoder))) | |||||||||||||
(def decode-response #(io/decode-stream % (response :decoder))) | |||||||||||||
(ns multimedia.streaming.rtsp.transport (:require [multimedia.streaming.rtsp.protocol :as rtsp] [aleph.tcp :as tcp] [manifold.deferred :as d] [manifold.stream :as s])) | |||||||||||||
Interface to be used by all protocols. | |||||||||||||
An object that is able to issue requests. | (defprotocol IRequest (request! [this request] "Sends `request` to the peer and returns a promise that will yield the response.")) | ||||||||||||
(def default-timeout 30000) | |||||||||||||
| (defmulti connect-to :protocol) | ||||||||||||
TCP support | |||||||||||||
(defn wrap-duplex-stream [encoder decoder wire] (let [tx (s/stream)] (s/connect (s/map encoder tx) wire) (s/splice tx (decoder wire)))) | |||||||||||||
(defn existing-connection [connection] (when (and @connection (not (s/closed? @connection))) @connection)) | |||||||||||||
(defn create-connection! [connection url] (-> (d/chain (tcp/client url) #(wrap-duplex-stream rtsp/encode-request rtsp/decode-response %) #(reset! connection %)) (d/catch io.netty.channel.ConnectTimeoutException #(d/error-deferred (java.net.SocketTimeoutException. (.getMessage %)))))) | |||||||||||||
(defrecord TcpConnection [url wire] IRequest (request! [this request] (d/chain (or (existing-connection wire) (create-connection! wire url)) #(do (s/put! % request) %) #(s/try-take! % (or (:timeout request) default-timeout)))) java.io.Closeable (close [this] (when (existing-connection wire) (swap! wire s/close!)))) | |||||||||||||
(defmethod connect-to "rtsp" [url] (->TcpConnection url (atom nil))) | |||||||||||||