Categories
Angular Nodejs

How to Build a Job Queue With Node.js

Spread the love

If you want to make an app that handles long-running tasks, you need a job queue running in the background. Otherwise, your user will be kept waiting for requests, and the server hosting your app may hang. That’s not a pleasant user experience for anyone. Node.js has libraries for building a job queue that run in the background without too much hassle.


Preparation

In this piece, we’ll build a YouTube video downloader that lets users enter a URL from YouTube. Our app will download the video to a local folder where it can be downloaded automatically from the UI once it’s done. The download progress will be displayed while it’s being downloaded. The user can’t download another video until the first one is finished. The way it works is that when a user enters a valid YouTube video URL, a database entry for the job will be recorded in the database. Then a background job will be created which will be downloaded in the background. The job’s progress will be reported back via Socket.io so it can be displayed to the user. Once the job is done, it’ll be marked as done in the database entry for the job. If it fails, it’ll be removed from the queue. The URL for the video will be sent back to the user, and then it will be downloaded automatically.

We’ll build a back end app with Express and a front end app with Angular. To do this, we use Express Generator. With the latest versions of Node.js, we can run npx express-generator after we make a folder for our back end app. This will generate the code files. Next, we need to install some packages. We do this by running npm i in our back end project folder’s root.

We’ll need to install some libraries in order to use the latest JavaScript features, build our queue, store our environment variables, and manipulate our database. We install these libraries by running npm i sequelize @babel/register babel-polyfill body-parser bull cors dotenv pg pg-hstore uuid ytdl-core. We’ll use PostgresSQL as our database, meaning we’ll need the pg and pg-hstore packages. We need theuuid package to generate UUIDs. ytld-core is the YouTube download library. babel-polyfill and @babel/register allow us to use the latest JavaScript features. We also need Sequelize CLI to create our models and allow us to run database migrations to change our database’s structure. To do this, we run npm i -g sequelize-cli.

Now, we need to create our database. First, we create an empty database with pgAdmin 3.x by connecting to our server and doubleclicking. Right-click the database item, then click New Database. pgAdmin 3.x is used because it’s much faster than 4.x and has more features.

Finally, we need to initialize our Sequelize code. We run npx sequelize-cli init in our back end app’s project folder to do this.


The Code

Now we can write some code.

Building the back end

In bin/www, we put:

#!/usr/bin/env node

/**
 * Module dependencies.
 */

const app = require('../app');
const debug = require('debug')('backend:server');
const http = require('http');
/**
 * Get port from environment and store in Express.
 */

const port = normalizePort(process.env.PORT || '3000');
app.set('port', port);

/**
 * Create HTTP server.
 */

const server = http.createServer(app);
const io = require('socket.io')(server, { origins: '*:*' });
global.io = io;

/**
 * Listen on provided port, on all network interfaces.
 */

server.listen(port);
server.on('error', onError);
server.on('listening', onListening);
io.on('connection', (socket) => {
  socket.emit('connected', { message: 'connected' });
});

/**
 * Normalize a port into a number, string, or false.
 */

function normalizePort(val) {
  const port = parseInt(val, 10);

if (isNaN(port)) {
    // named pipe
    return val;
  }

if (port >= 0) {
    // port number
    return port;
  }

return false;
}

/**
 * Event listener for HTTP server "error" event.
 */

function onError(error) {
  if (error.syscall !== 'listen') {
    throw error;
  }

const bind = typeof port === 'string'
    ? 'Pipe ' + port
    : 'Port ' + port;

// handle specific listen errors with friendly messages
  switch (error.code) {
    case 'EACCES':
      console.error(bind + ' requires elevated privileges');
      process.exit(1);
      break;
    case 'EADDRINUSE':
      console.error(bind + ' is already in use');
      process.exit(1);
      break;
    default:
      throw error;
  }
}

/**
 * Event listener for HTTP server "listening" event.
 */

function onListening() {
  const addr = server.address();
  const bind = typeof addr === 'string'
    ? 'pipe ' + addr
    : 'port ' + addr.port;
  debug('Listening on ' + bind);
}

This is the entry point of our app. We initialize Socket.io here to allow us to listen for messages from client-side. It will also set the socket object globally so that it can be used in other files.

