Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion changelog.d/2-features/WPB-20207
Original file line number Diff line number Diff line change
@@ -1 +1 @@
cargohold: add asset audit logging (#4782 ,#4784)
cargohold: add asset audit logging (#4782, #4784, #4787)
10 changes: 9 additions & 1 deletion charts/nginz/templates/conf/_nginx.conf.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ http {

{{- $validUpstreams := include "valid_upstreams" . | fromJson }}
{{- range $name, $_ := $validUpstreams }}
upstream {{ $name }}{{ if hasKey $.Values.nginx_conf.upstream_namespace $name }}.{{ get $.Values.nginx_conf.upstream_namespace $name }}{{end}} {
upstream {{ $name }}{{ if hasKey $.Values.nginx_conf.upstream_namespace $name }}.{{ get $.Values.nginx_conf.upstream_namespace $name }}{{end}} {
zone {{ $name }} 64k; # needed for dynamic DNS updates
least_conn;
keepalive 32;
Expand Down Expand Up @@ -322,6 +322,14 @@ http {
proxy_pass http://{{ $name }}{{ if hasKey $.Values.nginx_conf.upstream_namespace $name }}.{{ get $.Values.nginx_conf.upstream_namespace $name }}{{end}};
proxy_http_version 1.1;

# Forward client IP information to upstreams
# NOTE: Do NOT override X-Forwarded-For here.
# - We intentionally leave X-Forwarded-For as received from upstreams
# to preserve existing semantics (e.g., brig expects a single IP).
# - For components that need the full chain, we add X-Wire-Forwarded-For.
proxy_set_header X-Wire-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Real-IP $remote_addr;

{{- if ($location.disable_request_buffering) }}
proxy_request_buffering off;
{{ end -}}
Expand Down
1 change: 1 addition & 0 deletions libs/http2-manager/src/HTTP2/Client/Manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module HTTP2.Client.Manager
http2ManagerWithSSLCtx,
withHTTP2Request,
withHTTP2RequestOnSingleUseConn,
withHTTP2RequestOnSingleUseConnWithHook,
connectIfNotAlreadyConnected,
ConnectionAlreadyClosed (..),
disconnectTarget,
Expand Down
29 changes: 26 additions & 3 deletions libs/http2-manager/src/HTTP2/Client/Manager/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,19 @@ withHTTP2Request mgr target req k = do

-- | Temporary workaround for https://github.com/kazu-yamamoto/http2/issues/102
withHTTP2RequestOnSingleUseConn :: Http2Manager -> Target -> HTTP2.Request -> (HTTP2.Response -> IO a) -> IO a
withHTTP2RequestOnSingleUseConn Http2Manager {..} target req k = do
withHTTP2RequestOnSingleUseConn mgr target req k =
withHTTP2RequestOnSingleUseConnWithHook mgr target req (\_ -> pure ()) k

withHTTP2RequestOnSingleUseConnWithHook ::
Http2Manager ->
Target ->
HTTP2.Request ->
(NS.SockAddr -> IO ()) ->
(HTTP2.Response -> IO a) ->
IO a
withHTTP2RequestOnSingleUseConnWithHook Http2Manager {..} target req onConnect k = do
sendReqMVar <- newEmptyMVar
thread <- liftIO . async $ startPersistentHTTP2Connection sslContext target cacheLimit sslRemoveTrailingDot tcpConnectionTimeout sendReqMVar
thread <- liftIO . async $ startPersistentHTTP2ConnectionWithHook sslContext target cacheLimit sslRemoveTrailingDot tcpConnectionTimeout onConnect sendReqMVar
let newConn = HTTP2Conn thread (putMVar sendReqMVar CloseConnection) sendReqMVar
sendRequestWithConnection newConn req $ \resp -> do
k resp <* disconnect newConn
Expand Down Expand Up @@ -276,6 +286,17 @@ disconnectTargetWithTimeout mgr target microSeconds = do
`finally` (atomically . modifyTVar' (connections mgr) $ Map.delete target)

startPersistentHTTP2Connection ::
SSL.SSLContext ->
Target ->
Int ->
Bool ->
Int ->
MVar ConnectionAction ->
IO ()
startPersistentHTTP2Connection ctx target cl removeTrailingDot tcpConnectTimeout sendReqMVar =
startPersistentHTTP2ConnectionWithHook ctx target cl removeTrailingDot tcpConnectTimeout (\_ -> pure ()) sendReqMVar

startPersistentHTTP2ConnectionWithHook ::
SSL.SSLContext ->
Target ->
-- cacheLimit
Expand All @@ -284,12 +305,13 @@ startPersistentHTTP2Connection ::
Bool ->
-- | TCP connect timeout in microseconds
Int ->
(NS.SockAddr -> IO ()) ->
-- MVar used to communicate requests or the need to close the connection. (We could use a
-- queue here to queue several requests, but since the requestor has to wait for the
-- response, it might as well block before sending off the request.)
MVar ConnectionAction ->
IO ()
startPersistentHTTP2Connection ctx (tlsEnabled, hostname, port) cl removeTrailingDot tcpConnectTimeout sendReqMVar = do
startPersistentHTTP2ConnectionWithHook ctx (tlsEnabled, hostname, port) cl removeTrailingDot tcpConnectTimeout onConnect sendReqMVar = do
liveReqs <- newIORef mempty
let clientConfig =
HTTP2.defaultClientConfig
Expand Down Expand Up @@ -332,6 +354,7 @@ startPersistentHTTP2Connection ctx (tlsEnabled, hostname, port) cl removeTrailin

handle cleanupThreadsWith $
bracket connectTCPWithTimeout NS.close $ \sock -> do
onConnect =<< NS.getPeerName sock
bracket (mkTransport sock transportConfig) cleanupTransport $ \transport ->
bracket (allocHTTP2Config transport) HTTP2.freeSimpleConfig $ \http2Cfg -> do
let runAction = HTTP2.run clientConfig http2Cfg $ \sendReq _aux -> do
Expand Down
109 changes: 107 additions & 2 deletions libs/wire-api-federation/src/Wire/API/Federation/Endpoint.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,21 @@ module Wire.API.Federation.Endpoint
)
where

import Control.Lens ((?~))
import Data.ByteString.Conversion (fromByteString)
import Data.Kind
import Data.Metrics.Servant
import Data.Misc (IpAddr)
import Data.OpenApi qualified as S
import Data.Sequence qualified as Seq
import GHC.TypeLits
import Imports
import Servant.API
import Network.HTTP.Types qualified as HTTP
import Servant
import Servant.Client
import Servant.Client.Core
import Servant.OpenApi (HasOpenApi (toOpenApi))
import Servant.Server.Internal (MkContextWithErrorFormatter)
import Wire.API.ApplyMods
import Wire.API.Federation.API.Common
import Wire.API.Federation.Domain
Expand Down Expand Up @@ -72,8 +83,9 @@ type StreamingFedEndpoint name input output =
name
( name
:> OriginDomainHeader
:> OriginIpHeader
:> ReqBody '[JSON] input
:> StreamPost NoFraming OctetStream output
:> StreamPostWithRemoteIp NoFraming OctetStream output
)

type family
Expand All @@ -90,3 +102,96 @@ type instance
type instance
MkNotificationFedEndpoint m s ('Just v) p =
NotificationFedEndpointWithMods m (Versioned v s) s p

type OriginIpHeaderName = "Wire-Origin-IP" :: Symbol

-- | The remote backend's origin IP is best-effort forensic metadata only.
-- Do not use for auth, policy, or attribution; cert identity is authoritative.
-- IP reflects only the socket peer at our edge (often LB/NAT/egress) and may be inaccurate.
data OriginIpHeader

instance (RoutesToPaths api) => RoutesToPaths (OriginIpHeader :> api) where
getRoutes = getRoutes @api

instance (HasClient m api) => HasClient m (OriginIpHeader :> api) where
type Client m (OriginIpHeader :> api) = Client m api
clientWithRoute pm _ req = clientWithRoute pm (Proxy @api) req
hoistClientMonad pm _ = hoistClientMonad pm (Proxy @api)

type OriginIpHeaderHasServer = Header' '[Strict] OriginIpHeaderName IpAddr

instance
( HasServer api context,
HasContextEntry (MkContextWithErrorFormatter context) ErrorFormatters
) =>
HasServer (OriginIpHeader :> api) context
where
type ServerT (OriginIpHeader :> api) m = Maybe IpAddr -> ServerT api m
route _pa = route (Proxy @(OriginIpHeaderHasServer :> api))
hoistServerWithContext _ pc nt s = hoistServerWithContext (Proxy :: Proxy api) pc nt . s

originIpHeaderName :: (IsString a) => a
originIpHeaderName = fromString $ symbolVal (Proxy @OriginIpHeaderName)

-- Header carrying the best-effort remote peer IP observed by our internal
-- federator when calling a remote backend. Forensic metadata only; do not use
-- for auth/policy or attribution.
type RemoteIpHeaderName = "Wire-Remote-IP" :: Symbol

remoteIpHeaderName :: (IsString a) => a
remoteIpHeaderName = fromString $ symbolVal (Proxy @RemoteIpHeaderName)

instance (HasOpenApi api) => HasOpenApi (OriginIpHeader :> api) where
toOpenApi _ = desc $ toOpenApi (Proxy @api)
where
desc =
S.allOperations
. S.description
?~ ("Federated endpoints may include optional origin IP header: `" <> originIpHeaderName <> "`")

-- | A streaming POST combinator that behaves like Servant's 'StreamPost',
-- but whose client additionally returns an optional remote IP parsed from
-- the 'Wire-Remote-IP' response header.
data StreamPostWithRemoteIp framing (ct :: Type) a

-- Server-side simply delegates to the standard 'StreamPost' implementation.
instance
( HasServer (StreamPost framing ct a) context
) =>
HasServer (StreamPostWithRemoteIp framing ct a) context
where
type ServerT (StreamPostWithRemoteIp framing ct a) m = ServerT (StreamPost framing ct a) m
route _ = route (Proxy @(StreamPost framing ct a))
hoistServerWithContext _ = hoistServerWithContext (Proxy @(StreamPost framing ct a))

-- OpenAPI, metrics and path routing can delegate to the underlying StreamPost
instance RoutesToPaths (StreamPostWithRemoteIp (framing :: Type) (ct :: Type) (a :: Type)) where
getRoutes = getRoutes @(StreamPost framing ct a)

instance (HasOpenApi (StreamPost framing ct a)) => HasOpenApi (StreamPostWithRemoteIp framing ct a) where
toOpenApi _ = toOpenApi (Proxy @(StreamPost framing ct a))

-- Client-side: make the streaming request and return the body together with an
-- optional 'IpAddr' parsed from the 'Wire-Remote-IP' header.
instance
( RunStreamingClient m,
Accept ct,
FromSourceIO ByteString a
) =>
HasClient m (StreamPostWithRemoteIp framing ct a)
where
type Client m (StreamPostWithRemoteIp framing ct a) = m (Maybe IpAddr, a)

clientWithRoute _ _ req =
withStreamingRequest
req
{ requestMethod = HTTP.methodPost,
requestAccept = Seq.singleton (contentType (Proxy @ct))
}
$ \resp -> do
let hdrs = toList (responseHeaders resp)
mIp = lookup remoteIpHeaderName hdrs >>= fromByteString
body <- fromSourceIO (responseBody resp)
pure (mIp, body)

hoistClientMonad _ _ f c = f c
17 changes: 9 additions & 8 deletions services/cargohold/cargohold.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -356,21 +356,22 @@ test-suite cargohold-tests
-Wredundant-constraints -Wunused-packages -Wno-x-partial

build-depends:
aeson >=2.0.1.0
, base >=4 && <5
, bytestring >=0.10
aeson >=2.0.1.0
, base >=4 && <5
, bytestring >=0.10
, bytestring-conversion
, cargohold
, cargohold-types
, extended
, imports
, mime >=0.4
, tasty >=1.0
, tasty-quickcheck >=0.10
, text >=1.1
, mime >=0.4
, tasty >=1.0
, tasty-quickcheck >=0.10
, text >=1.1
, types-common
, unix
, unliftio
, uri-bytestring >=0.2
, uri-bytestring >=0.2
, wire-api

default-language: Haskell2010
1 change: 1 addition & 0 deletions services/cargohold/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ mkDerivation {
aeson
base
bytestring
bytestring-conversion
cargohold-types
extended
imports
Expand Down
15 changes: 11 additions & 4 deletions services/cargohold/src/CargoHold/API/AuditLog.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import qualified CargoHold.Types.V3 as V3
import Codec.MIME.Type (showType)
import Data.ByteString.Conversion.To (toByteString)
import Data.Id (UserId, botUserId)
import Data.Misc (IpAddr)
import Data.Qualified (Local, Qualified, Remote, qDomain, qUnqualified, tDomain, tUnqualified)
import Data.Text.Encoding (decodeUtf8)
import Imports
Expand Down Expand Up @@ -63,12 +64,13 @@ logUpload lwon mMeta pathTxt =
~~ "uploader.id" .= toByteString p
principalDomain = "uploader.domain" .= toByteString (tDomain lwon)

logDownload :: (Log.MonadLogger m) => Maybe (Qualified V3.Principal) -> S3AssetMeta -> Text -> m ()
logDownload mqDownloader s3 pathTxt =
logDownload :: (Log.MonadLogger m) => Maybe (Qualified V3.Principal) -> Maybe IpAddr -> S3AssetMeta -> Text -> m ()
logDownload mqDownloader mIp s3 pathTxt =
Log.info $
auditTrue
~~ "event" .= ("file-download" :: Text)
~~ downloaderFields mqDownloader
~~ ipaddr "downloader.backend.ip" mIp
~~ auditMetaFields (v3AssetAuditLogMetadata s3)
~~ "download.path" .= pathTxt
~~ msg (val "Asset audit log: download")
Expand All @@ -88,15 +90,16 @@ logSignedURLCreation mqCreator mMeta uri =
renderHost = maybe ("" :: Text) (decodeUtf8 . hostBS . authorityHost)
hostBS (Host h) = h

logDownloadRemoteAsset :: (Log.MonadLogger m) => Local UserId -> Remote () -> m ()
logDownloadRemoteAsset luid remote = do
logDownloadRemoteAsset :: (Log.MonadLogger m) => Local UserId -> Remote () -> Maybe IpAddr -> m ()
logDownloadRemoteAsset luid remote mIp = do
Log.info $
auditTrue
~~ "event" .= ("file-download" :: Text)
~~ "downloader.type" .= ("user" :: Text)
~~ "downloader.id" .= toByteString (tUnqualified luid)
~~ "downloader.domain" .= toByteString (tDomain luid)
~~ "remote.domain" .= toByteString (tDomain remote)
~~ ipaddr "remote.backend.ip" mIp
~~ msg (val "Asset audit log: remote download")

------------------------------------------------------------------------------
Expand Down Expand Up @@ -146,3 +149,7 @@ notAvailable = "N/A"

notAvailableInternalAccess :: Text
notAvailableInternalAccess = "N/A internal access"

ipaddr :: ByteString -> Maybe IpAddr -> Log.Msg -> Log.Msg
ipaddr field Nothing = field .= notAvailable
ipaddr field (Just ip) = field .= toByteString ip
9 changes: 5 additions & 4 deletions services/cargohold/src/CargoHold/API/Federation.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import CargoHold.Types.V3 (Principal (UserPrincipal))
import Control.Error
import Data.ByteString.Conversion (toByteString')
import Data.Domain
import Data.Misc (IpAddr)
import Data.Qualified (Qualified (Qualified))
import Data.Text.Encoding (decodeLatin1)
import Imports
Expand All @@ -54,12 +55,12 @@ checkAsset remote ga =
runMaybeT $
checkMetadata (Qualified (UserPrincipal ga.user) remote) (F.key ga) (F.token ga)

streamAsset :: Domain -> F.GetAsset -> Handler AssetSource
streamAsset remote ga = do
meta <- checkAsset remote ga >>= maybe (throwE assetNotFound) pure
streamAsset :: Domain -> Maybe IpAddr -> F.GetAsset -> Handler AssetSource
streamAsset remoteDomain remoteIp ga = do
meta <- checkAsset remoteDomain ga >>= maybe (throwE assetNotFound) pure
whenM (asks (.options.settings.assetAuditLogEnabled)) $ do
let pathTxt = decodeLatin1 (toByteString' (S3.mkKey (F.key ga)))
logDownload (Just $ Qualified (UserPrincipal ga.user) remote) meta pathTxt
logDownload (Just $ Qualified (UserPrincipal ga.user) remoteDomain) remoteIp meta pathTxt
AssetSource <$> S3.downloadV3 (F.key ga)

getAsset :: Domain -> F.GetAsset -> Handler F.GetAssetResponse
Expand Down
Loading