杰瑞科技汇

如何用sparksubmit运行Python任务?

Of course! spark-submit is the primary utility used to launch PySpark (and other Spark) applications on a cluster or in standalone mode. It's the bridge between your Python script and the Spark execution environment.

如何用sparksubmit运行Python任务?-图1
(图片来源网络,侵删)

Here’s a comprehensive guide covering everything from the basics to advanced configurations.


What is spark-submit?

spark-submit is a command-line tool packaged with Spark. Its main jobs are:

  1. Package Your Application: It bundles your Python script (.py file) along with any required dependencies (.zip, .egg, or .jar files).
  2. Connect to the Cluster: It connects to a Spark cluster (YARN, Mesos, Kubernetes, or Spark's standalone cluster manager).
  3. Configure Resources: It allocates the necessary resources (CPU, memory) for your application to run.
  4. Launch the Driver: It starts the Spark driver, which is the process that runs your main() function and coordinates the execution.

Basic Syntax

The fundamental syntax is:

spark-submit [options] <your_script.py> [script_arguments]
  • [options]: These are Spark-specific configurations to control how your application runs (e.g., memory, number of cores).
  • <your_script.py>: Your Python application containing the Spark code.
  • [script_arguments]: Any arguments you want to pass to your Python script.

A Simple Example

Let's say you have a file named my_app.py:

如何用sparksubmit运行Python任务?-图2
(图片来源网络,侵删)

my_app.py

from pyspark.sql import SparkSession
def main():
    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("MySimpleApp") \
        .getOrCreate()
    # Create a simple RDD
    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)
    # Perform a transformation (square the numbers)
    squared_rdd = rdd.map(lambda x: x * x)
    # Collect and print the result
    result = squared_rdd.collect()
    print(f"Squared numbers: {result}")
    # Stop the SparkSession
    spark.stop()
if __name__ == "__main__":
    main()

You can run this locally using spark-submit:

# Assuming SPARK_HOME is set in your environment
$SPARK_HOME/bin/spark-submit my_app.py

Output:

Squared numbers: [1, 4, 9, 16, 25]

Common and Important Options

Here are the most frequently used options for spark-submit.

A. Resource Allocation

These options tell Spark how much resources to request.

Option Description Example
--master <master-url> The URL of the Spark cluster. --master local[*] (local mode), --master yarn (YARN cluster)
--deploy-mode <client|cluster> Whether to deploy the driver on the client machine (client) or on one of the worker nodes (cluster). --deploy-mode client
--driver-memory Memory for the Spark driver (e.g., 1g, 2g). --driver-memory 2g
--executor-memory Memory for each executor (e.g., 4g, 8g). --executor-memory 4g
--executor-cores Number of CPU cores to use for each executor. --executor-cores 2
--num-executors Total number of executors to run. --num-executors 10

Example: Running on a YARN cluster with specific resources.

$SPARK_HOME/bin/spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 1g \
  --executor-memory 4g \
  --executor-cores 2 \
  --num-executors 5 \
  my_app.py

B. Application Packaging

These options help manage dependencies.

Option Description Example
--py-files Comma-separated list of .zip, .egg, or .py files to place on the Python path. --py-files my_utils.zip,other_lib.py
--archives Comma-separated list of archives to be extracted into the working directory of each executor. --archives hdfs://path/to/my_data.zip#data_dir
--files Comma-separated list of files to be placed in the working directory of each executor. --files config.properties

Example: Running an app that needs a utility module.

# my_utils.py contains helper functions
$SPARK_HOME/bin/spark-submit \
  --py-files my_utils.zip \
  my_app.py

C. Python Environment

These options control the Python interpreter.

Option Description Example
--python The Python interpreter to use. --python /usr/bin/python3
--conf Arbitrary Spark configuration property. --conf spark.pyspark.python=/path/to/conda/bin/python

Local vs. Cluster Mode (--deploy-mode)

This is a critical concept for production jobs.

--deploy-mode client (Default for local and yarn)

  • How it works: The driver process runs on the machine where you execute spark-submit. The executors run on the cluster nodes.
  • Pros: Easy to debug because you can see logs directly in your terminal. You can use local libraries and files.
  • Cons: The client machine must be running and have network access to all worker nodes for the duration of the job. Not suitable for large-scale or long-running jobs.
  • Use Case: Interactive development, debugging, and small jobs.

--deploy-mode cluster

  • How it works: The driver process is launched on one of the worker nodes in the cluster. The client machine that submitted the job can be shut down immediately after submission.
  • Pros: The client is not a bottleneck. The driver has full access to the cluster's resources and network. This is the standard for production jobs.
  • Cons: Debugging is harder. You must ensure all dependencies (including Python packages) are available on the cluster nodes (e.g., via --py-files or a cluster-shared Python environment like Conda).
  • Use Case: Production, large-scale, and long-running batch jobs.

Passing Arguments to Your Python Script

Any arguments after your script name are passed directly to it.

my_app_with_args.py

from pyspark.sql import SparkSession
import sys
def main(input_path, output_path):
    print(f"Input path: {input_path}")
    print(f"Output path: {output_path}")
    spark = SparkSession.builder.appName("AppWithArgs").getOrCreate()
    # ... your Spark logic here to read from input_path and write to output_path
    spark.stop()
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: my_app_with_args.py <input_path> <output_path>")
        sys.exit(-1)
    main(sys.argv[1], sys.argv[2])

Running it:

$SPARK_HOME/bin/spark-submit my_app_with_args.py hdfs://data/input/ hdfs://data/output/

Best Practices

  1. Use if __name__ == "__main__":: This ensures your Spark code only runs when the script is executed directly, not when it's imported as a module.
  2. Package Dependencies: Don't rely on Python packages installed only on your local machine. Use --py-files for your own code and --archives or --conf spark.pyspark.python for system packages. For containerized environments (like Docker/Kubernetes), this is less of an issue.
  3. Log Verbosity: Control Spark's log level using --conf to reduce noise.
    --conf spark.log.level=INFO
  4. Resource Tuning: Don't just use maximum resources. Tune --executor-memory, --executor-cores, and --num-executors based on your workload and cluster capacity. A common rule of thumb is to have 4-5 cores per executor.
  5. Use a Cluster Manager: For any serious work, use a proper cluster manager like YARN or Kubernetes instead of local[*]. local[*] is great for testing and learning.
分享:
扫描分享到社交APP
上一篇
下一篇