Next, in the config folder, we rename config.json, which is generated when running npx sequelize-cli init to config.js and add the following:

require('dotenv').config();
const dbHost = process.env.DB_HOST;
const dbName = process.env.DB_NAME;
const dbUsername = process.env.DB_USERNAME;
const dbPassword = process.env.DB_PASSWORD;
const dbPort = process.env.DB_PORT || 5432;

module.exports = {
    development: {
        username: dbUsername,
        password: dbPassword,
        database: dbName,
        host: dbHost,
        port: dbPort,
        dialect: 'postgres'
    },
    test: {
        username: dbUsername,
        password: dbPassword,
        database: 'youtube_app_test',
        host: dbHost,
        port: dbPort,
        dialect: 'postgres'
    },
    production: {
        use_env_variable: 'DATABASE_URL',
        username: dbUsername,
        password: dbPassword,
        database: dbName,
        host: dbHost,
        port: dbPort,
        dialect: 'postgres'
    }
};

This allows us to use environment variables instead of hard coding database credentials to our database. Then we make a files folder in the root and put an empty .gitkeep file in it so it can be committed to Git.

Then, we make a database migration with Sequelize to build our database. We run:

npx sequelize-cli model:generate --name Job --attributes status:enum,url:string,fileLocation:string

to create a migration file and its corresponding model file. In the model file, which should be called job.js in the models folder, we put:

'use strict';
module.exports = (sequelize, DataTypes) => {
  const Job = sequelize.define('Job', {
    status: DataTypes.ENUM('started', 'cancelled', 'done'),
    url: DataTypes.STRING,
    fileLocation: DataTypes.STRING
  }, {});
  Job.associate = function(models) {
    // associations can be defined here
  };
  return Job;
};

and in index.js in the models folder, we put:

'use strict';

const fs = require('fs');
const path = require('path');
const Sequelize = require('sequelize');
const basename = path.basename(__filename);
const env = process.env.NODE_ENV || 'development';
const config = require(__dirname + '/../config/config.js')[env];
const db = {};

let sequelize;
if (config.use_env_variable) {
  sequelize = new Sequelize(process.env[config.use_env_variable], config);
} else {
  sequelize = new Sequelize(config.database, config.username, config.password, config);
}

fs
  .readdirSync(__dirname)
  .filter(file => {
    return (file.indexOf('.') !== 0) && (file !== basename) && (file.slice(-3) === '.js');
  })
  .forEach(file => {
    const model = sequelize['import'](path.join(__dirname, file));
    db[model.name] = model;
  });

Object.keys(db).forEach(modelName => {
  if (db[modelName].associate) {
    db[modelName].associate(db);
  }
});

db.sequelize = sequelize;
db.Sequelize = Sequelize;

module.exports = db;

The most import part is renaming config.json to config.js in const config = require(__dirname + ‘/../config/config.js’)[env];.

Next, we build our queue with the bull package. We create a folder called queue in the project root folder and add video.js. In that file, we put:

const Queue = require('bull');
const fs = require('fs');
const models = require('../models');
const ytdl = require('ytdl-core');
const uuidv1 = require('uuid/v1');
const util = require('util');

const createVideoQueue = () => {
    const videoQueue = new Queue('video transcoding', {
        redis: {
            port: process.env.REDIS_PORT,
            host: process.env.REDIS_URL
        }
    });

videoQueue.process(async (job, done) => {
        const data = job.data;
        try {
            job.progress(0);
            global.io.emit('progress', { progress: 0, jobId: data.id });
            const uuid = uuidv1();
            const fileLocation = `./files/${uuid}.mp4`;
            await new Promise((resolve) => {
                ytdl(data.url)
                    .on('progress', (length, downloaded, totallength) => {
                        const progress = (downloaded / totallength) * 100;
                        global.io.emit('progress', { progress, jobId: data.id });
                        if (progress >= 100) {
                            global.io.emit('videoDone', { fileLocation: `${uuid}.mp4`, jobId: data.id });
                            global.io.emit('progress', { progress: 100, jobId: data.id });
                        }
                    })
                    .pipe(fs.createWriteStream(fileLocation))
                    .on('finish', () => {
                        resolve();
                    })
            })
            await models.Job.update({
                status: 'done',
                fileLocation: `${uuid}.mp4`
            }, {
                    where: {
                        id: data.id
                    }
                })
            done();
        }
        catch (ex) {
            console.log(ex);
            job.moveToFailed();
        }
    });
    return videoQueue;
}

module.exports = { createVideoQueue };

Note that we passed in the socket object to send progress back to the client, and that we converted all the asynchronous code to promises so they can be called sequentially. We use ytdl to download YouTube videos. It has a progress event handler which reports progress of the download, which we send back to the client via Socket.io’s broadcast function. This sends messages to all the clients. We will filter out the irrelevant messages on the client side. Any failed jobs will be removed from the queue.

Next, we create our routes. In the routes folder, we add a new file called jobs.js and put:

const express = require('express');
const models = require('../models');
const path = require('path');
const router = express.Router();
const ytdl = require('ytdl-core');
const { createVideoQueue } = require('../queue/video');

router.post('/new', async (req, res) => {
  const url = req.body.url;
  try {
    const isValidUrl = ytdl.validateURL(url);
    if (!isValidUrl) {
      res.status(400);
      return res.send({ error: 'invalid URL' });
    }
    const job = await models.Job.create({
      url,
      status: 'started'
    })
    await createVideoQueue().add({ url, id: job.id });
    return res.send(job);
  }
  catch (ex) {
    console.log(ex);
    res.status(400);
    return res.send({ error: ex });
  }
});

router.get('/file/:fileName', (req, res) => {
  const fileName = req.params.fileName;
  const file = path.resolve(__dirname, `../files/${fileName}`);
  res.download(file);
})

module.exports = router;

We need a route to add new jobs and to download the generated files. We validate the URL submitted before creating the job to minimize errors. In this line:

await createVideoQueue(global.socket).add({ url, id: job.id });

we pass in the global.socket object we created when the client connects to this app in binwww . Note that we don’t wait for the job to be done before returning a response. This is why we need Socket.io, to communicate the results back to the client.

In app.js, we add the initialization code. We add the following code to the file:

require("@babel/register");
require("babel-polyfill");
require('dotenv').config();
const createError = require('http-errors');
const express = require('express');
const path = require('path');
const cookieParser = require('cookie-parser');
const logger = require('morgan');
const bodyParser = require('body-parser')
const cors = require('cors')
const indexRouter = require('./routes/index');
const usersRouter = require('./routes/users');
const jobsRouter = require('./routes/jobs');
const app = express();

// view engine setup
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'jade');

app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));
app.use(express.static(path.join(__dirname, 'files')));
app.use(bodyParser.urlencoded({ extended: true }));
app.use(bodyParser.json())
app.use(cors())
app.use('/', indexRouter);
app.use('/users', usersRouter);
app.use('/jobs', jobsRouter);

// catch 404 and forward to error handler
app.use((req, res, next) => {
  next(createError(404));
});

// error handler
app.use((err, req, res, next) => {
  // set locals, only providing error in development
  res.locals.message = err.message;
  res.locals.error = req.app.get('env') === 'development' ? err : {};

// render the error page
  res.status(err.status || 500);
  res.render('error');
});

module.exports = app;

We add app.use(express.static(path.join(__dirname, ‘files’))); to expose the files folder that we created to the public, and we add:

const jobsRouter = require('./routes/jobs');

and

app.use('/jobs', jobsRouter);

so that clients can access the route we created.

Finally, we create an .env file and put the following:

REDIS_URL='localhost'
REDIS_PORT='6379'
DB_HOST='localhost'
DB_NAME='youtube_app_development'
DB_USERNAME='postgres'
DB_PASSWORD='postgres'

The bull package requires Redis, so we have to install it. To do so, we run the following in Ubuntu or related Linux distributions:

$ sudo apt-get update
$ sudo apt-get upgrade
$ sudo apt-get install redis-server
$ sudo systemctl enable redis-server.service
$ sudo service redis-server restart

The first two commands are run to update the package repository references and to update our Linux packages. We run sudo apt-get install redis-server to install Redis, and we run the fourth line to enable Redis on startup. If Redis is not started or needs restarting, we run sudo service redis-server restart.

Note—there is no recent Windows version of Redis, so Linux is required. Now we have everything needed to run the back end.

