Atomic MySQL Read-write Operations With Go 1.19

My approach to keeping things in sync.

Image by Mackenzie Marco via unsplash

Have you ever bought an item on Amazon, and later receive an email informing you that your order was canceled due to no items in stock?

I did a couple of times. As a consumer buying a product, this may not be a big deal. Read an email and await for stock to replenish, however, developers need to write all the code to support that functionality.

What if I told you that you can forgo implementing such processes by having synchronized read and write? Go web servers by default are multi-threaded. Each request spawns a Go routine. With this in mind, we should anticipate the errors that occur in a multi-threaded environment. Such as the example presented in the beginning of this paragraph.

In this post, I’ll define an HTTP endpoint that performs a sale on a specified item. The sale process will reduce the quantity of the item, and if there are no items left, it should return an error. I’ll be using github.com/go-sql-driver/mysql as my DB driver.

Setting up MYSQL

Prior to writing any Go code, I’ll set up the test environment. Please note that all the code shown in this section is MYSQL commands. I’ll start by defining an MYSQL database called Stock:

CREATE DATABASE Stock;

Next, I’ll add a table with two columns. One for the product ID and the other for the product’s stock amount :

CREATE TABLE `Inventory` (
`ProductID` INT,
`Count` INT COMMENT 'Will store product count'
) ENGINE=InnoDB;

This table will be responsible for tracking inventory stock count. I’ll then add one item to the table, with a stock count of 1 :

INSERT INTO Inventory(ProductID, Count) VALUES (10, 1);

One last thing, please make sure you have the MySQL credentials (username, password) capable of accessing the table defined above.

The Go Code

To begin, I’ll define my app type, as well as include the fields I’ll be using to manage the state. Here is type app :

type App struct {
Db *sql.DB
mu *sync.Mutex
Timeout atomic.Bool
}

Here is the desired use for each field :

  • Db : Store the MySQL database connection
  • mu : The star of the show, will enable synchronized operations
  • Timeout : an atomic variable that I’ll use to simulate low network throughput later on.

Once I defined the app, it was time to initialize it and set the appropriate fields. I copied the sample code from the MySQL driver page to setup my database connection.

After that, I assigned the db variable to my struct field Db . I assign mu as a pointer to a mutex, variable declared in the main function. This is ideal for memory management since the main function returns when the program stops. Here is what the main function will look like, initially :

func main() {
 db, err := sql.Open("mysql", "root:[email protected]/Stock")
if err != nil {
log.Fatal(err)
}
var mu sync.Mutex
 db.SetConnMaxLifetime(time.Minute * 3)
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)
  _ = App{Db: db, mu: &mu}
}

Now that I have a basic setup, it is time to start writing my HTTP handler. The HTTP handler will be a method of type App , as this will allow me to manage and access the state from the handler. The handler will call another method to perform the sale operation. This method will be called PerformPurchase. Here is the definition of the handler :

func (a *App) BuyItem(w http.ResponseWriter, r *http.Request) {
  id, amount, err := GetQuery(r)
  if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Nope!"))
return
}
  ctx, _ := context.WithTimeout(r.Context(), 30*time.Second)
  err = a.PerformPurchase(ctx, id, amount)
  if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("Nope!"))
    return
}
  w.Write([]byte("Ok"))
}

The first function invoked by the handler is GetQuery . This is the function that will extract user input from the request. Here is the code for GetQuery :

func GetQuery(r *http.Request) (id int, amount int, err error) {
 amount, err = strconv.Atoi(r.FormValue("amount"))
 if err != nil {
return
}
 id, err = strconv.Atoi(r.FormValue("id"))
 if err != nil {
return
}
  return
}

The function will return an error if invalid integers are passed. This is an ideal way to prevent SQL query injections. I chose to make use of named variables to keep the code clean, imagine writing 3 variables each time I wanted to make a return . Finally, here is the definition of PerformPurchase :

const (
StockError = "Stock is finished"
)
func (a *App) PerformPurchase(ctx context.Context, id, amount int) error {
 if a.Timeout.Load() {
time.Sleep(5 * time.Second)
}
 rows, err := a.Db.QueryContext(
ctx,
"SELECT Count FROM Inventory WHERE ProductID=?",
id,
)
 if err != nil {
return err
}
defer rows.Close()
 var stock int
 if rows.Next() {
if err := rows.Scan(&stock); err != nil {
// Check for a scan error...
return err
}
}
 if stock <= 0 {
return errors.New(StockError)
}
 _, err = a.Db.ExecContext(
ctx,
"UPDATE Inventory SET Count = Count - ? WHERE ProductID = ?",
amount,
id,
)

if err != nil {
return err
}
 return nil
}

As you can see, the Timeout field declared earlier is used to force the function to wait 5 seconds. This is to simulate low network throughput that may occur with the first MySQL query executed. I’ll come back to this field in the next section.

The function retrieves the item’s record and checks if the stock count is above 0, if not it’ll return an error. If stock is above 0, the function will perform a query to update the stock amount, reducing it by the amount specified in the request. Now that my app components are defined, I’ll write a test to perform 2 simultaneous purchase requests.

Here is the final version of the main function, with the http bells and whistles:

func main() {
 db, err := sql.Open("mysql", "root:[email protected]/Stock")
if err != nil {
log.Fatal(err)
}
var mu sync.Mutex
 db.SetConnMaxLifetime(time.Minute * 3)
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)
 app := App{Db: db, mu: &mu}
 http.HandleFunc("/buy", app.BuyItem)
log.Fatal(http.ListenAndServe(":8080", nil))
}

The Test

The test will consist of executing PerformPurchase as a Goroutine first. For this invocation, Timeout will be set to true, this will make the Go routine wait 5 seconds before execution. Again, this is to simulate a bad network. Within those 5 seconds, I’ll invoke PerformPurchase on the main thread and check for an error with message Stock is finished . If no error or another error is returned, the test will fail. Here is my test’s code :

func TestPerformPurchase(t *testing.T) {
 db, err := sql.Open("mysql", "root:[email protected]/Stock")
if err != nil {
log.Fatal(err)
}
 var mu sync.Mutex
 app := App{Db: db, mu: &mu}
 ctx := context.Background()
 // Launch buy
app.Timeout.Store(true)
go app.PerformPurchase(ctx, 10, 1)
 // Timeout would be false for first invocation without this
time.Sleep(2 * time.Second)
 app.Timeout.Store(false)
err = app.PerformPurchase(ctx, 10, 1)
if err == nil || err.Error() != StockError {
t.Fatalf("expected error %v, but got %v", StockError, err)
}
}

Now to perform the test :


Atomic MySQL Read-write Operations With Go 1.19 was originally published in Better Programming on Medium, where people are continuing the conversation by highlighting and responding to this story.

0
(Visited 3 times, 1 visits today)