Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Select interface in TFTP automatons #4684

Merged
merged 3 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions scapy/layers/tftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@

"""
TFTP (Trivial File Transfer Protocol).

This provides TFTP implementation and 4 small automata:
- TFTP_read: read a remote file
- TFTP_RRQ_server: server that answers to read requests
- TFTP_write: write a remote file
- TFTP_WRQ_server: server than accepts write requests
"""

import os
import random

from scapy.packet import Packet, bind_layers, split_bottom_up, bind_bottom_up
from scapy.fields import PacketListField, ShortEnumField, ShortField, \
StrNullField
from scapy.fields import (
PacketListField,
ShortEnumField,
ShortField,
StrNullField,
)
from scapy.automaton import ATMT, Automaton
from scapy.layers.inet import UDP, IP
from scapy.base_classes import Net
from scapy.config import conf
from scapy.volatile import RandShort

from scapy.layers.inet import UDP, IP

TFTP_operations = {1: "RRQ", 2: "WRQ", 3: "DATA", 4: "ACK", 5: "ERROR", 6: "OACK"} # noqa: E501

Expand Down Expand Up @@ -138,9 +149,17 @@ def answers(self, other):
class TFTP_read(Automaton):
"""
TFTP automaton to read a remote file on a TFTP server.

:param filename: the name of the remote file to read.
:param server: the host on which to read (IP or name).
:param sport: (optional) the source port to use. (default: random)
:param port: (optional) the TFTP port (default: 69)
"""

def parse_args(self, filename, server, sport=None, port=69, **kargs):
if "iface" not in kargs:
server = str(Net(server))
kargs["iface"] = conf.route.route(server)[0]
Automaton.parse_args(self, **kargs)
self.filename = filename
self.server = server
Expand Down Expand Up @@ -229,9 +248,18 @@ def END(self):
class TFTP_write(Automaton):
"""
TFTP automaton to write a local file onto a TFTP server.

:param filename: the name of the remote file to write.
:param data: the bytes data to write.
:param server: the host on which to read (IP or name).
:param sport: (optional) the source port to use. (default: random)
:param port: (optional) the TFTP port (default: 69)
"""

def parse_args(self, filename, data, server, sport=None, port=69, **kargs):
if "iface" not in kargs:
server = str(Net(server))
kargs["iface"] = conf.route.route(server)[0]
Automaton.parse_args(self, **kargs)
self.filename = filename
self.server = server
Expand Down Expand Up @@ -313,9 +341,15 @@ def END(self):
class TFTP_WRQ_server(Automaton):
"""
TFTP automaton to wait for incoming files

:param ip: (optional) the local IP to listen on.
:param sport: (optional) the local port (by default: random)
"""

def parse_args(self, ip=None, sport=None, *args, **kargs):
if "iface" not in kargs:
ip = str(Net(ip))
kargs["iface"] = conf.route.route(ip)[0]
Automaton.parse_args(self, *args, **kargs)
self.ip = ip
self.sport = sport
Expand Down Expand Up @@ -393,9 +427,22 @@ def END(self):
class TFTP_RRQ_server(Automaton):
"""
TFTP automaton to serve local files

You can't use 'store' and 'dir' at the same time.

:param store: (optional) a dictionary that contains the file data, like
{"thefile": b"data"}.
:param dir: (optional) a folder that contains the data file data.
:param joker: (optional) data to return when no file/data is found.
:param ip: (optional) the local IP to listen on.
:param sport: (optional) the local port (by default: random)
:param serve_one: (optional) close after serving one client (default: False)
"""

def parse_args(self, store=None, joker=None, dir=None, ip=None, sport=None, serve_one=False, **kargs): # noqa: E501
if "iface" not in kargs:
ip = str(Net(ip))
kargs["iface"] = conf.route.route(ip)[0]
Automaton.parse_args(self, **kargs)
if store is None:
store = {}
Expand Down
178 changes: 174 additions & 4 deletions test/scapy/layers/tftp.uts
Original file line number Diff line number Diff line change
@@ -1,16 +1,186 @@
% TFTP regression tests for Scapy
% Regression tests for TFTP