Building the UI

The back end is done and we can move on to building the UI. We build it with Angular and Angular Material. To get started, we install the Angular CLI by running npm i -g @angular/cli . Then we run ng new frontend in our top-level project folder to create the app. Be sure to choose to include routing and use SCSS for styling when prompted. After that, we run npm i @angular/cdk @angular/material file-saver socket.io-client. The first two packages are Angular Material packages. file-saver helps us download files, and socket.io-client allows us to connect to the back end to get download progress and file location.

In environment.ts, we put:

export const environment = {
  production: false,
  apiUrl: 'http://localhost:3000',
  socketIoUrl: 'http://localhost:3000'
};

Then we create our components and services.

We run ng g component homePage and ng g service video to create our code files.

In video.service.ts, we put:

import { Injectable } from '@angular/core"';
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { environment } from 'src/environments/environment';

@Injectable({
  providedIn: 'root'
})
export class VideoService {

  constructor(
    private http: HttpClient
  ) { }

  addVideoToQueue(data) {
    return this.http.post(`${environment.apiUrl}/jobs/new`, data);
  }

  getVideo(videoUrl: string) {
    return this.http.get<Blob>(videoUrl, {
      headers: new HttpHeaders({
        'accept': 'application/octet-stream',
        'content-type': 'application/json'
      }),
      responseType: 'blob' as 'json'
    })
  }
}

to let our app make requests to add YouTube videos to the queue for download, and we call the getVideo to download. Note that we set the accept header to ‘application/octet-stream’ so that we can download video files.

Next in home-page.component.ts, we put:

import { Component, OnInit } from '@angular/core"';
import { VideoService } from '../video.service';
import { NgForm } from '@angular/forms';
import io from 'socket.io-client';
import { environment } from 'src/environments/environment';
import { saveAs } from 'file-saver';

@Component({
  selector: 'app-home-page',
  templateUrl: './home-page.component.html',
  styleUrls: ['./home-page.component.scss']
})
export class HomePageComponent implements OnInit {
  videoData: any = <any>{};
  progress: number = 0;
  fileLocation: string;
  downloaded: boolean = false;
  jobId: number;
  connected: boolean = false;
  socket;
  getVideoSub;

constructor(
    private videoService: VideoService
  ) { }

ngOnInit() {
      this.addConnectionHandlers();
  }

  addConnectionHandlers() {
    const manager = io.Manager(environment.socketIoUrl);
    manager.on('connect_error', () => {
      this.socket = io.connect(environment.socketIoUrl);
    });

      this.socket = io.connect(environment.socketIoUrl);
    this.socket.on('connect', (data) => {
      this.socket.on('connected', (msg) => {

});

      this.socket.on('progress', (msg) => {
        if (this.jobId != msg.jobId) {
          return;
        }
        this.progress = msg.progress;
        if (msg.progress == 100) {
          this.progress = 0;
        }
      });

      this.socket.on('videoDone', (msg) => {
        if (this.jobId != msg.jobId || this.downloaded) {
          return;
        }
        this.getVideoSub = this.videoService.getVideo(`${environment.apiUrl}/jobs/file/${msg.fileLocation}`)
          .subscribe(res => {
            if (!this.downloaded) {
              saveAs(res, `${msg.fileLocation}.mp4`);
              this.progress = 0;
              this.downloaded = true;
              this.getVideoSub.unsubscribe();
            }
          })
      });
    });
  }

  addVideoToQueue(videoForm: NgForm) {
    this.downloaded = false;
    if (videoForm.invalid) {
      return;
    }
    this.videoService.addVideoToQueue(this.videoData)
      .subscribe(res => {
        this.jobId = (res as any).id;
      }, err => {
        alert('Invalid URL');
      })
  }
}

This provides the logic for the UI to let the user enter their YouTube URLs, watch their video’s download progress, and download it when it’s done. Since we used socket.broadcast.emit in the back end, we have to filter it out in the front end. The back end returns the jobId for the download job, so we can filter out by jobId. We also need to add retry in case the back end app goes down with the setTimeout block in the connect_error handler. We check if the same file has been downloaded before with the this.downloaded flag so it won’t download again. Otherwise, it might try to download too many times, causing freezes and crashes.

In home-page.component.html, we put:

