Of course! Using Python with the paramiko library to "tail" a remote file is a common and powerful task. It's perfect for real-time log monitoring.

Here’s a complete guide, starting with the basic concept and moving to a robust, production-ready solution.
Core Concept: How to "Tail" with Paramiko
You can't use a single command like tail -f directly with Paramiko. Instead, you simulate it by:
- Connecting to the remote server via SSH.
- Executing a command that reads the file's contents up to a certain point.
- Storing the current file size.
- Looping:
- Wait for a short interval.
- Check the file's new size.
- If the size has increased, execute the command again to read only the new bytes.
- Print the new bytes to your console.
Method 1: The Simple, Blocking Example
This example is great for understanding the basic logic. It will run indefinitely and block your script.
import paramiko
import time
# --- Configuration ---
HOSTNAME = "your_server_ip"
USERNAME = "your_username"
KEY_FILENAME = "/path/to/your/private_key" # Or use password
PORT = 22
REMOTE_FILE_PATH = "/var/log/syslog" # The file you want to tail
# --- SSH Connection ---
try:
# Create an SSH client
client = paramiko.SSHClient()
# Automatically add the server's host key (for testing/dev only)
# In production, you should load known hosts.
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Connect to the server
print(f"Connecting to {USERNAME}@{HOSTNAME}...")
client.connect(hostname=HOSTNAME, username=USERNAME, port=PORT, key_filename=KEY_FILENAME)
print("Connected successfully.")
# --- Initial File Size Check ---
# First, get the initial size of the file
sftp = client.open_sftp()
initial_stat = sftp.stat(REMOTE_FILE_PATH)
initial_size = initial_stat.st_size
print(f"Initial file size: {initial_size} bytes")
# --- The Tail Loop ---
while True:
try:
# Get the current file size
current_stat = sftp.stat(REMOTE_FILE_PATH)
current_size = current_stat.st_size
# If the file has grown
if current_size > initial_size:
# Read the new content from the last known position
with sftp.file(REMOTE_FILE_PATH, 'r') as remote_file:
remote_file.set_pipelining(True) # Can improve performance for large reads
remote_file.seek(initial_size) # Go to the end of where we last read
new_data = remote_file.read() # Read the new bytes
# Print the new data
if new_data:
print(new_data.decode('utf-8'), end='')
# Update the size for the next loop iteration
initial_size = current_size
# Wait for a short period before checking again
time.sleep(1)
except FileNotFoundError:
print(f"Error: File {REMOTE_FILE_PATH} not found on the remote server.")
break
except Exception as e:
print(f"An error occurred during tailing: {e}")
break
except paramiko.AuthenticationException:
print("Authentication failed, please verify your credentials.")
except paramiko.SSHException as e:
print(f"Unable to establish SSH connection: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
# Ensure the connection is closed
if 'client' in locals() and client:
client.close()
print("SSH connection closed.")
How to Run It:

- Install Paramiko:
pip install paramiko - Fill in your details: Replace
your_server_ip,your_username, and/path/to/your/private_key. - Run the script:
python your_script_name.py
You will see new lines from the log file appear in your console as they are written on the remote server.
Method 2: A Robust, Non-Blocking Class (Recommended)
For real applications, you want a non-blocking solution that can be integrated into a larger application (like a web service or a GUI). The best way to do this in Python is with asyncio.
This class uses asyncio to run the tailing process in the background without blocking your main program.
import asyncio
import paramiko
import logging
# Optional: Set up basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class AsyncRemoteFileTailer:
def __init__(self, hostname, username, key_filename, remote_path, port=22, poll_interval=1):
self.hostname = hostname
self.username = username
self.key_filename = key_filename
self.remote_path = remote_path
self.port = port
self.poll_interval = poll_interval
self.ssh_client = None
self.sftp_client = None
self.file_size = 0
self._stop_event = asyncio.Event()
async def connect(self):
"""Establishes the SSH and SFTP connection."""
loop = asyncio.get_running_loop()
# Run blocking SSH connection in a thread pool executor
self.ssh_client = await loop.run_in_executor(
None, self._create_ssh_client
)
self.sftp_client = self.ssh_client.open_sftp()
# Get initial file size
stat = await loop.run_in_executor(None, self.sftp_client.stat, self.remote_path)
self.file_size = stat.st_size
logging.info(f"Connected and tailing {self.remote_path} from size {self.file_size}")
def _create_ssh_client(self):
"""Helper function for blocking SSH connection."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(
hostname=self.hostname,
username=self.username,
port=self.port,
key_filename=self.key_filename,
timeout=10
)
return client
async def tail(self, callback):
"""
Tails the remote file and calls the callback with new data.
:param callback: A function to call with new data (bytes).
"""
if not self.sftp_client:
raise ConnectionError("Not connected. Call connect() first.")
while not self._stop_event.is_set():
try:
# Get current file size in a thread pool executor
loop = asyncio.get_running_loop()
stat = await loop.run_in_executor(None, self.sftp_client.stat, self.remote_path)
current_size = stat.st_size
if current_size > self.file_size:
# Read new data in a thread pool executor
new_data = await loop.run_in_executor(
None, self._read_new_data, self.file_size
)
if new_data:
# Call the callback with the new data
await callback(new_data)
self.file_size = current_size
# Wait for the next poll interval or until stop is requested
await asyncio.sleep(self.poll_interval)
except FileNotFoundError:
logging.error(f"File {self.remote_path} not found.")
await self.stop()
break
except Exception as e:
logging.error(f"Error during tailing: {e}")
await self.stop()
break
def _read_new_data(self, offset):
"""Helper function to read data from a specific offset."""
with self.sftp_client.file(self.remote_path, 'r') as f:
f.seek(offset)
return f.read()
async def stop(self):
"""Stops the tailing process and closes the connection."""
self._stop_event.set()
if self.sftp_client:
self.sftp_client.close()
if self.ssh_client:
self.ssh_client.close()
logging.info("Connection closed.")
# --- Example Usage ---
async def print_new_data(data: bytes):
"""A simple callback to print new data to the console."""
# Ensure we print line by line and don't add extra newlines
print(data.decode('utf-8'), end='')
async def main():
# --- Configuration ---
HOSTNAME = "your_server_ip"
USERNAME = "your_username"
KEY_FILENAME = "/path/to/your/private_key"
REMOTE_FILE_PATH = "/var/log/syslog"
tailer = AsyncRemoteFileTailer(
hostname=HOSTNAME,
username=USERNAME,
key_filename=KEY_FILENAME,
remote_path=REMOTE_FILE_PATH,
poll_interval=0.5 # Check every 0.5 seconds
)
try:
await tailer.connect()
await tailer.tail(callback=print_new_data)
except KeyboardInterrupt:
logging.info("Shutdown requested by user.")
finally:
await tailer.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Key Differences and Why the Async Version is Better:
| Feature | Simple Blocking Example | Robust Async Class |
|---|---|---|
| Blocking | Yes. The while True loop blocks the entire script. |
No. Uses ` |

