diff --git a/LICENSE b/LICENSE index 1b112db..d091810 100644 --- a/LICENSE +++ b/LICENSE @@ -1,29 +1,29 @@ -BSD 3-Clause License - -Copyright (c) 2019, Electronic Arts -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -3. Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +BSD 3-Clause License + +Copyright (c) 2019, Electronic Arts +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/README.md b/README.md index 7fb148c..47abc16 100644 --- a/README.md +++ b/README.md @@ -1,143 +1,143 @@ -# News for covid-19 times - -We have added support for having EACopyService sit on another machine than the network share to enable environments where the network share machine is not running windows (like NetApp etc). This means that anyone can spin up an EACopyService next to the network share and then use "EACopy.exe ... /SERVERADDR \ /C" to speed up transfers using zstd compression from the office to your home. Very simple setup. - -# EACopy - -EACopy is an alternative to Robocopy for copying files from one location to another. The main reason EACopy was created was to be able to provide text files containing the files/directories to copy. The second reason was to improve performance when copying large amounts of data to network share where the same files are often copied over and over again but to different destinations on the same network share. (example: \\myserver\builds\myapp\version1\, \\myserver\builds\myapp\version2\, \\myserver\builds\myapp\version3\) - -At EA we have a build farm that produce new builds for all supported platforms at quite a fast rate. Binaries and system files for all supported platforms adds up to over 250gb per build and we try to get a full turnaround of these builds every 5 minutes or so. Unfortunately network throughput to the network share is one of the limiting factors as well as storage to be able to go faster. Funny enough most of this data being copied over has been copied 5 minutes ago to another folder since the majority of those 250gb is the same (unless you change something _everything_ depends on) and it feels like we could save lots of work by not copying those files again. This is where EACopyService comes in. - -EACopyService is a service that can be started on any windows based server. It acts as an accelerator for clients copying data to the network share and doesnt store any additional state anywhere. It needs to be allowed to create hardlinks on the file system in order to work. - -Using EACopyService we've seen builds that has a very small delta from previous builds go down in copy-time from ~2 minutes to 1 second :-) - -When EACopy starts copying to a network share it also tries to connect to an EACopyService. If there is no server it behaves just like Robocopy using smb. If it finds a service it switches over to do communication with the service instead of smb. Before copying a file it uses the same criterias as robocopy to compute the key of the file, the key is sent over to EACopyService together with destination path. EACopyService keeps track of all the files it has copied since it started by key and destination. If EACopyService already has the key it will check so the file still exists and has the same key, if the file is still valid it just creates a hardlink for the new file to the old file and return to the EACopy client that the file is already copied. -If the server doesn't have the key it asks the EACopy client to write the new file. If compression is enabled EACopy will compress and send the data to the service which will write it to the share. If compression is not enabled EACopyService will ask the client to copy the file directly to the share using smb. It self-balance compression based on throughput of uncompressed data. If network is fast it will do less compression, if it for some reason is contended it will do more compression. - -Note that the link-to-old-file feature is on a per-file basis so if your files are zipped up to some big archives with one changed file in it, it will still copy these files even though only 1 byte diffed from previous copy. This might be a feature added in the future. - -Here's an example on how the summary of a copy could look like when using the EACopyservice (this is a real example from frostbite's farm deploy). -``` ----------------------------------------------------------------------------------- - Total Copied Linked Skipped Mismatch FAILED Extras - Files: 2259 7 2252 0 0 0 0 - Bytes: 5.1g 658.0k 5.1g 0.0b 0 0 0 - Times: 0.56s 10ms 0.19s 0ms - - FindFile: 0.20s SendFile: 0ms - ReadFile: 1ms SendBytes: 72.1k - CompressFile: 1ms CompressLevel: 1.0 - ConnectTime: 89ms - - Server found and used! -``` - -## Documentation -[EACopy usage](doc/Usage.md) -[Technical documentation](doc/TechDoc.md) -[Todos](doc/Todo.md) - -## Credits -EACopy was implemented by Henrik Karlsson. Thanks to Roberto Parolin for setting up cmake and github. - -## Contributing -Before you can contribute, EA must have a Contributor License Agreement (CLA) on file that has been signed by each contributor. -You can sign here: [Go to CLA](https://electronicarts.na1.echosign.com/public/esignWidget?wid=CBFCIBAA3AAABLblqZhByHRvZqmltGtliuExmuV-WNzlaJGPhbSRg2ufuPsM3P0QmILZjLpkGslg24-UJtek*) - -### Pull Request Policy - -All code contributions to EACopy are submitted as [Github pull requests](https://help.github.com/articles/using-pull-requests/). All pull requests will be reviewed by an EACopy maintainer according to the guidelines found in the next section. - -Your pull request should: - -* merge cleanly -* come with tests - * tests should be minimal and stable - * fail before your fix is applied -* pass the test suite -* do not deviate from style already established in the files - -## Building - -First you need to make sure that you have the git submodules cloned. This can be done using ""git clone ---recurse-submodules". So full command line "git clone --recurse-submodules https://github.com/electronicarts/EACopy.git" - -EACopy uses the defacto standard CMake build system. - -As an example, look at the "build.bat" file in the scripts folder for how to build the library and build/run the unit tests. -NOTE: to run ctest you need to run the CLI you use as Administrator! Also you must make the changes outlined in "Test Setup" and "Running the Tests" sections below. -``` - - -@mkdir build -@pushd build - -@call cmake .. -G "Visual Studio 15 2017 Win64" -DEACOPY_BUILD_TESTS:BOOL=ON -rem @call cmake .. -G "Visual Studio 16 2019" -A x64 -DEACOPY_BUILD_TESTS:BOOL=ON -@call cmake --build . --config Release -@call cmake --build . --config Debug - -@pushd test -@call ctest -C Release -V -@call ctest -C Debug - -@popd -@popd - -``` - -#CI System -Travis CI for EACopy: https://travis-ci.org/github/electronicarts/EACopy - -## Test Setup -The test/CMakeLists.txt file might have its tests commented out. If so go to the file and uncomment the test line (as below): - -``` -# Use CTest -enable_testing() -#Disabled on farm. Enable this for local testing -add_test(EACopyTestRun EACopyTest) -``` - -## Running the Tests -To run the tests you need to have a source folder and a destination folder specified. - -The source folder needs to be a absolute path on one of your local drives. Ex: D:\EACopyTest\source -the destination folder needs to be a unc network path. This can be a shared location on your local machine Ex: \\localhost\EACopyTest\dest, which is mapped to D:\EACopyTest\dest. - -NOTE: These folders have to actually exist on your system or EACopyTest will complain about the folder(s) not existing - -You can specify these on the cmdline for EACopyTest.exe: -``` -EACopyTest D:\EACopyTest\source \\localhost\EACopyTest\dest -``` - -Another way to set this up for ease of local development is to set these defines in the top of EACopyTest.cpp to the source and dest folders you have setup: -* Default setting for these values is L""; (Make sure you change the values or You will get an error that you didnt specify the source/dir arguments) - -``` -#define DEFAULT_SOURCE_DIR L"D:\\\\EACopyTest\\source" -#define DEFAULT_DEST_DIR L"\\\\localhost\\EACopyTest\\dest" // local share to D:\EACopyTest\dest -``` - -You can then set EACopyTest as your startup project in Visual Studio and debug from there, or just run EACopyTest.exe from the cmdline without parameters and it will use those folders set in the code. - -## Setting up CMake with Visual Studio for Debugging -Notes: - - You need to run Visual Studio as Admin to properly run some tests - - Once set up make sure to set the CMake settings so EACOPY_BUILD_TESTS is set to true so the tests are generated also. -Reference documentation for setting up CMake: https://docs.microsoft.com/en-us/cpp/build/cmake-projects-in-visual-studio?view=vs-2019 -Reference documenation for running ctests in VS: https://docs.microsoft.com/en-us/visualstudio/test/how-to-use-ctest-for-cpp?view=vs-2019 - - -##Reference links -CMake main page: https://cmake.org/ -CTest documentation page: https://cmake.org/cmake/help/latest/manual/ctest.1.html -Basic CMake intro: https://a4z.bitbucket.io/blog/2018/05/17/Speed-up-your-test-cycles-with-CMake.html -CMake Tutorial 1: https://cliutils.gitlab.io/modern-cmake/chapters/testing.html -CMake Tutorial 2: https://bastian.rieck.me/blog/posts/2017/simple_unit_tests/ -CMake File example: https://riptutorial.com/cmake/example/14698/basic-test-suite - -## License - -Modified BSD License (3-Clause BSD license) see the file LICENSE in the project root. +# News for covid-19 times + +We have added support for having EACopyService sit on another machine than the network share to enable environments where the network share machine is not running windows (like NetApp etc). This means that anyone can spin up an EACopyService next to the network share and then use "EACopy.exe ... /SERVERADDR \ /C" to speed up transfers using zstd compression from the office to your home. Very simple setup. + +# EACopy + +EACopy is an alternative to Robocopy for copying files from one location to another. The main reason EACopy was created was to be able to provide text files containing the files/directories to copy. The second reason was to improve performance when copying large amounts of data to network share where the same files are often copied over and over again but to different destinations on the same network share. (example: \\myserver\builds\myapp\version1\, \\myserver\builds\myapp\version2\, \\myserver\builds\myapp\version3\) + +At EA we have a build farm that produce new builds for all supported platforms at quite a fast rate. Binaries and system files for all supported platforms adds up to over 250gb per build and we try to get a full turnaround of these builds every 5 minutes or so. Unfortunately network throughput to the network share is one of the limiting factors as well as storage to be able to go faster. Funny enough most of this data being copied over has been copied 5 minutes ago to another folder since the majority of those 250gb is the same (unless you change something _everything_ depends on) and it feels like we could save lots of work by not copying those files again. This is where EACopyService comes in. + +EACopyService is a service that can be started on any windows based server. It acts as an accelerator for clients copying data to the network share and doesnt store any additional state anywhere. It needs to be allowed to create hardlinks on the file system in order to work. + +Using EACopyService we've seen builds that has a very small delta from previous builds go down in copy-time from ~2 minutes to 1 second :-) + +When EACopy starts copying to a network share it also tries to connect to an EACopyService. If there is no server it behaves just like Robocopy using smb. If it finds a service it switches over to do communication with the service instead of smb. Before copying a file it uses the same criterias as robocopy to compute the key of the file, the key is sent over to EACopyService together with destination path. EACopyService keeps track of all the files it has copied since it started by key and destination. If EACopyService already has the key it will check so the file still exists and has the same key, if the file is still valid it just creates a hardlink for the new file to the old file and return to the EACopy client that the file is already copied. +If the server doesn't have the key it asks the EACopy client to write the new file. If compression is enabled EACopy will compress and send the data to the service which will write it to the share. If compression is not enabled EACopyService will ask the client to copy the file directly to the share using smb. It self-balance compression based on throughput of uncompressed data. If network is fast it will do less compression, if it for some reason is contended it will do more compression. + +Note that the link-to-old-file feature is on a per-file basis so if your files are zipped up to some big archives with one changed file in it, it will still copy these files even though only 1 byte diffed from previous copy. This might be a feature added in the future. + +Here's an example on how the summary of a copy could look like when using the EACopyservice (this is a real example from frostbite's farm deploy). +``` +---------------------------------------------------------------------------------- + Total Copied Linked Skipped Mismatch FAILED Extras + Files: 2259 7 2252 0 0 0 0 + Bytes: 5.1g 658.0k 5.1g 0.0b 0 0 0 + Times: 0.56s 10ms 0.19s 0ms + + FindFile: 0.20s SendFile: 0ms + ReadFile: 1ms SendBytes: 72.1k + CompressFile: 1ms CompressLevel: 1.0 + ConnectTime: 89ms + + Server found and used! +``` + +## Documentation +[EACopy usage](doc/Usage.md) +[Technical documentation](doc/TechDoc.md) +[Todos](doc/Todo.md) + +## Credits +EACopy was implemented by Henrik Karlsson. Thanks to Roberto Parolin for setting up cmake and github. + +## Contributing +Before you can contribute, EA must have a Contributor License Agreement (CLA) on file that has been signed by each contributor. +You can sign here: [Go to CLA](https://electronicarts.na1.echosign.com/public/esignWidget?wid=CBFCIBAA3AAABLblqZhByHRvZqmltGtliuExmuV-WNzlaJGPhbSRg2ufuPsM3P0QmILZjLpkGslg24-UJtek*) + +### Pull Request Policy + +All code contributions to EACopy are submitted as [Github pull requests](https://help.github.com/articles/using-pull-requests/). All pull requests will be reviewed by an EACopy maintainer according to the guidelines found in the next section. + +Your pull request should: + +* merge cleanly +* come with tests + * tests should be minimal and stable + * fail before your fix is applied +* pass the test suite +* do not deviate from style already established in the files + +## Building + +First you need to make sure that you have the git submodules cloned. This can be done using ""git clone ---recurse-submodules". So full command line "git clone --recurse-submodules https://github.com/electronicarts/EACopy.git" + +EACopy uses the defacto standard CMake build system. + +As an example, look at the "build.bat" file in the scripts folder for how to build the library and build/run the unit tests. +NOTE: to run ctest you need to run the CLI you use as Administrator! Also you must make the changes outlined in "Test Setup" and "Running the Tests" sections below. +``` + + +@mkdir build +@pushd build + +@call cmake .. -G "Visual Studio 15 2017 Win64" -DEACOPY_BUILD_TESTS:BOOL=ON +rem @call cmake .. -G "Visual Studio 16 2019" -A x64 -DEACOPY_BUILD_TESTS:BOOL=ON +@call cmake --build . --config Release +@call cmake --build . --config Debug + +@pushd test +@call ctest -C Release -V +@call ctest -C Debug + +@popd +@popd + +``` + +#CI System +Travis CI for EACopy: https://travis-ci.org/github/electronicarts/EACopy + +## Test Setup +The test/CMakeLists.txt file might have its tests commented out. If so go to the file and uncomment the test line (as below): + +``` +# Use CTest +enable_testing() +#Disabled on farm. Enable this for local testing +add_test(EACopyTestRun EACopyTest) +``` + +## Running the Tests +To run the tests you need to have a source folder and a destination folder specified. + +The source folder needs to be a absolute path on one of your local drives. Ex: D:\EACopyTest\source +the destination folder needs to be a unc network path. This can be a shared location on your local machine Ex: \\localhost\EACopyTest\dest, which is mapped to D:\EACopyTest\dest. + +NOTE: These folders have to actually exist on your system or EACopyTest will complain about the folder(s) not existing + +You can specify these on the cmdline for EACopyTest.exe: +``` +EACopyTest D:\EACopyTest\source \\localhost\EACopyTest\dest +``` + +Another way to set this up for ease of local development is to set these defines in the top of EACopyTest.cpp to the source and dest folders you have setup: +* Default setting for these values is L""; (Make sure you change the values or You will get an error that you didnt specify the source/dir arguments) + +``` +#define DEFAULT_SOURCE_DIR L"D:\\\\EACopyTest\\source" +#define DEFAULT_DEST_DIR L"\\\\localhost\\EACopyTest\\dest" // local share to D:\EACopyTest\dest +``` + +You can then set EACopyTest as your startup project in Visual Studio and debug from there, or just run EACopyTest.exe from the cmdline without parameters and it will use those folders set in the code. + +## Setting up CMake with Visual Studio for Debugging +Notes: + - You need to run Visual Studio as Admin to properly run some tests + - Once set up make sure to set the CMake settings so EACOPY_BUILD_TESTS is set to true so the tests are generated also. +Reference documentation for setting up CMake: https://docs.microsoft.com/en-us/cpp/build/cmake-projects-in-visual-studio?view=vs-2019 +Reference documenation for running ctests in VS: https://docs.microsoft.com/en-us/visualstudio/test/how-to-use-ctest-for-cpp?view=vs-2019 + + +##Reference links +CMake main page: https://cmake.org/ +CTest documentation page: https://cmake.org/cmake/help/latest/manual/ctest.1.html +Basic CMake intro: https://a4z.bitbucket.io/blog/2018/05/17/Speed-up-your-test-cycles-with-CMake.html +CMake Tutorial 1: https://cliutils.gitlab.io/modern-cmake/chapters/testing.html +CMake Tutorial 2: https://bastian.rieck.me/blog/posts/2017/simple_unit_tests/ +CMake File example: https://riptutorial.com/cmake/example/14698/basic-test-suite + +## License + +Modified BSD License (3-Clause BSD license) see the file LICENSE in the project root. diff --git a/include/EACopyClient.h b/include/EACopyClient.h index ebcac3f..04ad555 100644 --- a/include/EACopyClient.h +++ b/include/EACopyClient.h @@ -10,7 +10,7 @@ namespace eacopy enum : uint { ClientMajorVersion = 1, - ClientMinorVersion = 17 + ClientMinorVersion = 18 }; /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/include/EACopyDelta.h b/include/EACopyDelta.h index 522b51a..04bdb4e 100644 --- a/include/EACopyDelta.h +++ b/include/EACopyDelta.h @@ -1,31 +1,31 @@ -// (c) Electronic Arts. All Rights Reserved. - -#pragma once - -#if defined(EACOPY_ALLOW_DELTA_COPY) - -#include "EACopyNetwork.h" - -namespace eacopy -{ - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -bool sendDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* newFileName, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats); - - -struct RecvDeltaStats -{ - u64 recvTime = 0; - u64 recvSize = 0; - u64 decompressTime = 0; -}; - - -bool receiveDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* destFileName, u64 destFileSize, FileTime lastWriteTime, NetworkCopyContext& copyContext, IOStats& ioStats, RecvDeltaStats& recvStats); - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -} - -#endif +// (c) Electronic Arts. All Rights Reserved. + +#pragma once + +#if defined(EACOPY_ALLOW_DELTA_COPY) + +#include "EACopyNetwork.h" + +namespace eacopy +{ + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +bool sendDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* newFileName, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats); + + +struct RecvDeltaStats +{ + u64 recvTime = 0; + u64 recvSize = 0; + u64 decompressTime = 0; +}; + + +bool receiveDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* destFileName, u64 destFileSize, FileTime lastWriteTime, NetworkCopyContext& copyContext, IOStats& ioStats, RecvDeltaStats& recvStats); + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} + +#endif diff --git a/include/EACopyServer.h b/include/EACopyServer.h index 850c52e..2945a7d 100644 --- a/include/EACopyServer.h +++ b/include/EACopyServer.h @@ -11,7 +11,7 @@ namespace eacopy enum : uint { ServerMajorVersion = 1, - ServerMinorVersion = 9, + ServerMinorVersion = 10, }; enum : uint { DefaultHistorySize = 500000 }; // Number of files diff --git a/include/OLD_EACopyRsync.h b/include/OLD_EACopyRsync.h index 4775571..4a7bbff 100644 --- a/include/OLD_EACopyRsync.h +++ b/include/OLD_EACopyRsync.h @@ -1,30 +1,30 @@ -// (c) Electronic Arts. All Rights Reserved. - -#pragma once - -#include "EACopyNetwork.h" - -namespace eacopy -{ - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -struct RsyncStats -{ - u64 readTimeMs = 0; - u64 readSize = 0; - - u64 receiveTimeMs = 0; - u64 writeTimeMs = 0; - u64 sendTimeMs = 0; - u64 sendSize = 0; - - u64 rsyncTimeMs = 0; -}; - -bool serverHandleRsync(SOCKET socket, const wchar_t* fileName, const wchar_t* newFileName, FILETIME lastWriteTime, RsyncStats& outStats); -bool clientHandleRsync(SOCKET socket, const wchar_t* fileName, RsyncStats& outStats); - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -} +// (c) Electronic Arts. All Rights Reserved. + +#pragma once + +#include "EACopyNetwork.h" + +namespace eacopy +{ + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +struct RsyncStats +{ + u64 readTimeMs = 0; + u64 readSize = 0; + + u64 receiveTimeMs = 0; + u64 writeTimeMs = 0; + u64 sendTimeMs = 0; + u64 sendSize = 0; + + u64 rsyncTimeMs = 0; +}; + +bool serverHandleRsync(SOCKET socket, const wchar_t* fileName, const wchar_t* newFileName, FILETIME lastWriteTime, RsyncStats& outStats); +bool clientHandleRsync(SOCKET socket, const wchar_t* fileName, RsyncStats& outStats); + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} diff --git a/source/EACopy.cpp b/source/EACopy.cpp index c1e6629..5719fb5 100644 --- a/source/EACopy.cpp +++ b/source/EACopy.cpp @@ -575,12 +575,6 @@ int main(int argc, char* argv_[]) logInfoLinef(); } - if (res == -1) - { - log.deinit(); - return -1; - } - u64 endTime = getTime(); if (settings.printJobSummary) @@ -658,7 +652,7 @@ int main(int argc, char* argv_[]) } }); - return 0; + return stats.failCount == 0 && res == 0 ? 0 : -1; } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/source/EACopyClient.cpp b/source/EACopyClient.cpp index 5960f69..29f1af9 100644 --- a/source/EACopyClient.cpp +++ b/source/EACopyClient.cpp @@ -179,8 +179,7 @@ Client::process(Log& log, ClientStats& outStats) // Process dirs and files (worker threads are doing the same right now) - if (!processQueues(logContext, m_sourceConnection, m_destConnection, m_copyContext, outStats, true)) - return -1; + processQueues(logContext, m_sourceConnection, m_destConnection, m_copyContext, outStats, true); // Wait for all worker threads to finish @@ -452,7 +451,6 @@ Client::processFile(LogContext& logContext, Connection* sourceConnection, Connec int retryCountLeft = m_settings.retryCount; while (true) { - u64 startTime = getTime(); auto reportSkip = [&]() diff --git a/source/EACopyDelta.cpp b/source/EACopyDelta.cpp index 3cfc58e..f54b5fe 100644 --- a/source/EACopyDelta.cpp +++ b/source/EACopyDelta.cpp @@ -1,105 +1,105 @@ -// (c) Electronic Arts. All Rights Reserved. - -#if defined(EACOPY_ALLOW_DELTA_COPY) - -#include -#include "EACopyDeltaZstd.h" -#include "EACopyDeltaXDelta.h" - -namespace eacopy -{ - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -using CodecFunc = bool(bool encode, Socket& socket, const wchar_t* referenceFileName, FileHandle referenceFile, u64 referenceFileSize, const wchar_t* newFileName, FileHandle newFile, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats, u64& socketTime, u64& socketSize, u64& codeTime); - - -CodecFunc* g_codecFuncs[] = -{ - zstdCode, - xDeltaCode, -}; - -bool -sendDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* newFileName, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats) -{ - FileHandle newFile; - if (!openFileRead(newFileName, newFile, ioStats, true)) - return false; - ScopeGuard _([&]() { closeFile(newFileName, newFile, AccessType_Read, ioStats); }); - - FileHandle referenceFile; - if (!openFileRead(referenceFileName, referenceFile, ioStats, true, nullptr, false)) - return false; - ScopeGuard _2([&]() { closeFile(referenceFileName, referenceFile, AccessType_Read, ioStats); }); - - - u8 codecIndex = 0; - if (!sendData(socket, &codecIndex, sizeof(codecIndex))) - return false; - - CodecFunc* codecFunc = g_codecFuncs[codecIndex]; - - u64 socketTime = 0; - u64 socketBytes = 0; - u64 codeTime = 0; - return codecFunc(true, socket, referenceFileName, referenceFile, referenceFileSize, newFileName, newFile, newFileSize, copyContext, ioStats, socketTime, socketBytes, codeTime); -} - -bool receiveDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* destFileName, u64 destFileSize, FileTime lastWriteTime, NetworkCopyContext& copyContext, IOStats& ioStats, RecvDeltaStats& recvStats) -{ - WString tempFileName; - const wchar_t* lastSlash = wcsrchr(destFileName, L'\\'); - if (lastSlash) - { - tempFileName.append(destFileName, lastSlash + 1); - tempFileName += L'.'; - tempFileName += lastSlash + 1; - } - else - { - tempFileName += L'.'; - tempFileName += destFileName; - } - - FileHandle referenceFile; - if (!openFileRead(referenceFileName, referenceFile, ioStats, true, nullptr, false)) - return false; - ScopeGuard closeRef([&]() { closeFile(referenceFileName, referenceFile, AccessType_Read, ioStats); }); - - FileHandle tempFile; - if (!openFileWrite(tempFileName.c_str(), tempFile, ioStats, true, nullptr, true)) // Create file hidden, and unhide it once we've moved it in place - return false; - ScopeGuard delTemp([&]() { deleteFile(tempFileName.c_str(), ioStats, false); }); - ScopeGuard closeTemp([&]() { closeFile(tempFileName.c_str(), tempFile, AccessType_Write, ioStats); }); - - u8 codecIndex; - if (!receiveData(socket, &codecIndex, sizeof(codecIndex))) - return false; - CodecFunc* codecFunc = g_codecFuncs[codecIndex]; - - if (!codecFunc(false, socket, referenceFileName, referenceFile, referenceFileSize, tempFileName.c_str(), tempFile, destFileSize, copyContext, ioStats, recvStats.recvTime, recvStats.recvSize, recvStats.decompressTime)) - return false; - - if (!setFileLastWriteTime(tempFileName.c_str(), tempFile, lastWriteTime, ioStats)) - return false; - - closeRef.execute(); - closeTemp.execute(); - - if (!moveFile(tempFileName.c_str(), destFileName, ioStats)) - return false; - - if (!setFileHidden(destFileName, false)) - return false; - - delTemp.cancel(); - - return true; -} - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -} - -#endif +// (c) Electronic Arts. All Rights Reserved. + +#if defined(EACOPY_ALLOW_DELTA_COPY) + +#include +#include "EACopyDeltaZstd.h" +#include "EACopyDeltaXDelta.h" + +namespace eacopy +{ + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +using CodecFunc = bool(bool encode, Socket& socket, const wchar_t* referenceFileName, FileHandle referenceFile, u64 referenceFileSize, const wchar_t* newFileName, FileHandle newFile, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats, u64& socketTime, u64& socketSize, u64& codeTime); + + +CodecFunc* g_codecFuncs[] = +{ + zstdCode, + xDeltaCode, +}; + +bool +sendDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* newFileName, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats) +{ + FileHandle newFile; + if (!openFileRead(newFileName, newFile, ioStats, true)) + return false; + ScopeGuard _([&]() { closeFile(newFileName, newFile, AccessType_Read, ioStats); }); + + FileHandle referenceFile; + if (!openFileRead(referenceFileName, referenceFile, ioStats, true, nullptr, false)) + return false; + ScopeGuard _2([&]() { closeFile(referenceFileName, referenceFile, AccessType_Read, ioStats); }); + + + u8 codecIndex = 0; + if (!sendData(socket, &codecIndex, sizeof(codecIndex))) + return false; + + CodecFunc* codecFunc = g_codecFuncs[codecIndex]; + + u64 socketTime = 0; + u64 socketBytes = 0; + u64 codeTime = 0; + return codecFunc(true, socket, referenceFileName, referenceFile, referenceFileSize, newFileName, newFile, newFileSize, copyContext, ioStats, socketTime, socketBytes, codeTime); +} + +bool receiveDelta(Socket& socket, const wchar_t* referenceFileName, u64 referenceFileSize, const wchar_t* destFileName, u64 destFileSize, FileTime lastWriteTime, NetworkCopyContext& copyContext, IOStats& ioStats, RecvDeltaStats& recvStats) +{ + WString tempFileName; + const wchar_t* lastSlash = wcsrchr(destFileName, L'\\'); + if (lastSlash) + { + tempFileName.append(destFileName, lastSlash + 1); + tempFileName += L'.'; + tempFileName += lastSlash + 1; + } + else + { + tempFileName += L'.'; + tempFileName += destFileName; + } + + FileHandle referenceFile; + if (!openFileRead(referenceFileName, referenceFile, ioStats, true, nullptr, false)) + return false; + ScopeGuard closeRef([&]() { closeFile(referenceFileName, referenceFile, AccessType_Read, ioStats); }); + + FileHandle tempFile; + if (!openFileWrite(tempFileName.c_str(), tempFile, ioStats, true, nullptr, true)) // Create file hidden, and unhide it once we've moved it in place + return false; + ScopeGuard delTemp([&]() { deleteFile(tempFileName.c_str(), ioStats, false); }); + ScopeGuard closeTemp([&]() { closeFile(tempFileName.c_str(), tempFile, AccessType_Write, ioStats); }); + + u8 codecIndex; + if (!receiveData(socket, &codecIndex, sizeof(codecIndex))) + return false; + CodecFunc* codecFunc = g_codecFuncs[codecIndex]; + + if (!codecFunc(false, socket, referenceFileName, referenceFile, referenceFileSize, tempFileName.c_str(), tempFile, destFileSize, copyContext, ioStats, recvStats.recvTime, recvStats.recvSize, recvStats.decompressTime)) + return false; + + if (!setFileLastWriteTime(tempFileName.c_str(), tempFile, lastWriteTime, ioStats)) + return false; + + closeRef.execute(); + closeTemp.execute(); + + if (!moveFile(tempFileName.c_str(), destFileName, ioStats)) + return false; + + if (!setFileHidden(destFileName, false)) + return false; + + delTemp.cancel(); + + return true; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} + +#endif diff --git a/source/EACopyDeltaXDelta.h b/source/EACopyDeltaXDelta.h index 5bdfe4b..d5cf5de 100644 --- a/source/EACopyDeltaXDelta.h +++ b/source/EACopyDeltaXDelta.h @@ -1,375 +1,375 @@ -// (c) Electronic Arts. All Rights Reserved. - -#if defined(EACOPY_ALLOW_DELTA_COPY) - -#include -#include - -namespace eacopy -{ - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -#define XD3_INVALID_OFFSET (~uintptr_t(0)) -#define MAX_LRU_SIZE 32u -#define SOURCEWINSIZE (128 * 1024 * 1024) - -struct xDeltaLruBlock -{ - u8* blk; - xoff_t blkno; - usize_t size; - - xDeltaLruBlock* next; - xDeltaLruBlock* prev; -}; - -struct xDeltaGetBlockData -{ - IOStats& ioStats; - const wchar_t* referenceFileName; - FileHandle& referenceFile; - u64 referenceFileSize; - u64 referenceFilePos = 0; - - // LRU work data - xDeltaLruBlock* lru; - uint lruSize = 0u; - u8* lruData; - xDeltaLruBlock list; - bool fifo; - - void* fileData = nullptr; -}; - -void remove(xDeltaLruBlock& list, xDeltaLruBlock* block) -{ - block->next->prev = block->prev; - block->prev->next = block->next; - block->next = nullptr; - block->prev = nullptr; -} - -void pushBack(xDeltaLruBlock& list, xDeltaLruBlock* block) -{ - block->prev = list.prev; - block->next = &list; - list.prev->next = block; - list.prev = block; -} - -xDeltaLruBlock* popFront(xDeltaLruBlock& list) -{ - xDeltaLruBlock* front = list.next; - front->next->prev = &list; - list.next = front->next; - front->next = nullptr; - front->prev = nullptr; - return front; -} - -bool empty(xDeltaLruBlock& list) -{ - return list.next != &list; -} - -int xDeltaGetLruBlock(xDeltaGetBlockData& data, xoff_t blkno, xDeltaLruBlock*& blrup, bool& isNew) -{ - xDeltaLruBlock* blru = NULL; - - isNew = false; - - if (data.fifo) - { - int idx = blkno % data.lruSize; - blru = &data.lru[idx]; - if (blru->blkno == blkno) - { - blrup = blru; - return 0; - } - if (blru->blkno != XD3_INVALID_OFFSET && blru->blkno > blkno) - return XD3_TOOFARBACK; - } - else - { - for (uint i = 0; i < data.lruSize; i += 1) - { - blru = &data.lru[i]; - if (blru->blkno == blkno) - { - remove(data.list, blru); - pushBack(data.list, blru); - blrup = blru; - return 0; - } - } - } - - if (data.fifo) - { - int idx = blkno % data.lruSize; - blru = &data.lru[idx]; - } - else - { - XD3_ASSERT(!empty(data.list)); - blru = popFront(data.list); - pushBack(data.list, blru); - } - - isNew = true; - blrup = blru; - blru->blkno = XD3_INVALID_OFFSET; - return 0; -} - - -int xDeltaGetBlock(xd3_stream* stream, xd3_source* source, xoff_t blkno) -{ - auto& data = *(xDeltaGetBlockData*)stream->opaque; - - int ret = 0; - xDeltaLruBlock* blru; - bool isNew; - - if ((ret = xDeltaGetLruBlock(data, blkno, blru, isNew))) - return ret; - - if (!isNew) - { - source->curblkno = blkno; - source->onblk = blru->size; - source->curblk = blru->blk; - return 0; - } - - xoff_t pos = blkno * source->blksize; - - if (pos != data.referenceFilePos) - { - if (!setFilePosition(data.referenceFileName, data.referenceFile, pos, data.ioStats)) - return 1; - data.referenceFilePos = pos; - } - - u64 nread; - if (!readFile(data.referenceFileName, data.referenceFile, (void*)blru->blk, source->blksize, nread, data.ioStats)) - return 1; - data.referenceFilePos += nread; - - source->curblk = blru->blk; - source->curblkno = blkno; - source->onblk = nread; - blru->size = nread; - blru->blkno = blkno; - - return 0; -} - -bool xDeltaInitBlockData(xDeltaGetBlockData& data, xd3_stream& stream, xd3_source& source, xoff_t blkno) -{ - data.list.prev = &data.list; - data.list.next = &data.list; - - u64 lruSize = MAX_LRU_SIZE * sizeof(xDeltaLruBlock); - data.lru = new xDeltaLruBlock[lruSize]; - memset(data.lru, 0, lruSize); - data.lru[0].blk = data.lruData = new u8[SOURCEWINSIZE]; - data.lru[0].blkno = XD3_INVALID_OFFSET; - data.lruSize = 1; - pushBack(data.list, &data.lru[0]); - usize_t blksize = SOURCEWINSIZE; - - source.blksize = blksize; - source.curblkno = XD3_INVALID_OFFSET; - source.curblk = NULL; - source.max_winsize = SOURCEWINSIZE; - - if (xDeltaGetBlock(&stream, &source, blkno) != 0) - return false; - - if (source.onblk < blksize) - source.onlastblk = data.referenceFileSize; - - if (data.referenceFileSize > SOURCEWINSIZE) - { - blksize = SOURCEWINSIZE / MAX_LRU_SIZE; - source.blksize = blksize; - source.onblk = blksize; - source.onlastblk = blksize; - source.max_blkno = MAX_LRU_SIZE - 1; - - data.lru[0].size = blksize; - data.lruSize = MAX_LRU_SIZE; - - for (uint i = 1; i < data.lruSize; i += 1) - { - data.lru[i].blk = data.lru[0].blk + (blksize * i); - data.lru[i].blkno = i; - data.lru[i].size = blksize; - pushBack(data.list, &data.lru[i]); - } - } - - return true; -} - -void xDeltaDestroyBlockData(xDeltaGetBlockData& data) -{ - delete[] data.lruData; - delete[] data.lru; -} - -bool xDeltaCode(bool encode, Socket& socket, const wchar_t* referenceFileName, FileHandle referenceFile, u64 referenceFileSize, const wchar_t* newFileName, FileHandle newFile, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats, u64& socketTime, u64& socketSize, u64& codeTime) -{ - uint flags = 0; - //flags |= XD3_ADLER32; - //flags |= XD3_COMPLEVEL_1; - flags |= XD3_COMPLEVEL_3; - //flags |= XD3_COMPLEVEL_9; - flags |= XD3_SEC_LZMA; - - xDeltaGetBlockData blockData{ ioStats, referenceFileName, referenceFile, referenceFileSize }; - blockData.fifo = encode; - - xd3_config config; - xd3_init_config(&config, flags); - config.winsize = min(referenceFileSize, XD3_DEFAULT_WINSIZE); - config.getblk = xDeltaGetBlock; - config.opaque = &blockData; - config.iopt_size = XD3_DEFAULT_IOPT_SIZE; - config.sprevsz = XD3_DEFAULT_SPREVSZ; - config.smatch_cfg = XD3_SMATCH_FAST; - - xd3_stream stream; - memset(&stream, 0, sizeof(stream)); - xd3_config_stream(&stream, &config); - - xd3_source source; - memset(&source, 0, sizeof(source)); - if (!xDeltaInitBlockData(blockData, stream, source, 0)) - return false; - ScopeGuard _bd([&]() { xDeltaDestroyBlockData(blockData); }); - xd3_set_source_and_size(&stream, &source, referenceFileSize); - - static_assert(XD3_ALLOCSIZE <= CopyContextBufferSize, ""); - u64 bufSize = XD3_ALLOCSIZE;// CopyContextBufferSize; - - u8* inputBuf = bufSize <= CopyContextBufferSize ? copyContext.buffers[1] : (u8*)malloc(bufSize); - u64 inputBufRead = 0; - u8* outputBuf = bufSize <= CopyContextBufferSize ? copyContext.buffers[2] : (u8*)malloc(bufSize); - u64 outputBufWritten = 0; - ScopeGuard _([&]() - { - xd3_close_stream(&stream); - xd3_free_stream(&stream); - - if (bufSize > CopyContextBufferSize) - { - free(inputBuf); - free(outputBuf); - } - }); - - bool outSuccess = true; - do - { - if (encode) - { - if (!readFile(newFileName, newFile, (void*)inputBuf, bufSize, inputBufRead, ioStats)) - return false; - } - else - { - TimerScope _(socketTime); - if (!receiveData(socket, &inputBufRead, sizeof(inputBufRead))) - return false; - if (inputBufRead) - if (!receiveData(socket, (void*)inputBuf, inputBufRead)) - return false; - socketSize += sizeof(inputBufRead) + inputBufRead; - } - - if (!outSuccess) - continue; - - if (inputBufRead < bufSize) - xd3_set_flags(&stream, XD3_FLUSH | stream.flags); - xd3_avail_input(&stream, inputBuf, inputBufRead); - - process: - - int ret = 0; - { - TimerScope _(codeTime); - if (encode) - ret = xd3_encode_input(&stream); - else - ret = xd3_decode_input(&stream); - } - - switch (ret) - { - case XD3_INPUT: - continue; - - case XD3_OUTPUT: - if (encode) - { - uint leftInBuffer = bufSize - outputBufWritten; - uint writeToBufferSize = stream.avail_out < leftInBuffer ? stream.avail_out : leftInBuffer; - memcpy(outputBuf + outputBufWritten, stream.next_out, writeToBufferSize); - outputBufWritten += writeToBufferSize; - if (outputBufWritten == bufSize) - { - if (!sendData(socket, &outputBufWritten, sizeof(outputBufWritten))) - return false; - if (!sendData(socket, outputBuf, outputBufWritten)) - return false; - uint left = stream.avail_out - writeToBufferSize; - memcpy(outputBuf, stream.next_out + writeToBufferSize, left); - outputBufWritten = left; - } - - } - else - { - outSuccess = outSuccess && writeFile(newFileName, newFile, stream.next_out, stream.avail_out, ioStats); - } - xd3_consume_output(&stream); - goto process; - - case XD3_GOTHEADER: - goto process; - - case XD3_WINSTART: - goto process; - - case XD3_WINFINISH: - goto process; - - default: - logErrorf(L"Xdelta failed %ls file %ls: %hs %d !!!\n", (encode ? L"compressing" : L"decompressing"), newFileName, stream.msg, ret); - outSuccess = false; - } - - } while (inputBufRead == bufSize); - - if (encode) - { - if (!sendData(socket, &outputBufWritten, sizeof(outputBufWritten))) - return false; - if (outputBufWritten) - if (!sendData(socket, outputBuf, outputBufWritten)) - return false; - } - - return outSuccess; -} - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -} - -#endif +// (c) Electronic Arts. All Rights Reserved. + +#if defined(EACOPY_ALLOW_DELTA_COPY) + +#include +#include + +namespace eacopy +{ + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +#define XD3_INVALID_OFFSET (~uintptr_t(0)) +#define MAX_LRU_SIZE 32u +#define SOURCEWINSIZE (128 * 1024 * 1024) + +struct xDeltaLruBlock +{ + u8* blk; + xoff_t blkno; + usize_t size; + + xDeltaLruBlock* next; + xDeltaLruBlock* prev; +}; + +struct xDeltaGetBlockData +{ + IOStats& ioStats; + const wchar_t* referenceFileName; + FileHandle& referenceFile; + u64 referenceFileSize; + u64 referenceFilePos = 0; + + // LRU work data + xDeltaLruBlock* lru; + uint lruSize = 0u; + u8* lruData; + xDeltaLruBlock list; + bool fifo; + + void* fileData = nullptr; +}; + +void remove(xDeltaLruBlock& list, xDeltaLruBlock* block) +{ + block->next->prev = block->prev; + block->prev->next = block->next; + block->next = nullptr; + block->prev = nullptr; +} + +void pushBack(xDeltaLruBlock& list, xDeltaLruBlock* block) +{ + block->prev = list.prev; + block->next = &list; + list.prev->next = block; + list.prev = block; +} + +xDeltaLruBlock* popFront(xDeltaLruBlock& list) +{ + xDeltaLruBlock* front = list.next; + front->next->prev = &list; + list.next = front->next; + front->next = nullptr; + front->prev = nullptr; + return front; +} + +bool empty(xDeltaLruBlock& list) +{ + return list.next != &list; +} + +int xDeltaGetLruBlock(xDeltaGetBlockData& data, xoff_t blkno, xDeltaLruBlock*& blrup, bool& isNew) +{ + xDeltaLruBlock* blru = NULL; + + isNew = false; + + if (data.fifo) + { + int idx = blkno % data.lruSize; + blru = &data.lru[idx]; + if (blru->blkno == blkno) + { + blrup = blru; + return 0; + } + if (blru->blkno != XD3_INVALID_OFFSET && blru->blkno > blkno) + return XD3_TOOFARBACK; + } + else + { + for (uint i = 0; i < data.lruSize; i += 1) + { + blru = &data.lru[i]; + if (blru->blkno == blkno) + { + remove(data.list, blru); + pushBack(data.list, blru); + blrup = blru; + return 0; + } + } + } + + if (data.fifo) + { + int idx = blkno % data.lruSize; + blru = &data.lru[idx]; + } + else + { + XD3_ASSERT(!empty(data.list)); + blru = popFront(data.list); + pushBack(data.list, blru); + } + + isNew = true; + blrup = blru; + blru->blkno = XD3_INVALID_OFFSET; + return 0; +} + + +int xDeltaGetBlock(xd3_stream* stream, xd3_source* source, xoff_t blkno) +{ + auto& data = *(xDeltaGetBlockData*)stream->opaque; + + int ret = 0; + xDeltaLruBlock* blru; + bool isNew; + + if ((ret = xDeltaGetLruBlock(data, blkno, blru, isNew))) + return ret; + + if (!isNew) + { + source->curblkno = blkno; + source->onblk = blru->size; + source->curblk = blru->blk; + return 0; + } + + xoff_t pos = blkno * source->blksize; + + if (pos != data.referenceFilePos) + { + if (!setFilePosition(data.referenceFileName, data.referenceFile, pos, data.ioStats)) + return 1; + data.referenceFilePos = pos; + } + + u64 nread; + if (!readFile(data.referenceFileName, data.referenceFile, (void*)blru->blk, source->blksize, nread, data.ioStats)) + return 1; + data.referenceFilePos += nread; + + source->curblk = blru->blk; + source->curblkno = blkno; + source->onblk = nread; + blru->size = nread; + blru->blkno = blkno; + + return 0; +} + +bool xDeltaInitBlockData(xDeltaGetBlockData& data, xd3_stream& stream, xd3_source& source, xoff_t blkno) +{ + data.list.prev = &data.list; + data.list.next = &data.list; + + u64 lruSize = MAX_LRU_SIZE * sizeof(xDeltaLruBlock); + data.lru = new xDeltaLruBlock[lruSize]; + memset(data.lru, 0, lruSize); + data.lru[0].blk = data.lruData = new u8[SOURCEWINSIZE]; + data.lru[0].blkno = XD3_INVALID_OFFSET; + data.lruSize = 1; + pushBack(data.list, &data.lru[0]); + usize_t blksize = SOURCEWINSIZE; + + source.blksize = blksize; + source.curblkno = XD3_INVALID_OFFSET; + source.curblk = NULL; + source.max_winsize = SOURCEWINSIZE; + + if (xDeltaGetBlock(&stream, &source, blkno) != 0) + return false; + + if (source.onblk < blksize) + source.onlastblk = data.referenceFileSize; + + if (data.referenceFileSize > SOURCEWINSIZE) + { + blksize = SOURCEWINSIZE / MAX_LRU_SIZE; + source.blksize = blksize; + source.onblk = blksize; + source.onlastblk = blksize; + source.max_blkno = MAX_LRU_SIZE - 1; + + data.lru[0].size = blksize; + data.lruSize = MAX_LRU_SIZE; + + for (uint i = 1; i < data.lruSize; i += 1) + { + data.lru[i].blk = data.lru[0].blk + (blksize * i); + data.lru[i].blkno = i; + data.lru[i].size = blksize; + pushBack(data.list, &data.lru[i]); + } + } + + return true; +} + +void xDeltaDestroyBlockData(xDeltaGetBlockData& data) +{ + delete[] data.lruData; + delete[] data.lru; +} + +bool xDeltaCode(bool encode, Socket& socket, const wchar_t* referenceFileName, FileHandle referenceFile, u64 referenceFileSize, const wchar_t* newFileName, FileHandle newFile, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats, u64& socketTime, u64& socketSize, u64& codeTime) +{ + uint flags = 0; + //flags |= XD3_ADLER32; + //flags |= XD3_COMPLEVEL_1; + flags |= XD3_COMPLEVEL_3; + //flags |= XD3_COMPLEVEL_9; + flags |= XD3_SEC_LZMA; + + xDeltaGetBlockData blockData{ ioStats, referenceFileName, referenceFile, referenceFileSize }; + blockData.fifo = encode; + + xd3_config config; + xd3_init_config(&config, flags); + config.winsize = min(referenceFileSize, XD3_DEFAULT_WINSIZE); + config.getblk = xDeltaGetBlock; + config.opaque = &blockData; + config.iopt_size = XD3_DEFAULT_IOPT_SIZE; + config.sprevsz = XD3_DEFAULT_SPREVSZ; + config.smatch_cfg = XD3_SMATCH_FAST; + + xd3_stream stream; + memset(&stream, 0, sizeof(stream)); + xd3_config_stream(&stream, &config); + + xd3_source source; + memset(&source, 0, sizeof(source)); + if (!xDeltaInitBlockData(blockData, stream, source, 0)) + return false; + ScopeGuard _bd([&]() { xDeltaDestroyBlockData(blockData); }); + xd3_set_source_and_size(&stream, &source, referenceFileSize); + + static_assert(XD3_ALLOCSIZE <= CopyContextBufferSize, ""); + u64 bufSize = XD3_ALLOCSIZE;// CopyContextBufferSize; + + u8* inputBuf = bufSize <= CopyContextBufferSize ? copyContext.buffers[1] : (u8*)malloc(bufSize); + u64 inputBufRead = 0; + u8* outputBuf = bufSize <= CopyContextBufferSize ? copyContext.buffers[2] : (u8*)malloc(bufSize); + u64 outputBufWritten = 0; + ScopeGuard _([&]() + { + xd3_close_stream(&stream); + xd3_free_stream(&stream); + + if (bufSize > CopyContextBufferSize) + { + free(inputBuf); + free(outputBuf); + } + }); + + bool outSuccess = true; + do + { + if (encode) + { + if (!readFile(newFileName, newFile, (void*)inputBuf, bufSize, inputBufRead, ioStats)) + return false; + } + else + { + TimerScope _(socketTime); + if (!receiveData(socket, &inputBufRead, sizeof(inputBufRead))) + return false; + if (inputBufRead) + if (!receiveData(socket, (void*)inputBuf, inputBufRead)) + return false; + socketSize += sizeof(inputBufRead) + inputBufRead; + } + + if (!outSuccess) + continue; + + if (inputBufRead < bufSize) + xd3_set_flags(&stream, XD3_FLUSH | stream.flags); + xd3_avail_input(&stream, inputBuf, inputBufRead); + + process: + + int ret = 0; + { + TimerScope _(codeTime); + if (encode) + ret = xd3_encode_input(&stream); + else + ret = xd3_decode_input(&stream); + } + + switch (ret) + { + case XD3_INPUT: + continue; + + case XD3_OUTPUT: + if (encode) + { + uint leftInBuffer = bufSize - outputBufWritten; + uint writeToBufferSize = stream.avail_out < leftInBuffer ? stream.avail_out : leftInBuffer; + memcpy(outputBuf + outputBufWritten, stream.next_out, writeToBufferSize); + outputBufWritten += writeToBufferSize; + if (outputBufWritten == bufSize) + { + if (!sendData(socket, &outputBufWritten, sizeof(outputBufWritten))) + return false; + if (!sendData(socket, outputBuf, outputBufWritten)) + return false; + uint left = stream.avail_out - writeToBufferSize; + memcpy(outputBuf, stream.next_out + writeToBufferSize, left); + outputBufWritten = left; + } + + } + else + { + outSuccess = outSuccess && writeFile(newFileName, newFile, stream.next_out, stream.avail_out, ioStats); + } + xd3_consume_output(&stream); + goto process; + + case XD3_GOTHEADER: + goto process; + + case XD3_WINSTART: + goto process; + + case XD3_WINFINISH: + goto process; + + default: + logErrorf(L"Xdelta failed %ls file %ls: %hs %d !!!\n", (encode ? L"compressing" : L"decompressing"), newFileName, stream.msg, ret); + outSuccess = false; + } + + } while (inputBufRead == bufSize); + + if (encode) + { + if (!sendData(socket, &outputBufWritten, sizeof(outputBufWritten))) + return false; + if (outputBufWritten) + if (!sendData(socket, outputBuf, outputBufWritten)) + return false; + } + + return outSuccess; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} + +#endif diff --git a/source/EACopyDeltaZstd.h b/source/EACopyDeltaZstd.h index ec1d614..efa2daa 100644 --- a/source/EACopyDeltaZstd.h +++ b/source/EACopyDeltaZstd.h @@ -1,263 +1,263 @@ -// (c) Electronic Arts. All Rights Reserved. - -#if defined(EACOPY_ALLOW_DELTA_COPY) - -#include -#define ZSTD_STATIC_LINKING_ONLY -#include "../external/zstd/lib/zstd.h" -#include - -namespace eacopy -{ - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -bool zstdCode(bool encode, Socket& socket, const wchar_t* referenceFileName, FileHandle referenceFile, u64 referenceFileSize, const wchar_t* newFileName, FileHandle newFile, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats, u64& socketTime, u64& socketSize, u64& codeTime) -{ - enum { CompressedNetworkTransferChunkSize = NetworkTransferChunkSize / 4 }; - enum { CompressBoundReservation = 32 * 1024 }; - - - u8* copyContextData = copyContext.buffers[0]; // All buffers are allocated in the same allocation.. so if we use less we can just linearly use them - - u8* inBuffer; - u8* outBuffer; - u8* referenceBuffer; - - if (encode) - { - inBuffer = copyContextData; // In-buffer for file - outBuffer = inBuffer + CompressedNetworkTransferChunkSize - CompressBoundReservation; // Out-buffer to socket - referenceBuffer = outBuffer + CompressedNetworkTransferChunkSize; // Buffer for reference file - } - else - { - inBuffer = copyContextData; // In-buffer for socket - outBuffer = inBuffer + CompressedNetworkTransferChunkSize; // Out-buffer for destination file - referenceBuffer = outBuffer + CopyContextBufferSize; // Buffer for reference file - } - - bool freeReferenceMemory = false; - void* referenceMemory = referenceBuffer; - - size_t referenceBufferSize = CopyContextBufferSize * 3 - (referenceBuffer - copyContextData); - if (referenceFileSize > referenceBufferSize) - { - referenceMemory = new u8[referenceFileSize]; - freeReferenceMemory = true; - } - ScopeGuard _([&]() { if (freeReferenceMemory) delete[] referenceMemory; }); - - u64 referenceFileRead; - if (!readFile(referenceFileName, referenceFile, referenceMemory, referenceFileSize, referenceFileRead, ioStats)) - return false; - - if (referenceFileRead != referenceFileSize) - { - logErrorf(L"Failed to read entire ref file %ls", referenceFileName); - return false; - } - - ZSTD_outBuffer buffOut; - ZSTD_inBuffer buffIn; - - if (encode) - { - if (!copyContext.compContext) - copyContext.compContext = ZSTD_createCCtx(); - auto cctx = (ZSTD_CCtx*)copyContext.compContext; - ZSTD_CCtx_reset(cctx, ZSTD_reset_session_and_parameters); - - bool useBufferedIO = true; - size_t res = ZSTD_CCtx_refPrefix(cctx, referenceMemory, referenceFileSize); - if (ZSTD_isError(res)) - { - logErrorf(L"Fail setting up ref file %ls when compressing %ls: %hs", referenceFileName, newFileName, ZSTD_getErrorName(res)); - return false; - } - - ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1); - ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 3); - ZSTD_CCtx_setParameter(cctx, ZSTD_c_contentSizeFlag, 1); /* always enable content size when available (note: supposed to be default) */ - ZSTD_CCtx_setParameter(cctx, ZSTD_c_windowLog, ZSTD_WINDOWLOG_MAX); - ZSTD_CCtx_setPledgedSrcSize(cctx, newFileSize); - ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableDedicatedDictSearch, 1); - //ZSTD_CCtx_setParameter(cctx, ZSTD_c_literalCompressionMode, ZSTD_lcm_auto); - - u64 left = newFileSize; - while (left) - { - // Make sure the amount of data we've read fit in the destination compressed buffer - static_assert(ZSTD_COMPRESSBOUND(CompressedNetworkTransferChunkSize - CompressBoundReservation) <= CompressedNetworkTransferChunkSize - 4, ""); - - uint toRead = min(left, u64(CompressedNetworkTransferChunkSize - CompressBoundReservation)); - uint toReadAligned = useBufferedIO ? toRead : (((toRead + 4095) / 4096) * 4096); - u64 read; - if (!readFile(newFileName, newFile, inBuffer, toReadAligned, read, ioStats)) - { - if (GetLastError() != ERROR_IO_PENDING) - { - logErrorf(L"Fail reading file %ls: %ls", newFileName, getLastErrorText().c_str()); - return false; - } - } - left -= read; - - // Use the first 4 bytes to write size of buffer.. can probably be replaced with zstd header instead - u8* destBuf = outBuffer; - u64 startCompressTime = getTime(); - buffOut.dst = destBuf + 4; - buffOut.size = CompressedNetworkTransferChunkSize - 4; - buffOut.pos = 0; - buffIn.src = inBuffer; - buffIn.size = read; - buffIn.pos = 0; - - ZSTD_EndDirective directive = ZSTD_e_continue; - if (left == 0) - directive = ZSTD_e_end; - - res = ZSTD_compressStream2(cctx, &buffOut, &buffIn, directive); - if (ZSTD_isError(res)) - { - logErrorf(L"Fail compressing file %ls: %hs", newFileName, ZSTD_getErrorName(res)); - return false; - } - - if (buffIn.pos != buffIn.size) - { - logErrorf(L"ERROR.. code path not implemented since I'm not sure if this can happen.. report this to if experienced!!"); - return false; - } - - size_t compressedSize = buffOut.pos; - if (compressedSize == 0) - continue; - - u64 compressTime = getTime() - startCompressTime; - - *(uint*)destBuf = uint(compressedSize); - - uint sendBytes = compressedSize + 4; - u64 startSendTime = getTime(); - if (!sendData(socket, destBuf, sendBytes)) - return false; - u64 sendTime = getTime() - startSendTime; - - } - - // Send last size 0 entry to tell client we're done - uint terminator = 0; - if (!sendData(socket, &terminator, sizeof(terminator))) - return false; - } - else - { - if (!copyContext.decompContext) - copyContext.decompContext = ZSTD_createDCtx(); - auto dctx = (ZSTD_DCtx*)copyContext.decompContext; - ZSTD_DCtx_reset(dctx, ZSTD_reset_session_and_parameters); - ZSTD_DCtx_setMaxWindowSize(dctx, newFileSize); - size_t res = ZSTD_DCtx_refPrefix(dctx, referenceMemory, referenceFileSize); - if (ZSTD_isError(res)) - { - logErrorf(L"Zstd error %ls: %hs", referenceFileName, ZSTD_getErrorName(res)); - return false; - } - - bool outSuccess = true; - - u64 read = 0; - - u64 totalReceivedSize = 0; - - while (true) - { - u64 startRecvTime = getTime(); - - uint compressedSize; - if (!receiveData(socket, &compressedSize, sizeof(uint))) - return false; - - totalReceivedSize += compressedSize + sizeof(uint); - - if (compressedSize == 0) // We're done - break; - - if (compressedSize > NetworkTransferChunkSize) - { - logErrorf(L"Compressed size is bigger than compression buffer capacity"); - return false; - } - - uint toRead = compressedSize; - - while (toRead > 0) - { - WSABUF wsabuf; - wsabuf.len = toRead; - wsabuf.buf = (char*)inBuffer; - uint recvBytes = 0; - uint flags = MSG_WAITALL; - int fileRes = WSARecv(socket.socket, &wsabuf, 1, &recvBytes, &flags, NULL, NULL); - if (fileRes != 0) - { - logErrorf(L"recv failed with error: %ls", getErrorText(getLastNetworkError()).c_str()); - return false; - } - if (recvBytes == 0) - { - logErrorf(L"Socket closed before full file has been received (%ls)", newFileName); - return false; - } - - socketSize += recvBytes; - toRead -= recvBytes; - } - socketTime += getTime() - startRecvTime; - - - size_t decompressedSize = 0; - - if (outSuccess) - { - u64 startDecompressTime = getTime(); - - buffOut.dst = outBuffer; - buffOut.size = CopyContextBufferSize; - buffOut.pos = 0; - buffIn.src = inBuffer; - buffIn.size = compressedSize; - buffIn.pos = 0; - res = ZSTD_decompressStream(dctx, &buffOut, &buffIn); - if (ZSTD_isError(res)) - { - logErrorf(L"Decompression error while decompressing %u bytes after reading %llu, for file %ls: %hs", compressedSize, read, newFileName, ZSTD_getErrorName(res)); - // Don't return false since we can still continue copying other files after getting this error - outSuccess = false; - } - - if (buffIn.pos != buffIn.size) - { - logErrorf(L"ERROR.. code path not implemented since I'm not sure if this can happen.. report this to if experienced!!"); - outSuccess = false; - } - decompressedSize = buffOut.pos; - codeTime += getTime() - startDecompressTime; - } - - - outSuccess = outSuccess && writeFile(newFileName, newFile, outBuffer, decompressedSize, ioStats); - - read += decompressedSize; - } - } - - return true; -} - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -} - -#endif +// (c) Electronic Arts. All Rights Reserved. + +#if defined(EACOPY_ALLOW_DELTA_COPY) + +#include +#define ZSTD_STATIC_LINKING_ONLY +#include "../external/zstd/lib/zstd.h" +#include + +namespace eacopy +{ + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +bool zstdCode(bool encode, Socket& socket, const wchar_t* referenceFileName, FileHandle referenceFile, u64 referenceFileSize, const wchar_t* newFileName, FileHandle newFile, u64 newFileSize, NetworkCopyContext& copyContext, IOStats& ioStats, u64& socketTime, u64& socketSize, u64& codeTime) +{ + enum { CompressedNetworkTransferChunkSize = NetworkTransferChunkSize / 4 }; + enum { CompressBoundReservation = 32 * 1024 }; + + + u8* copyContextData = copyContext.buffers[0]; // All buffers are allocated in the same allocation.. so if we use less we can just linearly use them + + u8* inBuffer; + u8* outBuffer; + u8* referenceBuffer; + + if (encode) + { + inBuffer = copyContextData; // In-buffer for file + outBuffer = inBuffer + CompressedNetworkTransferChunkSize - CompressBoundReservation; // Out-buffer to socket + referenceBuffer = outBuffer + CompressedNetworkTransferChunkSize; // Buffer for reference file + } + else + { + inBuffer = copyContextData; // In-buffer for socket + outBuffer = inBuffer + CompressedNetworkTransferChunkSize; // Out-buffer for destination file + referenceBuffer = outBuffer + CopyContextBufferSize; // Buffer for reference file + } + + bool freeReferenceMemory = false; + void* referenceMemory = referenceBuffer; + + size_t referenceBufferSize = CopyContextBufferSize * 3 - (referenceBuffer - copyContextData); + if (referenceFileSize > referenceBufferSize) + { + referenceMemory = new u8[referenceFileSize]; + freeReferenceMemory = true; + } + ScopeGuard _([&]() { if (freeReferenceMemory) delete[] referenceMemory; }); + + u64 referenceFileRead; + if (!readFile(referenceFileName, referenceFile, referenceMemory, referenceFileSize, referenceFileRead, ioStats)) + return false; + + if (referenceFileRead != referenceFileSize) + { + logErrorf(L"Failed to read entire ref file %ls", referenceFileName); + return false; + } + + ZSTD_outBuffer buffOut; + ZSTD_inBuffer buffIn; + + if (encode) + { + if (!copyContext.compContext) + copyContext.compContext = ZSTD_createCCtx(); + auto cctx = (ZSTD_CCtx*)copyContext.compContext; + ZSTD_CCtx_reset(cctx, ZSTD_reset_session_and_parameters); + + bool useBufferedIO = true; + size_t res = ZSTD_CCtx_refPrefix(cctx, referenceMemory, referenceFileSize); + if (ZSTD_isError(res)) + { + logErrorf(L"Fail setting up ref file %ls when compressing %ls: %hs", referenceFileName, newFileName, ZSTD_getErrorName(res)); + return false; + } + + ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableLongDistanceMatching, 1); + ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 3); + ZSTD_CCtx_setParameter(cctx, ZSTD_c_contentSizeFlag, 1); /* always enable content size when available (note: supposed to be default) */ + ZSTD_CCtx_setParameter(cctx, ZSTD_c_windowLog, ZSTD_WINDOWLOG_MAX); + ZSTD_CCtx_setPledgedSrcSize(cctx, newFileSize); + ZSTD_CCtx_setParameter(cctx, ZSTD_c_enableDedicatedDictSearch, 1); + //ZSTD_CCtx_setParameter(cctx, ZSTD_c_literalCompressionMode, ZSTD_lcm_auto); + + u64 left = newFileSize; + while (left) + { + // Make sure the amount of data we've read fit in the destination compressed buffer + static_assert(ZSTD_COMPRESSBOUND(CompressedNetworkTransferChunkSize - CompressBoundReservation) <= CompressedNetworkTransferChunkSize - 4, ""); + + uint toRead = min(left, u64(CompressedNetworkTransferChunkSize - CompressBoundReservation)); + uint toReadAligned = useBufferedIO ? toRead : (((toRead + 4095) / 4096) * 4096); + u64 read; + if (!readFile(newFileName, newFile, inBuffer, toReadAligned, read, ioStats)) + { + if (GetLastError() != ERROR_IO_PENDING) + { + logErrorf(L"Fail reading file %ls: %ls", newFileName, getLastErrorText().c_str()); + return false; + } + } + left -= read; + + // Use the first 4 bytes to write size of buffer.. can probably be replaced with zstd header instead + u8* destBuf = outBuffer; + u64 startCompressTime = getTime(); + buffOut.dst = destBuf + 4; + buffOut.size = CompressedNetworkTransferChunkSize - 4; + buffOut.pos = 0; + buffIn.src = inBuffer; + buffIn.size = read; + buffIn.pos = 0; + + ZSTD_EndDirective directive = ZSTD_e_continue; + if (left == 0) + directive = ZSTD_e_end; + + res = ZSTD_compressStream2(cctx, &buffOut, &buffIn, directive); + if (ZSTD_isError(res)) + { + logErrorf(L"Fail compressing file %ls: %hs", newFileName, ZSTD_getErrorName(res)); + return false; + } + + if (buffIn.pos != buffIn.size) + { + logErrorf(L"ERROR.. code path not implemented since I'm not sure if this can happen.. report this to if experienced!!"); + return false; + } + + size_t compressedSize = buffOut.pos; + if (compressedSize == 0) + continue; + + u64 compressTime = getTime() - startCompressTime; + + *(uint*)destBuf = uint(compressedSize); + + uint sendBytes = compressedSize + 4; + u64 startSendTime = getTime(); + if (!sendData(socket, destBuf, sendBytes)) + return false; + u64 sendTime = getTime() - startSendTime; + + } + + // Send last size 0 entry to tell client we're done + uint terminator = 0; + if (!sendData(socket, &terminator, sizeof(terminator))) + return false; + } + else + { + if (!copyContext.decompContext) + copyContext.decompContext = ZSTD_createDCtx(); + auto dctx = (ZSTD_DCtx*)copyContext.decompContext; + ZSTD_DCtx_reset(dctx, ZSTD_reset_session_and_parameters); + ZSTD_DCtx_setMaxWindowSize(dctx, newFileSize); + size_t res = ZSTD_DCtx_refPrefix(dctx, referenceMemory, referenceFileSize); + if (ZSTD_isError(res)) + { + logErrorf(L"Zstd error %ls: %hs", referenceFileName, ZSTD_getErrorName(res)); + return false; + } + + bool outSuccess = true; + + u64 read = 0; + + u64 totalReceivedSize = 0; + + while (true) + { + u64 startRecvTime = getTime(); + + uint compressedSize; + if (!receiveData(socket, &compressedSize, sizeof(uint))) + return false; + + totalReceivedSize += compressedSize + sizeof(uint); + + if (compressedSize == 0) // We're done + break; + + if (compressedSize > NetworkTransferChunkSize) + { + logErrorf(L"Compressed size is bigger than compression buffer capacity"); + return false; + } + + uint toRead = compressedSize; + + while (toRead > 0) + { + WSABUF wsabuf; + wsabuf.len = toRead; + wsabuf.buf = (char*)inBuffer; + uint recvBytes = 0; + uint flags = MSG_WAITALL; + int fileRes = WSARecv(socket.socket, &wsabuf, 1, &recvBytes, &flags, NULL, NULL); + if (fileRes != 0) + { + logErrorf(L"recv failed with error: %ls", getErrorText(getLastNetworkError()).c_str()); + return false; + } + if (recvBytes == 0) + { + logErrorf(L"Socket closed before full file has been received (%ls)", newFileName); + return false; + } + + socketSize += recvBytes; + toRead -= recvBytes; + } + socketTime += getTime() - startRecvTime; + + + size_t decompressedSize = 0; + + if (outSuccess) + { + u64 startDecompressTime = getTime(); + + buffOut.dst = outBuffer; + buffOut.size = CopyContextBufferSize; + buffOut.pos = 0; + buffIn.src = inBuffer; + buffIn.size = compressedSize; + buffIn.pos = 0; + res = ZSTD_decompressStream(dctx, &buffOut, &buffIn); + if (ZSTD_isError(res)) + { + logErrorf(L"Decompression error while decompressing %u bytes after reading %llu, for file %ls: %hs", compressedSize, read, newFileName, ZSTD_getErrorName(res)); + // Don't return false since we can still continue copying other files after getting this error + outSuccess = false; + } + + if (buffIn.pos != buffIn.size) + { + logErrorf(L"ERROR.. code path not implemented since I'm not sure if this can happen.. report this to if experienced!!"); + outSuccess = false; + } + decompressedSize = buffOut.pos; + codeTime += getTime() - startDecompressTime; + } + + + outSuccess = outSuccess && writeFile(newFileName, newFile, outBuffer, decompressedSize, ioStats); + + read += decompressedSize; + } + } + + return true; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} + +#endif diff --git a/source/EACopyService.cpp b/source/EACopyService.cpp index 43fcbf9..4cc6602 100644 --- a/source/EACopyService.cpp +++ b/source/EACopyService.cpp @@ -1,571 +1,571 @@ -// (c) Electronic Arts. All Rights Reserved. - -#include "EACopyServer.h" -#include -#include - -namespace eacopy -{ - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// If port is changed from default, name will change to include port. (Like EACopyService1339 if port is set to 1339) -#if defined(_DEBUG) -#define DEFAULTSERVICENAME L"EACopyServiceDEBUG" -#else -#define DEFAULTSERVICENAME L"EACopyService" -#endif -#define DEFAULTSERVICEDISPLAYNAME L"EACopy Accelerator Service" - -wchar_t g_serviceName[1024]; -wchar_t g_serviceDisplayName[1024]; - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -// internal variables -SERVICE_STATUS g_ssStatus; // current status of the service -SERVICE_STATUS_HANDLE g_sshStatusHandle; -DWORD g_dwErr; -bool g_runningAsService; -bool g_serviceCrashed; // This is set when we're exiting 'uncleanly'. I.e exiting cleanly but due - // to the service being compromised due to SEH -int g_argc; -wchar_t** g_argv; -Server g_server; - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -void printHelp() -{ - logInfoLinef(L"----------------------------------------------------------------------------------------"); - logInfoLinef(L" EACopyService v%ls (%u) - Copy Accelerator. (c) Electronic Arts. All Rights Reserved.", getServerVersionString().c_str(), ProtocolVersion); - logInfoLinef(L"----------------------------------------------------------------------------------------"); - logInfoLinef(); - logInfoLinef(L" Usage :: EACopyService [options]"); - logInfoLinef(); - logInfoLinef(L" /P:n :: Port that server will listen on (defaults to %i).", DefaultPort); - logInfoLinef(L" /IP:n :: Local ip that server will listen on (defaults to first found)."); - logInfoLinef(L" /HASH :: Will use hash of files to try to reduce copying."); - logInfoLinef(L" /UNSECURE :: Will not check if client has access to network path using smb."); - logInfoLinef(L" /HISTORY:n :: Max number of files tracked in history (defaults to %i).", DefaultHistorySize); - logInfoLinef(L" /NOLINKS :: Disables hard links."); - logInfoLinef(L" /LINKMIN:bytes :: Disable links for files smaller than bytes size."); - logInfoLinef(L" /LINKBYNAME :: Will link based on name only and skip relative path."); - logInfoLinef(L" /LINK [dir]... :: Will prepopulate file database with files that can be linked to"); - logInfoLinef(L" /OFFLOAD :: Let server do local copying as fallback when link fails."); - logInfoLinef(); - logInfoLinef(L" /J :: Enable unbuffered I/O for all files."); - logInfoLinef(L" /NJ :: Disable unbuffered I/O for all files."); - logInfoLinef(); - logInfoLinef(L" /LOG:file :: output status to LOG file (overwrite existing log)."); - logInfoLinef(L" /VERBOSE :: output debug logging."); - logInfoLinef(); - logInfoLinef(L" /INSTALL :: Install and start as auto starting windows service."); - logInfoLinef(L" Will start with parameters provided with /INSTALL call"); - logInfoLinef(L" /REMOVE :: Stop and remove service."); - logInfoLinef(); - logInfoLinef(L" /USER:u :: User used when installing service. Defaults to LocalSystem account"); - logInfoLinef(L" /PASS:p :: Password used when installing service"); - logInfoLinef(); -} - -bool readSettings(ServerSettings& outSettings, WString& outLogFileName, uint argc, wchar_t** argv) -{ - const wchar_t* activeCommand = L""; - - // Read options - uint argIndex = 1; - while (argIndex < argc) - { - wchar_t* arg = argv[argIndex++]; - - if (*arg == '/') - activeCommand = L""; - - if (startsWithIgnoreCase(arg, L"/P:")) - { - outSettings.listenPort = _wtoi(arg + 3); - } - else if (startsWithIgnoreCase(arg, L"/IP:")) - { - outSettings.listenIp = arg + 4; - } - else if (equalsIgnoreCase(arg, L"/HASH")) - { - outSettings.useHash = true; - } - else if (equalsIgnoreCase(arg, L"/UNSECURE")) - { - outSettings.useSecurityFile = false; - } - else if (startsWithIgnoreCase(arg, L"/HISTORY:")) - { - outSettings.maxHistory = _wtoi(arg + 9); - } - else if (equalsIgnoreCase(arg, L"/NOLINKS")) - { - outSettings.useLinksThreshold = ~u64(0); - } - else if (startsWithIgnoreCase(arg, L"/LINKMIN:")) - { - outSettings.useLinksThreshold = _wtoi(arg + 10); - } - else if (startsWithIgnoreCase(arg, L"/LINK")) - { - activeCommand = L"LINK"; - } - else if (equalsIgnoreCase(arg, L"/LINKBYNAME")) - { - outSettings.useLinksRelativePath = false; - } - else if (equalsIgnoreCase(arg, L"/OFFLOAD")) - { - outSettings.useOdx = true; - } - else if (equalsIgnoreCase(arg, L"/J")) - { - outSettings.useBufferedIO = UseBufferedIO_Enabled; - } - else if (equalsIgnoreCase(arg, L"/NJ")) - { - outSettings.useBufferedIO = UseBufferedIO_Disabled; - } - else if(startsWithIgnoreCase(arg, L"/LOG:")) - { - outLogFileName = arg + 5; - } - else if(startsWithIgnoreCase(arg, L"/USER:")) - { - outSettings.user = arg + 6; - } - else if(startsWithIgnoreCase(arg, L"/PASS:")) - { - outSettings.password = arg + 6; - } - else if (equalsIgnoreCase(arg, L"/VERBOSE")) - { - outSettings.logDebug = true; - } - else - { - if (equalsIgnoreCase(activeCommand, L"link")) - { - outSettings.additionalLinkDirectories.push_back(getCleanedupPath(arg)); - } - else - { - logErrorf(L"Unknown option %ls. Use /? for help", arg); - return false; - } - } - } - return true; -} - -void initializeServiceName() -{ - wcscpy(g_serviceName, DEFAULTSERVICENAME); - wcscpy(g_serviceDisplayName, DEFAULTSERVICEDISPLAYNAME); - for (uint i=1; i!=g_argc; ++i) - { - if (!startsWithIgnoreCase(g_argv[i], L"/P:")) - continue; - wcscat(g_serviceName, g_argv[i] + 3); - wcscat(g_serviceDisplayName, L" (port "); - wcscat(g_serviceDisplayName, g_argv[i] + 3); - wcscat(g_serviceDisplayName, L")"); - break; - } -} - -void addLastErrorToMessageLog(const wchar_t* lpszMsg) -{ - if (!g_runningAsService) - return; - - g_dwErr = GetLastError(); - HANDLE hEventSource = RegisterEventSourceW(NULL, g_serviceName); - if (!hEventSource) - return; - - wchar_t szMsg[eacopy_sizeof_array(g_serviceName) + 100 ]; - swprintf_s(szMsg,eacopy_sizeof_array(g_serviceName) + 100, L"%ls error: %d", g_serviceName, g_dwErr); - const wchar_t* lpszStrings[2] = { szMsg, lpszMsg }; - ReportEventW(hEventSource, EVENTLOG_ERROR_TYPE, 0, 0, NULL, 2, 0, &lpszStrings[0], NULL); - DeregisterEventSource(hEventSource); -} - -VOID addInfoToMessageLog(const wchar_t* lpszMsg) -{ - if (!g_runningAsService) - return; - HANDLE hEventSource = RegisterEventSourceW(NULL, g_serviceName); - if (!hEventSource) - return; - const wchar_t* lpszStrings[1] = { lpszMsg }; - ReportEventW(hEventSource, EVENTLOG_INFORMATION_TYPE, 0, 0, NULL, 1, 0, &lpszStrings[0], NULL); - DeregisterEventSource(hEventSource); -} - -BOOL reportServiceStatus(DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD dwWaitHint) -{ - if (!g_runningAsService) // when debugging we don't report to the SCM - return TRUE; - - if (dwCurrentState == SERVICE_START_PENDING) - g_ssStatus.dwControlsAccepted = 0; - else - g_ssStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP | SERVICE_ACCEPT_SHUTDOWN | SERVICE_ACCEPT_PAUSE_CONTINUE; - - g_ssStatus.dwCurrentState = dwCurrentState; - g_ssStatus.dwWin32ExitCode = dwWin32ExitCode; - g_ssStatus.dwWaitHint = dwWaitHint; - - static DWORD dwCheckPoint = 1; - - if (dwCurrentState == SERVICE_RUNNING || dwCurrentState == SERVICE_STOPPED) - g_ssStatus.dwCheckPoint = 0; - else - g_ssStatus.dwCheckPoint = dwCheckPoint++; - - - // Report the status of the service to the service control manager. - // - - BOOL res = SetServiceStatus( g_sshStatusHandle, &g_ssStatus); - if (res) - return TRUE; - - wchar_t errMsg[512]; - swprintf_s(errMsg, L"SetServiceStatus failed: '%ls'", getLastErrorText().c_str()); - - addLastErrorToMessageLog(errMsg); - return res; -} - -DWORD WINAPI serviceControl(DWORD dwControl, DWORD dwEventType, LPVOID lpEventData, LPVOID lpContext) -{ - switch (dwControl) - { - case SERVICE_CONTROL_STOP: - addInfoToMessageLog(L"Stop"); - reportServiceStatus(SERVICE_STOP_PENDING, NO_ERROR, 60000); - g_server.stop(); - return NO_ERROR; - - case SERVICE_CONTROL_SHUTDOWN: - addInfoToMessageLog(L"Shutdown"); - reportServiceStatus(SERVICE_STOP_PENDING, NO_ERROR, 60000); - g_server.stop(); - return NO_ERROR; - - case SERVICE_CONTROL_PAUSE: - addInfoToMessageLog(L"Pause"); - reportServiceStatus(SERVICE_PAUSE_PENDING, NO_ERROR, 0); - //g_server.pause(); - reportServiceStatus(SERVICE_PAUSED, NO_ERROR, 0); - return NO_ERROR; - - case SERVICE_CONTROL_CONTINUE: - addInfoToMessageLog(L"Continue"); - reportServiceStatus(SERVICE_CONTINUE_PENDING, NO_ERROR, 0); - //g_server.resume(); - reportServiceStatus(SERVICE_RUNNING, NO_ERROR, 0); - return NO_ERROR; - - case SERVICE_CONTROL_INTERROGATE: - // Fall through to send current status - break; - - default: - // invalid control code - // - return ERROR_CALL_NOT_IMPLEMENTED; - } - - reportServiceStatus(g_ssStatus.dwCurrentState, NO_ERROR, 0); - return NO_ERROR; -} - -void WINAPI serviceMain(DWORD dwArgc, LPWSTR *lpszArgv) -{ - initializeServiceName(); - - // register our service control handler: - // - g_sshStatusHandle = RegisterServiceCtrlHandlerExW(g_serviceName, serviceControl, NULL); - - if (!g_sshStatusHandle) - { - wchar_t msg[256]; - swprintf_s(msg, L"Failed to register control handler! err = %ls", getLastErrorText().c_str()); - addLastErrorToMessageLog(msg); - return; - } - - // SERVICE_STATUS members that don't change in example - // - g_ssStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; - g_ssStatus.dwServiceSpecificExitCode = 0; - - WString logFileName; - ServerSettings settings; - Log log; - - // report the status to the service control manager. - // - if (!reportServiceStatus(SERVICE_START_PENDING, NO_ERROR, 3000)) - goto cleanup; - - if (!readSettings(settings, logFileName, g_argc, g_argv)) - goto cleanup; - - log.init(logFileName.c_str(), settings.logDebug, true); - g_server.start(settings, log, false, reportServiceStatus); - log.deinit(); - - if (g_serviceCrashed) - return; // Unclean shutdown -- do not report SERVICE_STOPPED since this - // would make Windows not restart the service. -cleanup: - reportServiceStatus(SERVICE_STOPPED, g_dwErr, 0); -} - -BOOL WINAPI ctrlEventHandler(DWORD dwCtrlType) -{ - if (dwCtrlType != CTRL_BREAK_EVENT && dwCtrlType != CTRL_C_EVENT) - return FALSE; - g_server.stop(); - return TRUE; -} - -int installService(int argc, wchar_t** argv) -{ - WString logFileName; - ServerSettings settings; - if (!readSettings(settings, logFileName, argc, argv)) - return 0; - - wchar_t szPath[512]; - if (GetModuleFileNameW(NULL, szPath, eacopy_sizeof_array(szPath)) == 0) - { - logErrorf(L"Unable to install %ls - %ls", g_serviceDisplayName, getLastErrorText().c_str()); - return -1; - } - - if (argc > 1) - { - for (int i = 1; i < argc; ++i) - { - if(startsWithIgnoreCase(argv[i], L"/USER") || startsWithIgnoreCase(argv[i], L"/PASS")) - continue; - wcscat_s(szPath, eacopy_sizeof_array(szPath), L" "); - wcscat_s(szPath, eacopy_sizeof_array(szPath), argv[i]); - } - } - - SC_HANDLE manager = OpenSCManager(NULL, NULL, SC_MANAGER_CONNECT | SC_MANAGER_CREATE_SERVICE); - if (!manager) - { - logErrorf(L"OpenSCManager failed - %ls", getLastErrorText().c_str()); - return -1; - } - ScopeGuard managerGuard([&]() { CloseServiceHandle(manager); }); - - const wchar_t* username = NULL; - const wchar_t* password = NULL; - if (!settings.user.empty()) - username = settings.user.c_str(); - if (!settings.password.empty()) - password = settings.password.c_str(); - - SC_HANDLE service = CreateServiceW(manager, g_serviceName, g_serviceDisplayName, - SERVICE_ALL_ACCESS, SERVICE_WIN32_OWN_PROCESS, SERVICE_AUTO_START, - SERVICE_ERROR_NORMAL, szPath, NULL, NULL, L"", username, password); - - if (!service) - service = OpenServiceW(manager, g_serviceName, SERVICE_ALL_ACCESS); - - if (!service) - { - logErrorf(L"CreateService failed - %ls", getLastErrorText().c_str()); - return -1; - } - ScopeGuard serviceGuard([&]() { CloseServiceHandle(service); }); - - SERVICE_DESCRIPTION desc; - desc.lpDescription = (LPSTR)"EACopy Accelerator Service."; - - if (!ChangeServiceConfig2(service, SERVICE_CONFIG_DESCRIPTION, &desc)) - { - logErrorf(L"failed to set service description, errcode = %d", GetLastError()); - return -1; - } - - // Enable automatic restart - - SERVICE_FAILURE_ACTIONS_FLAG flag = { TRUE }; - if (!ChangeServiceConfig2(service, SERVICE_CONFIG_FAILURE_ACTIONS_FLAG, &flag)) - { - logErrorf(L"Failed to set failure action flag, errcode = %d", GetLastError()); - return -1; - } - - SC_ACTION acts[] = { - { SC_ACTION_RESTART, 10000 /* milliseconds */ }, - { SC_ACTION_RESTART, 30000 /* milliseconds */ }, - { SC_ACTION_RESTART, 60000 /* milliseconds */ } - }; - SERVICE_FAILURE_ACTIONS actions = { 60 * 60 * 24, NULL, NULL, sizeof acts / sizeof(SC_ACTION), &acts[0] }; - - if (!ChangeServiceConfig2(service, SERVICE_CONFIG_FAILURE_ACTIONS, &actions)) - { - logErrorf(L"Failed to set failure actions, errcode = %d", GetLastError()); - return -1; - } - - logInfoLinef(L"%ls installed. commandline = %ls", g_serviceDisplayName, szPath ); - - // Autostart server - if (!StartServiceA(service, argc, (LPCSTR*)argv)) - { - logInfoLinef(); - DWORD error = GetLastError(); - if (error == ERROR_SERVICE_LOGON_FAILED) - logErrorf(L"Failed to start service due to a logon failure. MAKE SURE USER IS ADDED TO \"Log on as a service\" policy."); - else - logErrorf(L"Failed to start service - %ls", getErrorText(error).c_str()); - return -1; - } - - return 0; -} - -int removeService() -{ - SC_HANDLE manager = OpenSCManager(NULL, NULL, SC_MANAGER_CONNECT); - if (!manager) - { - logErrorf(L"OpenSCManager failed - %ls", getLastErrorText().c_str()); - return -1; - } - ScopeGuard managerGuard([&]() { CloseServiceHandle(manager); }); - - SC_HANDLE service = OpenServiceW(manager, g_serviceName, DELETE | SERVICE_STOP | SERVICE_QUERY_STATUS); - - if (!service) - { - logErrorf(L"OpenService failed - %ls", getLastErrorText().c_str()); - return -1; - } - ScopeGuard serviceGuard([&]() { CloseServiceHandle(service); }); - - if (ControlService(service, SERVICE_CONTROL_STOP, &g_ssStatus)) - { - logInfof(L"Stopping %ls.", g_serviceDisplayName); - Sleep(1000); - - while (QueryServiceStatus(service, &g_ssStatus)) - { - if (g_ssStatus.dwCurrentState != SERVICE_STOP_PENDING) - break; - logInfof(L"."); - Sleep(1000); - } - - logInfoLinef(); - if (g_ssStatus.dwCurrentState != SERVICE_STOPPED) - { - logErrorf(L"%ls failed to stop.", g_serviceDisplayName); - return -1; - } - - logInfoLinef(L"%ls stopped.", g_serviceDisplayName); - } - - if (!DeleteService(service)) - { - logErrorf(L"DeleteService failed - %ls", getLastErrorText().c_str()); - return -1; - } - - logInfoLinef(L"%ls removed.", g_serviceDisplayName); - return 0; -} - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -} // namespace eacopy - -int __cdecl wmain(int argc, wchar_t** argv) -{ - using namespace eacopy; - - g_argc = argc; - g_argv = argv; - - initializeServiceName(); - - - // Try to figure out some context based on which session we're running in - // and decide which mode to default to. - bool isInteractiveSession = false; - DWORD dwSessionId = 0; - if (ProcessIdToSessionId(GetCurrentProcessId(), &dwSessionId)) - if (dwSessionId != 0) - isInteractiveSession = true; - - // Started by service controller - if (!isInteractiveSession) - { - g_runningAsService = true; - SERVICE_TABLE_ENTRYW dispatchTable[] = { { (LPWSTR)g_serviceName, (LPSERVICE_MAIN_FUNCTIONW)&serviceMain}, { NULL, NULL} }; - if (!StartServiceCtrlDispatcherW(dispatchTable)) - addLastErrorToMessageLog(L"StartServiceCtrlDispatcherW failed."); - return 0; - } - - // Started by user - if (argc > 1 && equalsIgnoreCase(argv[1], L"/?")) - { - printHelp(); - return 0; - } - - // Check if it is an install or removal of service - int argIndex = 1; - while (argIndex < argc) - { - wchar_t* arg = argv[argIndex]; - if (equalsIgnoreCase(arg, L"/INSTALL")) - { - --argc; - argv[argIndex] = argv[argc]; // Remove own argument - argv[argc] = nullptr; - return installService(argc, argv); - } - else if (equalsIgnoreCase(arg, L"/REMOVE")) - { - return removeService(); - } - ++argIndex; - } - - // Normal run, just execute the server - SetConsoleCtrlHandler(ctrlEventHandler, TRUE); - - WString logFileName; - ServerSettings settings; - if (!readSettings(settings, logFileName, argc, argv)) - return -1; - - logInfoLinef(L"Server v%ls (%u) - Starting... (Add /? for help)", getServerVersionString().c_str(), ProtocolVersion); - - Log log; - log.init(logFileName.c_str(), settings.logDebug, true); - ScopeGuard logGuard([&]() { log.deinit(); }); - - g_server.start(settings, log, true, reportServiceStatus); - return 0; -} - +// (c) Electronic Arts. All Rights Reserved. + +#include "EACopyServer.h" +#include +#include + +namespace eacopy +{ + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// If port is changed from default, name will change to include port. (Like EACopyService1339 if port is set to 1339) +#if defined(_DEBUG) +#define DEFAULTSERVICENAME L"EACopyServiceDEBUG" +#else +#define DEFAULTSERVICENAME L"EACopyService" +#endif +#define DEFAULTSERVICEDISPLAYNAME L"EACopy Accelerator Service" + +wchar_t g_serviceName[1024]; +wchar_t g_serviceDisplayName[1024]; + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// internal variables +SERVICE_STATUS g_ssStatus; // current status of the service +SERVICE_STATUS_HANDLE g_sshStatusHandle; +DWORD g_dwErr; +bool g_runningAsService; +bool g_serviceCrashed; // This is set when we're exiting 'uncleanly'. I.e exiting cleanly but due + // to the service being compromised due to SEH +int g_argc; +wchar_t** g_argv; +Server g_server; + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +void printHelp() +{ + logInfoLinef(L"----------------------------------------------------------------------------------------"); + logInfoLinef(L" EACopyService v%ls (%u) - Copy Accelerator. (c) Electronic Arts. All Rights Reserved.", getServerVersionString().c_str(), ProtocolVersion); + logInfoLinef(L"----------------------------------------------------------------------------------------"); + logInfoLinef(); + logInfoLinef(L" Usage :: EACopyService [options]"); + logInfoLinef(); + logInfoLinef(L" /P:n :: Port that server will listen on (defaults to %i).", DefaultPort); + logInfoLinef(L" /IP:n :: Local ip that server will listen on (defaults to first found)."); + logInfoLinef(L" /HASH :: Will use hash of files to try to reduce copying."); + logInfoLinef(L" /UNSECURE :: Will not check if client has access to network path using smb."); + logInfoLinef(L" /HISTORY:n :: Max number of files tracked in history (defaults to %i).", DefaultHistorySize); + logInfoLinef(L" /NOLINKS :: Disables hard links."); + logInfoLinef(L" /LINKMIN:bytes :: Disable links for files smaller than bytes size."); + logInfoLinef(L" /LINKBYNAME :: Will link based on name only and skip relative path."); + logInfoLinef(L" /LINK [dir]... :: Will prepopulate file database with files that can be linked to"); + logInfoLinef(L" /OFFLOAD :: Let server do local copying as fallback when link fails."); + logInfoLinef(); + logInfoLinef(L" /J :: Enable unbuffered I/O for all files."); + logInfoLinef(L" /NJ :: Disable unbuffered I/O for all files."); + logInfoLinef(); + logInfoLinef(L" /LOG:file :: output status to LOG file (overwrite existing log)."); + logInfoLinef(L" /VERBOSE :: output debug logging."); + logInfoLinef(); + logInfoLinef(L" /INSTALL :: Install and start as auto starting windows service."); + logInfoLinef(L" Will start with parameters provided with /INSTALL call"); + logInfoLinef(L" /REMOVE :: Stop and remove service."); + logInfoLinef(); + logInfoLinef(L" /USER:u :: User used when installing service. Defaults to LocalSystem account"); + logInfoLinef(L" /PASS:p :: Password used when installing service"); + logInfoLinef(); +} + +bool readSettings(ServerSettings& outSettings, WString& outLogFileName, uint argc, wchar_t** argv) +{ + const wchar_t* activeCommand = L""; + + // Read options + uint argIndex = 1; + while (argIndex < argc) + { + wchar_t* arg = argv[argIndex++]; + + if (*arg == '/') + activeCommand = L""; + + if (startsWithIgnoreCase(arg, L"/P:")) + { + outSettings.listenPort = _wtoi(arg + 3); + } + else if (startsWithIgnoreCase(arg, L"/IP:")) + { + outSettings.listenIp = arg + 4; + } + else if (equalsIgnoreCase(arg, L"/HASH")) + { + outSettings.useHash = true; + } + else if (equalsIgnoreCase(arg, L"/UNSECURE")) + { + outSettings.useSecurityFile = false; + } + else if (startsWithIgnoreCase(arg, L"/HISTORY:")) + { + outSettings.maxHistory = _wtoi(arg + 9); + } + else if (equalsIgnoreCase(arg, L"/NOLINKS")) + { + outSettings.useLinksThreshold = ~u64(0); + } + else if (startsWithIgnoreCase(arg, L"/LINKMIN:")) + { + outSettings.useLinksThreshold = _wtoi(arg + 10); + } + else if (startsWithIgnoreCase(arg, L"/LINK")) + { + activeCommand = L"LINK"; + } + else if (equalsIgnoreCase(arg, L"/LINKBYNAME")) + { + outSettings.useLinksRelativePath = false; + } + else if (equalsIgnoreCase(arg, L"/OFFLOAD")) + { + outSettings.useOdx = true; + } + else if (equalsIgnoreCase(arg, L"/J")) + { + outSettings.useBufferedIO = UseBufferedIO_Enabled; + } + else if (equalsIgnoreCase(arg, L"/NJ")) + { + outSettings.useBufferedIO = UseBufferedIO_Disabled; + } + else if(startsWithIgnoreCase(arg, L"/LOG:")) + { + outLogFileName = arg + 5; + } + else if(startsWithIgnoreCase(arg, L"/USER:")) + { + outSettings.user = arg + 6; + } + else if(startsWithIgnoreCase(arg, L"/PASS:")) + { + outSettings.password = arg + 6; + } + else if (equalsIgnoreCase(arg, L"/VERBOSE")) + { + outSettings.logDebug = true; + } + else + { + if (equalsIgnoreCase(activeCommand, L"link")) + { + outSettings.additionalLinkDirectories.push_back(getCleanedupPath(arg)); + } + else + { + logErrorf(L"Unknown option %ls. Use /? for help", arg); + return false; + } + } + } + return true; +} + +void initializeServiceName() +{ + wcscpy(g_serviceName, DEFAULTSERVICENAME); + wcscpy(g_serviceDisplayName, DEFAULTSERVICEDISPLAYNAME); + for (uint i=1; i!=g_argc; ++i) + { + if (!startsWithIgnoreCase(g_argv[i], L"/P:")) + continue; + wcscat(g_serviceName, g_argv[i] + 3); + wcscat(g_serviceDisplayName, L" (port "); + wcscat(g_serviceDisplayName, g_argv[i] + 3); + wcscat(g_serviceDisplayName, L")"); + break; + } +} + +void addLastErrorToMessageLog(const wchar_t* lpszMsg) +{ + if (!g_runningAsService) + return; + + g_dwErr = GetLastError(); + HANDLE hEventSource = RegisterEventSourceW(NULL, g_serviceName); + if (!hEventSource) + return; + + wchar_t szMsg[eacopy_sizeof_array(g_serviceName) + 100 ]; + swprintf_s(szMsg,eacopy_sizeof_array(g_serviceName) + 100, L"%ls error: %d", g_serviceName, g_dwErr); + const wchar_t* lpszStrings[2] = { szMsg, lpszMsg }; + ReportEventW(hEventSource, EVENTLOG_ERROR_TYPE, 0, 0, NULL, 2, 0, &lpszStrings[0], NULL); + DeregisterEventSource(hEventSource); +} + +VOID addInfoToMessageLog(const wchar_t* lpszMsg) +{ + if (!g_runningAsService) + return; + HANDLE hEventSource = RegisterEventSourceW(NULL, g_serviceName); + if (!hEventSource) + return; + const wchar_t* lpszStrings[1] = { lpszMsg }; + ReportEventW(hEventSource, EVENTLOG_INFORMATION_TYPE, 0, 0, NULL, 1, 0, &lpszStrings[0], NULL); + DeregisterEventSource(hEventSource); +} + +BOOL reportServiceStatus(DWORD dwCurrentState, DWORD dwWin32ExitCode, DWORD dwWaitHint) +{ + if (!g_runningAsService) // when debugging we don't report to the SCM + return TRUE; + + if (dwCurrentState == SERVICE_START_PENDING) + g_ssStatus.dwControlsAccepted = 0; + else + g_ssStatus.dwControlsAccepted = SERVICE_ACCEPT_STOP | SERVICE_ACCEPT_SHUTDOWN | SERVICE_ACCEPT_PAUSE_CONTINUE; + + g_ssStatus.dwCurrentState = dwCurrentState; + g_ssStatus.dwWin32ExitCode = dwWin32ExitCode; + g_ssStatus.dwWaitHint = dwWaitHint; + + static DWORD dwCheckPoint = 1; + + if (dwCurrentState == SERVICE_RUNNING || dwCurrentState == SERVICE_STOPPED) + g_ssStatus.dwCheckPoint = 0; + else + g_ssStatus.dwCheckPoint = dwCheckPoint++; + + + // Report the status of the service to the service control manager. + // + + BOOL res = SetServiceStatus( g_sshStatusHandle, &g_ssStatus); + if (res) + return TRUE; + + wchar_t errMsg[512]; + swprintf_s(errMsg, L"SetServiceStatus failed: '%ls'", getLastErrorText().c_str()); + + addLastErrorToMessageLog(errMsg); + return res; +} + +DWORD WINAPI serviceControl(DWORD dwControl, DWORD dwEventType, LPVOID lpEventData, LPVOID lpContext) +{ + switch (dwControl) + { + case SERVICE_CONTROL_STOP: + addInfoToMessageLog(L"Stop"); + reportServiceStatus(SERVICE_STOP_PENDING, NO_ERROR, 60000); + g_server.stop(); + return NO_ERROR; + + case SERVICE_CONTROL_SHUTDOWN: + addInfoToMessageLog(L"Shutdown"); + reportServiceStatus(SERVICE_STOP_PENDING, NO_ERROR, 60000); + g_server.stop(); + return NO_ERROR; + + case SERVICE_CONTROL_PAUSE: + addInfoToMessageLog(L"Pause"); + reportServiceStatus(SERVICE_PAUSE_PENDING, NO_ERROR, 0); + //g_server.pause(); + reportServiceStatus(SERVICE_PAUSED, NO_ERROR, 0); + return NO_ERROR; + + case SERVICE_CONTROL_CONTINUE: + addInfoToMessageLog(L"Continue"); + reportServiceStatus(SERVICE_CONTINUE_PENDING, NO_ERROR, 0); + //g_server.resume(); + reportServiceStatus(SERVICE_RUNNING, NO_ERROR, 0); + return NO_ERROR; + + case SERVICE_CONTROL_INTERROGATE: + // Fall through to send current status + break; + + default: + // invalid control code + // + return ERROR_CALL_NOT_IMPLEMENTED; + } + + reportServiceStatus(g_ssStatus.dwCurrentState, NO_ERROR, 0); + return NO_ERROR; +} + +void WINAPI serviceMain(DWORD dwArgc, LPWSTR *lpszArgv) +{ + initializeServiceName(); + + // register our service control handler: + // + g_sshStatusHandle = RegisterServiceCtrlHandlerExW(g_serviceName, serviceControl, NULL); + + if (!g_sshStatusHandle) + { + wchar_t msg[256]; + swprintf_s(msg, L"Failed to register control handler! err = %ls", getLastErrorText().c_str()); + addLastErrorToMessageLog(msg); + return; + } + + // SERVICE_STATUS members that don't change in example + // + g_ssStatus.dwServiceType = SERVICE_WIN32_OWN_PROCESS; + g_ssStatus.dwServiceSpecificExitCode = 0; + + WString logFileName; + ServerSettings settings; + Log log; + + // report the status to the service control manager. + // + if (!reportServiceStatus(SERVICE_START_PENDING, NO_ERROR, 3000)) + goto cleanup; + + if (!readSettings(settings, logFileName, g_argc, g_argv)) + goto cleanup; + + log.init(logFileName.c_str(), settings.logDebug, true); + g_server.start(settings, log, false, reportServiceStatus); + log.deinit(); + + if (g_serviceCrashed) + return; // Unclean shutdown -- do not report SERVICE_STOPPED since this + // would make Windows not restart the service. +cleanup: + reportServiceStatus(SERVICE_STOPPED, g_dwErr, 0); +} + +BOOL WINAPI ctrlEventHandler(DWORD dwCtrlType) +{ + if (dwCtrlType != CTRL_BREAK_EVENT && dwCtrlType != CTRL_C_EVENT) + return FALSE; + g_server.stop(); + return TRUE; +} + +int installService(int argc, wchar_t** argv) +{ + WString logFileName; + ServerSettings settings; + if (!readSettings(settings, logFileName, argc, argv)) + return 0; + + wchar_t szPath[512]; + if (GetModuleFileNameW(NULL, szPath, eacopy_sizeof_array(szPath)) == 0) + { + logErrorf(L"Unable to install %ls - %ls", g_serviceDisplayName, getLastErrorText().c_str()); + return -1; + } + + if (argc > 1) + { + for (int i = 1; i < argc; ++i) + { + if(startsWithIgnoreCase(argv[i], L"/USER") || startsWithIgnoreCase(argv[i], L"/PASS")) + continue; + wcscat_s(szPath, eacopy_sizeof_array(szPath), L" "); + wcscat_s(szPath, eacopy_sizeof_array(szPath), argv[i]); + } + } + + SC_HANDLE manager = OpenSCManager(NULL, NULL, SC_MANAGER_CONNECT | SC_MANAGER_CREATE_SERVICE); + if (!manager) + { + logErrorf(L"OpenSCManager failed - %ls", getLastErrorText().c_str()); + return -1; + } + ScopeGuard managerGuard([&]() { CloseServiceHandle(manager); }); + + const wchar_t* username = NULL; + const wchar_t* password = NULL; + if (!settings.user.empty()) + username = settings.user.c_str(); + if (!settings.password.empty()) + password = settings.password.c_str(); + + SC_HANDLE service = CreateServiceW(manager, g_serviceName, g_serviceDisplayName, + SERVICE_ALL_ACCESS, SERVICE_WIN32_OWN_PROCESS, SERVICE_AUTO_START, + SERVICE_ERROR_NORMAL, szPath, NULL, NULL, L"", username, password); + + if (!service) + service = OpenServiceW(manager, g_serviceName, SERVICE_ALL_ACCESS); + + if (!service) + { + logErrorf(L"CreateService failed - %ls", getLastErrorText().c_str()); + return -1; + } + ScopeGuard serviceGuard([&]() { CloseServiceHandle(service); }); + + SERVICE_DESCRIPTION desc; + desc.lpDescription = (LPSTR)"EACopy Accelerator Service."; + + if (!ChangeServiceConfig2(service, SERVICE_CONFIG_DESCRIPTION, &desc)) + { + logErrorf(L"failed to set service description, errcode = %d", GetLastError()); + return -1; + } + + // Enable automatic restart + + SERVICE_FAILURE_ACTIONS_FLAG flag = { TRUE }; + if (!ChangeServiceConfig2(service, SERVICE_CONFIG_FAILURE_ACTIONS_FLAG, &flag)) + { + logErrorf(L"Failed to set failure action flag, errcode = %d", GetLastError()); + return -1; + } + + SC_ACTION acts[] = { + { SC_ACTION_RESTART, 10000 /* milliseconds */ }, + { SC_ACTION_RESTART, 30000 /* milliseconds */ }, + { SC_ACTION_RESTART, 60000 /* milliseconds */ } + }; + SERVICE_FAILURE_ACTIONS actions = { 60 * 60 * 24, NULL, NULL, sizeof acts / sizeof(SC_ACTION), &acts[0] }; + + if (!ChangeServiceConfig2(service, SERVICE_CONFIG_FAILURE_ACTIONS, &actions)) + { + logErrorf(L"Failed to set failure actions, errcode = %d", GetLastError()); + return -1; + } + + logInfoLinef(L"%ls installed. commandline = %ls", g_serviceDisplayName, szPath ); + + // Autostart server + if (!StartServiceA(service, argc, (LPCSTR*)argv)) + { + logInfoLinef(); + DWORD error = GetLastError(); + if (error == ERROR_SERVICE_LOGON_FAILED) + logErrorf(L"Failed to start service due to a logon failure. MAKE SURE USER IS ADDED TO \"Log on as a service\" policy."); + else + logErrorf(L"Failed to start service - %ls", getErrorText(error).c_str()); + return -1; + } + + return 0; +} + +int removeService() +{ + SC_HANDLE manager = OpenSCManager(NULL, NULL, SC_MANAGER_CONNECT); + if (!manager) + { + logErrorf(L"OpenSCManager failed - %ls", getLastErrorText().c_str()); + return -1; + } + ScopeGuard managerGuard([&]() { CloseServiceHandle(manager); }); + + SC_HANDLE service = OpenServiceW(manager, g_serviceName, DELETE | SERVICE_STOP | SERVICE_QUERY_STATUS); + + if (!service) + { + logErrorf(L"OpenService failed - %ls", getLastErrorText().c_str()); + return -1; + } + ScopeGuard serviceGuard([&]() { CloseServiceHandle(service); }); + + if (ControlService(service, SERVICE_CONTROL_STOP, &g_ssStatus)) + { + logInfof(L"Stopping %ls.", g_serviceDisplayName); + Sleep(1000); + + while (QueryServiceStatus(service, &g_ssStatus)) + { + if (g_ssStatus.dwCurrentState != SERVICE_STOP_PENDING) + break; + logInfof(L"."); + Sleep(1000); + } + + logInfoLinef(); + if (g_ssStatus.dwCurrentState != SERVICE_STOPPED) + { + logErrorf(L"%ls failed to stop.", g_serviceDisplayName); + return -1; + } + + logInfoLinef(L"%ls stopped.", g_serviceDisplayName); + } + + if (!DeleteService(service)) + { + logErrorf(L"DeleteService failed - %ls", getLastErrorText().c_str()); + return -1; + } + + logInfoLinef(L"%ls removed.", g_serviceDisplayName); + return 0; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +} // namespace eacopy + +int __cdecl wmain(int argc, wchar_t** argv) +{ + using namespace eacopy; + + g_argc = argc; + g_argv = argv; + + initializeServiceName(); + + + // Try to figure out some context based on which session we're running in + // and decide which mode to default to. + bool isInteractiveSession = false; + DWORD dwSessionId = 0; + if (ProcessIdToSessionId(GetCurrentProcessId(), &dwSessionId)) + if (dwSessionId != 0) + isInteractiveSession = true; + + // Started by service controller + if (!isInteractiveSession) + { + g_runningAsService = true; + SERVICE_TABLE_ENTRYW dispatchTable[] = { { (LPWSTR)g_serviceName, (LPSERVICE_MAIN_FUNCTIONW)&serviceMain}, { NULL, NULL} }; + if (!StartServiceCtrlDispatcherW(dispatchTable)) + addLastErrorToMessageLog(L"StartServiceCtrlDispatcherW failed."); + return 0; + } + + // Started by user + if (argc > 1 && equalsIgnoreCase(argv[1], L"/?")) + { + printHelp(); + return 0; + } + + // Check if it is an install or removal of service + int argIndex = 1; + while (argIndex < argc) + { + wchar_t* arg = argv[argIndex]; + if (equalsIgnoreCase(arg, L"/INSTALL")) + { + --argc; + argv[argIndex] = argv[argc]; // Remove own argument + argv[argc] = nullptr; + return installService(argc, argv); + } + else if (equalsIgnoreCase(arg, L"/REMOVE")) + { + return removeService(); + } + ++argIndex; + } + + // Normal run, just execute the server + SetConsoleCtrlHandler(ctrlEventHandler, TRUE); + + WString logFileName; + ServerSettings settings; + if (!readSettings(settings, logFileName, argc, argv)) + return -1; + + logInfoLinef(L"Server v%ls (%u) - Starting... (Add /? for help)", getServerVersionString().c_str(), ProtocolVersion); + + Log log; + log.init(logFileName.c_str(), settings.logDebug, true); + ScopeGuard logGuard([&]() { log.deinit(); }); + + g_server.start(settings, log, true, reportServiceStatus); + return 0; +} + diff --git a/source/EACopyShared.cpp b/source/EACopyShared.cpp index 0986571..c34bd7d 100644 --- a/source/EACopyShared.cpp +++ b/source/EACopyShared.cpp @@ -1747,7 +1747,7 @@ bool copyFile(const wchar_t* source, const FileInfo& sourceInfo, uint sourceAttr bool result = true; ScopeGuard destGuard([&]() { CloseHandle(osWrite.hEvent); result &= closeFile(dest, destFile, AccessType_Write, ioStats); }); - + OVERLAPPED osRead = {0,0,0}; osRead.Offset = 0; osRead.OffsetHigh = 0; @@ -1792,6 +1792,9 @@ bool copyFile(const wchar_t* source, const FileInfo& sourceInfo, uint sourceAttr if (GetLastError() != ERROR_IO_PENDING) { logErrorf(L"Fail writing file %ls: %ls", dest, getLastErrorText().c_str()); + // in the event the write fails, we need to delete the dest file for the next retry + destGuard.execute(); + DeleteFileW(dest); return false; } } diff --git a/source/OLD_EACopyRsync.cpp b/source/OLD_EACopyRsync.cpp index 0de0622..6dd7794 100644 --- a/source/OLD_EACopyRsync.cpp +++ b/source/OLD_EACopyRsync.cpp @@ -1,490 +1,490 @@ -// (c) Electronic Arts. All Rights Reserved. - -#include -#include "librsync/src/librsync.h" -#include "librsync/src/fileutil.h" -#include "librsync/src/util.h" -#include "librsync/src/buf.h" -#include "librsync/src/job.h" -#include "librsync/src/sumset.h" -#include - -struct rs_filebuf { - FILE *f; - char *buf; - size_t buf_len; -}; - -namespace eacopy -{ - -enum : uint { DefaultRSyncBlockLen = 4*1024 };//RS_DEFAULT_BLOCK_LEN }; -enum : uint { SocketBlockSize = 512 * 1024 }; - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -struct ReadInfo -{ - const wchar_t* ident; - RsyncStats& outStats; - - HANDLE file = INVALID_HANDLE_VALUE; - SOCKET socket = INVALID_SOCKET; - size_t readPosition = 0; - uint bufferSize = 0; - bool lastBlock = false; - - bool readFromSocket(DWORD& bytesRead, int& eof, char* buf, size_t buf_len) - { - u64 startReceiveMs = getTimeMs(); - while (true) - { - if (!bufferSize) - { - if (lastBlock) - { - eof = 1; - break; - } - - if (!receiveData(socket, &bufferSize, sizeof(bufferSize))) - return false; - - if (!bufferSize) - { - eof = 1; - break; - } - else if (bufferSize < SocketBlockSize) - lastBlock = true; - } - - uint toRead = min(buf_len - bytesRead, bufferSize); - if (!receiveData(socket, buf + bytesRead, toRead)) - return false; - bufferSize -= toRead; - readPosition += toRead; - bytesRead += toRead; - - if (bytesRead == buf_len) - break; - } - outStats.receiveTimeMs += getTimeMs() - startReceiveMs; - return true; - } -}; - -struct WriteInfo -{ - const wchar_t* ident; - RsyncStats& outStats; - - HANDLE file = INVALID_HANDLE_VALUE; - - SOCKET socket = INVALID_SOCKET; - char* buffer = nullptr; - uint bufferSize = 0; - - bool flush() - { - u64 startSendMs = getTimeMs(); - if (!sendData(socket, &bufferSize, sizeof(bufferSize))) - return false; - if (!sendData(socket, buffer, bufferSize)) - return false; - outStats.sendSize += sizeof(bufferSize) + bufferSize; - outStats.sendTimeMs += getTimeMs() - startSendMs; - return true; - } - -}; - -/* If the stream has no more data available, read some from F into BUF, and let - the stream use that. On return, SEEN_EOF is true if the end of file has - passed into the stream. */ -rs_result rs_infilebuf_fill2(rs_job_t *job, rs_buffers_t *buf, void *opaque) -{ - rs_filebuf_t *fb = (rs_filebuf_t *)opaque; - - /* This is only allowed if either the buf has no input buffer yet, or that - buffer could possibly be BUF. */ - if (buf->next_in != NULL) { - assert(buf->avail_in <= fb->buf_len); - assert(buf->next_in >= fb->buf); - assert(buf->next_in <= fb->buf + fb->buf_len); - } else { - assert(buf->avail_in == 0); - } - - if (buf->eof_in == 1) - return RS_DONE; - - if (buf->avail_in != 0) - return RS_DONE; - - ReadInfo& readInfo = *(ReadInfo*)fb->f; - - DWORD bytesRead = 0; - - if (readInfo.file != INVALID_HANDLE_VALUE) - { - u64 startReadMs = getTimeMs(); - if (!ReadFile(readInfo.file, fb->buf, fb->buf_len, &bytesRead, NULL)) - { - DWORD error = GetLastError(); - if (error == ERROR_HANDLE_EOF) - { - buf->eof_in = 1; - return RS_DONE; - } - logErrorf(L"Fail reading file %s: %s", readInfo.ident, getErrorText(error).c_str()); - return RS_IO_ERROR; - } - readInfo.outStats.readTimeMs += getTimeMs() - startReadMs; - readInfo.outStats.readSize += bytesRead; - - if (bytesRead == 0) - { - buf->eof_in = 1; - return RS_DONE; - } - - readInfo.readPosition += bytesRead; - } - else if (readInfo.socket != INVALID_SOCKET) - { - if (!readInfo.readFromSocket(bytesRead, buf->eof_in, fb->buf, fb->buf_len)) - return RS_IO_ERROR; - } - - buf->avail_in = bytesRead; - buf->next_in = fb->buf; - - job->stats.in_bytes += bytesRead; - - return RS_DONE; -} - -/* The buf is already using BUF for an output buffer, and probably contains - some buffered output now. Write this out to F, and reset the buffer cursor. */ -rs_result rs_outfilebuf_drain2(rs_job_t *job, rs_buffers_t *buf, void *opaque) -{ - int present; - rs_filebuf_t *fb = (rs_filebuf_t *)opaque; - - WriteInfo& writeInfo = *(WriteInfo*)fb->f; - - // This is only allowed if either the buf has no output buffer yet, or that - // buffer could possibly be BUF. - if (buf->next_out == NULL) { - assert(buf->avail_out == 0); - - buf->next_out = fb->buf; - buf->avail_out = fb->buf_len; - - return RS_DONE; - } - - assert(buf->avail_out <= fb->buf_len); - assert(buf->next_out >= fb->buf); - assert(buf->next_out <= fb->buf + fb->buf_len); - - present = buf->next_out - fb->buf; - if (present > 0) { - - assert(present > 0); - if (writeInfo.file != INVALID_HANDLE_VALUE) - { - u64 startWriteMs = getTimeMs(); - DWORD written; - if (!WriteFile(writeInfo.file, fb->buf, present, &written, NULL)) - { - logErrorf(L"Fail writing file %s: %s", writeInfo.ident, getLastErrorText().c_str()); - return RS_IO_ERROR; - } - present = written; - writeInfo.outStats.writeTimeMs += getTimeMs() - startWriteMs; - } - else - { - u64 startSendMs = getTimeMs(); - size_t left = present; - size_t readPos = 0; - if (writeInfo.bufferSize && writeInfo.bufferSize + left >= SocketBlockSize) - { - size_t toCopy = SocketBlockSize - writeInfo.bufferSize; - memcpy(writeInfo.buffer + writeInfo.bufferSize, fb->buf, toCopy); - uint blockSize = SocketBlockSize; - if (!sendData(writeInfo.socket, &blockSize, sizeof(blockSize))) - return RS_IO_ERROR; - if (!sendData(writeInfo.socket, writeInfo.buffer, blockSize)) - return RS_IO_ERROR; - writeInfo.bufferSize = 0; - left = present - toCopy; - readPos += toCopy; - } - - while (left >= SocketBlockSize) - { - uint blockSize = SocketBlockSize; - if (!sendData(writeInfo.socket, &blockSize, sizeof(blockSize))) - return RS_IO_ERROR; - if (!sendData(writeInfo.socket, fb->buf + readPos, blockSize)) - return RS_IO_ERROR; - left -= SocketBlockSize; - readPos += SocketBlockSize; - } - - memcpy(writeInfo.buffer + writeInfo.bufferSize, fb->buf + readPos, left); - writeInfo.bufferSize += left; - - writeInfo.outStats.sendTimeMs += getTimeMs() - startSendMs; - writeInfo.outStats.sendSize += readPos; // Not fully accurate but good enough - } - - buf->next_out = fb->buf; - buf->avail_out = fb->buf_len; - - job->stats.out_bytes += present; - } - - return RS_DONE; -} - -rs_result rs_file_copy_cb2(void *arg, rs_long_t pos, size_t *len, void **buf) -{ - ReadInfo& readInfo = *(ReadInfo*)arg; - assert(readInfo.file != INVALID_HANDLE_VALUE); - - u64 startReadMs = getTimeMs(); - - if (!setFilePosition(readInfo.ident, readInfo.file, pos)) - return RS_IO_ERROR; - - readInfo.readPosition = pos; - - DWORD bytesRead; - if (!ReadFile(readInfo.file, *buf, *len, &bytesRead, NULL)) - { - logErrorf(L"Fail reading file %s: %s", readInfo.ident, getLastErrorText().c_str()); - return RS_IO_ERROR; - } - readInfo.outStats.readTimeMs += getTimeMs() - startReadMs; - readInfo.outStats.readSize += bytesRead; - - readInfo.readPosition += bytesRead; - - *len = bytesRead; - - return RS_DONE; -} - -rs_result rs_whole_run2(rs_job_t *job, FILE *in_file, FILE *out_file, - int inbuflen, int outbuflen) -{ - rs_buffers_t buf; - rs_result result; - rs_filebuf_t *in_fb = NULL, *out_fb = NULL; - - /* Override buffer sizes if rs_inbuflen or rs_outbuflen are set. */ - inbuflen = rs_inbuflen ? rs_inbuflen : inbuflen; - outbuflen = rs_outbuflen ? rs_outbuflen : outbuflen; - if (in_file) - in_fb = rs_filebuf_new(in_file, inbuflen); - if (out_file) - out_fb = rs_filebuf_new(out_file, outbuflen); - result = - rs_job_drive(job, &buf, in_fb ? rs_infilebuf_fill2 : NULL, in_fb, - out_fb ? rs_outfilebuf_drain2 : NULL, out_fb); - if (in_fb) - rs_filebuf_free(in_fb); - if (out_fb) - rs_filebuf_free(out_fb); - return result; -} - -rs_result rs_sig_file2(FILE *old_file, FILE *sig_file, size_t new_block_len, - size_t strong_len, rs_magic_number sig_magic, - rs_stats_t *stats) -{ - rs_job_t *job; - rs_result r; - - job = rs_sig_begin(new_block_len, strong_len, sig_magic); - /* Size inbuf for 4 blocks, outbuf for header + 4 blocksums. */ - r = rs_whole_run2(job, old_file, sig_file, 4 * new_block_len, - 12 + 4 * (4 + strong_len)); - if (stats) - memcpy(stats, &job->stats, sizeof *stats); - rs_job_free(job); - - return r; -} - -rs_result rs_delta_file2(rs_signature_t *sig, FILE *new_file, FILE *delta_file, - rs_stats_t *stats) -{ - rs_job_t *job; - rs_result r; - - job = rs_delta_begin(sig); - /* Size inbuf for 1 block, outbuf for literal cmd + 4 blocks. */ - r = rs_whole_run2(job, new_file, delta_file, sig->block_len, - 10 + 4 * sig->block_len); - if (stats) - memcpy(stats, &job->stats, sizeof *stats); - rs_job_free(job); - return r; -} - -rs_result rs_patch_file2(FILE *basis_file, FILE *delta_file, FILE *new_file, - rs_stats_t *stats) -{ - rs_job_t *job; - rs_result r; - - job = rs_patch_begin(rs_file_copy_cb2, basis_file); - /* Default size inbuf and outbuf 64K. */ - r = rs_whole_run2(job, delta_file, new_file, 64 * 1024, 64 * 1024); - if (stats) - memcpy(stats, &job->stats, sizeof *stats); - rs_job_free(job); - return r; -} - -bool serverHandleRsync(SOCKET socket, const wchar_t* baseFileName, const wchar_t* newFileName, FILETIME lastWriteTime, RsyncStats& outStats) -{ - //rs_inbuflen = 512*1024; - //rs_outbuflen = 512*1024; - - u64 startMs = getTimeMs(); - - HANDLE baseFile; - if (!openFileRead(baseFileName, baseFile, true)) - return false; - ScopeGuard fileGuard([&] { CloseHandle(baseFile); }); - - // Send signature data to client - { - - WriteInfo writeInfo { L"RsyncSignatureMem", outStats }; - writeInfo.socket = socket; - char buffer[SocketBlockSize]; - writeInfo.buffer = buffer; - - size_t block_len = DefaultRSyncBlockLen; - size_t strong_len = 0; - - ReadInfo readInfo { baseFileName, outStats }; - readInfo.file = baseFile; - FILE* basis_file = (FILE*)&readInfo; - FILE* sig_file = (FILE*)&writeInfo; - - rs_result result = rs_sig_file2(basis_file, sig_file, block_len, strong_len, RS_BLAKE2_SIG_MAGIC, nullptr); - if (result != RS_DONE) - return false; - - if (!writeInfo.flush()) - return false; - } - - // Receive delta data from client and combine with base file to create new file - { - if (!setFilePosition(baseFileName, baseFile, 0)) - return false; - - HANDLE newFile; - if (!openFileWrite(newFileName, newFile, true)) - return false; - ScopeGuard newFileGuard([&] { CloseHandle(newFile); }); - - ReadInfo basisReadInfo { baseFileName, outStats }; - basisReadInfo.file = baseFile; - ReadInfo deltaReadInfo { L"RSyncDeltaPatch", outStats }; - deltaReadInfo.socket = socket; - - WriteInfo writeInfo { newFileName, outStats }; - writeInfo.file = newFile; - - rs_result result = rs_patch_file2((FILE*)&basisReadInfo, (FILE*)&deltaReadInfo, (FILE*)&writeInfo, nullptr); - if (result != RS_DONE) - return false; - - if (!setFileLastWriteTime(newFileName, newFile, lastWriteTime)) - return false; - } - - outStats.rsyncTimeMs = getTimeMs() - startMs - outStats.readTimeMs - outStats.receiveTimeMs - outStats.sendTimeMs - outStats.writeTimeMs; - - return true; -} - -bool clientHandleRsync(SOCKET socket, const wchar_t* fileName, RsyncStats& outStats) -{ - u64 startMs = getTimeMs(); - - rs_signature_t* sumset = nullptr; - ScopeGuard cleanSumset([&]() { rs_free_sumset(sumset); }); - - { - ReadInfo readInfo { L"RsyncSignature", outStats }; - readInfo.socket = socket; - - rs_job_t* job = rs_loadsig_begin(&sumset); - - // We read 0 bytes which will read first blocks size (which is very likely that it is the entire file) - DWORD bytesRead = 0; - int eof; - if (!readInfo.readFromSocket(bytesRead, eof, nullptr, 0)) - return false; - - job->sig_fsize = readInfo.bufferSize < SocketBlockSize ? readInfo.bufferSize : 0; // If bufferSize is more than SocketBlockSize we don't know how big the signature data is - - // Size inbuf for 1024x 16 byte blocksums. - rs_result result = rs_whole_run2(job, (FILE*)&readInfo, NULL, 1024 * 16, 0); - rs_job_free(job); - - if (result != RS_DONE) - { - logErrorf(L"Rsync failed while receiving signature data"); - return false; - } - } - - { - rs_result result = rs_build_hash_table(sumset); - if (result != RS_DONE) - { - logErrorf(L"Rsync failed while building hashtable"); - return false; - } - } - - { - HANDLE newFileHandle; - if (!openFileRead(fileName, newFileHandle, true)) - return false; - ScopeGuard fileGuard([&] { CloseHandle(newFileHandle); }); - ReadInfo readInfo = { fileName, outStats }; - readInfo.file = newFileHandle; - WriteInfo writeInfo = { L"RSyncDeltaPatchMemory", outStats }; - writeInfo.socket = socket; - char buffer[SocketBlockSize]; - writeInfo.buffer = buffer; - - u64 startMs = getTimeMs(); - rs_result result = rs_delta_file2(sumset, (FILE*)&readInfo, (FILE*)&writeInfo, nullptr); - logInfof(L"BUILD DELTA: %s\r\n", toHourMinSec(getTimeMs() - startMs).c_str()); - - if (!writeInfo.flush()) - return false; - } - - outStats.rsyncTimeMs = getTimeMs() - startMs - outStats.readTimeMs - outStats.receiveTimeMs - outStats.sendTimeMs - outStats.writeTimeMs; - - return true; -} - -/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -} +// (c) Electronic Arts. All Rights Reserved. + +#include +#include "librsync/src/librsync.h" +#include "librsync/src/fileutil.h" +#include "librsync/src/util.h" +#include "librsync/src/buf.h" +#include "librsync/src/job.h" +#include "librsync/src/sumset.h" +#include + +struct rs_filebuf { + FILE *f; + char *buf; + size_t buf_len; +}; + +namespace eacopy +{ + +enum : uint { DefaultRSyncBlockLen = 4*1024 };//RS_DEFAULT_BLOCK_LEN }; +enum : uint { SocketBlockSize = 512 * 1024 }; + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +struct ReadInfo +{ + const wchar_t* ident; + RsyncStats& outStats; + + HANDLE file = INVALID_HANDLE_VALUE; + SOCKET socket = INVALID_SOCKET; + size_t readPosition = 0; + uint bufferSize = 0; + bool lastBlock = false; + + bool readFromSocket(DWORD& bytesRead, int& eof, char* buf, size_t buf_len) + { + u64 startReceiveMs = getTimeMs(); + while (true) + { + if (!bufferSize) + { + if (lastBlock) + { + eof = 1; + break; + } + + if (!receiveData(socket, &bufferSize, sizeof(bufferSize))) + return false; + + if (!bufferSize) + { + eof = 1; + break; + } + else if (bufferSize < SocketBlockSize) + lastBlock = true; + } + + uint toRead = min(buf_len - bytesRead, bufferSize); + if (!receiveData(socket, buf + bytesRead, toRead)) + return false; + bufferSize -= toRead; + readPosition += toRead; + bytesRead += toRead; + + if (bytesRead == buf_len) + break; + } + outStats.receiveTimeMs += getTimeMs() - startReceiveMs; + return true; + } +}; + +struct WriteInfo +{ + const wchar_t* ident; + RsyncStats& outStats; + + HANDLE file = INVALID_HANDLE_VALUE; + + SOCKET socket = INVALID_SOCKET; + char* buffer = nullptr; + uint bufferSize = 0; + + bool flush() + { + u64 startSendMs = getTimeMs(); + if (!sendData(socket, &bufferSize, sizeof(bufferSize))) + return false; + if (!sendData(socket, buffer, bufferSize)) + return false; + outStats.sendSize += sizeof(bufferSize) + bufferSize; + outStats.sendTimeMs += getTimeMs() - startSendMs; + return true; + } + +}; + +/* If the stream has no more data available, read some from F into BUF, and let + the stream use that. On return, SEEN_EOF is true if the end of file has + passed into the stream. */ +rs_result rs_infilebuf_fill2(rs_job_t *job, rs_buffers_t *buf, void *opaque) +{ + rs_filebuf_t *fb = (rs_filebuf_t *)opaque; + + /* This is only allowed if either the buf has no input buffer yet, or that + buffer could possibly be BUF. */ + if (buf->next_in != NULL) { + assert(buf->avail_in <= fb->buf_len); + assert(buf->next_in >= fb->buf); + assert(buf->next_in <= fb->buf + fb->buf_len); + } else { + assert(buf->avail_in == 0); + } + + if (buf->eof_in == 1) + return RS_DONE; + + if (buf->avail_in != 0) + return RS_DONE; + + ReadInfo& readInfo = *(ReadInfo*)fb->f; + + DWORD bytesRead = 0; + + if (readInfo.file != INVALID_HANDLE_VALUE) + { + u64 startReadMs = getTimeMs(); + if (!ReadFile(readInfo.file, fb->buf, fb->buf_len, &bytesRead, NULL)) + { + DWORD error = GetLastError(); + if (error == ERROR_HANDLE_EOF) + { + buf->eof_in = 1; + return RS_DONE; + } + logErrorf(L"Fail reading file %s: %s", readInfo.ident, getErrorText(error).c_str()); + return RS_IO_ERROR; + } + readInfo.outStats.readTimeMs += getTimeMs() - startReadMs; + readInfo.outStats.readSize += bytesRead; + + if (bytesRead == 0) + { + buf->eof_in = 1; + return RS_DONE; + } + + readInfo.readPosition += bytesRead; + } + else if (readInfo.socket != INVALID_SOCKET) + { + if (!readInfo.readFromSocket(bytesRead, buf->eof_in, fb->buf, fb->buf_len)) + return RS_IO_ERROR; + } + + buf->avail_in = bytesRead; + buf->next_in = fb->buf; + + job->stats.in_bytes += bytesRead; + + return RS_DONE; +} + +/* The buf is already using BUF for an output buffer, and probably contains + some buffered output now. Write this out to F, and reset the buffer cursor. */ +rs_result rs_outfilebuf_drain2(rs_job_t *job, rs_buffers_t *buf, void *opaque) +{ + int present; + rs_filebuf_t *fb = (rs_filebuf_t *)opaque; + + WriteInfo& writeInfo = *(WriteInfo*)fb->f; + + // This is only allowed if either the buf has no output buffer yet, or that + // buffer could possibly be BUF. + if (buf->next_out == NULL) { + assert(buf->avail_out == 0); + + buf->next_out = fb->buf; + buf->avail_out = fb->buf_len; + + return RS_DONE; + } + + assert(buf->avail_out <= fb->buf_len); + assert(buf->next_out >= fb->buf); + assert(buf->next_out <= fb->buf + fb->buf_len); + + present = buf->next_out - fb->buf; + if (present > 0) { + + assert(present > 0); + if (writeInfo.file != INVALID_HANDLE_VALUE) + { + u64 startWriteMs = getTimeMs(); + DWORD written; + if (!WriteFile(writeInfo.file, fb->buf, present, &written, NULL)) + { + logErrorf(L"Fail writing file %s: %s", writeInfo.ident, getLastErrorText().c_str()); + return RS_IO_ERROR; + } + present = written; + writeInfo.outStats.writeTimeMs += getTimeMs() - startWriteMs; + } + else + { + u64 startSendMs = getTimeMs(); + size_t left = present; + size_t readPos = 0; + if (writeInfo.bufferSize && writeInfo.bufferSize + left >= SocketBlockSize) + { + size_t toCopy = SocketBlockSize - writeInfo.bufferSize; + memcpy(writeInfo.buffer + writeInfo.bufferSize, fb->buf, toCopy); + uint blockSize = SocketBlockSize; + if (!sendData(writeInfo.socket, &blockSize, sizeof(blockSize))) + return RS_IO_ERROR; + if (!sendData(writeInfo.socket, writeInfo.buffer, blockSize)) + return RS_IO_ERROR; + writeInfo.bufferSize = 0; + left = present - toCopy; + readPos += toCopy; + } + + while (left >= SocketBlockSize) + { + uint blockSize = SocketBlockSize; + if (!sendData(writeInfo.socket, &blockSize, sizeof(blockSize))) + return RS_IO_ERROR; + if (!sendData(writeInfo.socket, fb->buf + readPos, blockSize)) + return RS_IO_ERROR; + left -= SocketBlockSize; + readPos += SocketBlockSize; + } + + memcpy(writeInfo.buffer + writeInfo.bufferSize, fb->buf + readPos, left); + writeInfo.bufferSize += left; + + writeInfo.outStats.sendTimeMs += getTimeMs() - startSendMs; + writeInfo.outStats.sendSize += readPos; // Not fully accurate but good enough + } + + buf->next_out = fb->buf; + buf->avail_out = fb->buf_len; + + job->stats.out_bytes += present; + } + + return RS_DONE; +} + +rs_result rs_file_copy_cb2(void *arg, rs_long_t pos, size_t *len, void **buf) +{ + ReadInfo& readInfo = *(ReadInfo*)arg; + assert(readInfo.file != INVALID_HANDLE_VALUE); + + u64 startReadMs = getTimeMs(); + + if (!setFilePosition(readInfo.ident, readInfo.file, pos)) + return RS_IO_ERROR; + + readInfo.readPosition = pos; + + DWORD bytesRead; + if (!ReadFile(readInfo.file, *buf, *len, &bytesRead, NULL)) + { + logErrorf(L"Fail reading file %s: %s", readInfo.ident, getLastErrorText().c_str()); + return RS_IO_ERROR; + } + readInfo.outStats.readTimeMs += getTimeMs() - startReadMs; + readInfo.outStats.readSize += bytesRead; + + readInfo.readPosition += bytesRead; + + *len = bytesRead; + + return RS_DONE; +} + +rs_result rs_whole_run2(rs_job_t *job, FILE *in_file, FILE *out_file, + int inbuflen, int outbuflen) +{ + rs_buffers_t buf; + rs_result result; + rs_filebuf_t *in_fb = NULL, *out_fb = NULL; + + /* Override buffer sizes if rs_inbuflen or rs_outbuflen are set. */ + inbuflen = rs_inbuflen ? rs_inbuflen : inbuflen; + outbuflen = rs_outbuflen ? rs_outbuflen : outbuflen; + if (in_file) + in_fb = rs_filebuf_new(in_file, inbuflen); + if (out_file) + out_fb = rs_filebuf_new(out_file, outbuflen); + result = + rs_job_drive(job, &buf, in_fb ? rs_infilebuf_fill2 : NULL, in_fb, + out_fb ? rs_outfilebuf_drain2 : NULL, out_fb); + if (in_fb) + rs_filebuf_free(in_fb); + if (out_fb) + rs_filebuf_free(out_fb); + return result; +} + +rs_result rs_sig_file2(FILE *old_file, FILE *sig_file, size_t new_block_len, + size_t strong_len, rs_magic_number sig_magic, + rs_stats_t *stats) +{ + rs_job_t *job; + rs_result r; + + job = rs_sig_begin(new_block_len, strong_len, sig_magic); + /* Size inbuf for 4 blocks, outbuf for header + 4 blocksums. */ + r = rs_whole_run2(job, old_file, sig_file, 4 * new_block_len, + 12 + 4 * (4 + strong_len)); + if (stats) + memcpy(stats, &job->stats, sizeof *stats); + rs_job_free(job); + + return r; +} + +rs_result rs_delta_file2(rs_signature_t *sig, FILE *new_file, FILE *delta_file, + rs_stats_t *stats) +{ + rs_job_t *job; + rs_result r; + + job = rs_delta_begin(sig); + /* Size inbuf for 1 block, outbuf for literal cmd + 4 blocks. */ + r = rs_whole_run2(job, new_file, delta_file, sig->block_len, + 10 + 4 * sig->block_len); + if (stats) + memcpy(stats, &job->stats, sizeof *stats); + rs_job_free(job); + return r; +} + +rs_result rs_patch_file2(FILE *basis_file, FILE *delta_file, FILE *new_file, + rs_stats_t *stats) +{ + rs_job_t *job; + rs_result r; + + job = rs_patch_begin(rs_file_copy_cb2, basis_file); + /* Default size inbuf and outbuf 64K. */ + r = rs_whole_run2(job, delta_file, new_file, 64 * 1024, 64 * 1024); + if (stats) + memcpy(stats, &job->stats, sizeof *stats); + rs_job_free(job); + return r; +} + +bool serverHandleRsync(SOCKET socket, const wchar_t* baseFileName, const wchar_t* newFileName, FILETIME lastWriteTime, RsyncStats& outStats) +{ + //rs_inbuflen = 512*1024; + //rs_outbuflen = 512*1024; + + u64 startMs = getTimeMs(); + + HANDLE baseFile; + if (!openFileRead(baseFileName, baseFile, true)) + return false; + ScopeGuard fileGuard([&] { CloseHandle(baseFile); }); + + // Send signature data to client + { + + WriteInfo writeInfo { L"RsyncSignatureMem", outStats }; + writeInfo.socket = socket; + char buffer[SocketBlockSize]; + writeInfo.buffer = buffer; + + size_t block_len = DefaultRSyncBlockLen; + size_t strong_len = 0; + + ReadInfo readInfo { baseFileName, outStats }; + readInfo.file = baseFile; + FILE* basis_file = (FILE*)&readInfo; + FILE* sig_file = (FILE*)&writeInfo; + + rs_result result = rs_sig_file2(basis_file, sig_file, block_len, strong_len, RS_BLAKE2_SIG_MAGIC, nullptr); + if (result != RS_DONE) + return false; + + if (!writeInfo.flush()) + return false; + } + + // Receive delta data from client and combine with base file to create new file + { + if (!setFilePosition(baseFileName, baseFile, 0)) + return false; + + HANDLE newFile; + if (!openFileWrite(newFileName, newFile, true)) + return false; + ScopeGuard newFileGuard([&] { CloseHandle(newFile); }); + + ReadInfo basisReadInfo { baseFileName, outStats }; + basisReadInfo.file = baseFile; + ReadInfo deltaReadInfo { L"RSyncDeltaPatch", outStats }; + deltaReadInfo.socket = socket; + + WriteInfo writeInfo { newFileName, outStats }; + writeInfo.file = newFile; + + rs_result result = rs_patch_file2((FILE*)&basisReadInfo, (FILE*)&deltaReadInfo, (FILE*)&writeInfo, nullptr); + if (result != RS_DONE) + return false; + + if (!setFileLastWriteTime(newFileName, newFile, lastWriteTime)) + return false; + } + + outStats.rsyncTimeMs = getTimeMs() - startMs - outStats.readTimeMs - outStats.receiveTimeMs - outStats.sendTimeMs - outStats.writeTimeMs; + + return true; +} + +bool clientHandleRsync(SOCKET socket, const wchar_t* fileName, RsyncStats& outStats) +{ + u64 startMs = getTimeMs(); + + rs_signature_t* sumset = nullptr; + ScopeGuard cleanSumset([&]() { rs_free_sumset(sumset); }); + + { + ReadInfo readInfo { L"RsyncSignature", outStats }; + readInfo.socket = socket; + + rs_job_t* job = rs_loadsig_begin(&sumset); + + // We read 0 bytes which will read first blocks size (which is very likely that it is the entire file) + DWORD bytesRead = 0; + int eof; + if (!readInfo.readFromSocket(bytesRead, eof, nullptr, 0)) + return false; + + job->sig_fsize = readInfo.bufferSize < SocketBlockSize ? readInfo.bufferSize : 0; // If bufferSize is more than SocketBlockSize we don't know how big the signature data is + + // Size inbuf for 1024x 16 byte blocksums. + rs_result result = rs_whole_run2(job, (FILE*)&readInfo, NULL, 1024 * 16, 0); + rs_job_free(job); + + if (result != RS_DONE) + { + logErrorf(L"Rsync failed while receiving signature data"); + return false; + } + } + + { + rs_result result = rs_build_hash_table(sumset); + if (result != RS_DONE) + { + logErrorf(L"Rsync failed while building hashtable"); + return false; + } + } + + { + HANDLE newFileHandle; + if (!openFileRead(fileName, newFileHandle, true)) + return false; + ScopeGuard fileGuard([&] { CloseHandle(newFileHandle); }); + ReadInfo readInfo = { fileName, outStats }; + readInfo.file = newFileHandle; + WriteInfo writeInfo = { L"RSyncDeltaPatchMemory", outStats }; + writeInfo.socket = socket; + char buffer[SocketBlockSize]; + writeInfo.buffer = buffer; + + u64 startMs = getTimeMs(); + rs_result result = rs_delta_file2(sumset, (FILE*)&readInfo, (FILE*)&writeInfo, nullptr); + logInfof(L"BUILD DELTA: %s\r\n", toHourMinSec(getTimeMs() - startMs).c_str()); + + if (!writeInfo.flush()) + return false; + } + + outStats.rsyncTimeMs = getTimeMs() - startMs - outStats.readTimeMs - outStats.receiveTimeMs - outStats.sendTimeMs - outStats.writeTimeMs; + + return true; +} + +/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +}