Apache Flink - bwHPC Wiki Apache Flink - bwHPC Wiki

Apache Flink

From bwHPC Wiki
Jump to: navigation, search

1 Introduction to Flink

Apache Flink is described on their website as ‘an open-source stream processing frame- work for distributed, high-performing, always-available, and accurate data streaming applications. This itself doesn’t sound like MapReduce, but the data processing in Flink has its roots in the MapReduce paradigm. Apache Flink is written in Java, so every program that you want to run on Flink must be written in Java or Scala. To run the written code, you can use Maven to package your code into a jar-file. This jar-file can then be executed by Flink. As data sources Flink can use Message Queues, from which the events can be acquired after subscribing, it can use Socket streams, or a simple file which is read from the file system.

1.1 Flink on the BwUniCluster

For a small test-setup we want to use 1 node as a Master node running the Flink JobManager, and multiple nodes running in the Multinode Queue for the TaskManagers. As mentioned in the description of the BwUniCluster, it is managed with the Workload Manager Moab. So we have to allocate the resources for the Master node, and the other nodes. The resources for the master Node can be allocated with

msub -l nodes =1 -l walltime =0:00:30:00 -I -V

The parameter -I gives us an interactive session, which allows us to have a Shell running on the node. We don't need the Master node running in a special Queue, because there is only one JobManager thread. The walltime parameter declares how long we want to use the resources, and with -V we tell Moab to use the environment variables for our user in the initiated interactive session. For The TaskManagers we use

msub -q multinode -l nodes =2 -l walltime =0:00:30:00 -I -V

The added queue parameter for Multinode allows us to allocate multiple nodes with one command, so that there are multiple nodes running in the same environment. With this setup we have to use different allocated nodes because there were problems with running different software on different nodes in the multinode queue. The Flink application can be downloaded from its homepage (https://flink.apache.org/downloads.html), and the implementation stored in the HOME directory, so that it is accessible from all nodes. Therefore all nodes use the same executables and configuration files. After the resources are assigned, and the interactive sessions are started, the ip-address or the domain name of the master node has to be distributed to the the working nodes. For this we have to edit the config file at FLINK-DIR/conf/flink-conf.yaml. In jobman- ager.rpc.address, the Ip-Address / DomainName of the Master node has to be entered. While configuring this, we can also define the amount of memory, and the number of TaskSlots that should be used by Flink. When everything is configured we can start the JobManager on the Master Node with

$FLINK - DIR / bin / jobmanager . sh start cluster

and the TaskManagers on the other nodes with

$FLINK - DIR / bin / taskmanager . sh start

After this Flink should be ready to run packed.jar files. These can be executed using flinks own command line tool (FLINK-DIR/bin/flink) on either of the nodes. If you want to get a nicer overview of the running programs, there is also a Web-Interface which can be used to manage the Flink installation. One possibility to access the web- interface is, running on the master node is to use a SOCKS proxy running on the login node that connects the browser to the subnet of the cluster. For this we can use SSH to initialize a SOCKS Proxy on port 7070 with the following command

ssh -D 7070 username@bwunicluster.scc.kit.edu

After setting localhost and port 7070 in your Browsers Proxy Settings, the Browser should be able to access the cluster subnet. To find the IP-Address of the Masternode we can use the ifconfig command, that will give us the ip-address used on the main Ethernet interface. This IP-Adress can now be used in the browser with the TCP-Port set in the config file, to access the Web Interface (for example: 10.20.4.251:8081 )