Skip to content
1 change: 1 addition & 0 deletions cvrplib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .download import download_instance, download_solution, list_names
from .read import read_instance, read_solution
from .write import write_instance, write_solution
35 changes: 15 additions & 20 deletions cvrplib/read/parse_solution.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from typing import Dict, List, Union

from .utils import infer_type

def parse_solution(lines: List[str]) -> Dict[str, Union[List, float]]:
Solution = Dict[str, Union[int, float, str, List]]


def parse_solution(lines: List[str]) -> Solution:
"""
Parses the text of a solution file formatted in VRPLIB style. A solution
consists of routes, which are indexed from 1 to n, and possibly other data.
Expand All @@ -16,28 +20,19 @@ def parse_solution(lines: List[str]) -> Dict[str, Union[List, float]]:
A dictionary that contains solution data.

"""
data: Dict[str, Union[List, float]] = {}

routes = []
solution: Solution = {"routes": []}

for line in lines:
line = line.strip().lower()

if not line.startswith("route"):
if "route" in line:
route = [int(idx) for idx in line.split(":")[1].split(" ") if idx]
solution["routes"].append(route) # type: ignore
elif ":" in line or " " in line: # Split at first colon or whitespace
split_at = ":" if ":" in line else " "
k, v = [word.strip() for word in line.split(split_at, 1)]
solution[k] = infer_type(v)
else: # Ignore lines without keyword-value pairs
continue

route = [int(cust) for cust in line.split(":")[1].split(" ") if cust]
routes.append(route)

data["routes"] = routes

# Find the cost
for line in lines:
line = line.strip().lower()

if "cost" in line:
cost = line.lstrip("cost ")
data["cost"] = int(cost) if cost.isdigit() else float(cost)
break

return data
return solution
28 changes: 14 additions & 14 deletions cvrplib/read/parse_vrplib.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import numpy as np

from .utils import euclidean
from .utils import euclidean, infer_type


def parse_vrplib(lines: List[str]):
Expand All @@ -20,7 +20,9 @@ def parse_vrplib(lines: List[str]):
"""
data = parse_specifications(lines)
data.update(parse_sections(lines))
data.update(parse_distances(data))

distances = parse_distances(data)
data.update(distances if distances else {})

return data

Expand All @@ -35,7 +37,7 @@ def parse_specifications(lines: List[str]) -> Dict[str, Any]:
for line in lines:
if ": " in line:
k, v = [x.strip() for x in re.split("\\s*: ", line, maxsplit=1)]
data[k.lower()] = int(v) if v.isnumeric() else v
data[k.lower()] = infer_type(v)

return data

Expand All @@ -49,14 +51,14 @@ def parse_sections(lines: List[str]) -> Dict[str, Any]:
sections = defaultdict(list)

for line in lines:
if "_SECTION" in line:
name = line.split("_SECTION")[0].strip()
if "EOF" in line:
break

elif "EOF" in line:
continue
elif "_SECTION" in line:
name = line.split("_SECTION")[0].strip()

elif name is not None:
row = [_int_or_float(num) for num in line.split()]
row = [infer_type(num) for num in line.split()]

# Most sections start with an index that we do not want to keep
if name not in ["EDGE_WEIGHT", "DEPOT"]:
Expand All @@ -70,7 +72,10 @@ def parse_sections(lines: List[str]) -> Dict[str, Any]:
section_name = section_name.lower()

if section_name == "depot":
data[section_name] = section_data[0][0] - 1
depot_data = np.array(section_data)
# TODO Keep this or remove?
depot_data[:-1] -= 1 # Normalize depot indices to start at zero
data[section_name] = depot_data
elif section_name == "edge_weight":
data[section_name] = section_data
else:
Expand Down Expand Up @@ -167,8 +172,3 @@ def from_flattened(edge_weights: List[List[int]], n: int) -> List[List[int]]:
distances[j][i] = d_ij

return distances


def _int_or_float(num: str):
"""Return an integer if num is an integer string and float otherwise."""
return int(num) if num.isnumeric() else float(num)
10 changes: 10 additions & 0 deletions cvrplib/read/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,13 @@ def strip_lines(lines):
Strip all lines and return the non-empty ones.
"""
return [line1 for line1 in (line.strip() for line in lines) if line1]


def infer_type(s):
try:
return int(s)
except ValueError:
try:
return float(s)
except ValueError:
return s
2 changes: 2 additions & 0 deletions cvrplib/write/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .write_instance import write_instance
from .write_solution import write_solution
80 changes: 80 additions & 0 deletions cvrplib/write/write_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Any, Dict, Iterable

import numpy as np


def write_instance(path: str, instance: Dict[str, Any]):
"""
Writes a VRP instance to file following the VRPLIB format [1].

path
The path of the file.
instance
The instance dictionary, containing problem specifications and data.

References
----------
.. [1] Helsgaun, K. (2017). An Extension of the Lin-Kernighan-Helsgaun TSP
Solver for Constrained Traveling Salesman and Vehicle Routing
Problems.
http://webhotel4.ruc.dk/~keld/research/LKH-3/LKH-3_REPORT.pdf

"""
with open(path, "w") as fi:
for k, v in instance.items():
if isinstance(v, (np.ndarray, list)):
write_section(fi, k.upper(), v)
else:
fi.write(f"{k.upper()} : {v}")
fi.write("\n")

fi.write("EOF\n")


