杰瑞科技汇

如何用Python玩转Hadoop?

Of course! Using Hadoop with Python is a very common and powerful combination. While Java is the native language for Hadoop, Python is often preferred for its simplicity, readability, and vast ecosystem of data science libraries.

如何用Python玩转Hadoop?-图1
(图片来源网络,侵删)

Here’s a comprehensive guide covering the main ways to use Hadoop with Python, from the most common to the more advanced.


The Core Idea: Why Python + Hadoop?

Hadoop's core strength is its distributed storage (HDFS) and processing (MapReduce/YARN). Python's strength is in rapid development, data manipulation (Pandas, NumPy), and machine learning (Scikit-learn, TensorFlow).

Combining them allows you to:

  • Leverage Hadoop's scalability for massive datasets that don't fit on a single machine.
  • Use Python's rich ecosystem for data analysis, cleaning, and modeling.
  • Write complex data pipelines more easily than with raw Java MapReduce.

The Main Approaches for Python on Hadoop

There are three primary ways to integrate Python with Hadoop:

  1. Hadoop Streaming: The classic, built-in method. It treats Python scripts as standard executables that communicate with Hadoop via stdin and stdout.
  2. PySpark (Apache Spark with Python API): The modern, de-facto standard. Spark is a faster, more powerful successor to MapReduce and has excellent native Python support.
  3. Hadoop Pipes (C++ API): An older, lower-level API. It's generally not used for Python development and is mentioned here only for completeness.

Let's dive into the first two, which are the most important.


Approach 1: Hadoop Streaming (The Classic Method)

Hadoop Streaming allows you to create and run MapReduce jobs using any executable as the mapper or reducer. This executable reads from stdin and writes to stdout.

How it Works:

  1. Mapper: A Python script that reads input line by line, processes it, and outputs key-value pairs (e.g., word\t1).
  2. Reducer: Another Python script that reads the sorted key-value pairs from the mapper, aggregates them (e.g., sums the counts for each word), and outputs the final result (e.g., hadoop\t42).
  3. Hadoop Job: The Hadoop framework pipes the data between your Python scripts.

Step-by-Step Example: Word Count

Prerequisites:

  • A working Hadoop cluster (or a single-node setup like Hadoop on Docker/Cloudera).
  • Python 3 installed on all nodes.
  • Hadoop Streaming JAR file (usually found in your Hadoop installation, e.g., $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar).

Step 1: Create the Mapper Script (mapper.py)

This script splits each line into words and emits each word with a count of 1.

#!/usr/bin/env python3
import sys
# Read input from stdin line by line
for line in sys.stdin:
    # Remove leading/trailing whitespace and convert to lowercase
    line = line.strip().lower()
    # Split the line into words
    words = line.split()
    # Emit each word with a count of 1
    for word in words:
        # The output format is: key \t value
        print(f"{word}\t1")

Step 2: Create the Reducer Script (reducer.py)

This script sums up the counts for each word.

#!/usr/bin/env python3
import sys
current_word = None
current_count = 0
word = None
# Read input from stdin, which is sorted by Hadoop
for line in sys.stdin:
    # Remove leading/trailing whitespace
    line = line.strip()
    # Parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    count = int(count)
    # This part handles the aggregation
    if current_word == word:
        current_count += count
    else:
        # If the word has changed, print the previous word's count
        if current_word:
            print(f"{current_word}\t{current_count}")
        # Reset for the new word
        current_word = word
        current_count = count
# Output the last word
if current_word == word:
    print(f"{current_word}\t{current_count}")

Step 3: Make Scripts Executable

chmod +x mapper.py
chmod +x reducer.py

Step 4: Run the Hadoop Streaming Job

Let's assume you have a text file input.txt in your local directory. First, upload it to HDFS.

# Create an input directory in HDFS
hdfs dfs -mkdir -p /user/hadoop/streaming_input
# Upload your local file to HDFS
hdfs dfs -put input.txt /user/hadoop/streaming_input

Now, submit the job:

hadoop jar \
  $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
  -input /user/hadoop/streaming_input/input.txt \
  -output /user/hadoop/streaming_output \
  -mapper ./mapper.py \
  -reducer ./reducer.py \
  -file mapper.py \
  -file reducer.py

Explanation of Arguments:

  • -jar: The Hadoop Streaming library.
  • -input: The HDFS path for input data.
  • -output: The HDFS path for the output (must not exist).
  • -mapper: The command to execute the mapper (our Python script).
  • -reducer: The command to execute the reducer (our Python script).
  • -file: Copies the local file to the Hadoop cluster so each task node can access it.

Step 5: Check the Output

The output will be in part-00000 (and other part-* files if the job is split).

# View the output files in HDFS
hdfs dfs -cat /user/hadoop/streaming_output/part-00000
# Copy the output to your local machine to view it easily
hdfs dfs -get /user/hadoop/streaming_output/* ./local_output/
cat ./local_output/part-00000

Approach 2: PySpark (The Modern & Recommended Method)

Apache Spark is a unified analytics engine for large-scale data processing. It's much faster than MapReduce because it uses in-memory processing and a more efficient execution model. PySpark is its Python API.

Prerequisites:

  • A working Spark cluster (or a local installation).
  • Python and pyspark installed (pip install pyspark).

How it Works:

Instead of writing separate mapper/reducer scripts, you write a single Python script that uses the PySpark API. The Spark framework handles distributing the code and data across the cluster.

Step-by-Step Example: Word Count with PySpark

Step 1: Create the PySpark Script (wordcount_pyspark.py)

This script defines the entire data processing logic within a single application.

from pyspark.sql import SparkSession
# 1. Initialize a SparkSession
# This is the entry point for any PySpark application.
spark = SparkSession.builder \
    .appName("PythonWordCount") \
    .getOrCreate()
# 2. Read the input file from HDFS
# `textFile` reads each line of the text file as an element in an RDD (Resilient Distributed Dataset).
lines = spark.sparkContext.textFile("hdfs:///user/hadoop/streaming_input/input.txt")
# 3. Perform the Map operations
# flatMap: Splits each line into words. The result is an RDD of words.
# map: Transforms each word into a (word, 1) pair.
pairs = lines.flatMap(lambda x: x.split(" ")) \
             .map(lambda word: (word, 1))
# 4. Perform the Reduce operation
# reduceByKey: Aggregates pairs by key, summing up the values.
counts = pairs.reduceByKey(lambda a, b: a + b)
# 5. Collect and print the results
# `collect()` brings the data from the cluster to the driver node.
# Use this for small final results, not for large datasets.
output = counts.collect()
for (word, count) in output:
    print(f"{word}: {count}")
# 6. Stop the SparkSession
spark.stop()

Step 2: Run the PySpark Application

You can run this script in two ways:

A) Locally (for testing): This will run Spark on your local machine using multiple cores.

# --master local[*] uses all available cores on your machine
python wordcount_pyspark.py --master local[*]

B) On a Cluster (e.g., YARN): This is how you run it on a real Hadoop cluster.

# Submit the application to the YARN resource manager
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --name PythonWordCount \
  --py-files wordcount_pyspark.py \
  wordcount_pyspark.py

Explanation of Arguments:

  • --master: Specifies the cluster manager. yarn for Hadoop YARN, local[*] for local mode.
  • --deploy-mode: cluster deploys your driver on the cluster (safer for production), client runs the driver on your local machine (easier for debugging).
  • --name: A descriptive name for your application.
  • --py-files: Archives or .py files to send to the cluster. Useful for dependencies.

Comparison: Hadoop Streaming vs. PySpark

Feature Hadoop Streaming PySpark (Spark)
Performance Slower. Relies on process-level communication. Much Faster. In-memory processing, DAG execution.
Ease of Use More manual. You manage pipes and I/O. Easier. High-level, DataFrame API similar to Pandas.
Ecosystem Limited. Just your Python scripts. Rich. Integrates with SQL, ML (MLlib), Graph (GraphX), Streaming.
Abstraction Level Low-level (MapReduce paradigm). High-level. Can use RDDs or the even higher-level DataFrame API.
Cluster Overhead Lower overhead per task. Higher overhead due to the Spark application context.
Primary Use Case Simple, one-off MapReduce jobs; integrating legacy scripts. Modern data pipelines, ETL, machine learning on large data.

Conclusion and Recommendation

  • For new projects, use PySpark. It is the modern industry standard, significantly more powerful and easier to work with for most data processing tasks. The DataFrame API provides a level of abstraction that is both highly optimized and intuitive for those familiar with Pandas or SQL.

  • Use Hadoop Streaming when:

    • You have existing Python scripts that you want to run on Hadoop with minimal changes.
    • You are in an environment where Spark is not available or is overkill for a very simple, ad-hoc job.
    • You need to run a non-Python executable (e.g., a sed script or a compiled binary) as part of your pipeline.
分享:
扫描分享到社交APP
上一篇
下一篇