# More information at http://www.secdev.org/projects/UTscapy/

+ TFTP coverage tests

############
############
+ TFTP tests
= Test answers

assert TFTP_DATA(block=1).answers(TFTP_RRQ())
assert not TFTP_WRQ().answers(TFTP_RRQ())
assert not TFTP_RRQ().answers(TFTP_WRQ())
assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1))
assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1))
assert TFTP_ACK(block=0).answers(TFTP_RRQ())
assert not TFTP_ACK().answers(TFTP_ACK())
assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK())
assert TFTP_OACK().answers(TFTP_WRQ())

= TFTP Options

x=IP()/UDP(sport=12345)/TFTP()/TFTP_RRQ(filename="fname")/TFTP_Options(options=[TFTP_Option(oname="blksize", value="8192"),TFTP_Option(oname="other", value="othervalue")])
assert raw(x) == b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x0109\x00E\x004B6\x00\x01fname\x00octet\x00blksize\x008192\x00other\x00othervalue\x00'
y=IP(raw(x))
y[TFTP_Option].oname
y[TFTP_Option:2].oname
assert len(y[TFTP_Options].options) == 2 and y[TFTP_Option].oname == b"blksize"


+ TFTP Automatons
~ linux

= Utilities
~ linux

from scapy.automaton import select_objects

class MockTFTPSocket(object):
packets = []
def __init__(self, iface):
self.iface = iface
def recv(self, n=None):
pkt = self.packets.pop(0)
return pkt
def send(self, *args, **kargs):
pass
def close(self):
pass
@classmethod
def select(classname, inputs, remain):
test = [s for s in inputs if isinstance(s, classname)]
if test:
if len(test[0].packets):
return test
else:
inputs = [s for s in inputs if not isinstance(s, classname)]
return select_objects(inputs, remain)


= TFTP_read() automaton
~ linux

class MockReadSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"]

tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
ll=MockReadSocket,
recvsock=MockReadSocket, debug=5)

res = tftp_read.run()
assert res == (b"P" * 512 + b"<3")

= TFTP_read() automaton error
~ linux

class MockReadSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]

tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
ll=MockReadSocket,
recvsock=MockReadSocket)

try:
tftp_read.run()
assert False
except Automaton.ErrorState as e:
assert "Reached ERROR" in str(e)
assert "ERROR Access violation" in str(e)


= TFTP_write() automaton
~ linux

data_received = b""

class MockWriteSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ]
def send(self, *args, **kargs):
if len(args) and Raw in args[0]:
global data_received
data_received += args[0][Raw].load

tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
ll=MockWriteSocket,
recvsock=MockWriteSocket)

tftp_write.run()
assert data_received == (b"P" * 767 + b"Scapy <3")

= TFTP_write() automaton error
~ linux

class MockWriteSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]

tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
ll=MockWriteSocket,
recvsock=MockWriteSocket)

try:
tftp_write.run()
assert False
except Automaton.ErrorState as e:
assert "Reached ERROR" in str(e)
assert "ERROR Access violation" in str(e)


= TFTP_WRQ_server() automaton
~ linux

class MockWRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]

tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
ll=MockWRQSocket,
recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3"))

= TFTP_WRQ_server() automaton with options
~ linux

class MockWRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]

tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
ll=MockWRQSocket,
recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3"))

= TFTP_RRQ_server() automaton
~ linux

sent_data = "P" * 512 + "<3"
import tempfile
filename = tempfile.mktemp(suffix=".txt")
fdesc = open(filename, "w")
fdesc.write(sent_data)
fdesc.close()

received_data = ""

class MockRRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ]
def send(self, *args, **kargs):
if len(args):
pkt = args[0]
if TFTP_DATA in pkt:
global received_data
received_data += pkt[Raw].load.decode("utf-8")

tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True,
ll=MockRRQSocket,
recvsock=MockRRQSocket, debug=4)
tftp_rrq.run()
assert received_data == sent_data

import os
os.unlink(filename)
Loading