def write_section(fi, name: str, data: Iterable):
"""
Writes a data section to file.

A data section starts with the section name in all uppercase. It is then
followed by row entries consisting of one or multiple values.
"""
if name == "EDGE_WEIGHT":
write_edge_weight_section(fi, data)
elif name == "DEPOT":
write_depot_section(fi, data)
else:
fi.write(f"{name}_SECTION\n")

# TODO Refactor this
if len(np.shape(data)) == 1:
for idx, elt in enumerate(data, 1):
row = f"{idx}\t{elt}"
fi.write(row + "\n")
else:
for idx, elts in enumerate(data, 1):
row = f"{idx}\t" + "\t".join(str(elt) for elt in elts)
fi.write(row + "\n")


def write_edge_weight_section(fi, duration_matrix):
"""
Writes the edge weight section. Rows do not start with index.
"""
fi.write("EDGE_WEIGHT_SECTION\n")

for row in duration_matrix:
fi.write("\t".join(map(str, row)))
fi.write("\n")


def write_depot_section(fi, depots):
"""
Writes the depot section. Rows correspond to the index of the depot(s),
where the final value is -1 to indicate termination.
"""
fi.write("DEPOT_SECTION\n")

for idx in depots[:-1].flatten():
fi.write(f"{idx + 1}\n")

fi.write("-1\n")
26 changes: 26 additions & 0 deletions cvrplib/write/write_solution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Dict, List, Union

Solution = Dict[str, Union[int, float, str, List[List[int]]]]


def write_solution(path: str, solution: Solution):
"""
Writes a VRP solution to file following the VRPLIB convention.

path
The file path.
solution
The dictionary containing solution data.

"""
with open(path, "w") as fi:
for k, v in solution.items():
if k == "routes":
for idx, route in enumerate(v, 1): # type: ignore
fi.write(
f"Route {idx} : {' '.join([str(s) for s in route])}"
)
fi.write("\n")
else:
fi.write(f"{k.capitalize()} : {v}")
fi.write("\n")
3 changes: 0 additions & 3 deletions tests/read/test_read_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from .._utils import CVRPLIB_DATA_DIR, LKH_3_DATA_DIR, selected_cases

# TODO Rename "cvrp" to VRPLIB
# TODO Add more tests to this - maybe make a csv?

instances = [
Expand Down Expand Up @@ -83,7 +82,6 @@ def test_C101():
assert instance["n_vehicles"] == 25
assert instance["capacity"] == 200
assert instance["node_coord"][N] == [55, 85]
assert instance["distances"][0][1] == 19
assert instance["demands"][N] == 20
assert instance["service_times"][N] == 90
assert instance["earliest"][N] == 647
Expand All @@ -95,7 +93,6 @@ def test_C101():
)
def test_lkh_3_vrplib(path):
"""
TODO Maybe add more instances
TODO Test for instance values
"""
read_instance(path)
3 changes: 2 additions & 1 deletion tests/read/test_read_solution.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path

import pytest
from numpy.testing import assert_equal

from cvrplib import read_solution

Expand All @@ -13,7 +14,7 @@ def test_read_solution(case):
Read the case solution and verify its cost.
"""
solution = read_solution(case.solution_path)
assert solution["cost"] == pytest.approx(case.cost, 2)
assert_equal(solution["cost"], case.cost)


@pytest.mark.parametrize(
Expand Down
Empty file added tests/write/__init__.py
Empty file.
84 changes: 84 additions & 0 deletions tests/write/test_write_instance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from pathlib import Path

import pytest
from numpy.testing import assert_equal

from cvrplib import read_instance, write_instance

from .._utils import LKH_3_DATA_DIR, selected_cases


def test_dummy(tmp_path):
"""
Tests if writing a small dummy instance yields the correct result.
"""
name = "C101"
instance = dict(
name=name,
type="VRPTW",
dimension=101,
capacity=200,
node_coord=[[40, 50], [45, 68], [45, 70], [42, 66]],
demand=[0, 10, 30, 10],
)

write_instance(tmp_path / name, instance)

target = "\n".join(
[
"NAME : C101",
"TYPE : VRPTW",
"DIMENSION : 101",
"CAPACITY : 200",
"NODE_COORD_SECTION",
"1\t40\t50",
"2\t45\t68",
"3\t45\t70",
"4\t42\t66",
"DEMAND_SECTION",
"1\t0",
"2\t10",
"3\t30",
"4\t10",
"EOF",
"",
]
)

with open(tmp_path / name, "r") as fi:
assert_equal(fi.read(), target)


@pytest.mark.parametrize("case", selected_cases())
def test_cvrplib(tmp_path, case):
"""
Tests if writing a CVRPLIB instance and reading it yields the same result.
"""
desired = read_instance(case.instance_path)

write_instance(tmp_path / case.instance_name, desired)
actual = read_instance(tmp_path / case.instance_name)

assert_equal(actual, desired)


@pytest.mark.parametrize(
"instance_path", Path(LKH_3_DATA_DIR).glob("*/INSTANCES/*vrp*")
)
def test_lkh_3(tmp_path, instance_path):
"""
Tests if writing an LKH-3 instance and reading it yields the same result.
"""
# These instances are incorrectly formatted, because the depot section
# does not terminate with -1.
invalid = ["S-E016-03m", "D022-04g", "R-E016-03m"]

if any(name in str(instance_path) for name in invalid):
return

desired = read_instance(instance_path)

write_instance(tmp_path / "test.vrp", desired)
actual = read_instance(tmp_path / "test.vrp")

assert_equal(actual, desired)
Loading