<div class="center">
    <h1>Download Video From YouTube</h1>
</div>
<div id='content'>
    <form #videoForm='ngForm' (ngSubmit)='addVideoToQueue(videoForm)'>
        <mat-form-field>
            <input matInput placeholder="YouTube URL" required #url='ngModel' name='url' [(ngModel)]='videoData.url'
                [disabled]='progress != 0'>
            <mat-error *ngIf="url.invalid && (url.dirty || url.touched)">
                <div *ngIf="url.errors.required">
                    URL is required.
                </div>
            </mat-error>
        </mat-form-field>
        <br>
        <button mat-raised-button type='submit'>Convert</button>
    </form>
    <br>
    <mat-card *ngIf='progress > 0'>
        Downloading: {{progress}}%
    </mat-card>
</div>

to let the user enter their YouTube URL and display progress. Note that we disabled input when a video is downloaded, so that users can’t keep entering new requests.

In home-page.component.scss, we put:

#content {
  width: 95vw;
  margin: 0 auto;
}

to add some padding to the form.

In app-routing.module.ts, we put:

import { NgModule } from '@angular/core"';
import { Routes, RouterModule } from '@angular/router';
import { HomePageComponent } from './home-page/home-page.component';

const routes: Routes = [
  { path: '', component: HomePageComponent }
];

@NgModule({
  imports: [RouterModule.forRoot(routes)],
  exports: [RouterModule]
})
export class AppRoutingModule { }

so that users can see our page.

In app.component.html, we put:

<router-outlet></router-outlet>

so that our page will be displayed. In app.module.ts, we put:

import { BrowserModule } from '@angular/platform-browser';
import { NgModule } from '@angular/core"';
import { BrowserAnimationsModule } from '@angular/platform-browser/animations';
import {
  MatButtonModule,
  MatCheckboxModule,
  MatInputModule,
  MatMenuModule,
  MatSidenavModule,
  MatToolbarModule,
  MatTableModule,
  MatDialogModule,
  MAT_DIALOG_DEFAULT_OPTIONS,
  MatDatepickerModule,
  MatSelectModule,
  MatCardModule,
  MatFormFieldModule
} from @angular/material;
import { AppRoutingModule } from './app-routing.module';
import { AppComponent } from './app.component';
import { HomePageComponent } from './home-page/home-page.component';
import { FormsModule } from '@angular/forms';
import { HttpClientModule } from '@angular/common/http';

@NgModule({
  declarations: [
    AppComponent,
    HomePageComponent
  ],
  imports: [
    BrowserModule,
    AppRoutingModule,
    MatButtonModule,
    BrowserAnimationsModule,
    MatButtonModule,
    MatCheckboxModule,
    MatFormFieldModule,
    MatInputModule,
    MatMenuModule,
    MatSidenavModule,
    MatToolbarModule,
    MatTableModule,
    FormsModule,
    HttpClientModule,
    MatDialogModule,
    MatDatepickerModule,
    MatSelectModule,
    MatCardModule
  ],
  providers: [],
  bootstrap: [AppComponent]
})
export class AppModule { }

so that we can use Angular Material widgets in our app.

In styles.scss, we put:

/* You can add global styles to this file, and also import other style files */
@import "~@angular/material/prebuilt-themes/indigo-pink.css";
body {
  font-family: "Roboto", sans-serif;
  margin: 0;
}

form {
  mat-form-field {
    width: 95vw;
    margin: 0 auto;
  }
}

.center {
  text-align: center;
}

to include Material Design styles and add some padding to our forms and style for centering text.

In index.html, we put:

<!doctype html>
<html lang="en">

<head>
  <meta charset="utf-8">
  <title>YouTube Download App</title>
  <base href="/">
  <link href="https://fonts.googleapis.com/css?family=Roboto&display=swap" rel="stylesheet">
  <link href="https://fonts.googleapis.com/icon?family=Material+Icons" rel="stylesheet">
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <link rel="icon" type="image/x-icon" href="favicon.ico">
</head>

<body>
  <app-root></app-root>
</body>

</html>

to include Material Icons and Roboto font.

By John Au-Yeung

Web developer specializing in React, Vue, and front end development.

Leave a Reply

Your email address will not be published. Required fields are marked *