Add S3 connection method
cap10morgan committed May 5, 2023
1 parent e84bf09 commit 7efcd2f
Expand Up @@ -37,7 +37,12 @@
org.bouncycastle/bcprov-jdk15on {:mvn/version "1.70"}

;; smartfunctions
org.babashka/sci {:mvn/version "0.3.31"}}
org.babashka/sci {:mvn/version "0.3.31"}

;; storage {:mvn/version "0.8.666"} {:mvn/version ""} {:mvn/version "847.2.1365.0"}}

:paths ["src" "resources"]

Expand All @@ -54,12 +59,12 @@
criterium/criterium {:mvn/version "0.4.6"}
figwheel-sidecar/figwheel-sidecar {:mvn/version "0.5.20"}
thheller/shadow-cljs {:mvn/version "2.20.12"}
com.magnars/test-with-files {:mvn/version "2021-02-17"}}}
com.magnars/test-with-files {:mvn/version "2021-02-17"}}}

{:extra-paths ["test" "dev-resources"]
:extra-deps {lambdaisland/kaocha {:mvn/version "1.71.1119"}
org.clojure/test.check {:mvn/version "1.1.1"}
:extra-deps {lambdaisland/kaocha {:mvn/version "1.71.1119"}
org.clojure/test.check {:mvn/version "1.1.1"}
io.github.cap10morgan/test-with-files {:git/tag "v1.0.0"
:git/sha "9181a2e"}}
:exec-fn kaocha.runner/exec-fn
Expand Down Expand Up @@ -91,15 +96,15 @@
:main-opts ["-m" "cloverage.coverage" "-p" "src" "-s" "test" "--output" "scanning_results/coverage"]}

{:extra-deps {jonase/eastwood {:mvn/version "1.3.0"}}
:main-opts ["-m" "eastwood.lint"
{:source-paths ["src" "src-docs"]
:test-paths ["test"]
;; TODO: Un-exclude this when it stops triggering false
;; positives on "UnsupportedOperationException empty is
;; not supported on Flake" when using the #Flake data
;; reader - WSM 2023-02-01
:exclude-linters [:implicit-dependencies]}]}
{:extra-deps {jonase/eastwood {:mvn/version "1.3.0"}}
:main-opts ["-m" "eastwood.lint"
{:source-paths ["src" "src-docs"]
:test-paths ["test"]
;; TODO: Un-exclude this when it stops triggering false
;; positives on "UnsupportedOperationException empty is
;; not supported on Flake" when using the #Flake data
;; reader - WSM 2023-02-01
:exclude-linters [:implicit-dependencies]}]}

{:extra-deps {com.github.liquidz/antq {:mvn/version "RELEASE"}}
267 changes: 267 additions & 0 deletions src/fluree/db/conn/s3.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
(ns fluree.db.conn.s3
(:require [ :as io]
[clojure.string :as str]
[ :as aws]
[clojure.core.async :as async :refer [go <!]]
[fluree.crypto :as crypto]
[fluree.db.conn.cache :as conn-cache]
[fluree.db.conn.proto :as conn-proto]
[fluree.db.conn.state-machine :as state-machine]
[fluree.db.full-text :as full-text]
[fluree.db.index :as index]
[fluree.db.indexer.default :as idx-default]
[fluree.db.ledger.proto :as ledger-proto]
[fluree.db.serde.json :refer [json-serde]]
[ :as storage]
[fluree.db.util.context :as ctx-util]
[fluree.db.util.json :as json]
[fluree.db.util.log :as log]
[fluree.json-ld :as json-ld])
(:import ( Closeable ByteArrayOutputStream)))

(set! *warn-on-reflection* true)

(defn s3-address
[{:keys [s3-bucket s3-prefix]} path]
(if (str/starts-with? path "//")
(str "fluree:s3://" s3-bucket "/" s3-prefix "/" (-> path (str/split #"//")
(str "fluree:s3://" s3-bucket "/" s3-prefix "/" path)))

(defn address-path
([conn address] (address-path conn address true))
([{:keys [s3-bucket s3-prefix]} address strip-prefix?]
(log/debug "address-path address:" address)
(let [path (-> address (str/split #"://") last)]
(if strip-prefix?
(-> path (str/replace-first (str s3-bucket "/" s3-prefix "/") ""))
(str "//" path)))))

(defn handle-s3-response
(if (:cognitect.anomalies/category resp)
(if ( resp)
(ex-info "S3 read failed"
{:status 500, :error :db/unexpected-error, :aws/response resp}))
(let [{in :Body} resp
_ (log/debug "S3 response:" resp)
body-str (when in
(with-open [out (ByteArrayOutputStream.)]
(io/copy in out)
(.close ^Closeable in)
(String. (.toByteArray out))))]
(cond-> resp
body-str (assoc :Body body-str)))))

(defn s3-list
([conn path] (s3-list conn path nil))
([{:keys [s3-client s3-bucket s3-prefix]} path continuation-token]
(let [ch (async/promise-chan (map handle-s3-response))
base-req {:op :ListObjectsV2
:ch ch
:request {:Bucket s3-bucket}}
full-path (if (empty? s3-prefix)
(str s3-prefix "/" path))
req (cond-> base-req
(not= full-path "/") (assoc-in [:request :Prefix]
continuation-token (assoc-in
[:request :ContinuationToken]
(aws/invoke-async s3-client req)

(defn s3-key-exists?
[conn key]
(let [list (<! (s3-list conn key))]
(< 0 (:KeyCount list)))))

(defn read-s3-data
[{:keys [s3-client s3-bucket s3-prefix]} path]
(let [ch (async/promise-chan (map handle-s3-response))
full-path (str s3-prefix "/" path)
req {:op :GetObject
:ch ch
:request {:Bucket s3-bucket, :Key full-path}}]
(aws/invoke-async s3-client req)

(defn write-s3-data
[{:keys [s3-client s3-bucket s3-prefix]} path ^bytes data]
(let [ch (async/promise-chan (map handle-s3-response))
full-path (str s3-prefix "/" path)
req {:op :PutObject
:ch ch
:request {:Bucket s3-bucket, :Key full-path, :Body data}}]
(aws/invoke-async s3-client req)

(defn write-data
[conn ledger data-type data]
(let [alias (ledger-proto/-alias ledger)
branch (-> ledger ledger-proto/-branch :name name)
json (if (string? data)
(json-ld/normalize-data data))
bytes (.getBytes ^String json)
hash (crypto/sha2-256 bytes :hex)
type-dir (name data-type)
path (str alias
(when branch (str "/" branch))
(str "/" type-dir "/")
hash ".json")
result (<! (write-s3-data conn path bytes))]
(if (instance? Throwable result)
{:name hash
:hash hash
:json json
:size (count json)
:address (s3-address conn path)}))))

(defn read-address
[conn address]
(->> address (address-path conn) (read-s3-data conn)))

(defn read-commit
[conn address]
(go (json/parse (<! (read-address conn address)) false)))

(defn write-commit
[conn ledger commit-data]
(write-data conn ledger :commit commit-data))

(defn read-context
[conn address]
(go (json/parse (<! (read-address conn address)) false)))

(defn write-context
[conn ledger context-data]
(write-data conn ledger :context context-data))

(defn write-index
[conn ledger index-type index-data]
(write-data conn ledger (str "index/" (name index-type)) index-data))

(defn read-index
[conn index-address]
(go (-> conn (read-address index-address) <! (json/parse true))))

(defn push
[conn publish-address {commit-address :address}]
(let [commit-path (address-path conn commit-address false)
head-path (address-path conn publish-address)]
(->> (.getBytes ^String commit-path)
(write-s3-data conn head-path)

(defrecord S3Connection [id s3-client s3-bucket s3-prefix memory state
ledger-defaults parallelism msg-in-ch msg-out-ch
(-c-read [conn commit-key] (read-commit conn commit-key))
(-c-write [conn ledger commit-data] (write-commit conn ledger commit-data))
(-ctx-read [conn context-key] (read-context conn context-key))
(-ctx-write [conn ledger context-data] (write-context conn ledger context-data))
(-index-file-write [conn ledger index-type index-data]
(write-index conn ledger index-type index-data))
(-index-file-read [conn index-address]
(read-index conn index-address))

(-pull [_conn _ledger] (throw (ex-info "Unsupported S3Connection op: pull" {})))
(-subscribe [_conn _ledger]
(throw (ex-info "Unsupported S3Connection op: subscribe" {})))
(-alias [conn ledger-address]
(-> ledger-address (->> (address-path conn)) (str/split #"/")
(->> (drop-last 2) (str/join #"/"))))
(-push [conn head-path commit-data] (push conn head-path commit-data))
(-lookup [conn head-address]
(go (s3-address conn (<! (read-address conn head-address)))))
(-address [conn ledger-alias {:keys [branch] :as _opts}]
(let [branch (if branch (name branch) "main")]
(go (s3-address conn (str ledger-alias "/" branch "/head")))))
(-exists? [conn ledger-address] (s3-key-exists? conn ledger-address))

(-close [_] (swap! state assoc :closed? true))
(-closed? [_] (boolean (:closed? @state)))
(-method [_] :s3)
(-parallelism [_] parallelism)
(-id [_] id)
(-default-context [_] (:context ledger-defaults))
(-new-indexer [_ opts]
(let [indexer-fn (:indexer ledger-defaults)]
(indexer-fn opts)))
(-did [_] (:did ledger-defaults))
(-msg-in [_ _] (throw (ex-info "Unsupported S3Connection op: msg-in" {})))
(-msg-out [_ _] (throw (ex-info "Unsupported S3Connection op: msg-out" {})))
(-state [_] @state)
(-state [_ ledger] (get @state ledger))

(resolve [conn {:keys [id leaf tempid] :as node}]
(let [cache-key [::resolve id tempid]]
(if (= :empty id)
(storage/resolve-empty-leaf node)
(conn-cache/lru-lookup lru-cache-atom cache-key
(fn [_]
conn node
(fn [] (conn-cache/lru-evict lru-cache-atom

(open-storage [_conn _network _dbid _lang]
(throw (ex-info "S3 connection does not support full text operations."
{:status 400, :error :db/unsupported-operation}))))

(defn ledger-defaults
[{:keys [context context-type did indexer]}]
{:context (ctx-util/stringify-context context)
:context-type context-type
:did did
:indexer (cond
(fn? indexer)

(or (map? indexer) (nil? indexer))
(fn [opts]
(idx-default/create (merge indexer opts)))

(throw (ex-info (str "Expected an indexer constructor fn or default indexer options map. Provided: "
{:status 400, :error :db/invalid-s3-connection})))})

(defn connect
"Create a new S3 connection."
[{:keys [defaults parallelism s3-endpoint s3-bucket s3-prefix lru-cache-atom
memory serializer]
:or {serializer (json-serde)} :as _opts}]
(let [aws-opts (cond-> {:api :s3}
s3-endpoint (assoc :endpoint-override s3-endpoint))
client (aws/client aws-opts)
conn-id (str (random-uuid))
state (state-machine/blank-state)
cache-size (conn-cache/memory->cache-size memory)
lru-cache-atom (or lru-cache-atom
(atom (conn-cache/create-lru-cache cache-size)))]
(map->S3Connection {:id conn-id
:s3-client client
:s3-bucket s3-bucket
:s3-prefix s3-prefix
:memory memory
:state state
:ledger-defaults (ledger-defaults defaults)
:serializer serializer
:parallelism parallelism
:msg-in-ch (async/chan)
:msg-out-ch (async/chan)
:lru-cache-atom lru-cache-atom}))))
6 changes: 5 additions & 1 deletion src/fluree/db/json_ld/api.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[fluree.db.conn.ipfs :as ipfs-conn]
[fluree.db.conn.file :as file-conn]
[fluree.db.conn.memory :as memory-conn]
[fluree.db.conn.s3 :as s3-conn]
[fluree.db.conn.proto :as conn-proto]
[fluree.db.dbproto :as dbproto]
[fluree.db.platform :as platform]
Expand Down Expand Up @@ -66,7 +67,10 @@
:file (if platform/BROWSER
(throw (ex-info "File connection not supported in the browser" opts))
(file-conn/connect opts*))
:memory (memory-conn/connect opts*)))))
:memory (memory-conn/connect opts*)
:s3 #?(:clj (s3-conn/connect opts*)
:cljs (throw (ex-info "S3 connections not yet supported in ClojureScript"
{:status 400, :error :db/unsupported-operation})))))))

Expand Down

0 comments on commit 7efcd2f

