Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions orbit/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ FROM alpine:3.19 AS build
RUN apk update && apk upgrade && apk add \
python3-dev \
py3-pip \
sqlite \
build-base \
libffi-dev \
envsubst \
Expand All @@ -19,7 +18,8 @@ COPY . /orbit
WORKDIR /orbit

RUN mkdir -p /var/orbit/ && \
sqlite3 /var/orbit/orbit.db ".read init-db.sql" && \
source /radius-venv/bin/activate && \
./db.py \
:

ARG orbit_version_info
Expand Down
2 changes: 0 additions & 2 deletions orbit/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,3 @@

# duration of authentication token validity period
minutes_each_session_token_is_valid = 180

sql_verbose = False
155 changes: 21 additions & 134 deletions orbit/db.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,146 +1,33 @@
import sqlite3
#!/usr/bin/env python3
import peewee
import config
# nickname table name
# USR => users
# SES => sessions
# REG => newusers
import sys

DB = peewee.SqliteDatabase(config.database)

def _do(cmd, reps=(), set_=False, get_=False):
if config.sql_verbose:
print("SQL", cmd, file=sys.stderr)
reps = (lambda x: x if type(x) is tuple else (x,))(reps)
dat = None
con = sqlite3.connect(config.database)
new = con.cursor()
ret = new.execute(cmd, reps)
if get_:
dat = ret.fetchall()
if len(dat) < 1:
dat = [None]
if config.sql_verbose:
print("SQLRET", dat, file=sys.stderr)
# works when get lookup fails
if set_:
ret.execute("COMMIT;")
con.close()
return dat

class BaseModel(peewee.Model):
class Meta:
database = DB
strict_tables = True

def _set(cmd, reps=()): return _do(cmd, reps, set_=True, get_=True)
def _get(cmd, reps=()): return _do(cmd, reps, get_=True)

class User(BaseModel):
username = peewee.TextField(unique=True)
pwdhash = peewee.TextField()
student_id = peewee.TextField(unique=True, null=True)

# session table interface

SES_GETBY_TOKEN = """
SELECT token, username, expiry
FROM sessions
WHERE token = ?;
""".strip()
def ses_getby_token(tok): return _get(SES_GETBY_TOKEN, tok)
class Session(BaseModel):
token = peewee.TextField(primary_key=True)
username = peewee.TextField(unique=True)
expiry = peewee.FloatField()


SES_GETBY_USERNAME = """
SELECT token, username, expiry
FROM sessions
WHERE username = ?;
""".strip()
def ses_getby_username(usn): return _get(SES_GETBY_USERNAME, usn)
class Registration(BaseModel):
student_id = peewee.TextField(unique=True)
username = peewee.TextField(unique=True)
password = peewee.TextField()


SES_INS = """
INSERT INTO sessions (token, username, expiry)
VALUES (?, ?, ?)
RETURNING username;
""".strip()
def ses_ins(tpl): return _set(SES_INS, tpl)


SES_DELBY_TOKEN = """
DELETE FROM sessions
WHERE token = ?
RETURNING username;
""".strip()
def ses_delby_token(tok): return _set(SES_DELBY_TOKEN, tok)


SES_DELBY_USERNAME = """
DELETE FROM sessions
WHERE username = ?
RETURNING username;
""".strip()
def ses_delby_username(usn): return _set(SES_DELBY_USERNAME, usn)


SES_GET = """
SELECT token, username, expiry
FROM sessions;
""".strip()
def ses_get(): return _get(SES_GET)


# users table interface

USR_PWDHASHFOR_USERNAME = """
SELECT pwdhash
FROM users
WHERE username = ?;
""".strip()
def usr_pwdhashfor_username(usn): return _get(USR_PWDHASHFOR_USERNAME, usn)


USR_INS = """
INSERT INTO users (username, pwdhash, student_id)
VALUES (?, ?, ?);
""".strip()
def usr_ins(usr): return _set(USR_INS, usr)


USR_DELBY_USERNAME = """
DELETE FROM users
WHERE username = ?
RETURNING username;
""".strip()
def usr_delby_username(usn): return _set(USR_DELBY_USERNAME, usn)


USR_SETPWDHASH_USERNAME = """
UPDATE users
SET pwdhash = ?
WHERE username = ?;
""".strip()
def usr_setpwdhash_username(usr): return _set(USR_SETPWDHASH_USERNAME, usr)


USR_GET = """
SELECT username, pwdhash, student_id
FROM users;
""".strip()
def usr_get(): return _get(USR_GET)


USR_GETBY_USERNAME = """
SELECT username, pwdhash, student_id
FROM users
WHERE username = ?;
""".strip()
def usr_getby_username(usn): return _get(USR_GETBY_USERNAME, usn)


# registration table inferface

REG_INS = """
INSERT INTO newusers (username, password, student_id)
VALUES (?,?,?);
""".strip()
def reg_ins(tpl): return _set(REG_INS, tpl)


