Linking flask and sqlite

We now need to import and use our database module inside our server module. In order to make this easier, we should first wrap all of our functions in a class.

Update your database.py file, creating a class named Database and adding the necessary self instances to your methods. We can also move the database name out to an attribute in the __init__ method:

import sqlite3

class Database:
def __init__(self):
self.database = "chat.db"

def perform_insert(self, sql, params):
conn = sqlite3.connect(self.database)
...

def perform_select(self, sql, params):
conn = sqlite3.connect(self.database)
...

# update the rest of your methods to include self where necessary

Now that we have that done, we can import and instantiate the database in our server.py file:

...
from database import Database
...
database = Database()
...

With our database in place, we can go ahead and turn the get_all_users method into an endpoint available via our flask server.

Get rid of the testing endpoints and add this one in their place:

@app.route("/get_all_users")
def get_all_users():
all_users = database.get_all_users()

return jsonify(all_users)

This will simply call the method from our Database class with the same name and return the results as JSON.

Visit http://1227.0.0.1:5000/get_all_users in your web browser and view the results. You should see the user you created has appeared on screen. You will notice, however, that the data is not labeled in any way. It would be much more useful if we could have this as a dictionary instead.

To make this change, we shall go back to our Database class and modify the perform_select method to return a list of dictionaries instead of a list of tuples:

def perform_select(self, sql, params):
conn = sqlite3.connect(self.database)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
conn.close()

return results

We can pass the Row class from sqlite3 as the row_factory attribute of our connection to receive Row instances instead of tuples as the result of our select statement.

These Row instances are similar to dictionaries, but are not compatible with JSON. To ensure that our results can be returned as JSON, we need to explicitly convert them to dictionaries. We do this using the dict method inside a list comprehension, so that the results we return will definitely be a list of dictionaries.

Head back to your web browser and refresh the page. You should now see dictionary results come back from your endpoint:

[
{
"real_name": "David Love",
"username": "davidlove"
}
]

We now can simply visit a server endpoint to perform a select query! How about an insert?

To perform an insert, we would need to send data to our web server from our Python application, then have flask store it in sqlite. To try this out, let's write an endpoint that will create new users:

@app.route("/add_user")
def add_user():
data = request.form
username = data["username"]
real_name = data["real_name"]
database.add_user(username, real_name)

return jsonify(
"User Created"
)

As before, we use request.form to access POST data, then extract values from it like a normal dictionary. We grab the username and real_name values and pass them over to our Database's add_user method.

We need to return something for the requester to see; so, we just return the string User Created.

One last endpoint to make is to check whether a user already exists. After that, we can go ahead and begin linking our chat application to our web server:

@app.route("/user_exists", methods=["POST"])
def user_exists():
username = request.form.get("username")
exists = database.user_exists(username)

return jsonify({
"exists": exists
})

This endpoint will return JSON with one key—exists. The requester can then extract the value of this key to tell whether or not the username is already in the database.

Of course, we need a method of the same name in our Database class now:

def user_exists(self, username):
sql = "SELECT username FROM users WHERE username = ?"
params = (username,)

results = self.perform_select(sql, params)

if len(results):
return True

return False

The SQL used by this method uses a new element of a select query—the where clause. As you may be able to tell, this is a way of filtering the results to include only those which match certain criteria. In this case, we want to return records which have a username which matches a provided variable. To achieve this, we add WHERE username = ? to the end of our select statement. Recall that the question mark will be replaced with our username variable when executing the query.

Since we do not actually want the returned record, merely to check for its existence, we can use len(results) to check how many records were returned. If this function returns any number above 0, the username must exist; so, we return True. Otherwise, the username is not in our database and we can return False.

With the setup finished, we can finally go back to our chat application and begin utilizing our web service.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset