-
Notifications
You must be signed in to change notification settings - Fork 20
How to read files via FTP and write them into HDFS filesystem.
Ahmad Nouri edited this page Dec 18, 2018
·
3 revisions
The following SPL application demonstrates how to connect to a FTP server and read files from a directory.
And how to insert these files into a HDFS file system via HDFS2FileSink.
/*
* The stream 'SignalOutput' creates a connection to the FTP server and reads the file names from a directory.
* The stream 'FileContents' reads the file contents and forwards the lines to the 'HdfsWriter'.
* Check if the FTP server is running.
* service vsftpd status
* The stream 'HdfsWriter' writes all incoming lines into a file in your HDFS server in your user directory.
* Make sure that directory defined in "HDFSPath" parameter exist and the user has permeation to write in this directory.
* After a successfully copy of file in HDFS, 'DeleteFtpFile' deletes the file with FTP command
*
* When your hadoop cluster is kerberosed, you can add 'authKeytab' an 'authPrincipal' parameters
* to the HDFS2FileSink.
* Copy 'core-site.xml' file and authentication keytab from hadoop server in 'etc' directory of your project.
* Before you begin with submit of your SPL application make sure that your keytab and principal works
* for example:
* kinit -k -t etc/hdfs.headless.keytab [email protected]
*
* The print streams are only to help for a better understanding of the whole streams.
* They can be removed for production application.
*
* Adapt the first parameters in this SPL file.
* Replace only the values of these parameters with your FTP and HDFS credentials.
*/
namespace application ;
use com.ibm.streamsx.inet.ftp::* ;
use com.ibm.streamsx.hdfs::HDFS2FileSink ;
composite SFTP2HDFS
{
param
expression<Protocol> $protocol : (Protocol)getSubmissionTimeValue("protocol", "sftp");
expression<rstring> $host : getSubmissionTimeValue("host", "myftphost.fyre.ibm.com");
expression<rstring> $path : getSubmissionTimeValue("path", "/Inbox/");
expression<rstring> $username : getSubmissionTimeValue("username", "ftpuser");
expression<rstring> $password : getSubmissionTimeValue("password", "myftppassword");
expression<rstring> $HDFSPath : getSubmissionTimeValue("HDFSPath", "/user/hdfs/testDirectory/");
expression<rstring> $authKeytab : getSubmissionTimeValue("authKeytab", "etc/hdfs.headless.keytab") ;
expression<rstring> $authPrincipal : getSubmissionTimeValue("authPrincipal", "[email protected]") ;
expression<rstring> $configPath : getSubmissionTimeValue("configPath", "etc") ;
graph
// a trigger stream for the ftp directory scan
stream<int32 count> TriggerStream = Beacon()
{
param
initDelay : 2.0 ;
iterations : 1 ;
period : 10.0 ;
output
TriggerStream : count =(int32) IterationCount() ;
config
placement : partitionColocation("DIR") ;
}
stream<rstring line, rstring fileName, uint64 size, rstring date, rstring user, boolean isFile,
uint32 transferCount, uint32 failureCount, uint64 bytesTransferred, float64 speed> SignalOutput
as OUT = FTPReader(TriggerStream)
{
param
protocol : $protocol ;
isDirReader : true ;
host : $host ;
path : $path ;
username : $username ;
password : $password ;
output
OUT : line = Line(), fileName = FileName(), size = FileSize(), date = FileDate(), user =
FileUser(), isFile = IsFile(), transferCount = TransferCount(), failureCount =
TransferFailureCount(), bytesTransferred = BytesTransferred(), speed = TransferSpeed() ;
}
stream<rstring line, rstring fileName> FileContents as OUT1 = FTPReader(SignalOutput as IN)
{
param
protocol : $protocol ;
isDirReader : false ;
filename : IN.fileName ;
host : $host ;
path : $path ;
username : $username ;
password : $password ;
output
OUT1 : line = Line(), fileName = $HDFSPath + IN.fileName ;
}
() as printFTPReader=Custom(FileContents){
logic
onTuple FileContents :
{
printStringLn("HdfsWriter line : " + line + " fileName : " + fileName) ;
}
}
stream<rstring out_file_name, uint64 size> HdfsWriter = HDFS2FileSink(FileContents)
{
logic
onTuple FileContents :
{
printStringLn("HdfsWriter line : " + line + " fileName : " + fileName) ;
}
param
authKeytab : $authKeytab ;
authPrincipal : $authPrincipal ;
configPath : "etc" ;
fileAttributeName : "fileName" ;
// vmArg : "-Djava.security.krb5.conf=/path/krb5.conf" ;
}
//prepare command stream
stream<rstring command, rstring file> CommandStream as OUT = Custom(HdfsWriter as IN)
{
logic
state :
{
mutable rstring fileName = "" ;
mutable int32 position1 = 0 ;
mutable int32 leng = 0 ;
}
onTuple IN :
{
printStringLn("file name " + IN.out_file_name) ;
fileName = IN.out_file_name ;
// extract only file name without path
leng = length(fileName) ;
position1 = findLast(fileName, "/", leng - 1) ;
fileName = substring(fileName, position1 + 1, leng - 1) ;
submit({ command = "rm", file = fileName }, OUT) ;
}
config
placement : partitionColocation("DELETE") ;
}
() as printCommandStream=Custom(CommandStream){
logic
onTuple CommandStream :
{
printStringLn("printCommandStream line : " + command + " fileName : " + file) ;
}
}
stream<boolean success, rstring fileName> DeleteFtpFile as OUT = FTPCommand(CommandStream as IN)
{
param
protocol : $protocol ;
filename : IN.file ;
command : IN.command ;
host : $host ;
path : $path ;
username : $username ;
password : $password ;
connectionCloseMode : never ;
curlVerbose : false;
output
OUT : fileName = IN.file, success = Success() ;
config
placement : partitionColocation("DELETE") ;
}
() as printDeleteFtpFile=Custom(DeleteFtpFile){
logic
onTuple DeleteFtpFile :
{
printStringLn("DeleteFtpFile line : " + (rstring)success + " fileName : " + fileName) ;
}
}
}