REG_GETDEL_BY_STUID = """
DELETE FROM newusers
WHERE student_id = ?
RETURNING username, password;
"""
def reg_get_and_del_by_stuid(sid): return _set(REG_GETDEL_BY_STUID, sid)
if __name__ == '__main__':
DB.create_tables(BaseModel.__subclasses__())
107 changes: 50 additions & 57 deletions orbit/hyperspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,34 @@ def nou(u):
errx(f'no such user "{u}". Bye.')


USR_FMT = """
Username : {}
Hashed Password : {}
Student ID : {}
""".strip()


def do_query_username(args):
need(args, u=True)
u = db.usr_getby_username(args.username)[0]
if u is None:
if not (user := db.User.get_or_none(db.User.username == args.username)):
nou(args.username)
print(USR_FMT.format(*u))
print(f'Username : {user.username}\n'
f'Hashed Password : {user.pwdhash}\n'
f'Student ID : {user.student_id}')


def do_validate_token(args):
need(args, t=True)

ses = db.ses_getby_token(args.token)[0]
ses = db.Session.get_or_none(db.Session.token == args.token)
if ses:
print(ses[1])
print(ses.username)
else:
print('null')


def do_drop_session(args):
need(args, u=True)
dropped = db.ses_delby_username(args.username)[0]
if dropped:
print(dropped[0])
query = (db.Session
Comment thread
charliemirabile marked this conversation as resolved.
.delete()
.where(db.Session.username == args.username)
.returning(db.Session))

if ses := next(iter(query.execute()), None):
print(ses.username)
else:
print('null')

Expand All @@ -71,35 +69,34 @@ def do_create_session(args):

def do_validate_creds(args):
need(args, u=True, p=True)
u, p = args.username, args.password
pwdhash = db.usr_pwdhashfor_username(u)[0]
if pwdhash is None:
nou(u)
pwdhash = pwdhash[0]

if bcrypt.checkpw(bytes(p, "UTF-8"), bytes(pwdhash, "UTF-8")):
print('credentials(username: {}, password:{})'.format(u, p))
else:
if not (user := db.User.get_or_none(db.User.username == args.username)):
nou(args.username)
if not bcrypt.checkpw(args.password.encode('utf-8'),
Comment thread
charliemirabile marked this conversation as resolved.
user.pwdhash.encode('utf-8')):
print('null')
return
print(f'credentials(username: {args.username}, password:{args.password})')


def do_change_password(args):
need(args, u=True, p=True)
u, _ = args.username, args.password
if db.usr_getby_username(u)[0]:
db.usr_setpwdhash_username((do_bcrypt_hash(args, get=True), u))
do_validate_creds(args)
else:
nou(u)
new_hash = do_bcrypt_hash(args, get=True)
query = (db.User
.update({db.User.pwdhash: new_hash})
.where(db.User.username == args.username))
if query.execute() < 1:
nou(args.username)
print(f'credentials(username: {args.username}, password:{args.password})')


def do_delete_user(args):
need(args, u=True)
deleted = db.usr_delby_username(args.username)[0]
if deleted:
print(deleted[0])
else:
print('null')
query = (db.User
.delete()
.where(db.User.username == args.username))
if query.execute() < 1:
nou(args.username)
print(args.username)


def do_bcrypt_hash(args, get=False):
Expand All @@ -114,34 +111,30 @@ def do_bcrypt_hash(args, get=False):

def do_newuser(args):
need(args, u=True, p=True)
if db.usr_getby_username(args.username)[0]:
errx(f'cannot create duplicate user "{args.username}"')
else:
db.usr_ins((args.username, do_bcrypt_hash(args, get=True),
args.studentid or 0))
if args.studentid:
db.reg_ins((args.username, args.password, args.studentid))
do_validate_creds(args)
new_hash = do_bcrypt_hash(args, get=True)
try:
db.User.create(username=args.username, pwdhash=new_hash,
student_id=args.studentid)
if args.studentid:
db.Registration.create(username=args.username,
password=args.password,
student_id=args.studentid)
do_validate_creds(args)
except db.peewee.IntegrityError as e:
errx(f'cannot create user with duplicate field: "{e}"')


def do_roster(args):
r = db.usr_get()
print(r)


SES_FMT = """
{} until {}: {}
""".strip()
print('Users:')
for u in db.User.select():
print(f'{u.username}, {u.pwdhash}, {u.student_id}')


def do_list_sessions(args):
raw_list = db.ses_get()
if raw_list[0] is None:
print("(no sessions)")
else:
print('\n'.join([SES_FMT.format(session[1],
datetime.fromtimestamp(session[2]),
session[0]) for session in raw_list]))
print('Sessions:')
for s in db.Session.select():
expiry = datetime.fromtimestamp(s.expiry)
print(f'{s.username} until {expiry}: {s.token}')


def hyperspace_main(raw_args):
Expand Down
15 changes: 0 additions & 15 deletions orbit/init-db.sql

This file was deleted.

Loading