-
Notifications
You must be signed in to change notification settings - Fork 177
/
Copy pathaiori-S3-4c.c
executable file
·1396 lines (1176 loc) · 48.4 KB
/
aiori-S3-4c.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* -*- mode: c; indent-tabs-mode: t; -*-
* vim:noexpandtab:
*
* Editing with tabs allows different users to pick their own indentation
* appearance without changing the file.
*/
/*
* Copyright (c) 2009, Los Alamos National Security, LLC All rights reserved.
* Copyright 2009. Los Alamos National Security, LLC. This software was produced
* under U.S. Government contract DE-AC52-06NA25396 for Los Alamos National
* Laboratory (LANL), which is operated by Los Alamos National Security, LLC for
* the U.S. Department of Energy. The U.S. Government has rights to use,
* reproduce, and distribute this software. NEITHER THE GOVERNMENT NOR LOS
* ALAMOS NATIONAL SECURITY, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
* ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE. If software is
* modified to produce derivative works, such modified software should be
* clearly marked, so as not to confuse it with the version available from
* LANL.
*
* Additionally, redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following conditions are
* met:
*
* • Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* • Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* • Neither the name of Los Alamos National Security, LLC, Los Alamos National
* Laboratory, LANL, the U.S. Government, nor the names of its contributors may be
* used to endorse or promote products derived from this software without specific
* prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY LOS ALAMOS NATIONAL SECURITY, LLC AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL LOS ALAMOS NATIONAL SECURITY, LLC OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
* OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
* IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
* OF SUCH DAMAGE.
*/
/******************************************************************************
*
* Implementation of abstract IOR interface, for the Amazon S3 API.
* EMC/ViPR supports some useful extensions to S3, which we also implement
* here. There are 3 different mixes:
*
* (1) "Pure S3" uses S3 "Multi-Part Upload" to do N:1 writes. N:N writes
* fail, in the case where IOR "transfer-size" differs from
* "block-size', because this implies an "append", and append is not
* supported in S3. [TBD: The spec also says multi-part upload can't
* have any individual part greater than 5MB, or more then 10k total
* parts. Failing these conditions may produce obscure errors. Should
* we enforce? ]
*
* --> Select this option with the '-a S3' command-line arg to IOR
*
*
* (2) "EMC S3 Extensions" uses the EMC byte-range support for N:1
* writes, eliminating Multi-Part Upload. EMC expects this will
* perform better than MPU, and it avoids some problems that are
* imposed by the S3 MPU spec. [See comments at EMC_Xfer().]
*
* --> Select this option with the '-a EMC_S3' command-line arg to IOR
*
*
* NOTE: Putting EMC's S3-extensions in the same file with the S3 API
* allows us to share some code that would otherwise be duplicated
* (e.g. s3_connect(), etc). This should also help us avoid losing
* bug fixes that are discovered in one interface or the other. In
* some cases, S3 is incapable of supporting all the needs of IOR.
* (For example, see notes about "append", above S3_Xfer().
*
******************************************************************************/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h> /* strnstr() */
#include <errno.h>
#include <assert.h>
#include <curl/curl.h>
#include <libxml/parser.h> // from libxml2
#include <libxml/tree.h>
#include "aws4c.h" // extended vers of "aws4c" lib for S3 via libcurl
#include "aws4c_extra.h" // utilities, e.g. for parsing XML in responses
#include "ior.h"
#include "aiori.h"
#include "aiori-debug.h"
extern int rank;
extern MPI_Comm testComm;
#define BUFF_SIZE 1024
const int ETAG_SIZE = 32;
CURLcode rc;
/* TODO: The following stuff goes into options! */
/* REST/S3 variables */
// CURL* curl; /* for libcurl "easy" fns (now managed by aws4c) */
# define IOR_CURL_INIT 0x01 /* curl top-level inits were performed once? */
# define IOR_CURL_NOCONTINUE 0x02
# define IOR_CURL_S3_EMC_EXT 0x04 /* allow EMC extensions to S3? */
#define MAX_UPLOAD_ID_SIZE 256 /* TODO don't know the actual value */
#ifdef USE_S3_4C_AIORI
# include <curl/curl.h>
# include "aws4c.h"
#else
typedef void CURL; /* unused, but needs a type */
typedef void IOBuf; /* unused, but needs a type */
#endif
typedef struct {
/* Any objects we create or delete will be under this bucket */
char* bucket_name;
char* user;
char* host;
/* Runtime data, this data isn't yet safe to allow concurrent access to multiple files, only open one file at a time */
int curl_flags;
IOBuf* io_buf; /* aws4c places parsed header values here */
IOBuf* etags; /* accumulate ETags for N:1 parts */
size_t part_number;
char UploadId[MAX_UPLOAD_ID_SIZE]; /* key for multi-part-uploads */
int written; /* did we write to the file */
} s3_options_t;
///////////////////////////////////////////////
static aiori_xfer_hint_t * hints = NULL;
static void S3_xfer_hints(aiori_xfer_hint_t * params){
hints = params;
}
/**************************** P R O T O T Y P E S *****************************/
static aiori_fd_t* S3_Create(char *path, int iorflags, aiori_mod_opt_t * options);
static aiori_fd_t* S3_Open(char *path, int flags, aiori_mod_opt_t * options);
static IOR_offset_t S3_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options);
static void S3_Close(aiori_fd_t * afd, aiori_mod_opt_t * options);
static aiori_fd_t* EMC_Create(char *path, int iorflags, aiori_mod_opt_t * options);
static aiori_fd_t* EMC_Open(char *path, int flags, aiori_mod_opt_t * options);
static IOR_offset_t EMC_Xfer(int access, aiori_fd_t * afd, IOR_size_t * buffer, IOR_offset_t length, IOR_offset_t offset, aiori_mod_opt_t * options);
static void EMC_Close(aiori_fd_t * afd, aiori_mod_opt_t * options);
static void S3_Delete(char *path, aiori_mod_opt_t * options);
static void S3_Fsync(aiori_fd_t *fd, aiori_mod_opt_t * options);
static IOR_offset_t S3_GetFileSize(aiori_mod_opt_t * options, char *testFileName);
static void S3_init(aiori_mod_opt_t * options);
static void S3_finalize(aiori_mod_opt_t * options);
static int S3_check_params(aiori_mod_opt_t * options);
static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values);
/************************** D E C L A R A T I O N S ***************************/
// "Pure S3"
// N:1 writes use multi-part upload
// N:N fails if "transfer-size" != "block-size" (because that requires "append")
ior_aiori_t s3_4c_aiori = {
.name = "S3-4c",
.name_legacy = NULL,
.create = S3_Create,
.open = S3_Open,
.xfer = S3_Xfer,
.xfer_hints = S3_xfer_hints,
.close = S3_Close,
.remove = S3_Delete,
.get_version = aiori_get_version,
.fsync = S3_Fsync,
.get_file_size = S3_GetFileSize,
.initialize = S3_init,
.finalize = S3_finalize,
.check_params = S3_check_params,
.get_options = S3_options,
.enable_mdtest = true
};
// "S3", plus EMC-extensions enabled
// N:1 writes use multi-part upload
// N:N succeeds (because EMC-extensions support "append")
ior_aiori_t s3_plus_aiori = {
.name = "S3_plus",
.create = S3_Create,
.open = S3_Open,
.xfer = S3_Xfer,
.close = S3_Close,
.remove = S3_Delete,
.get_version = aiori_get_version,
.fsync = S3_Fsync,
.get_file_size = S3_GetFileSize,
.initialize = S3_init,
.finalize = S3_finalize
};
// Use EMC-extensions for N:1 write, as well
// N:1 writes use EMC byte-range
// N:N succeeds because EMC-extensions support "append"
ior_aiori_t s3_emc_aiori = {
.name = "S3_EMC",
.create = EMC_Create,
.open = EMC_Open,
.xfer = EMC_Xfer,
.close = EMC_Close,
.remove = S3_Delete,
.get_version = aiori_get_version,
.fsync = S3_Fsync,
.get_file_size = S3_GetFileSize,
.initialize = S3_init,
.finalize = S3_finalize
};
static option_help * S3_options(aiori_mod_opt_t ** init_backend_options, aiori_mod_opt_t * init_values){
s3_options_t * o = malloc(sizeof(s3_options_t));
if (init_values != NULL){
memcpy(o, init_values, sizeof(s3_options_t));
}else{
memset(o, 0, sizeof(s3_options_t));
}
*init_backend_options = (aiori_mod_opt_t*) o;
o->bucket_name = "ior";
option_help h [] = {
{0, "S3-4c.user", "The username (in ~/.awsAuth).", OPTION_OPTIONAL_ARGUMENT, 's', & o->user},
{0, "S3-4C.host", "The host optionally followed by:port.", OPTION_OPTIONAL_ARGUMENT, 's', & o->host},
{0, "S3-4c.bucket-name", "The name of the bucket.", OPTION_OPTIONAL_ARGUMENT, 's', & o->bucket_name},
LAST_OPTION
};
option_help * help = malloc(sizeof(h));
memcpy(help, h, sizeof(h));
return help;
}
static void S3_init(aiori_mod_opt_t * options){
/* This is supposed to be done before *any* threads are created.
* Could MPI_Init() create threads (or call multi-threaded
* libraries)? We'll assume so. */
AWS4C_CHECK( aws_init() );
}
static void S3_finalize(aiori_mod_opt_t * options){
/* done once per program, after exiting all threads.
* NOTE: This fn doesn't return a value that can be checked for success. */
aws_cleanup();
}
static int S3_check_params(aiori_mod_opt_t * test){
if(! hints) return 0;
/* N:1 and N:N */
IOR_offset_t NtoN = hints->filePerProc;
IOR_offset_t Nto1 = ! NtoN;
IOR_offset_t s = hints->segmentCount;
IOR_offset_t t = hints->transferSize;
IOR_offset_t b = hints->blockSize;
if (Nto1 && (s != 1) && (b != t)) {
ERR("N:1 (strided) requires xfer-size == block-size");
return 1;
}
return 0;
}
/* modelled on similar macros in iordef.h */
#define CURL_ERR(MSG, CURL_ERRNO, PARAM) \
do { \
fprintf(stdout, "ior ERROR: %s: %s (curl-errno=%d) (%s:%d)\n", \
MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \
__FILE__, __LINE__); \
fflush(stdout); \
MPI_Abort((PARAM)->testComm, -1); \
} while (0)
#define CURL_WARN(MSG, CURL_ERRNO) \
do { \
fprintf(stdout, "ior WARNING: %s: %s (curl-errno=%d) (%s:%d)\n", \
MSG, curl_easy_strerror(CURL_ERRNO), CURL_ERRNO, \
__FILE__, __LINE__); \
fflush(stdout); \
} while (0)
/***************************** F U N C T I O N S ******************************/
/* ---------------------------------------------------------------------------
* "Connect" to an S3 object-file-system. We're really just initializing
* libcurl. We need this done before any interactions. It is easy for
* ior_aiori.open/create to assure that we connect, if we haven't already
* done so. However, there's not a simple way to assure that we
* "disconnect" at the end. For now, we'll make a special call at the end
* of ior.c
*
* NOTE: It's okay to call this thing whenever you need to be sure the curl
* handle is initialized.
*
* NOTE: Our custom version of aws4c can be configured so that connections
* are reused, instead of opened and closed on every operation. We
* do configure it that way, but you still need to call these
* connect/disconnect functions, in order to insure that aws4c has
* been configured.
* ---------------------------------------------------------------------------
*/
static void s3_connect( s3_options_t* param ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> s3_connect\n"); /* DEBUGGING */
//}
if ( param->curl_flags & IOR_CURL_INIT ) {
//if (param->verbose >= VERBOSE_2) {
// printf("<- s3_connect [nothing to do]\n"); /* DEBUGGING */
//}
return;
}
// --- Done once-only (per rank). Perform all first-time inits.
//
// The aws library requires a config file, as illustrated below. We
// assume that the user running the test has an entry in this file,
// using their login moniker (i.e. `echo $USER`) as the key, as
// suggested in the example:
//
// <user>:<s3_login_id>:<s3_private_key>
//
// This file must not be readable by other than user.
//
// NOTE: These inits could be done in init_IORParam_t(), in ior.c, but
// would require conditional compilation, there.
aws_set_debug(0); // param->verbose >= 4
aws_read_config(param->user); // requires ~/.awsAuth
aws_reuse_connections(1);
// initialize IOBufs. These are basically dynamically-extensible
// linked-lists. "growth size" controls the increment of new memory
// allocated, whenever storage is used up.
param->io_buf = aws_iobuf_new();
aws_iobuf_growth_size(param->io_buf, 1024*1024*1);
param->etags = aws_iobuf_new();
aws_iobuf_growth_size(param->etags, 1024*1024*8);
// WARNING: if you have http_proxy set in your environment, you may need
// to override it here. TBD: add a command-line variable to
// allow you to define a proxy.
//
// our hosts are currently 10.140.0.15 - 10.140 0.18
// TBD: Try DNS-round-robin server at vi-lb.ccstar.lanl.gov
// TBD: try HAProxy round-robin at 10.143.0.1
#if 1
// snprintf(buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
// s3_set_proxy(buff);
//
// snprintf(buff, BUFF_SIZE, "10.140.0.%d", 15 + (rank % 4));
// s3_set_host(buff);
//snprintf(options->buff, BUFF_SIZE, "10.140.0.%d:9020", 15 + (rank % 4));
//s3_set_host(options->buff);
#else
/*
* If you just want to go to one if the ECS nodes, put that IP
* address in here directly with port 9020.
*
*/
// s3_set_host("10.140.0.15:9020");
/*
* If you want to go to haproxy.ccstar.lanl.gov, this is its IP
* address.
*
*/
// s3_set_proxy("10.143.0.1:80");
// s3_set_host( "10.143.0.1:80");
#endif
s3_set_host(param->host);
// make sure test-bucket exists
s3_set_bucket((char*) param->bucket_name);
if (rank == 0) {
AWS4C_CHECK( s3_head(param->io_buf, "") );
if ( param->io_buf->code == 404 ) { // "404 Not Found"
printf(" bucket '%s' doesn't exist\n", param->bucket_name);
AWS4C_CHECK( s3_put(param->io_buf, "") ); /* creates URL as bucket + obj */
AWS4C_CHECK_OK( param->io_buf ); // assure "200 OK"
printf("created bucket '%s'\n", param->bucket_name);
}
else { // assure "200 OK"
AWS4C_CHECK_OK( param->io_buf );
}
}
MPI_CHECK(MPI_Barrier(testComm), "barrier error");
// Maybe allow EMC extensions to S3
s3_enable_EMC_extensions(param->curl_flags & IOR_CURL_S3_EMC_EXT);
// don't perform these inits more than once
param->curl_flags |= IOR_CURL_INIT;
//if (param->verbose >= VERBOSE_2) {
// printf("<- s3_connect [success]\n");
//}
}
static
void
s3_disconnect( s3_options_t* param ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> s3_disconnect\n");
//}
// nothing to do here, if using new aws4c ...
//if (param->verbose >= VERBOSE_2) {
// printf("<- s3_disconnect\n");
//}
}
// After finalizing an S3 multi-part-upload, you must reset some things
// before you can use multi-part-upload again. This will also avoid (one
// particular set of) memory-leaks.
void s3_MPU_reset(s3_options_t* param) {
aws_iobuf_reset(param->io_buf);
aws_iobuf_reset(param->etags);
param->part_number = 0;
}
/* ---------------------------------------------------------------------------
* direct support for the IOR S3 interface
* ---------------------------------------------------------------------------
*/
/*
* One doesn't "open" an object, in REST semantics. All we really care
* about is whether caller expects the object to have zero-size, when we
* return. If so, we conceptually delete it, then recreate it empty.
*
* ISSUE: If the object is going to receive "appends" (supported in EMC S3
* extensions), the object has to exist before the first append
* operation. On the other hand, there appears to be a bug in the
* EMC implementation, such that if an object ever receives appends,
* and then is deleted, and then recreated, the recreated object will
* always return "500 Server Error" on GET (whether it has been
* appended or not).
*
* Therefore, a safer thing to do here is write zero-length contents,
* instead of deleting.
*
* NOTE: There's also no file-descriptor to return, in REST semantics. On
* the other hand, we keep needing the file *NAME*. Therefore, we
* will return the file-name, and let IOR pass it around to our
* functions, in place of what IOR understands to be a
* file-descriptor.
*
*/
static aiori_fd_t * S3_Create_Or_Open_internal(char* testFileName, int openFlags, s3_options_t* param, int multi_part_upload_p ) {
unsigned char createFile = openFlags & IOR_CREAT;
//if (param->verbose >= VERBOSE_2) {
// printf("-> S3_Create_Or_Open('%s', ,%d, %d)\n",
// testFileName, createFile, multi_part_upload_p);
//}
/* initialize curl, if needed */
s3_connect( param );
/* Check for unsupported flags */
//if ( param->openFlags & IOR_EXCL ) {
// fprintf( stdout, "Opening in Exclusive mode is not implemented in S3\n" );
//}
//if ( param->useO_DIRECT == TRUE ) {
// fprintf( stdout, "Direct I/O mode is not implemented in S3\n" );
//}
// easier to think
int n_to_n = hints->filePerProc;
int n_to_1 = ! n_to_n;
/* check whether object needs reset to zero-length */
int needs_reset = 0;
if (! multi_part_upload_p)
needs_reset = 1; /* so "append" can work */
else if ( openFlags & IOR_TRUNC )
needs_reset = 1; /* so "append" can work */
else if (createFile) {
// AWS4C_CHECK( s3_head(param->io_buf, testFileName) );
// if ( ! AWS4C_OK(param->io_buf) )
needs_reset = 1;
}
char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
param->written = 0;
if ( openFlags & IOR_WRONLY || openFlags & IOR_RDWR ) {
param->written = 1;
/* initializations for N:1 or N:N writes using multi-part upload */
if (multi_part_upload_p) {
// For N:N, all ranks do their own MPU open/close. For N:1, only
// rank0 does that. Either way, the response from the server
// includes an "uploadId", which must be used to upload parts to
// the same object.
if ( n_to_n || (rank == 0) ) {
// rank0 handles truncate
if ( needs_reset) {
aws_iobuf_reset(param->io_buf);
AWS4C_CHECK( s3_put(param->io_buf, testFileName) ); /* 0-length write */
AWS4C_CHECK_OK( param->io_buf );
}
// POST request with URL+"?uploads" initiates multi-part upload
snprintf(buff, BUFF_SIZE, "%s?uploads", testFileName);
IOBuf* response = aws_iobuf_new();
AWS4C_CHECK( s3_post2(param->io_buf, buff, NULL, response) );
AWS4C_CHECK_OK( param->io_buf );
// parse XML returned from server, into a tree structure
aws_iobuf_realloc(response);
xmlDocPtr doc = xmlReadMemory(response->first->buf,
response->first->len,
NULL, NULL, 0);
if (doc == NULL)
ERR("Rank0 Failed to find POST response\n");
// navigate parsed XML-tree to find UploadId
xmlNode* root_element = xmlDocGetRootElement(doc);
const char* upload_id = find_element_named(root_element, (char*)"UploadId");
if (! upload_id)
ERR("couldn't find 'UploadId' in returned XML\n");
//if (param->verbose >= VERBOSE_3)
// printf("got UploadId = '%s'\n", upload_id);
const size_t upload_id_len = strlen(upload_id);
if (upload_id_len > MAX_UPLOAD_ID_SIZE) {
snprintf(buff, BUFF_SIZE, "UploadId length %zd exceeds expected max (%d)", upload_id_len, MAX_UPLOAD_ID_SIZE);
ERR(buff);
}
// save the UploadId we found
memcpy(param->UploadId, upload_id, upload_id_len);
param->UploadId[upload_id_len] = 0;
// free storage for parsed XML tree
xmlFreeDoc(doc);
aws_iobuf_free(response);
// For N:1, share UploadId across all ranks
if (n_to_1)
MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, testComm);
}
else
// N:1, and we're not rank0. recv UploadID from Rank 0
MPI_Bcast(param->UploadId, MAX_UPLOAD_ID_SIZE, MPI_BYTE, 0, testComm);
}
/* initializations for N:N or N:1 writes using EMC byte-range extensions */
else {
/* maybe reset to zero-length, so "append" can work */
if (needs_reset) {
if (verbose >= VERBOSE_3) {
fprintf( stdout, "rank %d resetting\n",
rank);
}
aws_iobuf_reset(param->io_buf);
AWS4C_CHECK( s3_put(param->io_buf, testFileName) );
AWS4C_CHECK_OK( param->io_buf );
}
}
}
//if (param->verbose >= VERBOSE_2) {
// printf("<- S3_Create_Or_Open\n");
//}
return ((aiori_fd_t *) testFileName );
}
static aiori_fd_t * S3_Create( char *testFileName, int iorflags, aiori_mod_opt_t * param ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> S3_Create\n");
//}
//if (param->verbose >= VERBOSE_2) {
// printf("<- S3_Create\n");
//}
return S3_Create_Or_Open_internal( testFileName, iorflags, (s3_options_t*) param, TRUE );
}
static aiori_fd_t * EMC_Create( char *testFileName, int iorflags, aiori_mod_opt_t * param ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> EMC_Create\n");
//}
//if (param->verbose >= VERBOSE_2) {
// printf("<- EMC_Create\n");
//}
return S3_Create_Or_Open_internal( testFileName, iorflags, (s3_options_t*) param, FALSE );
}
static aiori_fd_t * S3_Open( char *testFileName, int flags, aiori_mod_opt_t * param ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> S3_Open\n");
//}
return S3_Create_Or_Open_internal( testFileName, flags, (s3_options_t*) param, TRUE );
}
static aiori_fd_t * EMC_Open( char *testFileName, int flags, aiori_mod_opt_t * param ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> S3_Open\n");
//}
return S3_Create_Or_Open_internal( testFileName, flags, (s3_options_t*) param, FALSE );
}
/*
* transfer (more) data to an object. <file> is just the obj name.
*
* For N:1, param->offset is understood as offset for a given client to
* write into the "file". This translates to a byte-range in the HTTP
* request. Each write in the N:1 case is treated as a complete "part",
* so there is no such thing as a partial write.
*
* For N:N, when IOR "transfer-size" differs from "block-size", IOR treats
* Xfer as a partial write (i.e. there are multiple calls to XFER, to write
* any one of the "N" objects, as a series of "append" operations). This
* is not supported in S3/REST. Therefore, we depend on an EMC extension,
* in this case. This EMC extension allows appends using a byte-range
* header spec of "Range: bytes=-1-". aws4c now provides
* s3_enable_EMC_extensions(), to allow this behavior. If EMC-extensions
* are not enabled, the aws4c library will generate a run-time error, in
* this case.
*
* Each write-request returns an ETag which is a hash of the data. (The
* ETag could also be computed directly, if we wanted.) We must save the
* etags for later use by S3_close().
*
* WARNING: "Pure" S3 doesn't allow byte-ranges for writes to an object.
* Thus, you also can not append to an object. In the context of IOR,
* this causes objects to have only the size of the most-recent write.
* Thus, If the IOR "transfer-size" is different from the IOR
* "block-size", the files will be smaller than the amount of data
* that was written to them.
*
* EMC does support "append" to an object. In order to allow this,
* you must enable the EMC-extensions in the aws4c library, by calling
* s3_set_emc_compatibility() with a non-zero argument.
*
* NOTE: I don't think REST allows us to read/write an amount other than
* the size we request. Maybe our callback-handlers (above) could
* tell us? For now, this is assuming we only have to send one
* request, to transfer any amount of data. (But see above, re EMC
* support for "append".)
*/
/* In the EMC case, instead of Multi-Part Upload we can use HTTP
* "byte-range" headers to write parts of a single object. This appears to
* have several advantages over the S3 MPU spec:
*
* (a) no need for a special "open" operation, to capture an "UploadID".
* Instead we simply write byte-ranges, and the server-side resolves
* any races, producing a single winner. In the IOR case, there should
* be no races, anyhow.
*
* (b) individual write operations don't have to refer to an ID, or to
* parse and save ETags returned from every write.
*
* (c) no need for a special "close" operation, in which all the saved
* ETags are gathered at a single rank, placed into XML, and shipped to
* the server, to finalize the MPU. That special close appears to
* impose two scaling problems: (1) requires all ETags to be shipped at
* the BW available to a single process, (1) requires either that they
* all fit into memory of a single process, or be written to disk
* (imposes additional BW constraints), or make a more-complex
* interaction with a threaded curl writefunction, to present the
* appearance of a single thread to curl, whilst allowing streaming
* reception of non-local ETags.
*
* (d) no constraints on the number or size of individual parts. (These
* exist in the S3 spec, the EMC's impl of the S3 multi-part upload is
* also free of these constraints.)
*
* Instead, parallel processes can write any number and/or size of updates,
* using a "byte-range" header. After each write returns, that part of the
* global object is visible to any reader. Places that are not updated
* read as zeros.
*/
static IOR_offset_t S3_Xfer_internal(int access,
aiori_fd_t* file,
IOR_size_t* buffer,
IOR_offset_t length,
IOR_offset_t offset,
s3_options_t* param,
int multi_part_upload_p ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> S3_Xfer(acc:%d, target:%s, buf:0x%llx, len:%llu, 0x%llx)\n",
// access, (char*)file, buffer, length, param);
//}
char* fname = (char*)file; /* see NOTE above S3_Create_Or_Open() */
size_t remaining = (size_t)length;
char* data_ptr = (char *)buffer;
// easier to think
int n_to_n = hints->filePerProc;
int n_to_1 = (! n_to_n);
int segmented = (hints->segmentCount == 1);
if (access == WRITE) { /* WRITE */
//if (verbose >= VERBOSE_3) {
// fprintf( stdout, "rank %d writing length=%lld to offset %lld\n",
// rank,
// remaining,
// param->offset + length - remaining);
//}
if (multi_part_upload_p) {
// For N:1, part-numbers must have a global ordering for the
// components of the final object. param->part_number is
// incremented by 1 per write, on each rank. This lets us use it
// to compute a global part-numbering.
//
// In the N:N case, we only need to increment part-numbers within
// each rank.
//
// In the N:1 case, the global order of part-numbers we're writing
// depends on whether wer're writing strided or segmented, in
// other words, how <offset> and <remaining> are actually
// positioning the parts being written. [See discussion at
// S3_Close_internal().]
//
// NOTE: 's3curl.pl --debug' shows StringToSign having partNumber
// first, even if I put uploadId first in the URL. Maybe
// that's what the server will do. GetStringToSign() in
// aws4c is not clever about this, so we spoon-feed args in
// the proper order.
size_t part_number;
if (n_to_1) {
if (segmented) { // segmented
size_t parts_per_rank = hints->blockSize / hints->transferSize;
part_number = (rank * parts_per_rank) + param->part_number;
}
else // strided
part_number = (param->part_number * hints->numTasks) + rank;
}
else
part_number = param->part_number;
++ param->part_number;
// if (verbose >= VERBOSE_3) {
// fprintf( stdout, "rank %d of %d writing (%s,%s) part_number %lld\n",
// rank,
// hints->numTasks,
// (n_to_1 ? "N:1" : "N:N"),
// (segmented ? "segmented" : "strided"),
// part_number);
// }
char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
snprintf(buff, BUFF_SIZE,
"%s?partNumber=%zd&uploadId=%s",
fname, part_number, param->UploadId);
// For performance, we append <data_ptr> directly into the linked list
// of data in param->io_buf. We are "appending" rather than
// "extending", so the added buffer is seen as written data, rather
// than empty storage.
//
// aws4c parses some header-fields automatically for us (into members
// of the IOBuf). After s3_put2(), we can just read the etag from
// param->io_buf->eTag. The server actually returns literal
// quote-marks, at both ends of the string.
aws_iobuf_reset(param->io_buf);
aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
AWS4C_CHECK( s3_put(param->io_buf, buff) );
AWS4C_CHECK_OK( param->io_buf );
// if (verbose >= VERBOSE_3) {
// printf("rank %d: read ETag = '%s'\n", rank, param->io_buf->eTag);
// if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */
// fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n",
// rank, ETAG_SIZE);
// exit(1);
// }
// }
//if (verbose >= VERBOSE_3) {
// fprintf( stdout, "rank %d of %d (%s,%s) offset %lld, part# %lld --> ETag %s\n",
// rank,
// hints->numTasks,
// (n_to_1 ? "N:1" : "N:N"),
// (segmented ? "segmented" : "strided"),
// offset,
// part_number,
// param->io_buf->eTag); // incl quote-marks at [0] and [len-1]
//}
if (strlen(param->io_buf->eTag) != ETAG_SIZE+2) { /* quotes at both ends */
fprintf(stderr, "Rank %d: ERROR: expected ETag to be %d hex digits\n",
rank, ETAG_SIZE);
exit(EXIT_FAILURE);
}
// save the eTag for later
//
// memcpy(etag, param->io_buf->eTag +1, strlen(param->io_buf->eTag) -2);
// etag[ETAG_SIZE] = 0;
aws_iobuf_append(param->etags,
param->io_buf->eTag +1,
strlen(param->io_buf->eTag) -2);
// DEBUGGING
//if (verbose >= VERBOSE_4) {
// printf("rank %d: part %d = ETag %s\n", rank, part_number, param->io_buf->eTag);
//}
// drop ptrs to <data_ptr>, in param->io_buf
aws_iobuf_reset(param->io_buf);
}
else { // use EMC's byte-range write-support, instead of MPU
// NOTE: You must call 's3_enable_EMC_extensions(1)' for
// byte-ranges to work for writes.
if (n_to_n)
s3_set_byte_range(-1,-1); // EMC header "Range: bytes=-1-" means "append"
else
s3_set_byte_range(offset, remaining);
// For performance, we append <data_ptr> directly into the linked list
// of data in param->io_buf. We are "appending" rather than
// "extending", so the added buffer is seen as written data, rather
// than empty storage.
aws_iobuf_reset(param->io_buf);
aws_iobuf_append_static(param->io_buf, data_ptr, remaining);
AWS4C_CHECK ( s3_put(param->io_buf, (char*) file) );
AWS4C_CHECK_OK( param->io_buf );
// drop ptrs to <data_ptr>, in param->io_buf
aws_iobuf_reset(param->io_buf);
}
if ( hints->fsyncPerWrite == TRUE ) {
WARN("S3 doesn't support 'fsync'" ); /* does it? */
}
}
else { /* READ or CHECK */
//if (verbose >= VERBOSE_3) {
// fprintf( stdout, "rank %d reading from offset %lld\n",
// rank,
// hints->offset + length - remaining );
//}
// read specific byte-range from the object
// [This is included in the "pure" S3 spec.]
s3_set_byte_range(offset, remaining);
// For performance, we append <data_ptr> directly into the linked
// list of data in param->io_buf. In this case (i.e. reading),
// we're "extending" rather than "appending". That means the
// buffer represents empty storage, which will be filled by the
// libcurl writefunction, invoked via aws4c.
aws_iobuf_reset(param->io_buf);
aws_iobuf_extend_static(param->io_buf, data_ptr, remaining);
AWS4C_CHECK( s3_get(param->io_buf, (char*) file) );
if (param->io_buf->code != 206) { /* '206 Partial Content' */
char buff[BUFF_SIZE]; /* buffer is used to generate URLs, err_msgs, etc */
snprintf(buff, BUFF_SIZE,
"Unexpected result (%d, '%s')",
param->io_buf->code, param->io_buf->result);
ERR(buff);
}
// drop refs to <data_ptr>, in param->io_buf
aws_iobuf_reset(param->io_buf);
}
//if (verbose >= VERBOSE_2) {
// printf("<- S3_Xfer\n");
//}
return ( length );
}
static IOR_offset_t S3_Xfer(int access,
aiori_fd_t* file,
IOR_size_t* buffer,
IOR_offset_t length,
IOR_offset_t offset,
aiori_mod_opt_t* param ) {
S3_Xfer_internal(access, file, buffer, length, offset, (s3_options_t*) param, TRUE);
}
static
IOR_offset_t
EMC_Xfer(int access,
aiori_fd_t* file,
IOR_size_t* buffer,
IOR_offset_t length,
IOR_offset_t offset,
aiori_mod_opt_t* param ) {
S3_Xfer_internal(access, file, buffer, length, offset, (s3_options_t*) param, FALSE);
}
/*
* Does this even mean anything, for HTTP/S3 ?
*
* I believe all interactions with the server are considered complete at
* the time we get a response, e.g. from s3_put(). Therefore, fsync is
* kind of meaningless, for REST/S3.
*
* In future, we could extend our interface so as to allow a non-blocking
* semantics, for example with the libcurl "multi" interface, and/or by
* adding threaded callback handlers to obj_put(). *IF* we do that, *THEN*
* we should revisit 'fsync'.
*
* Another special case is multi-part upload, where many parallel clients
* may be writing to the same "file". (It looks like param->filePerProc
* would be the flag to check, for this.) Maybe when you called 'fsync',
* you meant that you wanted *all* the clients to be complete? That's not
* really what fsync would do. In the N:1 case, this is accomplished by
* S3_Close(). If you really wanted this behavior from S3_Fsync, we could
* have S3_Fsync call S3_close.
*
* As explained above, we may eventually want to consider the following:
*
* (1) thread interaction with any handlers that are doing ongoing
* interactions with the socket, to make sure they have finished all
* actions and gotten responses.
*
* (2) MPI barrier for all clients involved in a multi-part upload.
* Presumably, for IOR, when we are doing N:1, all clients are
* involved in that transfer, so this would amount to a barrier on
* MPI_COMM_WORLD.
*/
static void S3_Fsync( aiori_fd_t *fd, aiori_mod_opt_t * param ) {
//if (param->verbose >= VERBOSE_2) {
// printf("-> S3_Fsync [no-op]\n");
//}
}
/*
* It seems the only kind of "close" that ever needs doing for S3 is in the
* case of multi-part upload (i.e. N:1). In this case, all the parties to
* the upload must provide their ETags to a single party (e.g. rank 0 in an
* MPI job). Then the rank doing the closing can generate XML and complete
* the upload.
*