diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e2bd1b1..795fc0f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,8 +33,8 @@ jobs: - name: Install and run RabbitMQ run: | brew install rabbitmq - rabbitmq-plugins enable rabbitmq_stomp - brew services start rabbitmq + rabbitmq-plugins enable rabbitmq_stomp rabbitmq_web_stomp + RABBITMQ_CONFIG_FILE=./RabbitMQ/rabbitmq rabbitmq-server -detached - name: Run unit tests run: swift test --enable-code-coverage - name: Upload coverage data @@ -73,6 +73,7 @@ jobs: - 61613:61613 volumes: - ${{ github.workspace }}/RabbitMQ:/etc/rabbitmq + - ${{ github.workspace }}/Certs:/Certs steps: - name: Install curl run: apt-get update -yq && apt-get install -y curl diff --git a/Certs/ca.der b/Certs/ca.der new file mode 100644 index 0000000..da95b37 Binary files /dev/null and b/Certs/ca.der differ diff --git a/Certs/ca.key b/Certs/ca.key new file mode 100644 index 0000000..c7b6ca4 --- /dev/null +++ b/Certs/ca.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDZxz3BDLxULvL/ +dP++EOjQYdDJU3kg+6GRC/VaMBIe2wB8sZpvjrI5YMmDJYMXccLh076s38doQWy1 +acIBQMBQHYP7A6FlIybj7yBwzR/aVVWt3+97FuJcmQFBfACKNyBAvD49p6BxjNJT +IFg6V2RJDITCAGTeUwl+gATLrwImDdWAXJUJUOSqtsCRy9OButltUDoqlTajWK3+ ++MSiHiUKnDq9skGoChDNqye9+qliRZuCX/XkUn+ZLa9wq0SarNfcE8YX4aZjVGk5 +9bl/JOD3u481qQCoMGCvH7qg4GfoOA3WDb3rA/5ww5immgZyxiW5z00H+taTabDU +NWkQsvNJAgMBAAECggEACrCK1ZRjVKMC0rr6mCjikqLwgYdXQ2ESmMjsW8dPBmRz +0UsS9G2dzi7z6Tvww7h160mOWxQ3aKTIeh2lNBRKdExEl5BBrnITpT3OOvc4nGoS +pPCgY8t3ZDnu67jTWnVy//Z1fCeDmrtBsiqc5g022xaSGCc2jEqpZUQRqDFQks9F +FYDGVdEskNcKZRgLlTU66t0r7ha1LUNjlRbVnrHdGrlTVx8wJip0QpPpU3/vzdJf +5IylhrFsWO1keigAsjhw+eB5bv52tcbm05lvoTraARc2whY//r77TDKf1n4mie4J +LRdlIuUtK6d7m3d3ztpdxUrOrXxvYJHl4QamcHM+gQKBgQD0O8lz3kG7sYOzE5T7 +zb2VvbnR7k0GbsJfhpI0OSkGj7WTw88Fik1RdYfCv6JU17GNIa+r5WXr+2tsDfP1 +lIVMkLgBTWaBpancOnCML04KPLcnCrrO09WHg/w2PZZmEe2pX2AssUgiSu6FgAx9 +XhbaN55J2v0jPboa1KpQMaltyQKBgQDkRSxBSBDFW41gqeYN8mASxC6Kl+18QbZw +fh39dmr2s+0GBOW1yUZCGilPnQb3Cy+UMClQlXlpJGZOSMO3Drz6+FIBX738FgV2 +i+OUsYP/NjCyCLrrIfUfC7DZVyn5PGqDuoEu4YOXZ2JmDCfy/x/fE+MLxFgGb6MT +c+W97soZgQKBgBkqg/KVh05w1zI/pU1hANUKLLiOBo/QH+U595y7+xc6/anRNmbK +vnbTVn7hwjdd1mTFgNWEmMD9Mi97LJU0jZu7GcrAj/xx9pPDzc8UHMV/RhwWxfMD +7u+80ONB61pvJH8cXcudJOKrt/I3pJHSfSZVLtFLaGQWOzQCj94dj+jJAoGAR2K2 +f0fZLs79vIAFWNE2aEf/wgnXE+e89RsLp+szcik8c4huidYC6cMirioONo3B+FbD +ZNcJ2+6tK8gnpISwZs/nXVSipkYAuVzv8907yUEaI2MFiWjaLjZKvP08PBw/tyhJ +I+8UZvHG3ODqUVyli+69Lz3cxU5+MiQpcadOfoECgYBWr177O3tjVIg7eHtjE/HA +h3a3v0XmVeq1UxQw/WC7CC9Yh6SE5odZtQdGlqzmCsOF/H08I8UT5U+BsSEb6nSv +sFJiERnKITBhrk+1pOdNehkuB2Aj2IYeKpGp6GkS8c4DLaHUfV36EE3G5wOn8fsD +Nsz2sfx29zjlCDzXd+RpjA== +-----END PRIVATE KEY----- diff --git a/Certs/ca.pem b/Certs/ca.pem new file mode 100644 index 0000000..319a433 --- /dev/null +++ b/Certs/ca.pem @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDrTCCApWgAwIBAgIUK0VkTAGFOug9F7KgRnAb4Y5IX/wwDQYJKoZIhvcNAQEL +BQAwZjELMAkGA1UEBhMCSVQxDzANBgNVBAgMBlNpY2lseTENMAsGA1UEBwwERW5u +YTERMA8GA1UECgwIU1RPTVBOSU8xCzAJBgNVBAsMAkNBMRcwFQYDVQQDDA5mcHNl +dmVyaW5vLmNvbTAeFw0yNjAxMDgwOTU4MDFaFw0yNzAxMDgwOTU4MDFaMGYxCzAJ +BgNVBAYTAklUMQ8wDQYDVQQIDAZTaWNpbHkxDTALBgNVBAcMBEVubmExETAPBgNV +BAoMCFNUT01QTklPMQswCQYDVQQLDAJDQTEXMBUGA1UEAwwOZnBzZXZlcmluby5j +b20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDZxz3BDLxULvL/dP++ +EOjQYdDJU3kg+6GRC/VaMBIe2wB8sZpvjrI5YMmDJYMXccLh076s38doQWy1acIB +QMBQHYP7A6FlIybj7yBwzR/aVVWt3+97FuJcmQFBfACKNyBAvD49p6BxjNJTIFg6 +V2RJDITCAGTeUwl+gATLrwImDdWAXJUJUOSqtsCRy9OButltUDoqlTajWK3++MSi +HiUKnDq9skGoChDNqye9+qliRZuCX/XkUn+ZLa9wq0SarNfcE8YX4aZjVGk59bl/ +JOD3u481qQCoMGCvH7qg4GfoOA3WDb3rA/5ww5immgZyxiW5z00H+taTabDUNWkQ +svNJAgMBAAGjUzBRMB0GA1UdDgQWBBS6S/spfRhpwIHgrwq4kfSkaAumBzAfBgNV +HSMEGDAWgBS6S/spfRhpwIHgrwq4kfSkaAumBzAPBgNVHRMBAf8EBTADAQH/MA0G +CSqGSIb3DQEBCwUAA4IBAQCMPdS5JckDqQ7zDEVEnumpPbBg37/pQea8KW5hnoNe +uBai1ycqoSfOhfBrpbTQTliKETCvD+Ts25pdzxeWhcILvH9BwKcAihhWRyDWsKUx +oozrjiZhXEFzgT4yFv6kiE1g51HrudllWpzIJar0ufg/0Pm2ZSOVbSTsDqO8kP/M +KUBbR59yfKTX2eHtkEt0QZ8kODR00FZ/xuR4MG75Ukbzjl6OnuFKa2rpy3vD1nDW +VzQsIB7CYw4VOCbCL+IGU9Bwe8aF/KUOQeWZgU4E7j15R/D3aLmtoAm2KcOdsMkd +pBSBbWuYZdeLF1i5838WkyrnqQxEgV+2pN37C9yHFgKQ +-----END CERTIFICATE----- diff --git a/Certs/ca.srl b/Certs/ca.srl new file mode 100644 index 0000000..38ee442 --- /dev/null +++ b/Certs/ca.srl @@ -0,0 +1 @@ +32FA8CE071C68BB7F32F0B07B9EE05D5B38548F8 diff --git a/Certs/client.csr b/Certs/client.csr new file mode 100644 index 0000000..4feb52d --- /dev/null +++ b/Certs/client.csr @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICrzCCAZcCAQAwajELMAkGA1UEBhMCSVQxDzANBgNVBAgMBlNpY2lseTENMAsG +A1UEBwwERW5uYTERMA8GA1UECgwIU1RPTVBOSU8xDzANBgNVBAsMBkNsaWVudDEX +MBUGA1UEAwwOZnBzZXZlcmluby5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQCd3Q9zC3Jmf9B3VUdzp2TBhO1dnlo9thFyOB4AxnW8TGJ5kYxaOTRn +NGfPKiGRSOECe2isPra2hoj0eub6sowszFlYQNsbYh2jEhCWL8G2GJD0f3xnpqJx +lyb/YYLiYcDJXpucbvMdWnaApqaEIcA25g3rZ52PmkIsIFKM00K4vTmvDpV2dwC5 +59+p3da+sEYp9N5B7o8ypZwEVajY5z5glisaxGos+qDg6wHLPoxMRKojoYcE4urK +2O9A6LEDfUWDxTLkSjiVQVHD5KK5N/kAZdB57kHR/3C5gmauYus4lJfoBUA9YcSH +iyq+EDzE8lEHg+UQmNxn9ywVAT3PsXlTAgMBAAGgADANBgkqhkiG9w0BAQsFAAOC +AQEAg2DnJTIarPZOQ3gorJ2Vnarh25RIw5Y4uEwiVkp0yGM7dvX5gakN4AOwhtfY +2aHa/ChLA6UxMkd4zfjON1S7LxV0dLrtHQ2hCNp0OKBOFoF25EzLNYB1opicKYJI +oSeOY+9f0OpfGEkbYHM3Fr0eqUSjnWxOcescFYIVYAFwSvgM3tGV8Jd6ka2pRLkJ +gi9BM97N7jCCAc2L+bj0NKci1uvZTeQ9duhjifLoTJ53a7+VqVbnXTKeISMh1uQJ +IIdumoq1d6lK+f2jZboVWOTgjiXRD9ubbtuES1jMRimIHF/jB3ReB1ao8UcJfvti +fHKTKClviUj53X0ryTqpmpNBmg== +-----END CERTIFICATE REQUEST----- diff --git a/Certs/client.key b/Certs/client.key new file mode 100644 index 0000000..e8190ef --- /dev/null +++ b/Certs/client.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCd3Q9zC3Jmf9B3 +VUdzp2TBhO1dnlo9thFyOB4AxnW8TGJ5kYxaOTRnNGfPKiGRSOECe2isPra2hoj0 +eub6sowszFlYQNsbYh2jEhCWL8G2GJD0f3xnpqJxlyb/YYLiYcDJXpucbvMdWnaA +pqaEIcA25g3rZ52PmkIsIFKM00K4vTmvDpV2dwC559+p3da+sEYp9N5B7o8ypZwE +VajY5z5glisaxGos+qDg6wHLPoxMRKojoYcE4urK2O9A6LEDfUWDxTLkSjiVQVHD +5KK5N/kAZdB57kHR/3C5gmauYus4lJfoBUA9YcSHiyq+EDzE8lEHg+UQmNxn9ywV +AT3PsXlTAgMBAAECggEAEr7VdFBM094OkkphSIWn7GY393tzyiCCFCw/Z9EpLuRq +dyFw50/OqLNWNl6Ri4UkxEeE55svgB9xSN1ald9jeiMD4vvEk40EPuvIu293/a26 +HMZnWTmLeu3mIWzh/rbOjpA6QzUA5SCWXMgtPW3H5jJNlIFXz4ho84DHvKrjdo3r +RynMdSx4U2KKTTJgl+kd3oYMjH3/6wSfiDcIb1kwabrYm45igmZz7lgkwGpyrcEc +ZUNztsNOfKSiZqyh9BY1Ri0mxHK6GYs3XdvCqgDdvT+JnCXw6QErTvj+kRvlCOIu +3dg6o0GneY9YY8Fsz1sY5z1wbjdsx/wovLFpNrckBQKBgQDQ6Dbuy17goQw+p3n0 +FCyagI1NYnuSF5fYDfvic/dFCr9nBc7E0Z1Qy0pqgUH4PVAqkBFM76jOmkeNyfyx +pl3E7tKfgzAgExEexc6FYLf6oiv2ChfLPUVevsBZMUE9/k8KC5O9IsGev3/ewY8z +DaA/4X4rBqG0cuAMJBS9gYZPJwKBgQDBcy45HvovT4eQMoK/nf7/lceK0vxPMkV0 +zz8aQqaGkf2/SxQo+AJ7eaIiSE7QhmrV9ZCUGbjZX+rUBlL9ZM3rg19rJQHSldPd +JKhtt7zSIALVmFd3pBHw3B3dBN8c6REhzwefEkKtgIuzklg9V5jZnWwuKUPXcU0S +ajSbMB4f9QKBgF740tBowG9ah9iMBtNBwK1Ut+hwV0EpXKyqXtKqacWHh1BswX2U +1fH/GYuly7nxFdrjuBnD8nhKhzEcnLaerY50DGjcGSrbwDK/No5IVBurUlT+babF +6h2Slpt12vU2AfTNIpMfk+p5oMZCNLm+/hVMjDE/SJBikRyz4oHsuoSNAoGBAJV4 +t8hvBQS1QJvMS3ZfT/6P7oR3jXhr8XrV/3387cqg1x2Nl2AocVKSoxI2KJ+6nKYB +fdrRHSDomfuFf25HC0zkTz/sckISfocqofFUvNLm5QnBEiRhY3NlwLVb5EDkXNZw +rwTcCSUiY882TnPnUTUkC8iHHeJDqzeFVGxse3pZAoGAHO+7OChgyKxV6IT4B4/x +pLURZdcV9z2KmhK/5xJ5S/9McFf7bHnv/EfikcqK4wZrawW9t1GZl4brz1EKuck+ +qSi2+W6u/gdgy8W0eteRB6rfyCza3+EGmJ/0ePrpiHwz1LPOMfIPQE6+ufvuTzIB +TfGS6WhO6RJZpclS2KCz1HM= +-----END PRIVATE KEY----- diff --git a/Certs/client.p12 b/Certs/client.p12 new file mode 100644 index 0000000..b1dd8de Binary files /dev/null and b/Certs/client.p12 differ diff --git a/Certs/client.pem b/Certs/client.pem new file mode 100644 index 0000000..98853e3 --- /dev/null +++ b/Certs/client.pem @@ -0,0 +1,22 @@ +-----BEGIN CERTIFICATE----- +MIIDoDCCAoigAwIBAgIUMvqM4HHGi7fzLwsHue4F1bOFSPgwDQYJKoZIhvcNAQEL +BQAwZjELMAkGA1UEBhMCSVQxDzANBgNVBAgMBlNpY2lseTENMAsGA1UEBwwERW5u +YTERMA8GA1UECgwIU1RPTVBOSU8xCzAJBgNVBAsMAkNBMRcwFQYDVQQDDA5mcHNl +dmVyaW5vLmNvbTAeFw0yNjAxMDgwOTU4MDFaFw0yNzAxMDgwOTU4MDFaMGoxCzAJ +BgNVBAYTAklUMQ8wDQYDVQQIDAZTaWNpbHkxDTALBgNVBAcMBEVubmExETAPBgNV +BAoMCFNUT01QTklPMQ8wDQYDVQQLDAZDbGllbnQxFzAVBgNVBAMMDmZwc2V2ZXJp +bm8uY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAnd0PcwtyZn/Q +d1VHc6dkwYTtXZ5aPbYRcjgeAMZ1vExieZGMWjk0ZzRnzyohkUjhAntorD62toaI +9Hrm+rKMLMxZWEDbG2IdoxIQli/BthiQ9H98Z6aicZcm/2GC4mHAyV6bnG7zHVp2 +gKamhCHANuYN62edj5pCLCBSjNNCuL05rw6VdncAueffqd3WvrBGKfTeQe6PMqWc +BFWo2Oc+YJYrGsRqLPqg4OsByz6MTESqI6GHBOLqytjvQOixA31Fg8Uy5Eo4lUFR +w+SiuTf5AGXQee5B0f9wuYJmrmLrOJSX6AVAPWHEh4sqvhA8xPJRB4PlEJjcZ/cs +FQE9z7F5UwIDAQABo0IwQDAdBgNVHQ4EFgQUnZrMgVKUoj837kqd45OCwOGsPOow +HwYDVR0jBBgwFoAUukv7KX0YacCB4K8KuJH0pGgLpgcwDQYJKoZIhvcNAQELBQAD +ggEBAIylv5wAIzbEud5u+A0aP04k1sClaJuvh51leke3EObMBgBD4eT2IT9uw1Db +BgcDilIGd7mXovSsRJ8nGmXDm2U+rQb1QyL+Iqt6vKI64j9TDNdw8zGrvZbGaVDC +1ViuSwykSWc+D1uelSBuwMmUacPuxGUMstBGTdwA8UpGxtsCVniUJ5h07wrp+uCQ +mreIpmi4bhIWytdM7iZRPrlq2Vf6pRFC+wSbe6v/tWvI0/CUKW2eSi1g1avBKTcp +RtBpJNEzZr1yLTk6J8YCR304wGm0OitLW70Gf2bc8c/tesDalsMD8ANuy+u5SAzz +NhHLIVk/+/BZQiawxlDNOnR+jQs= +-----END CERTIFICATE----- diff --git a/Certs/server.csr b/Certs/server.csr new file mode 100644 index 0000000..69ef3f1 --- /dev/null +++ b/Certs/server.csr @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIIC2zCCAcMCAQAwajELMAkGA1UEBhMCSVQxDzANBgNVBAgMBlNpY2lseTENMAsG +A1UEBwwERW5uYTERMA8GA1UECgwIU1RPTVBOSU8xDzANBgNVBAsMBlNlcnZlcjEX +MBUGA1UEAwwOZnBzZXZlcmluby5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQDIiBLlPDzdawP+4vLbvbH3Vj0juSa1J31ZOrDMK3YAynel7un9vMLt +hPuZseF9eqWkuBrh2NWu5D0vMtNZcxjGnP/iDIyslMM2RrWaTI9R+74Z5fTho4T8 +HBIxpFdryzeAbLiAhzU7n04crcDjDwDsxSDGe3+MLUOCv3IzOoG6eyUh7LBpq2wi +5y5q5GVIyZK3trs3eSVXhhE5b/zY/JeXNeovTrktz8X1nBORCTgWJEJle7e4Pv23 +WBZH7esDmbp0gLIWIh1Ox8Fo7YtgiDWAEoQLko95uKmRoMR3261cAZxuA/LnteNL +MLHJNGtM2hwXjclqBGyVOf3Ws3KP5Y6HAgMBAAGgLDAqBgkqhkiG9w0BCQ4xHTAb +MBkGA1UdEQQSMBCCDmZwc2V2ZXJpbm8uY29tMA0GCSqGSIb3DQEBCwUAA4IBAQAb +nWUDW1cpizI45fc7KpV/M4R5MOm1RNoi2b9Yobwwcy835DrBwaLwZkWzOmVGYag/ +aaSKf/vZ1eG6fC32i5qjJJ18JWdip+oB+oiiF6yMUThaPN6zvlYbqUhqlSd7WfsM +hhaR5ixtm8RFIbhRdoH3Eljwc5/xG0WDjH2vPwB22ZajMNUcBMe4thPgEFSuxFsy +dCwQQMVWj101lD4nKbrMkALDd43FZBmlIFdMj6bw5klYwjBudIEcffjLkQI12l/d +A5xKFl+JAtp7GThIU1XezcXrgl4bHE3rjz8GAbA9CeAe3hABSkZk2b7lLJSGHdkx +pveAu5BqxcHMsNTxme6B +-----END CERTIFICATE REQUEST----- diff --git a/Certs/server.key b/Certs/server.key new file mode 100644 index 0000000..f179c96 --- /dev/null +++ b/Certs/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDIiBLlPDzdawP+ +4vLbvbH3Vj0juSa1J31ZOrDMK3YAynel7un9vMLthPuZseF9eqWkuBrh2NWu5D0v +MtNZcxjGnP/iDIyslMM2RrWaTI9R+74Z5fTho4T8HBIxpFdryzeAbLiAhzU7n04c +rcDjDwDsxSDGe3+MLUOCv3IzOoG6eyUh7LBpq2wi5y5q5GVIyZK3trs3eSVXhhE5 +b/zY/JeXNeovTrktz8X1nBORCTgWJEJle7e4Pv23WBZH7esDmbp0gLIWIh1Ox8Fo +7YtgiDWAEoQLko95uKmRoMR3261cAZxuA/LnteNLMLHJNGtM2hwXjclqBGyVOf3W +s3KP5Y6HAgMBAAECggEADc8ZBgIk6tZwuv+j26/vbmnY4fnhCX6Dqj5ZaGXhxWIr +5RhTsq5p+fM7ZDUUoxBsEbC/4SeOZ3G+KN7mIPXFEgBDuqxEj6na24T5AnwdLncY +ey4Ts+oPwVCpjBOWFY0z8DsMZYVqDAYHWgA5FSm2VZepjyF4Ag4n5PwMAC0aO94Z +vF70zg2J6c8QYQDwMSnzNKpCTdlSEbQOA4r8NJ9Bw2g7Ufg4fklOfH8dEmFWLGmx +WYGdNQR025pFBfSLFbyZgKNV5lD+KxgAacSrt10+8yjQQM9dIHrHlvRYX0CGKOiN +s2+zO74DG41KEbknjyrusWME3mSAvlEiOTuDWi7h4QKBgQD11lJ/+zw3v48v4YPt +XgGIZqFRBFZLs5jVs0l9zb/6ZWXFtL7C1UcA831KARrFx0HzSvJDGKi7w/yTXGKw +HRveJQxV59wolHo19v0DvcfDw9fVYcHM9i6jFrOxyOkiZnBajDSYkrIHRGIMg9et +noHtPgEswHD4A61T9WJ7jyzU0QKBgQDQ0kjy8OHnkAtZH2WBf3MaG63WMrQgqKC5 +/KYkTl96Ym06wqERJePiAYTcMl5j6gjWuM1CGjBUcIrXXvJ3oxBBGOTtwcOwFed3 +t8Nn7Kqk4NSNyMCYxemo5iPZz6B+vSRsmLmz4DRPqCrYYSUT4oDrYroIwcbIpS7s +V1G50FJj1wKBgQC8eZ10m2Q2hXm8xtQDdvAL6EtDNuUGUuIY7MSqW8UUrwqQwhsC +esteEk97Hk+tVC/e3BXSlSRAGGkbfVf4F8kWziaf8gwbXII5v2uJo8V09EAUS0du +4UsuJCiOZC9A/XKkKRvl4Yn2Alp9p9MCPCT2g3QZIE08CG2pkdb4TfOcEQKBgA3A +i4VUT8xMNDMeWjT5C7+AayZcGHi+IjedZ+OkNa19pevkvW6/rA5KQ1GtD4JM+2AI +s5vS1bP55Wrhmd4/IubmmMz/ob+K8b90nq3NbD+HfvgHbIejIQw6e8nwXoV6N1ff +hvtNnKRSUuAFJcf1dsAJtG63NDwqhLiT15IBIivpAoGBAOHxyCpGfOq1n4xGMeFo +xvW4M3FoppiBe1RSt/ZQYQqii6P5rnkBpqHuFbXzOJIQKAy+QPzBoJO14Qc7dLq6 +LkcmINS02UGanDilJk5GAZOU4SHfXiwhqgkB3Sn4Taw2gC+C1exxCH0GBb2aRRwh +AKRKQxvEbjUQlLemy5oyc2WH +-----END PRIVATE KEY----- diff --git a/Certs/server.pem b/Certs/server.pem new file mode 100644 index 0000000..6ea5fa2 --- /dev/null +++ b/Certs/server.pem @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID6TCCAtGgAwIBAgIUMvqM4HHGi7fzLwsHue4F1bOFSPcwDQYJKoZIhvcNAQEL +BQAwZjELMAkGA1UEBhMCSVQxDzANBgNVBAgMBlNpY2lseTENMAsGA1UEBwwERW5u +YTERMA8GA1UECgwIU1RPTVBOSU8xCzAJBgNVBAsMAkNBMRcwFQYDVQQDDA5mcHNl +dmVyaW5vLmNvbTAeFw0yNjAxMDgwOTU4MDFaFw0yNzAxMDgwOTU4MDFaMGoxCzAJ +BgNVBAYTAklUMQ8wDQYDVQQIDAZTaWNpbHkxDTALBgNVBAcMBEVubmExETAPBgNV +BAoMCFNUT01QTklPMQ8wDQYDVQQLDAZTZXJ2ZXIxFzAVBgNVBAMMDmZwc2V2ZXJp +bm8uY29tMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAyIgS5Tw83WsD +/uLy272x91Y9I7kmtSd9WTqwzCt2AMp3pe7p/bzC7YT7mbHhfXqlpLga4djVruQ9 +LzLTWXMYxpz/4gyMrJTDNka1mkyPUfu+GeX04aOE/BwSMaRXa8s3gGy4gIc1O59O +HK3A4w8A7MUgxnt/jC1Dgr9yMzqBunslIeywaatsIucuauRlSMmSt7a7N3klV4YR +OW/82PyXlzXqL065Lc/F9ZwTkQk4FiRCZXu3uD79t1gWR+3rA5m6dICyFiIdTsfB +aO2LYIg1gBKEC5KPebipkaDEd9utXAGcbgPy57XjSzCxyTRrTNocF43JagRslTn9 +1rNyj+WOhwIDAQABo4GKMIGHMAsGA1UdDwQEAwIFoDAdBgNVHSUEFjAUBggrBgEF +BQcDAQYIKwYBBQUHAwIwGQYDVR0RBBIwEIIOZnBzZXZlcmluby5jb20wHQYDVR0O +BBYEFKZY3up3J8Fd4et0kBlaPl8GTEqeMB8GA1UdIwQYMBaAFLpL+yl9GGnAgeCv +CriR9KRoC6YHMA0GCSqGSIb3DQEBCwUAA4IBAQAUT14J8+++NJXo8WDQ/Oclmiwi +xG6/tyctPSVJBuhqzNR8y3R20T+TD3t43J7SzxW1uINqsAEz0nJigbNjNOeTS7Zf +uZ2xzLvwtPsOKPi+2lb+D+Zcncqp0VeIsOsOcxMBbSEN+mUD8qhmsN7P0jPrhMHp +s2sKj2dURVdysLGbedTNdNWCjoyRd8T+v4RxPMQUimVOSV6dAEHdvWnjv0YZLAX1 +/teX8VelYDPVv3zn8xhbqSP5G7rNMiJ7Trt3JwST5O7/db7IITLi8Fyx967p+ssX +qg32zy79FeqPfV1g1MPOlu35sLkFrI6B5/C3eOQ9BKTMEPkDfAuou2IQqh5z +-----END CERTIFICATE----- diff --git a/Notice.txt b/Notice.txt index 0c7ecaa..f2a1224 100644 --- a/Notice.txt +++ b/Notice.txt @@ -27,7 +27,8 @@ components that this product depends on. ------------------------------------------------------------------------------- This product was heavily influenced by MQTT NIO. -It contains a derivation of MQTT NIO's 'MQTTTask.swift'. +It contains a derivation of MQTT NIO's 'MQTTTask.swift', 'WebSocketInitialRequest.swift' and 'WebSocketHandler.swift'. +It also contains a version of MQTT NIO's 'TSTLSConfiguration.swift'. * LICENSE (Apache License 2.0) * https://github.com/swift-server-community/mqtt-nio/blob/main/LICENSE diff --git a/Package.swift b/Package.swift index b3ceddb..8d1fa36 100644 --- a/Package.swift +++ b/Package.swift @@ -16,6 +16,7 @@ let package = Package( dependencies: [ .package(url: "https://github.com/apple/swift-log.git", from: "1.7.1"), .package(url: "https://github.com/apple/swift-nio.git", from: "2.91.0"), + .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.36.0"), .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.26.0"), .package(url: "https://github.com/apple/swift-configuration.git", from: "1.0.0"), ], @@ -26,6 +27,9 @@ let package = Package( .product(name: "Logging", package: "swift-log"), .product(name: "NIOCore", package: "swift-nio"), .product(name: "NIOPosix", package: "swift-nio"), + .product(name: "NIOWebSocket", package: "swift-nio"), + .product(name: "NIOHTTP1", package: "swift-nio"), + .product(name: "NIOSSL", package: "swift-nio-ssl", condition: .when(platforms: [.linux, .macOS, .android])), .product(name: "NIOTransportServices", package: "swift-nio-transport-services"), .product(name: "Configuration", package: "swift-configuration"), ], diff --git a/README.md b/README.md index 2a99ead..517fa2a 100644 --- a/README.md +++ b/README.md @@ -14,12 +14,12 @@ It defines a text based wire-format for messages passed between these clients an STOMP has been in active use for several years and is supported by many message brokers and client libraries. STOMPNIO is a Swift NIO based implementation of a STOMP client. It supports: -- [x] STOMP versions 1.0, 1.1, and 1.2 -- [ ] Unencrypted and encrypted (via TLS) connections -- [ ] WebSocket connections -- [x] POSIX sockets -- [x] Apple's Network framework via [NIOTransportServices](https://github.com/apple/swift-nio-transport-services) (required for iOS) -- [x] Unix domain sockets +- STOMP versions 1.0, 1.1, and 1.2 +- Unencrypted and encrypted (via TLS) connections +- WebSocket connections +- POSIX sockets +- Apple's Network framework via [NIOTransportServices](https://github.com/apple/swift-nio-transport-services) (required for iOS) +- Unix domain sockets ## Overview diff --git a/RabbitMQ/enabled_plugins b/RabbitMQ/enabled_plugins index 17b6ee0..f4af8e0 100644 --- a/RabbitMQ/enabled_plugins +++ b/RabbitMQ/enabled_plugins @@ -1 +1 @@ -[rabbitmq_stomp]. +[rabbitmq_stomp,rabbitmq_web_stomp]. diff --git a/RabbitMQ/rabbitmq.conf b/RabbitMQ/rabbitmq.conf index 602817d..94a7703 100644 --- a/RabbitMQ/rabbitmq.conf +++ b/RabbitMQ/rabbitmq.conf @@ -1,4 +1,21 @@ # awful security practice, # consider creating a new # user with secure generated credentials! -loopback_users = none \ No newline at end of file +loopback_users = none + +ssl_options.cacertfile = ./Certs/ca.pem +ssl_options.certfile = ./Certs/server.pem +ssl_options.keyfile = ./Certs/server.key +ssl_options.verify = verify_peer +ssl_options.fail_if_no_peer_cert = true + +stomp.listeners.tcp.1 = 61613 +# default TLS-enabled port for STOMP connections +stomp.listeners.ssl.1 = 61614 + +web_stomp.ssl.port = 15673 +web_stomp.ssl.backlog = 1024 +web_stomp.ssl.cacertfile = ./Certs/ca.pem +web_stomp.ssl.certfile = ./Certs/server.pem +web_stomp.ssl.keyfile = ./Certs/server.key +web_stomp.ssl.password = STOMPNIOClientCertPassword \ No newline at end of file diff --git a/Scripts/generate-certs.sh b/Scripts/generate-certs.sh new file mode 100755 index 0000000..b88bec3 --- /dev/null +++ b/Scripts/generate-certs.sh @@ -0,0 +1,100 @@ +#!/bin/bash + +set -eu + +SCRIPT_HOME=$(dirname "$0") +FULL_HOME="$(pwd)"/"$SCRIPT_HOME" +SERVER=fpseverino.com + +function generateCA() { + SUBJECT=$1 + openssl req \ + -nodes \ + -x509 \ + -sha256 \ + -newkey rsa:2048 \ + -subj "$SUBJECT" \ + -days 365 \ + -keyout ca.key \ + -out ca.pem + openssl x509 -in ca.pem -out ca.der -outform DER +} + +function generateServerCertificate() { + SUBJECT=$1 + NAME=$2 + openssl req \ + -new \ + -nodes \ + -sha256 \ + -subj "$SUBJECT" \ + -extensions v3_req \ + -reqexts SAN \ + -config <(cat "$FULL_HOME"/openssl.cnf <(printf "[SAN]\nsubjectAltName=DNS:$SERVER\n")) \ + -keyout "$NAME".key \ + -out "$NAME".csr + + openssl x509 \ + -req \ + -sha256 \ + -in "$NAME".csr \ + -CA ca.pem \ + -CAkey ca.key \ + -CAcreateserial \ + -extfile <(cat "$FULL_HOME"/openssl.cnf <(printf "subjectAltName=DNS:$SERVER\n")) \ + -extensions v3_req \ + -out "$NAME".pem \ + -days 365 +} + +function generateClientCertificate() { + SUBJECT=$1 + NAME=$2 + #PASSWORD=$(openssl rand -base64 29 | tr -d "=+/" | cut -c1-25) + PASSWORD="STOMPNIOClientCertPassword" + openssl req \ + -new \ + -nodes \ + -sha256 \ + -subj "$SUBJECT" \ + -keyout "$NAME".key \ + -out "$NAME".csr + + openssl x509 \ + -req \ + -sha256 \ + -in "$NAME".csr \ + -CA ca.pem \ + -CAkey ca.key \ + -CAcreateserial \ + -out "$NAME".pem \ + -days 365 + + openssl pkcs12 -export -passout pass:"$PASSWORD" -out "$NAME".p12 -in "$NAME".pem -inkey "$NAME".key + + echo "Password: $PASSWORD" +} + +cd "$SCRIPT_HOME"/../Certs/ + +OUTPUT_ROOT=1 +OUTPUT_CLIENT=1 +OUTPUT_SERVER=1 + +while getopts 'sc' option +do + case $option in + s) OUTPUT_ROOT=0;OUTPUT_SERVER=1;OUTPUT_CLIENT=0 ;; + c) OUTPUT_ROOT=0;OUTPUT_SERVER=0;OUTPUT_CLIENT=1 ;; + esac +done + +if test "$OUTPUT_ROOT" == 1; then + generateCA "/C=IT/ST=Sicily/L=Enna/O=STOMPNIO/OU=CA/CN=${SERVER}" +fi +if test "$OUTPUT_SERVER" == 1; then + generateServerCertificate "/C=IT/ST=Sicily/L=Enna/O=STOMPNIO/OU=Server/CN=${SERVER}" server +fi +if test "$OUTPUT_CLIENT" == 1; then + generateClientCertificate "/C=IT/ST=Sicily/L=Enna/O=STOMPNIO/OU=Client/CN=${SERVER}" client +fi diff --git a/Scripts/openssl.cnf b/Scripts/openssl.cnf new file mode 100644 index 0000000..e411c51 --- /dev/null +++ b/Scripts/openssl.cnf @@ -0,0 +1,29 @@ +[ req ] +#default_bits = 2048 +#default_md = sha256 +#default_keyfile = privkey.pem +distinguished_name = req_distinguished_name +attributes = req_attributes +req_extensions = v3_req + +[ req_distinguished_name ] +countryName = Country Name (2 letter code) +countryName_min = 2 +countryName_max = 2 +stateOrProvinceName = State or Province Name (full name) +localityName = Locality Name (eg, city) +0.organizationName = Organization Name (eg, company) +organizationalUnitName = Organizational Unit Name (eg, section) +commonName = Common Name (eg, fully qualified host name) +commonName_max = 64 +emailAddress = Email Address +emailAddress_max = 64 + +[ req_attributes ] +challengePassword = A challenge password +challengePassword_min = 4 +challengePassword_max = 20 + +[ v3_req ] +keyUsage = digitalSignature, keyEncipherment +extendedKeyUsage = serverAuth, clientAuth diff --git a/Sources/STOMPNIO/Connection/STOMPConnection.swift b/Sources/STOMPNIO/Connection/STOMPConnection.swift index eb7d1da..f51574c 100644 --- a/Sources/STOMPNIO/Connection/STOMPConnection.swift +++ b/Sources/STOMPNIO/Connection/STOMPConnection.swift @@ -1,8 +1,14 @@ public import Logging public import NIOCore +import NIOHTTP1 public import NIOPosix +import NIOWebSocket import Synchronization +#if os(macOS) || os(Linux) || os(Android) +import NIOSSL +#endif + #if canImport(Network) import Network import NIOTransportServices @@ -201,46 +207,69 @@ public final actor STOMPConnection: Sendable { ) -> EventLoopFuture { eventLoop.assertInEventLoop() - let bootstrap: any NIOClientTCPBootstrapProtocol - #if canImport(Network) - if let tsBootstrap = createTSBootstrap(eventLoopGroup: eventLoop, tlsOptions: nil) { - bootstrap = tsBootstrap - } else { - #if os(iOS) || os(tvOS) - logger.warning( - "Running BSD sockets on iOS or tvOS is not recommended. Please use NIOTSEventLoopGroup, to run with the Network framework" - ) - #endif - bootstrap = self.createSocketsBootstrap(eventLoopGroup: eventLoop) - } - #else - bootstrap = self.createSocketsBootstrap(eventLoopGroup: eventLoop) - #endif - - let connect = bootstrap.channelInitializer { channel in - do { - try self._setupChannel(channel, configuration: configuration, logger: logger) - return eventLoop.makeSucceededVoidFuture() - } catch { - return eventLoop.makeFailedFuture(error) + let host = + switch address.value { + case .hostname(let hostname, _): + hostname + case .unixDomainSocket(let path): + path } - } - let future: EventLoopFuture - switch address.value { - case .hostname(let host, let port): - future = connect.connect(host: host, port: port) - future.whenSuccess { _ in - logger.debug("Client connected to \(host):\(port)") + let channelPromise = eventLoop.makePromise(of: (any Channel).self) + + do { + let bootstrap = try Self.createBootstrap(configuration: configuration, eventLoopGroup: eventLoop, host: host, logger: logger) + + let connect = bootstrap.channelInitializer { channel in + do { + if let webSocketConfiguration = configuration.webSocket { + // Prepare for WebSockets and on upgrade add handlers + let promise = eventLoop.makePromise(of: Void.self) + promise.futureResult.map { _ in channel }.cascade(to: channelPromise) + + return Self._setupChannelForWebSockets( + channel, + address: address, + configuration: configuration, + webSocketConfiguration: webSocketConfiguration, + upgradePromise: promise + ) { + try self._setupChannel(channel, configuration: configuration, logger: logger) + } + } else { + try self._setupChannel(channel, configuration: configuration, logger: logger) + } + return eventLoop.makeSucceededVoidFuture() + } catch { + channelPromise.fail(error) + return eventLoop.makeFailedFuture(error) + } } - case .unixDomainSocket(let path): - future = connect.connect(unixDomainSocketPath: path) - future.whenSuccess { _ in - logger.debug("Client connected to socket path \(path)") + + let future: EventLoopFuture + switch address.value { + case .hostname(let host, let port): + future = connect.connect(host: host, port: port) + future.whenSuccess { _ in + logger.debug("Client connected to \(host):\(port)") + } + case .unixDomainSocket(let path): + future = connect.connect(unixDomainSocketPath: path) + future.whenSuccess { _ in + logger.debug("Client connected to socket path \(path)") + } } + + future.map { channel in + if configuration.webSocket == nil { + channelPromise.succeed(channel) + } + }.cascadeFailure(to: channelPromise) + } catch { + channelPromise.fail(error) } - return future.flatMapThrowing { channel in + return channelPromise.futureResult.flatMapThrowing { channel in let handler = try channel.pipeline.syncOperations.handler(type: STOMPChannelHandler.self) return STOMPConnection( channel: channel, @@ -307,28 +336,131 @@ public final actor STOMPConnection: Sendable { return stompChannelHandler } - /// Create a BSD sockets based bootstrap - private static func createSocketsBootstrap(eventLoopGroup: any EventLoopGroup) -> ClientBootstrap { - ClientBootstrap(group: eventLoopGroup) + private static func _setupChannelForWebSockets( + _ channel: any Channel, + address: STOMPServerAddress, + configuration: STOMPConnectionConfiguration, + webSocketConfiguration: STOMPConnectionConfiguration.WebSocket, + upgradePromise promise: EventLoopPromise, + afterHandlerAdded: @Sendable @escaping () throws -> Void + ) -> EventLoopFuture { + var hostHeader: String { + if case .enable(_, let sniServerName) = configuration.tls.base, let sniServerName { + return sniServerName + } + switch (configuration.tls.base, address.value) { + case (.enable, .hostname(let host, let port)) where port != 443: + return "\(host):\(port)" + case (.disable, .hostname(let host, let port)) where port != 80: + return "\(host):\(port)" + case (.enable, .hostname(let host, _)), (.disable, .hostname(let host, _)): + return host + case (.enable, .unixDomainSocket(let path)), (.disable, .unixDomainSocket(let path)): + return path + } + } + + // Initial HTTP request handler, before upgrade + let httpHandler = STOMPWebSocketInitialRequestChannelHandler( + host: hostHeader, + urlPath: webSocketConfiguration.urlPath, + additionalHeaders: webSocketConfiguration.initialRequestHeaders, + upgradePromise: promise + ) + + // Create random request key + let requestKey = (0..<16).map { _ in UInt8.random(in: .min ... .max) } + let websocketUpgrader = NIOWebSocketClientUpgrader( + requestKey: Data(requestKey).base64EncodedString(), + maxFrameSize: webSocketConfiguration.maxFrameSize + ) { channel, _ in + let future = channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(STOMPWebSocketChannelHandler()) + try afterHandlerAdded() + } + future.cascade(to: promise) + return future + } + let upgradeConfig: NIOHTTPClientUpgradeSendableConfiguration = ( + upgraders: [websocketUpgrader], + completionHandler: { _ in + channel.pipeline.removeHandler(httpHandler, promise: nil) + } + ) + + // Add HTTP handler with WebSocket upgrade + return channel.pipeline.addHTTPClientHandlers(withClientUpgrade: upgradeConfig).flatMap { + channel.pipeline.addHandler(httpHandler) + } } - #if canImport(Network) - /// Create a NIOTransportServices bootstrap using Network.framework - private static func createTSBootstrap( + private static func createBootstrap( + configuration: STOMPConnectionConfiguration, eventLoopGroup: any EventLoopGroup, - tlsOptions: NWProtocolTLS.Options? - ) -> NIOTSConnectionBootstrap? { - guard - let bootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoopGroup) - else { - return nil + host: String, + logger: Logger + ) throws -> NIOClientTCPBootstrap { + var serverName: String { + if case .enable(_, let sniServerName) = configuration.tls.base, let sniServerName { + sniServerName + } else { + host + } } - if let tlsOptions { - return bootstrap.tlsOptions(tlsOptions) + + let bootstrap: NIOClientTCPBootstrap + #if canImport(Network) + // If the EventLoop is compatible with NIOTransportServices create a `NIOTSConnectionBootstrap` + if let tsBootstrap = NIOTSConnectionBootstrap(validatingGroup: eventLoopGroup) { + // Create `NIOClientTCPBootstrap` with NIOTS TLS provider + let options: NWProtocolTLS.Options + if case .enable(let tlsConfigType, _) = configuration.tls.base { + switch tlsConfigType { + case .ts(let tsConfig): + options = try tsConfig.getNWProtocolTLSOptions(logger: logger) + #if os(macOS) || os(Linux) || os(Android) + case .niossl: + throw STOMPClientError.wrongTLSConfig + #endif + } + } else { + options = NWProtocolTLS.Options() + } + sec_protocol_options_set_tls_server_name(options.securityProtocolOptions, serverName) + let tlsProvider = NIOTSClientTLSProvider(tlsOptions: options) + bootstrap = NIOClientTCPBootstrap(tsBootstrap, tls: tlsProvider) + if case .enable = configuration.tls.base { + return bootstrap.enableTLS() + } + return bootstrap } - return bootstrap + #endif + + #if os(macOS) || os(Linux) || os(Android) + if let clientBootstrap = ClientBootstrap(validatingGroup: eventLoopGroup) { + if case .enable(let tlsConfig, _) = configuration.tls.base { + let tlsConfiguration: TLSConfiguration + switch tlsConfig { + case .niossl(let config): + tlsConfiguration = config + #if os(macOS) + case .ts: + throw STOMPClientError.wrongTLSConfig + #endif + } + let sslContext = try NIOSSLContext(configuration: tlsConfiguration) + let tlsProvider = try NIOSSLClientTLSProvider(context: sslContext, serverHostname: serverName) + bootstrap = NIOClientTCPBootstrap(clientBootstrap, tls: tlsProvider) + return bootstrap.enableTLS() + } else { + bootstrap = NIOClientTCPBootstrap(clientBootstrap, tls: NIOInsecureNoTLS()) + } + return bootstrap + } + #endif + + preconditionFailure("Cannot create bootstrap for the supplied EventLoop") } - #endif @usableFromInline func sendFrame( diff --git a/Sources/STOMPNIO/Connection/STOMPConnectionConfiguration+ConfigReader.swift b/Sources/STOMPNIO/Connection/STOMPConnectionConfiguration+ConfigReader.swift index 0cb7d54..1c7088a 100644 --- a/Sources/STOMPNIO/Connection/STOMPConnectionConfiguration+ConfigReader.swift +++ b/Sources/STOMPNIO/Connection/STOMPConnectionConfiguration+ConfigReader.swift @@ -1,4 +1,5 @@ public import Configuration +import NIOHTTP1 extension STOMPConnectionConfiguration { /// Creates a new STOMP connection configuration using values from the provided reader. @@ -12,6 +13,11 @@ extension STOMPConnectionConfiguration { /// - `stomp.connectTimeout` (int, optional, default: `10`): Maximum time to wait for the `CONNECTED` frame, in seconds. /// - `stomp.receiptTimeout` (int, optional, default: `30`): Maximum time to wait for a `RECEIPT` frame, in seconds. /// - `stomp.connectHeaders` (string array, optional): Additional user defined headers to include in the `CONNECT` frame, in the `:` format. + /// - `stomp.webSocket.urlPath` (string, optional): The URL path to use when establishing the WebSocket connection. + /// - `stomp.webSocket.maxFrameSize` (int, optional): The maximum frame size for the WebSocket connection. + /// - `stomp.webSocket.initialRequestHeaders` (string array, optional): Initial HTTP headers to include in the WebSocket handshake request. + /// + /// > Note: TLS configuration is not read from the `ConfigReader` and is disabled by default. You must set the `tls` property manually after initialization. /// /// - Parameter config: The config reader to read configuration values from. public init(config: ConfigReader) { @@ -47,6 +53,26 @@ extension STOMPConnectionConfiguration { } else { [:] } + + let stompWebSocketConfig = stompConfig.scoped(to: "webSocket") + let urlPath = stompWebSocketConfig.string(forKey: "urlPath") + let maxFrameSize = stompWebSocketConfig.int(forKey: "maxFrameSize") + let initialRequestHeaders = stompWebSocketConfig.stringArray(forKey: "initialRequestHeaders").flatMap { + HTTPHeaders(configStringArray: $0) + } + self.webSocket = + if urlPath != nil || maxFrameSize != nil || initialRequestHeaders != nil { + .init( + urlPath: urlPath ?? "/ws", + maxFrameSize: maxFrameSize ?? 1 << 14, + initialRequestHeaders: initialRequestHeaders ?? [:] + ) + } else { + nil + } + + // TLS is disabled by default + self.tls = .disable } } @@ -66,3 +92,24 @@ extension STOMPHeader: ExpressibleByConfigString { self.init(name: name, value: value) } } + +extension HTTPHeaders { + /// Creates HTTP headers from an array of configuration strings. + /// + /// Each configuration string must be in the `:` format. + /// + /// - Parameter configStringArray: The array of configuration strings to create the HTTP headers from. + fileprivate init?(configStringArray: [String]) { + var headers = HTTPHeaders() + for configString in configStringArray { + guard let colonIndex = configString.firstIndex(of: ":") else { + return nil + } + let name = String(configString[.. Self { + .init(base: .enable(configuration, tlsServerName)) + } + } + /// Authentication credentials for accessing a STOMP server. /// /// Use this structure to provide user ID and password credentials @@ -21,11 +70,42 @@ public struct STOMPConnectionConfiguration: Sendable { } } + /// WebSocket configuration for the STOMP connection. + public struct WebSocket: Sendable { + /// WebSocket URL. + public var urlPath: String + /// The maximum frame size the WebSocket client will allow. + public var maxFrameSize: Int + /// Additional headers to add to the initial HTTP request. + public var initialRequestHeaders: HTTPHeaders + + /// Creates a new WebSocket configuration. + /// + /// - Parameters: + /// - urlPath: WebSocket URL, defaults to "/ws". + /// - maxFrameSize: The maximum frame size the WebSocket client will allow. + /// - initialRequestHeaders: Additional headers to add to the initial HTTP request. + public init( + urlPath: String = "/ws", + maxFrameSize: Int = 1 << 14, + initialRequestHeaders: HTTPHeaders = [:] + ) { + self.urlPath = urlPath + self.maxFrameSize = maxFrameSize + self.initialRequestHeaders = initialRequestHeaders + } + } + /// Optional authentication credentials for accessing the STOMP server. /// /// Set this property when connecting to a server that requires authentication. public var authentication: Authentication? + /// TLS configuration for the connection. + /// + /// Use `.disable` for unencrypted connections or `.enable(...)` for secure connections. + public var tls: TLS + /// The name of a virtual host that the client wishes to connect to. /// /// It is recommended clients set this to the host name that the socket was established against, or to any name of their choosing. @@ -65,6 +145,9 @@ public struct STOMPConnectionConfiguration: Sendable { /// Additional user defined headers to include in the `CONNECT` frame. public var connectHeaders: STOMPHeaders + /// WebSocket configuration for the STOMP connection. + public var webSocket: WebSocket? + /// Creates a new STOMP connection configuration. /// /// Use this initializer to create a configuration object @@ -76,20 +159,26 @@ public struct STOMPConnectionConfiguration: Sendable { /// - heartBeat: The heart-beating configuration for the STOMP connection. Defaults to no heart-beating. /// - connectTimeout: Maximum time to wait for the `CONNECTED` frame. Defaults to 10 seconds. /// - receiptTimeout: Maximum time to wait for a `RECEIPT` frame. Defaults to 30 seconds. + /// - tls: TLS configuration for secure connections. Defaults to `.disable` for unencrypted connections. /// - connectHeaders: Additional user defined headers to include in the `CONNECT` frame. + /// - webSocket: WebSocket configuration for the STOMP connection. Defaults to `nil` for non-WebSocket connections. public init( authentication: Authentication? = nil, virtualHost: String? = nil, heartBeat: (outgoing: Duration, incoming: Duration) = (outgoing: .milliseconds(0), incoming: .milliseconds(0)), connectTimeout: Duration = .seconds(10), receiptTimeout: Duration = .seconds(30), - connectHeaders: STOMPHeaders = [:] + tls: TLS = .disable, + connectHeaders: STOMPHeaders = [:], + webSocket: WebSocket? = nil ) { self.authentication = authentication self.virtualHost = virtualHost self.heartBeat = heartBeat self.connectTimeout = connectTimeout self.receiptTimeout = receiptTimeout + self.tls = tls self.connectHeaders = connectHeaders + self.webSocket = webSocket } } diff --git a/Sources/STOMPNIO/Connection/TSTLSConfiguration.swift b/Sources/STOMPNIO/Connection/TSTLSConfiguration.swift new file mode 100644 index 0000000..4af2b71 --- /dev/null +++ b/Sources/STOMPNIO/Connection/TSTLSConfiguration.swift @@ -0,0 +1,249 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the MQTTNIO project +// +// Copyright (c) 2020-2022 Adam Fowler +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if canImport(Network) +@preconcurrency public import Security + +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +import Logging +import Network +#if os(macOS) || os(Linux) || os(Android) +import NIOSSL +#endif + +/// TLS Version enumeration +public enum TSTLSVersion: Sendable { + case tlsV12 + case tlsV13 + + var tlsProtocolVersion: tls_protocol_version_t { + switch self { + case .tlsV12: + return .TLSv12 + case .tlsV13: + return .TLSv13 + } + } +} + +extension tls_protocol_version_t { + var tsTLSVersion: TSTLSVersion { + switch self { + case .TLSv12: + return .tlsV12 + case .TLSv13: + return .tlsV13 + default: + preconditionFailure("Invalid TLS version") + } + } +} + +/// Certificate verification modes. +public enum TSCertificateVerification: Sendable { + /// All certificate verification disabled. + case none + + /// Certificates will be validated against the trust store and checked + /// against the hostname of the service we are contacting. + case fullVerification +} + +/// TLS configuration for NIO Transport Services +public struct TSTLSConfiguration: Sendable { + /// Error loading TLS files + public enum Error: Swift.Error { + case invalidData + } + + /// Struct defining an array of certificates + public struct Certificates { + let certificates: [SecCertificate] + + /// Create certificate array from already loaded SecCertificate array + public static func certificates(_ secCertificates: [SecCertificate]) -> Self { .init(certificates: secCertificates) } + + #if os(macOS) || os(Linux) || os(Android) + /// Create certificate array from PEM file + public static func pem(_ filename: String) throws -> Self { + let certificates = try NIOSSLCertificate.fromPEMFile(filename) + let secCertificates = try certificates.map { certificate -> SecCertificate in + guard let certificate = try SecCertificateCreateWithData(nil, Data(certificate.toDERBytes()) as CFData) else { + throw TSTLSConfiguration.Error.invalidData + } + return certificate + } + return .init(certificates: secCertificates) + } + #endif + + /// Create certificate array from DER file + public static func der(_ filename: String) throws -> Self { + let certificateData = try Data(contentsOf: URL(fileURLWithPath: filename)) + guard let secCertificate = SecCertificateCreateWithData(nil, certificateData as CFData) else { + throw TSTLSConfiguration.Error.invalidData + } + return .init(certificates: [secCertificate]) + } + } + + /// Struct defining identity + public struct Identity { + let identity: SecIdentity + + /// Create Identity from already loaded SecIdentity + public static func secIdentity(_ secIdentity: SecIdentity) -> Self { .init(identity: secIdentity) } + + /// Create Identity from p12 file + public static func p12(filename: String, password: String) throws -> Self { + let data = try Data(contentsOf: URL(fileURLWithPath: filename)) + let options: [String: String] = [kSecImportExportPassphrase as String: password] + var rawItems: CFArray? + guard SecPKCS12Import(data as CFData, options as CFDictionary, &rawItems) == errSecSuccess else { + throw TSTLSConfiguration.Error.invalidData + } + let items = rawItems! as! [[String: Any]] + guard let firstItem = items.first, + let secIdentity = firstItem[kSecImportItemIdentity as String] as! SecIdentity? + else { + throw TSTLSConfiguration.Error.invalidData + } + return .init(identity: secIdentity) + } + } + + /// The minimum TLS version to allow in negotiation. Defaults to tlsv12. + public var minimumTLSVersion: TSTLSVersion + + /// The maximum TLS version to allow in negotiation. If nil, there is no upper limit. Defaults to nil. + public var maximumTLSVersion: TSTLSVersion? + + /// The trust roots to use to validate certificates. This only needs to be provided if you intend to validate + /// certificates. + public var trustRoots: [SecCertificate]? + + /// The identity associated with the leaf certificate. + public var clientIdentity: SecIdentity? + + /// The application protocols to use in the connection. + public var applicationProtocols: [String] + + /// Whether to verify remote certificates. + public var certificateVerification: TSCertificateVerification + + /// Initialize TSTLSConfiguration + /// - Parameters: + /// - minimumTLSVersion: minimum version of TLS supported + /// - maximumTLSVersion: maximum version of TLS supported + /// - trustRoots: The trust roots to use to validate certificates + /// - clientIdentity: Client identity + /// - applicationProtocols: The application protocols to use in the connection + /// - certificateVerification: Should certificates be verified + public init( + minimumTLSVersion: TSTLSVersion = .tlsV12, + maximumTLSVersion: TSTLSVersion? = nil, + trustRoots: [SecCertificate]? = nil, + clientIdentity: SecIdentity? = nil, + applicationProtocols: [String] = [], + certificateVerification: TSCertificateVerification = .fullVerification + ) { + self.minimumTLSVersion = minimumTLSVersion + self.maximumTLSVersion = maximumTLSVersion + self.trustRoots = trustRoots + self.clientIdentity = clientIdentity + self.applicationProtocols = applicationProtocols + self.certificateVerification = certificateVerification + } + + /// Initialize TSTLSConfiguration + /// - Parameters: + /// - minimumTLSVersion: minimum version of TLS supported + /// - maximumTLSVersion: maximum version of TLS supported + /// - trustRoots: The trust roots to use to validate certificates + /// - clientIdentity: Client identity + /// - applicationProtocols: The application protocols to use in the connection + /// - certificateVerification: Should certificates be verified + public init( + minimumTLSVersion: TSTLSVersion = .tlsV12, + maximumTLSVersion: TSTLSVersion? = nil, + trustRoots: Certificates, + clientIdentity: Identity, + applicationProtocols: [String] = [], + certificateVerification: TSCertificateVerification = .fullVerification + ) { + self.minimumTLSVersion = minimumTLSVersion + self.maximumTLSVersion = maximumTLSVersion + self.trustRoots = trustRoots.certificates + self.clientIdentity = clientIdentity.identity + self.applicationProtocols = applicationProtocols + self.certificateVerification = certificateVerification + } +} + +extension TSTLSConfiguration { + func getNWProtocolTLSOptions(logger: Logger) throws -> NWProtocolTLS.Options { + let options = NWProtocolTLS.Options() + + // minimum TLS protocol + sec_protocol_options_set_min_tls_protocol_version(options.securityProtocolOptions, self.minimumTLSVersion.tlsProtocolVersion) + + // maximum TLS protocol + if let maximumTLSVersion = self.maximumTLSVersion { + sec_protocol_options_set_max_tls_protocol_version(options.securityProtocolOptions, maximumTLSVersion.tlsProtocolVersion) + } + + if let clientIdentity = self.clientIdentity, let secClientIdentity = sec_identity_create(clientIdentity) { + sec_protocol_options_set_local_identity(options.securityProtocolOptions, secClientIdentity) + } + + for applicationProtocol in self.applicationProtocols { + sec_protocol_options_add_tls_application_protocol(options.securityProtocolOptions, applicationProtocol) + } + + if self.certificateVerification != .fullVerification || self.trustRoots != nil { + // add verify block to control certificate verification + // swift-format-ignore: AlwaysUseLowerCamelCase + sec_protocol_options_set_verify_block( + options.securityProtocolOptions, + { _, sec_trust, sec_protocol_verify_complete in + guard self.certificateVerification != .none else { + sec_protocol_verify_complete(true) + return + } + + let trust = sec_trust_copy_ref(sec_trust).takeRetainedValue() + if let trustRootCertificates = self.trustRoots { + SecTrustSetAnchorCertificates(trust, trustRootCertificates as CFArray) + } + SecTrustEvaluateAsyncWithError(trust, Self.tlsDispatchQueue) { _, result, error in + if let error { + logger.error("Trust failed: \(error.localizedDescription)") + } + sec_protocol_verify_complete(result) + } + }, + Self.tlsDispatchQueue + ) + } + return options + } + + /// Dispatch queue used by Network framework TLS to control certificate verification + static let tlsDispatchQueue = DispatchQueue(label: "TSTLSConfiguration") +} +#endif diff --git a/Sources/STOMPNIO/STOMPClientError.swift b/Sources/STOMPNIO/STOMPClientError.swift index 0fe8e69..81384a7 100644 --- a/Sources/STOMPNIO/STOMPClientError.swift +++ b/Sources/STOMPNIO/STOMPClientError.swift @@ -16,6 +16,9 @@ public struct STOMPClientError: Error, Sendable, Equatable { case missingHeader /// Connection closed because it timed out while waiting for RECEIPT or CONNECTED frame case timeout + /// You have provided the wrong TLS configuration for the EventLoopGroup you provided + case wrongTLSConfig + case websocketUpgradeFailed } let base: Base @@ -38,6 +41,9 @@ public struct STOMPClientError: Error, Sendable, Equatable { public static let missingHeader = Self(.missingHeader) /// Connection closed because it timed out while waiting for RECEIPT or CONNECTED frame public static let timeout = Self(.timeout) + /// You have provided the wrong TLS configuration for the EventLoopGroup you provided + public static let wrongTLSConfig = Self(.wrongTLSConfig) + public static let websocketUpgradeFailed = Self(.websocketUpgradeFailed) public var description: String { self.base.rawValue @@ -115,6 +121,11 @@ public struct STOMPClientError: Error, Sendable, Equatable { /// Connection closed because it timed out while waiting for RECEIPT or CONNECTED frame public static let timeout = Self(errorType: .timeout) + /// You have provided the wrong TLS configuration for the EventLoopGroup you provided + public static let wrongTLSConfig = Self(errorType: .wrongTLSConfig) + + public static let websocketUpgradeFailed = Self(errorType: .websocketUpgradeFailed) + public static func == (lhs: STOMPClientError, rhs: STOMPClientError) -> Bool { lhs.backing == rhs.backing } diff --git a/Sources/STOMPNIO/Utils/String+trimmingWhitespace.swift b/Sources/STOMPNIO/Utils/StringProtocol+trimmingWhitespace.swift similarity index 70% rename from Sources/STOMPNIO/Utils/String+trimmingWhitespace.swift rename to Sources/STOMPNIO/Utils/StringProtocol+trimmingWhitespace.swift index 2c17cea..9db46bd 100644 --- a/Sources/STOMPNIO/Utils/String+trimmingWhitespace.swift +++ b/Sources/STOMPNIO/Utils/StringProtocol+trimmingWhitespace.swift @@ -1,9 +1,9 @@ -extension String { - func trimmingWhitespace() -> Substring { +extension StringProtocol { + func trimmingWhitespace() -> Self.SubSequence { self.trimmingWhitespacePrefix().trimmingWhitespaceSuffix() } - func endOfWhitespacePrefix() -> String.Index { + func endOfWhitespacePrefix() -> Self.Index { var index = self.startIndex while index < self.endIndex, self[index].isWhitespace { formIndex(after: &index) @@ -11,14 +11,12 @@ extension String { return index } - func trimmingWhitespacePrefix() -> Substring { + func trimmingWhitespacePrefix() -> Self.SubSequence { let start = self.endOfWhitespacePrefix() return self[start.. Substring.Index { + func startOfWhitespaceSuffix() -> Self.Index { var index = self.endIndex while index > self.startIndex { let after = index @@ -30,7 +28,7 @@ extension Substring { return index } - func trimmingWhitespaceSuffix() -> Substring { + func trimmingWhitespaceSuffix() -> Self.SubSequence { let end = self.startOfWhitespaceSuffix() return self[self.startIndex.. = .init(false) + + /// Write `ByteBuffer`s as a WebSocket frame + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + guard context.channel.isActive else { return } + + let buffer = unwrapOutboundIn(data) + self.send(context: context, buffer: buffer, opcode: .binary, fin: true, promise: promise) + } + + /// Read WebSocket frame + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let frame = self.unwrapInboundIn(data) + + switch frame.opcode { + case .text, .binary: + if var frameSeq = self.webSocketFrameSequence { + frameSeq.append(frame) + self.webSocketFrameSequence = frameSeq + } else { + var frameSeq = WebSocketFrameSequence(type: frame.opcode) + frameSeq.append(frame) + self.webSocketFrameSequence = frameSeq + } + case .continuation: + if var frameSeq = self.webSocketFrameSequence { + frameSeq.append(frame) + self.webSocketFrameSequence = frameSeq + } else { + self.close(context: context, code: .protocolError, promise: nil) + } + case .connectionClose: + self.receivedClose(context: context) + default: + break + } + + if let frameSeq = self.webSocketFrameSequence, frame.fin { + switch frameSeq.type { + case .binary, .text: + context.fireChannelRead(wrapInboundOut(frameSeq.buffer)) + default: break + } + self.webSocketFrameSequence = nil + } + } + + /// Send WebSocket frame to server + private func send( + context: ChannelHandlerContext, + buffer: ByteBuffer, + opcode: WebSocketOpcode, + fin: Bool = true, + promise: EventLoopPromise? = nil + ) { + let maskKey = self.makeMaskKey() + let frame = WebSocketFrame(fin: fin, opcode: opcode, maskKey: maskKey, data: buffer) + context.writeAndFlush(wrapOutboundOut(frame), promise: promise) + } + + private func receivedClose(context: ChannelHandlerContext) { + // Handle a received close frame. We're just going to close. + self.isClosed.store(true, ordering: .relaxed) + context.close(promise: nil) + } + + /// Make mask key to be used in WebSocket frame + func makeMaskKey() -> WebSocketMaskingKey? { + let bytes: [UInt8] = (0..<4).map { _ in UInt8.random(in: .min ... .max) } + return WebSocketMaskingKey(bytes) + } + + /// Close WebSocket connection + func close(context: ChannelHandlerContext, code: WebSocketErrorCode = .goingAway, promise: EventLoopPromise?) { + guard self.isClosed.compareExchange(expected: false, desired: true, successOrdering: .relaxed, failureOrdering: .relaxed).exchanged + else { + promise?.succeed(()) + return + } + + let codeAsInt = UInt16(webSocketErrorCode: code) + let codeToSend: WebSocketErrorCode + if codeAsInt == 1005 || codeAsInt == 1006 { + // Code `1005` and `1006` are used to report errors to the application, + // but must never be sent over the wire (per https://tools.ietf.org/html/rfc6455#section-7.4) + codeToSend = .normalClosure + } else { + codeToSend = code + } + + var buffer = context.channel.allocator.buffer(capacity: 2) + buffer.write(webSocketErrorCode: codeToSend) + self.send(context: context, buffer: buffer, opcode: .connectionClose, fin: true, promise: promise) + } + + func channelInactive(context: ChannelHandlerContext) { + self.close(context: context, code: .unknown(1006), promise: nil) + + // We always forward the error on to let others see it. + context.fireChannelInactive() + } + + func errorCaught(context: ChannelHandlerContext, error: any Error) { + let errorCode: WebSocketErrorCode + if let error = error as? NIOWebSocketError { + errorCode = WebSocketErrorCode(error) + } else { + errorCode = .unexpectedServerError + } + self.close(context: context, code: errorCode, promise: nil) + + // We always forward the error on to let others see it. + context.fireErrorCaught(error) + } +} + +private struct WebSocketFrameSequence { + var buffer: ByteBuffer + var type: WebSocketOpcode + + init(type: WebSocketOpcode) { + self.buffer = ByteBufferAllocator().buffer(capacity: 0) + self.type = type + } + + mutating func append(_ frame: WebSocketFrame) { + var data = frame.unmaskedData + switch self.type { + case .binary, .text: + self.buffer.writeBuffer(&data) + default: break + } + } +} + +extension WebSocketErrorCode { + fileprivate init(_ error: NIOWebSocketError) { + switch error { + case .invalidFrameLength: + self = .messageTooLarge + case .fragmentedControlFrame, + .multiByteControlFrameLength: + self = .protocolError + } + } +} diff --git a/Sources/STOMPNIO/WebSockets/STOMPWebSocketInitialRequestChannelHandler.swift b/Sources/STOMPNIO/WebSockets/STOMPWebSocketInitialRequestChannelHandler.swift new file mode 100644 index 0000000..6af0a02 --- /dev/null +++ b/Sources/STOMPNIO/WebSockets/STOMPWebSocketInitialRequestChannelHandler.swift @@ -0,0 +1,64 @@ +import NIOCore +import NIOHTTP1 + +/// The HTTP handler to be used to initiate the request. +/// +/// This initial request will be adapted by the WebSocket upgrader to contain the upgrade header parameters. +/// `channelRead` will only be called if the upgrade fails. +final class STOMPWebSocketInitialRequestChannelHandler: ChannelInboundHandler, RemovableChannelHandler, Sendable { + typealias InboundIn = HTTPClientResponsePart + typealias OutboundOut = HTTPClientRequestPart + + let host: String + let urlPath: String + let additionalHeaders: HTTPHeaders + let upgradePromise: EventLoopPromise + + init(host: String, urlPath: String, additionalHeaders: HTTPHeaders, upgradePromise: EventLoopPromise) { + self.host = host + self.urlPath = urlPath + self.additionalHeaders = additionalHeaders + self.upgradePromise = upgradePromise + } + + func channelActive(context: ChannelHandlerContext) { + // We are connected. It's time to send the message to the server to initialize the upgrade dance. + var headers = HTTPHeaders() + headers.add(name: "Content-Length", value: "0") + headers.add(name: "host", value: self.host) + headers.add(name: "Sec-WebSocket-Protocol", value: "v12.stomp, v11.stomp, v10.stomp") + headers.add(contentsOf: self.additionalHeaders) + + let requestHead = HTTPRequestHead( + version: HTTPVersion(major: 1, minor: 1), + method: .GET, + uri: self.urlPath, + headers: headers + ) + + context.write(self.wrapOutboundOut(.head(requestHead)), promise: nil) + context.write(self.wrapOutboundOut(.body(.byteBuffer(ByteBuffer()))), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } + + /// This will be called only if the WebSocket upgrade fails. + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let clientResponse = self.unwrapInboundIn(data) + + switch clientResponse { + case .head: + self.upgradePromise.fail(STOMPClientError.websocketUpgradeFailed) + case .body: + break + case .end: + context.close(promise: nil) + } + } + + func errorCaught(context: ChannelHandlerContext, error: any Error) { + self.upgradePromise.fail(error) + // As we are not really interested in getting notified on success or failure, + // we just pass `nil` as the promise to reduce allocations. + context.close(promise: nil) + } +} diff --git a/Tests/STOMPNIOTests/STOMPConnectionTests.swift b/Tests/STOMPNIOTests/STOMPConnectionTests.swift index 7bf9a77..1174012 100644 --- a/Tests/STOMPNIOTests/STOMPConnectionTests.swift +++ b/Tests/STOMPNIOTests/STOMPConnectionTests.swift @@ -3,6 +3,7 @@ import Logging import NIOCore import NIOEmbedded import NIOFoundationCompat +import NIOPosix import STOMPNIO import Testing @@ -16,15 +17,23 @@ import Foundation import NIOTransportServices #endif +#if os(macOS) || os(Linux) || os(Android) +import NIOSSL +#endif + @Suite("STOMPConnection Tests", .serialized) struct STOMPConnectionTests { static let hostname = ProcessInfo.processInfo.environment["RABBITMQ_SERVER"] ?? "localhost" - @Test("Pub/Sub", arguments: STOMPSubscription.AckMode.allCases) - func pubSub(ackMode: STOMPSubscription.AckMode) async throws { + @Test("Pub/Sub", arguments: STOMPSubscription.AckMode.allCases, [STOMPConnectionConfiguration.WebSocket(), nil]) + func pubSub(ackMode: STOMPSubscription.AckMode, webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { try await withThrowingTaskGroup { group in group.addTask { - try await STOMPConnection.withConnection(address: .hostname(Self.hostname), logger: self.subscriberLogger) { connection in + try await STOMPConnection.withConnection( + address: .hostname(Self.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.subscriberLogger + ) { connection in try await connection.subscribe(to: "/queue/stomp-nio", ackMode: ackMode) { subscription in for try await frame in subscription { #expect(String(buffer: frame.body) == "Hello, STOMP over NIO!") @@ -35,7 +44,11 @@ struct STOMPConnectionTests { } group.addTask { - try await STOMPConnection.withConnection(address: .hostname(Self.hostname), logger: self.publisherLogger) { connection in + try await STOMPConnection.withConnection( + address: .hostname(Self.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.publisherLogger + ) { connection in try await connection.send("Hello, STOMP over NIO!", to: "/queue/stomp-nio") } } @@ -44,15 +57,21 @@ struct STOMPConnectionTests { } } - @Test("Publish Large Payload", arguments: STOMPSubscription.AckMode.allCases) - func publishLargePayload(ackMode: STOMPSubscription.AckMode) async throws { - let payloadSize = 65537 - let payloadData = Data(count: payloadSize) + @Test( + "Publish Large Payload", + arguments: STOMPSubscription.AckMode.allCases, [STOMPConnectionConfiguration.WebSocket(maxFrameSize: 70000), nil] + ) + func publishLargePayload(ackMode: STOMPSubscription.AckMode, webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { + let payloadData = Data(count: 65537) let payload = ByteBufferAllocator().buffer(data: payloadData) try await withThrowingTaskGroup { group in group.addTask { - try await STOMPConnection.withConnection(address: .hostname(Self.hostname), logger: self.subscriberLogger) { connection in + try await STOMPConnection.withConnection( + address: .hostname(Self.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.subscriberLogger + ) { connection in try await connection.subscribe(to: "/queue/large-payload", ackMode: ackMode) { subscription in for try await frame in subscription { var buffer = frame.body @@ -65,7 +84,11 @@ struct STOMPConnectionTests { } group.addTask { - try await STOMPConnection.withConnection(address: .hostname(Self.hostname), logger: self.publisherLogger) { connection in + try await STOMPConnection.withConnection( + address: .hostname(Self.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.publisherLogger + ) { connection in try await connection.send(payload, to: "/queue/large-payload", contentType: "application/octet-stream") } } @@ -103,6 +126,70 @@ struct STOMPConnectionTests { } } + // TODO: Check if this is a RabbitMQ bug or a STOMPNIO bug + @Test("Connect with WebSocket with TCP Port", .disabled()) + func webSocketTCPPort() async throws { + await #expect(throws: (any Error).self) { + try await STOMPConnection.withConnection( + address: .hostname(Self.hostname, port: 61613), + configuration: .init(webSocket: .init()), + logger: self.logger + ) { _ in } + } + } + + @Test("Connect with TLS", arguments: [STOMPConnectionConfiguration.WebSocket(), nil]) + func tlsConnect(webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { + try await STOMPConnection.withConnection( + address: .hostname(Self.hostname, port: webSocket == nil ? 61614 : 15673), + configuration: .init( + tls: .enable(Self.getTLSConfiguration(), tlsServerName: "fpseverino.com"), + webSocket: webSocket + ), + eventLoop: Self.eventLoopGroupSingleton.any(), + logger: self.logger + ) { connection in + try await connection.send("Hello, STOMP over TLS\(webSocket == nil ? "" : " and WebSockets")!", to: "/queue/tls") + } + + // Try consuming the message with a standard unencrypted TCP connection + try await STOMPConnection.withConnection(address: .hostname(Self.hostname), logger: self.logger) { connection in + try await connection.subscribe(to: "/queue/tls") { subscription in + for try await frame in subscription { + #expect(String(buffer: frame.body) == "Hello, STOMP over TLS\(webSocket == nil ? "" : " and WebSockets")!") + return + } + } + } + } + + #if canImport(Network) + @Test("Connect with TLS from P12") + func tlsConnectFromP12() async throws { + try await STOMPConnection.withConnection( + address: .hostname(Self.hostname, port: 61614), + configuration: .init( + tls: .enable( + .ts( + .init( + trustRoots: .der(Self.rootPath + "/Certs/ca.der"), + clientIdentity: .p12( + filename: Self.rootPath + "/Certs/client.p12", + password: "STOMPNIOClientCertPassword" + ) + ) + ), + tlsServerName: "fpseverino.com" + ) + ), + eventLoop: Self.eventLoopGroupSingleton.any(), + logger: self.logger + ) { connection in + try await connection.send("Test", to: "/queue/tls-p12") + } + } + #endif + @Test("Send Frame") func sendFrame() async throws { try await STOMPConnection.withConnection(address: .hostname(Self.hostname), logger: self.logger) { connection in @@ -194,11 +281,11 @@ struct STOMPConnectionTests { } } - @Test("Heart-beating with Real Broker") - func heartBeatingBroker() async throws { + @Test("Heart-beating with Real Broker", arguments: [STOMPConnectionConfiguration.WebSocket(), nil]) + func heartBeatingBroker(webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { try await STOMPConnection.withConnection( - address: .hostname(STOMPConnectionTests.hostname), - configuration: .init(heartBeat: (outgoing: .seconds(1), incoming: .seconds(1))), + address: .hostname(STOMPConnectionTests.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(heartBeat: (outgoing: .seconds(1), incoming: .seconds(1)), webSocket: webSocket), logger: self.logger ) { connection in try await Task.sleep(for: .seconds(5)) @@ -231,12 +318,16 @@ struct STOMPConnectionTests { .stringArray(["header1:value1", "header2: value2"]), isSecret: false ), + "stomp.webSocket.initialRequestHeaders": ConfigValue( + .stringArray(["X-Custom-Header: CustomValue", "X-Another-Header: AnotherValue"]), + isSecret: false + ), ] ) ) try await STOMPConnection.withConnection( - address: .hostname(STOMPConnectionTests.hostname), + address: .hostname(STOMPConnectionTests.hostname, port: 15674), configuration: .init(config: config), logger: self.logger ) { connection in @@ -281,9 +372,13 @@ struct STOMPConnectionTests { @Suite("Cancellation Tests") struct CancellationTests { - @Test("Cancellation") - func cancellation() async throws { - try await STOMPConnection.withConnection(address: .hostname(STOMPConnectionTests.hostname), logger: self.logger) { connection in + @Test("Cancellation", arguments: [STOMPConnectionConfiguration.WebSocket(), nil]) + func cancellation(webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { + try await STOMPConnection.withConnection( + address: .hostname(STOMPConnectionTests.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.logger + ) { connection in await withThrowingTaskGroup { group in group.addTask { await #expect(throws: STOMPClientError.cancelledTask) { @@ -299,9 +394,13 @@ struct STOMPConnectionTests { } } - @Test("Already Cancelled") - func alreadyCancelled() async throws { - try await STOMPConnection.withConnection(address: .hostname(STOMPConnectionTests.hostname), logger: self.logger) { connection in + @Test("Already Cancelled", arguments: [STOMPConnectionConfiguration.WebSocket(), nil]) + func alreadyCancelled(webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { + try await STOMPConnection.withConnection( + address: .hostname(STOMPConnectionTests.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.logger + ) { connection in await withThrowingTaskGroup(of: Void.self) { group in group.cancelAll() group.addTask { @@ -352,9 +451,13 @@ struct STOMPConnectionTests { @Suite("Transactions Tests") struct TransactionsTests { - @Test("Subscription Transaction", arguments: STOMPSubscription.AckMode.allCases) - func subscriptionTransaction(ackMode: STOMPSubscription.AckMode) async throws { - try await STOMPConnection.withConnection(address: .hostname(STOMPConnectionTests.hostname), logger: self.logger) { connection in + @Test("Subscription Transaction", arguments: STOMPSubscription.AckMode.allCases, [STOMPConnectionConfiguration.WebSocket(), nil]) + func subscriptionTransaction(ackMode: STOMPSubscription.AckMode, webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { + try await STOMPConnection.withConnection( + address: .hostname(STOMPConnectionTests.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.logger + ) { connection in try await connection.withTransaction { transaction in try await withThrowingTaskGroup { group in group.addTask { @@ -375,7 +478,11 @@ struct STOMPConnectionTests { } } - try await STOMPConnection.withConnection(address: .hostname(STOMPConnectionTests.hostname), logger: self.logger) { connection in + try await STOMPConnection.withConnection( + address: .hostname(STOMPConnectionTests.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.logger + ) { connection in try await withThrowingTaskGroup { group in group.addTask { try await connection.subscribe(to: "/queue/sub-transaction", ackMode: ackMode) { subscription in @@ -392,9 +499,13 @@ struct STOMPConnectionTests { } } - @Test("Send Transaction") - func sendTransaction() async throws { - try await STOMPConnection.withConnection(address: .hostname(STOMPConnectionTests.hostname), logger: self.logger) { connection in + @Test("Send Transaction", arguments: [STOMPConnectionConfiguration.WebSocket(), nil]) + func sendTransaction(webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { + try await STOMPConnection.withConnection( + address: .hostname(STOMPConnectionTests.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.logger + ) { connection in try await withThrowingTaskGroup { group in group.addTask { try await connection.withTransaction { transaction in @@ -454,9 +565,13 @@ struct STOMPConnectionTests { } } - @Test("Abort Send Transaction") - func abortSendTransaction() async throws { - try await STOMPConnection.withConnection(address: .hostname(STOMPConnectionTests.hostname), logger: self.logger) { connection in + @Test("Abort Send Transaction", arguments: [STOMPConnectionConfiguration.WebSocket(), nil]) + func abortSendTransaction(webSocket: STOMPConnectionConfiguration.WebSocket?) async throws { + try await STOMPConnection.withConnection( + address: .hostname(STOMPConnectionTests.hostname, port: webSocket == nil ? 61613 : 15674), + configuration: .init(webSocket: webSocket), + logger: self.logger + ) { connection in try await withThrowingTaskGroup { group in group.addTask { try await connection.withTransaction { transaction in @@ -506,4 +621,79 @@ struct STOMPConnectionTests { logger.logLevel = .trace return logger }() + + static let rootPath = #filePath + .split(separator: "/", omittingEmptySubsequences: false) + .dropLast(3) + .joined(separator: "/") + + static var eventLoopGroupSingleton: any EventLoopGroup { + #if canImport(Network) + // Return TS Eventloop for non-Linux builds, as we use TS TLS + NIOTSEventLoopGroup.singleton + #else + MultiThreadedEventLoopGroup.singleton + #endif + } + + static var _tlsConfiguration: STOMPConnectionConfiguration.TLS.Configuration { + get throws { + #if os(Linux) || os(Android) + let rootCertificate = try NIOSSLCertificate.fromPEMFile(Self.rootPath + "/Certs/ca.pem") + let certificate = try NIOSSLCertificate.fromPEMFile(Self.rootPath + "/Certs/client.pem") + let privateKey = try NIOSSLPrivateKey(file: Self.rootPath + "/Certs/client.key", format: .pem) + var tlsConfiguration = TLSConfiguration.makeClientConfiguration() + tlsConfiguration.trustRoots = .certificates(rootCertificate) + tlsConfiguration.certificateChain = certificate.map { .certificate($0) } + tlsConfiguration.privateKey = .privateKey(privateKey) + return .niossl(tlsConfiguration) + #else + let caData = try Data(contentsOf: URL(fileURLWithPath: Self.rootPath + "/Certs/ca.der")) + let trustRootCertificates = SecCertificateCreateWithData(nil, caData as CFData).map { [$0] } + let p12Data = try Data(contentsOf: URL(fileURLWithPath: Self.rootPath + "/Certs/client.p12")) + let options: [String: String] = [kSecImportExportPassphrase as String: "STOMPNIOClientCertPassword"] + var rawItems: CFArray? + guard SecPKCS12Import(p12Data as CFData, options as CFDictionary, &rawItems) == errSecSuccess else { + throw STOMPClientError.wrongTLSConfig + } + guard + let items = rawItems as? [[String: Any]], + let firstItem = items.first + else { + throw STOMPClientError.wrongTLSConfig + } + let identity = firstItem[kSecImportItemIdentity as String] as! SecIdentity? + let tlsConfiguration = TSTLSConfiguration( + trustRoots: trustRootCertificates, + clientIdentity: identity + ) + return .ts(tlsConfiguration) + #endif + } + } + + static func getTLSConfiguration( + withTrustRoots: Bool = true, + withClientKey: Bool = true + ) throws -> STOMPConnectionConfiguration.TLS.Configuration { + switch try Self._tlsConfiguration { + #if os(macOS) || os(Linux) || os(Android) + case .niossl(let config): + var tlsConfig = TLSConfiguration.makeClientConfiguration() + tlsConfig.trustRoots = withTrustRoots ? (config.trustRoots ?? .default) : .default + tlsConfig.certificateChain = withClientKey ? config.certificateChain : [] + tlsConfig.privateKey = withClientKey ? config.privateKey : nil + return .niossl(tlsConfig) + #endif + #if canImport(Network) + case .ts(let config): + return .ts( + TSTLSConfiguration( + trustRoots: withTrustRoots ? config.trustRoots : nil, + clientIdentity: withClientKey ? config.clientIdentity : nil + ) + ) + #endif + } + } } diff --git a/Tests/STOMPNIOTests/STOMPNIOTests.swift b/Tests/STOMPNIOTests/STOMPNIOTests.swift index 3ddfc1f..bb53070 100644 --- a/Tests/STOMPNIOTests/STOMPNIOTests.swift +++ b/Tests/STOMPNIOTests/STOMPNIOTests.swift @@ -1,3 +1,6 @@ +import NIOCore +import NIOEmbedded +import NIOHTTP1 import Testing @testable import STOMPNIO @@ -62,11 +65,52 @@ struct STOMPNIOTests { } } - @Test("Start of Whitespace Suffix in All-Whitespace Substring") - func startOfWhitespaceSuffixInAllWhitespaceSubstring() { - let substring: Substring = " " - let index = substring.startOfWhitespaceSuffix() - #expect(index == substring.startIndex) + @Test("Start of Whitespace Suffix in All-Whitespace String") + func startOfWhitespaceSuffixInAllWhitespaceString() { + let string = " " + let index = string.startOfWhitespaceSuffix() + #expect(index == string.startIndex) + } + + @Test("WebSocket Initial Request") + func webSocketInitialRequest() throws { + let el = EmbeddedEventLoop() + defer { #expect(throws: Never.self) { try el.syncShutdownGracefully() } } + let promise = el.makePromise(of: Void.self) + let initialRequestHandler = STOMPWebSocketInitialRequestChannelHandler( + host: "example.com", + urlPath: "/stomp", + additionalHeaders: ["Test": "Value"], + upgradePromise: promise + ) + let channel = EmbeddedChannel(handler: initialRequestHandler, loop: el) + try channel.connect(to: SocketAddress(ipAddress: "127.0.0.1", port: 0)).wait() + let requestHead = try channel.readOutbound(as: HTTPClientRequestPart.self) + let requestBody = try channel.readOutbound(as: HTTPClientRequestPart.self) + let requestEnd = try channel.readOutbound(as: HTTPClientRequestPart.self) + switch requestHead { + case .head(let head): + #expect(head.uri == "/stomp") + #expect(head.headers["host"].first == "example.com") + #expect(head.headers["Sec-WebSocket-Protocol"].first == "v12.stomp, v11.stomp, v10.stomp") + #expect(head.headers["Test"].first == "Value") + default: + Issue.record("Unexpected request head: \(String(describing: requestHead))") + } + switch requestBody { + case .body(let data): + #expect(data == .byteBuffer(ByteBuffer())) + default: + Issue.record("Unexpected request body: \(String(describing: requestBody))") + } + switch requestEnd { + case .end(nil): + break + default: + Issue.record("Unexpected request end: \(String(describing: requestEnd))") + } + _ = try channel.finish() + promise.succeed(()) } }